How can I set the priorities of tasks in async or lwt

Hi all,

I am porting my handwritten coroutine to async or lwt. I read their manuals, RWO chapter and lwt tutorial, but I don’t figure out how to assign a priority of a Deferred.t or Lwt.t so that the scheduler can pick the task with the highest priority.

I found Normal and Low for async job queue but it’s too low granularity for me.
I found lwt has a Lwt_pqueue but marked deprecated and it also looks like a stdlib stuff.

In general, the code is doing search and usually the search space is unbounded (bound by timeout), therefore I need to assign the priority, derived from search policy. In my naive implementation, I have a priority queue of task. I want to know how to achieve this using async or lwt.


There’s no support within Lwt for priority queues of tasks. One of the things to note is that “tasks” is not a concept that’s actually implemented by Lwt. (There is no data-structure that retains tasks somewhere in the code — that’s kind of only partially true, but it’s true enough.) Lwt deals with promises.

That being said, you can easily implement a task system with priorities on top of Lwt. Here are some pointers:

Say your priority queue has the following interface:

type priority
type 'a priority_queue
val push : 'a priority_queue -> priority -> (unit -> 'a) -> unit
val pull : 'a priority_queue -> (unit -> 'a) option

Then you can use your priority queue with 'a being a _ Lwt.t:

type 'a priority_queue_lwt = 'a Lwt.t priority_queue

You can use push normally and then you can have a simple run function that takes each tasks after the other:

let rec run handle_result q =
    match pull q with
    | None -> Lwt.return ()
    | Some t ->
        t () >>= fun r ->
        handle_result r >>= fun () ->
        run handle_result q

That’s a good start. There are incremental changes you can make:

  • support concurrent execution of tasks (you can do that by adding an int argument to run and using it to instantiate an Lwt_pool or some other such resource limiter) (alternatively, you can change the priority queue to return multiple values that you can Lwt_list.iter_p)
  • add an Lwt_condition so that the run function never returns, instead it waits for more tasks to be available (and you need to wrap the push function to also call this condition).