How can I set the priorities of tasks in async or lwt

Hi all,

I am porting my handwritten coroutine to async or lwt. I read their manuals, RWO chapter and lwt tutorial, but I don’t figure out how to assign a priority of a Deferred.t or Lwt.t so that the scheduler can pick the task with the highest priority.

I found Normal and Low for async job queue but it’s too low granularity for me.
I found lwt has a Lwt_pqueue but marked deprecated and it also looks like a stdlib stuff.

In general, the code is doing search and usually the search space is unbounded (bound by timeout), therefore I need to assign the priority, derived from search policy. In my naive implementation, I have a priority queue of task. I want to know how to achieve this using async or lwt.

Hi,

There’s no support within Lwt for priority queues of tasks. One of the things to note is that “tasks” is not a concept that’s actually implemented by Lwt. (There is no data-structure that retains tasks somewhere in the code — that’s kind of only partially true, but it’s true enough.) Lwt deals with promises.

That being said, you can easily implement a task system with priorities on top of Lwt. Here are some pointers:

Say your priority queue has the following interface:

type priority
type 'a priority_queue
val push : 'a priority_queue -> priority -> (unit -> 'a) -> unit
val pull : 'a priority_queue -> (unit -> 'a) option

Then you can use your priority queue with 'a being a _ Lwt.t:

type 'a priority_queue_lwt = 'a Lwt.t priority_queue

You can use push normally and then you can have a simple run function that takes each tasks after the other:

let rec run handle_result q =
    match pull q with
    | None -> Lwt.return ()
    | Some t ->
        t () >>= fun r ->
        handle_result r >>= fun () ->
        run handle_result q

That’s a good start. There are incremental changes you can make:

  • support concurrent execution of tasks (you can do that by adding an int argument to run and using it to instantiate an Lwt_pool or some other such resource limiter) (alternatively, you can change the priority queue to return multiple values that you can Lwt_list.iter_p)
  • add an Lwt_condition so that the run function never returns, instead it waits for more tasks to be available (and you need to wrap the push function to also call this condition).
7 Likes

Hi raphael,

I tried your code and it works (of course). I was working to extend it towards my case.
(meanwhile, I read the manual and the source code of Lwt.) . Here are some of my thought and experiments.

This demo code has the workload function of type unit -> 'a, and since 'a is _ Lwt.t, the workload becomes unit -> _ Lwt.t. The scheduling run picks the task and runs it, and when it’s done, it handles its result and picks another task to run.

My case may be similar to what you say run function never returuns_. When a task runs, the scheduler run will not wait for its result because the first task actually is the whole computation.

My demo case is to sum among a binary tree, where

let limit = 10
let rec sum_node n : unit -> 'a Lwt.t =
  fun () ->
  if n > limit then
    Lwt.return 0
  else
    (* spawn two child tasks *)
    let s1 = sum_node (2*n)... in
    let s2 = sum_node (2*n+1)... in
    (* add s1,s2 to the queue *)
    ...
    heavy_computation ()
    ...
    Lwt.return (n + result_of_s1 + result_of_s2)

therefore, I modified your code as

let rec run q =
    match pull q with
    | None -> Lwt.return ()
    | Some t ->
        t ();
        run q

Now the question becomes how should I get the result of computation on the node given s1 s2 are functions.

# aside: what if the workload function is 'a Lwt.t instead of unit -> 'a Lwt.t?

let rec sum_node n : 'a Lwt.t =
  ...
  let s1 = sum_node (2*n)... in
  let s2 = sum_node (2*n+1)... in
  heavy_computation ();
  Lwt.both p1 p2 >>= fun (t1, t2) ->
  Lwt.return (n+t1+t2)

The dependency is correct but the scheduling clapses. I totally bypass the queue and the only freedom to adjust the computation priority is the order of s1 and s2. Therefore, I need to wrap the workload into unit -> 'a Lwt.t.

# back to the main

The problem is

  1. the patient node need to hold the promise of the child result, to express more control and flexibility
  2. the child node needs to notify the parent node of the result
  3. use unit -> 'a Lwt.t to postpone the heavy computation in the queue

My rescue is Lwt.task and the code is

let fn n = (* priority assignment function *)
let rec sum_node parent_resolver n : unit -> 'a Lwt.t =
  fun () ->
  let p1, r1 = Lwt.task () in
  let p2, r2 = Lwt.task () in
  
  let s1 = sum_node r1 (2*n) in
  let s2 = sum_node r2 (2*n+1) in

  push queue (fn (2*n))   s1;
  push queue (fn (2*n+1)) s2;

  Lwt.both p1 p2 >>= fun (t1, t2) ->
  heavy_computation ();
  Lwt.wakeup parent_resolver (n+t1+t2);
  Lwt.return_unit

