Print asynchronously but in the right order

Hi,
I have a program that uses basically

let for_loop_print print f init i =
  let rec aux acc k =
    if k < i then
      f acc k >>= fun out ->
      print out >>= fun () ->
      aux out (succ k)
    else Lwt.return acc in
  aux init 0

and I would love to not wait for print to finish before starting the next iteration but if I replace

print out >>= fun () ->

by

let () = Lwt.ignore_result (print out) in

I lost the property that out_0 is printed before out_1 which is printed before out_2 … Right?

Is there a (canonical) way to get the best of both worlds?
My intuition would be to go in the direction of having this loop pushing on a stack instead of printing and having an other thread popping from the stack to print but that feels not right…

Thank you

@pirbo Your intuition is right. The stack would be an Lwt_stream. Here is an overly verbose example of how to use it for this purpose. I hope the inline comments explain everything, but if not, please do ask :slight_smile:

open Lwt.Infix

let for_loop_print print f init i =
  let values_to_print, push_value = Lwt_stream.create () in
   (* values_to_print : 'a Lwt_stream.t, and
      push_value : 'a option -> unit. *)

  let all_values_printed = Lwt_stream.iter_s print values_to_print in
   (* Each time a value is pushed to the stream, print will be called
      on it. The returned promise, all_values_printed, resolves with ()
      when the stream is finished (you push None into it), and all the
      prints complete. iter_s is for iterating in series, as opposed to
      iter_p, which is allowed to run prints in parallel. *)

  let computation =
    let rec aux acc k =
      if k < i then
        f acc k >>= fun out ->
        push_value (Some out);
        aux out (succ k)
      else Lwt.return acc in
    aux init 0
  in
   (* Your computation promise, of course, with pushes into the stream.
      The computation starts running eagerly: in Lwt, you don't have to
      wait on a promise with Lwt.bind or some other function, for Lwt to
      start trying to resolve it, but we are going to do it anyway: *)

  computation >>= fun result ->
   (* Wait for the computation to complete. *)

  push_value None;
  all_values_printed >>= fun () ->
   (* End the value stream, and wait for printing to complete. Note that
      printing has been happening concurrently with the computation
      already. *)

  Lwt.return result
   (* One could also return just "computation" here, because we know it
      has completed. I'm just trying not to be cryptic :) *)

let () =
  Lwt_main.run
    (for_loop_print
      (fun i -> Lwt_io.print (string_of_int i))
      (fun x i -> Lwt.return x) 0 10
    >|= ignore)

https://ocsigen.org/lwt/api/Lwt_stream

1 Like

One concern with using Lwt_stream is that I’m not sure what its behaviour is if there is an exception raised during the processing of values_to_print. There is lwt#250 with a lot of interesting discussion about how to improve this interface, for the interested reader.

1 Like