Hi raphael,
I tried your code and it works (of course). I was working to extend it towards my case.
(meanwhile, I read the manual and the source code of Lwt.) . Here are some of my thought and experiments.
This demo code has the workload function of type unit -> 'a
, and since 'a
is _ Lwt.t
, the workload becomes unit -> _ Lwt.t
. The scheduling run
picks the task and runs it, and when it’s done, it handles its result and picks another task to run.
My case may be similar to what you say run
function never returuns_. When a task runs, the scheduler run
will not wait for its result because the first task actually is the whole computation.
My demo case is to sum among a binary tree, where
let limit = 10
let rec sum_node n : unit -> 'a Lwt.t =
fun () ->
if n > limit then
Lwt.return 0
else
(* spawn two child tasks *)
let s1 = sum_node (2*n)... in
let s2 = sum_node (2*n+1)... in
(* add s1,s2 to the queue *)
...
heavy_computation ()
...
Lwt.return (n + result_of_s1 + result_of_s2)
therefore, I modified your code as
let rec run q =
match pull q with
| None -> Lwt.return ()
| Some t ->
t ();
run q
Now the question becomes how should I get the result of computation on the node given s1
s2
are functions.
# aside: what if the workload function is 'a Lwt.t
instead of unit -> 'a Lwt.t
?
let rec sum_node n : 'a Lwt.t =
...
let s1 = sum_node (2*n)... in
let s2 = sum_node (2*n+1)... in
heavy_computation ();
Lwt.both p1 p2 >>= fun (t1, t2) ->
Lwt.return (n+t1+t2)
The dependency is correct but the scheduling clapses. I totally bypass the queue and the only freedom to adjust the computation priority is the order of s1
and s2
. Therefore, I need to wrap the workload into unit -> 'a Lwt.t
.
# back to the main
The problem is
- the patient node need to hold the promise of the child result, to express more control and flexibility
- the child node needs to notify the parent node of the result
- use
unit -> 'a Lwt.t
to postpone the heavy computation in the queue
My rescue is Lwt.task
and the code is
let fn n = (* priority assignment function *)
let rec sum_node parent_resolver n : unit -> 'a Lwt.t =
fun () ->
let p1, r1 = Lwt.task () in
let p2, r2 = Lwt.task () in
let s1 = sum_node r1 (2*n) in
let s2 = sum_node r2 (2*n+1) in
push queue (fn (2*n)) s1;
push queue (fn (2*n+1)) s2;
Lwt.both p1 p2 >>= fun (t1, t2) ->
heavy_computation ();
Lwt.wakeup parent_resolver (n+t1+t2);
Lwt.return_unit
The return value (Lwt.return_unit
) is of less interest here.
I hope I don’t make it over-complex. When I post my original question, I had little experiment on both Lwt and Async. The example-based experiments help me understand how it works.
My next experiment will be how to deal when a sub-computation never returns, and when a sub-computation return multiple answers.
edit: done with the stream demo. Though resolver
and stream
look different, the code is quite similar for push-stream. By replacing Lwt.task ()
by Lwt_stream.create ()
, I get a reusable ref instead of a write-once one. Now I can turn to my own project.