The return value (Lwt.return_unit) is of less interest here.

I hope I don’t make it over-complex. When I post my original question, I had little experiment on both Lwt and Async. The example-based experiments help me understand how it works.

My next experiment will be how to deal when a sub-computation never returns, and when a sub-computation return multiple answers.

edit: done with the stream demo. Though resolver and stream look different, the code is quite similar for push-stream. By replacing Lwt.task () by Lwt_stream.create (), I get a reusable ref instead of a write-once one. Now I can turn to my own project.

1 Like

Your answer worths much value to me. It was vaguely understand when first read it, and after spending some time in lwt, I can understand most of it.

1 Like

It’s pretty good. I think you can simplify a bit the way you bubble the results back up the call chain. One thing you can do is use the priority queue but wrap the calls to return promises instead.

(** [push_and_wait] is like [push] but it returns a promise that is resolved
    when [f] does. Note that the priority queue must carry [_ Lwt.t] tasks. *)
let push_and_wait prio_q prio f =
  (* create synchronisation promise/resolver *)
  let p, r = Lwt.task () in
  (* wrap `f` so that it sends its result to the synchronisation resolver *)
  let f =
    f () >>= fun x ->
    Lwt.wakeup r x;
    Lwt.return_unit
  in
  (* pushed the wrapped `f` *)
  push prio_q prio f;
  (* return the synchronisation promise *)
  p

With this, your code doesn’t pass around its own resolvers and such:

let fn n = (* priority assignment function *)
let rec sum_node n : unit -> 'a Lwt.t = fun () ->
  let p_2n = push_and_wait q (fn (2 * n)) (sum_node (2*n)) in
  let p_2nplus1 = push_and_wait q (fn (2 * n + 1)) (sum_node (2*n+1)) in
  Lwt.both p1 p2 >>= fun (t1, t2) ->
  heavy_computation ();
  Lwt.return (n+t1+t2)

In general, in Lwt, you should try to push the task/waiter/wakener(resolver) close to the generic parts of the code that provide abstractions and away from the entry-point of your programs. In general, task should be used for building new abstractions. (Exceptions apply, some uses of task near the entry-point of your programs are fine.)

1 Like

Thanks for your reply!

I now understand and appreciate the guiding idea from you that Lwt ops should be close to the generic abstraction and away from the entry point.

One point I am not clear is why the priority must carry _ Lwt.t tasks rather than unit -> _ Lwt.t. I test with your code and found work_f (f in the argument) has already called to build the resolver binding, which means the computation for the node is done partly. If I use unit -> _ Lwt.t instead:

let push_and_wait prio_q prio work_f =
  let p, r = Lwt.task () in
  (* wrap `work_f` so that it sends its result to the synchronisation resolver *)
  let f () =
    work_f () >>= fun x ->
    Lwt.wakeup r x;
    Lwt.return_unit
  in
  push prio_q prio f;
  p

work_f won’t be called until the task is pulled from the priority queue.

let fn n = (* priority assignment function *)
let rec sum_node n : unit -> 'a Lwt.t = fun () ->
  lwt_log_enter n;

  some_heavy_computation_1 ();

  let p_2n = push_and_wait q (fn (2 * n + 1)) (sum_node (2*n+1)) in
  let p_2nplus1 = push_and_wait q (fn (2 * n + 1)) (sum_node (2*n+2)) in
  Lwt.both p1 p2 >>= fun (t1, t2) ->

  some_heavy_computation_2 ();

  Lwt.return (n+t1+t2)

p.s. I found one mistake in my previous reply that might mislead so I put two heavy computations now. There is some init work when entering sum_node and the intention to use a priority queue is to reduce the unnecessary computation.

You are right. The queue should hold unit -> _ Lwt.t values. Note however that in the early messages of this thread I mentioned:

It was intended that an 'a priority queue is a priority queue that carries elements of type unit -> 'a. I.e., internally, the data-structure stores unit -> 'a Lwt.t values, but the interface exposes this as an 'a queue.


In the latest code samples I wrote, note that the resolver is not created when f is called. Instead, it is created when push_and_wait is called. I did make a mistake in the definition of push_and_wait though: I forgot to add a parameter to the definition of the wrapped function. I wrote

But I should have wrote

let push_and_wait prio_q prio f =
  let p, r = Lwt.task () in
  let f () = (* THIS UNIT PARAMETER IS IMPORTANT!! *)
    f () >>= fun x ->
    Lwt.wakeup r x;
    Lwt.return_unit
  in
  push prio_q prio f;
  p

Note that the incorrect code should not type. It calls push with a value of type 'a Lwt.t whilst the function expects unit -> 'a Lwt.t (see comment on interface at the beginning of this post).


I may have made other mistakes. I haven’t tested the code and it’s intended more as “giving and idea of what a solution might look like” than a ready-to-use library. Let me know if you find other problems in the code I posted.

1 Like

I may have missed something but I wonder about the usefulness of this, and indeed of the relevance of the original question. If you have cpu-bound tasks, here apparently search tasks, why bother with Lwt at all? You can have a priority queue of functions which are pulled off the queue and executed sequentially in a loop. Lwt is really intended for programs which are predominantly i/o-bound. In such programs, priorities are rarely relevant - an action will be carried out when something else on which it is waiting has become ready, such as a file descriptor becoming ready for reading or writing or a timeout expiring.

When ocaml-multicore arrives, Lwt_preemptive.detach might become particularly useful for organizing cpu-bound parallel computation, and a version of that function which can take a priority argument might be of real value: but in such a case it is the underlying thread pool which offers priorities, not Lwt’s promises.

2 Likes

You have precisely got this, cvine.

I have cpu-bound search tasks, the model is like searching exits in a maze. At each node, I may spawn search functions (threads) for this. Since most searches can be unbound for my question, I hope to have a priority queue in which I can specify the search policy.

I didn’t say here that we do have a priority queue of functions and executed sequentially. The legacy code is based on handwritten free monads for nondeterminism. We also have pub/sub functions in it. (It’s an academic project through, people take turns to maintain it). Please believe I do need to update it, or have to replace some parts on my own).

Therefore, I hope there is an existing public library, or I can adapt onto it, to achieve a framework which I can make coroutines, yield them, composite them and their callbacks, broadcast signals, and pick ones with high-priorities run for a bit.

As for Lwt (or Async), Lwt has a core library and a Unix binding library. The core library has no I/O at all. The APIs look appealing if only I can check priority-based in achievable. That’s my main purpose of this post. I might go a bit further in this post and it’s possible finally I can come back to write my own scheduling.

I also agree with your point that priority might be of real value with multicore. I also admit here the critical reason for priority is not for efficiency, but to avoid to be trapped in some searches.

If you want asymmetrical coroutines constructed using delimited continuations, I have toyed with delimcc (on opam) and it worked fine for that. I have never benchmarked delimcc for performance and only experimented with it in order to compare it with delimited continuations in scheme, so I cannot confirm that it is suitable for what you want. But it does offer you asymmetrical coroutines.

ocaml’s multicore project also used to support effects, from which you could construct coroutines, but I think they have taken a back seat in multicore development at present. It is 3 or 4 years since I played with effects and with delimcc with coroutines in mind.

Do you use this delimcc on opam, which links to this webpage with a newer version on the page?

I know delimcc a bit and read Oleg’s pages and papers a bit. My last memory is the library needs to hack the compiler to use. If not, I will give it a try.

As I said, I am going back a few years when it did happen to work. I have just tried out some of my old coroutine code using delimcc-2018.03.16 as supplied by opam and ocaml-4.11.1 and it ran OK with bytecode (as compiled by ocamlc) but not with native code (as compiled by ocamlopt) where there was a stack overrun. I don’t know whether the latest version works with ocaml-4.11.1 native. I agree it does look like something of a hack.

Edit: I have tried pinning opam’s delimcc to the latest version on Oleg’s webpage (caml-shift.tar.gz) and it works OK with native code with ocaml-4.11.1. So you could give it a go and see if it meets your needs.

1 Like

In the case where the program is indeed CPU bound, Lwt may not be the most appropriate tool for the job. It’s possibly a tad overkill. In that case, a simple priority queue may be sufficient in itself.

(There are specific cases where Lwt may still make sense:

If the heavy computations are actually I/O bound tasks that need to run exclusively from one another (e.g., hitting a remote API that does rate-limiting, hitting a remote component that is very stateful, some other reason).

I the computation is one part of a whole pipeline, the rest of which is I/O bound and thus benefits from Lwt.)