In an attempt to understand and use lwt, I’m trying to create a basic program that executes sleep in parallel:
open Lwt.Infix
let wait_time n =
Lwt_io.printf "%f started\n" n >>= fun () ->
Lwt_unix.sleep (4.0 -. n) >>= fun () ->
Lwt_io.printf "%f done\n" n
let () =
let start_time = Unix.gettimeofday () in
Printf.printf "Start time: %f\n" start_time;
Lwt_main.run (Lwt_list.iter_s (wait_time) [1.0;2.0;3.0]);
let end_time = Unix.gettimeofday () in
Printf.printf "End time: %f\n" end_time;
Printf.printf "Took: %f\n" (end_time -. start_time);
Output:
1.000000 started
1.000000 done
2.000000 started
2.000000 done
3.000000 started
3.000000 done
Start time: 1500196376.614725
End time: 1500196382.617343
Took: 6.002618
Issues:
I expected the Lwt_list.iter_s to execute wait_time for each float in the list in parallel. The total time would then be ~3 seconds instead of ~6.
Why is the start_time not printing before the wait_time list is iterated? Is the only way to solve this the Lwt_stream example in Print asynchronously but in the right order?
Thanks, I should have read the comments in lwt_list.mli:
(** Note: this module use the same naming convention as
{!Lwt_stream}. *)
Then from the comments in lwt_stream.mli:
Naming convention: in this module, all functions applying a function
to each element of a stream are suffixed by:
- [_s] when the function returns a thread and calls are serialised
- [_p] when the function returns a thread and calls are parallelised
Thanks @Armael, I’ve added %! at the end of all Printf.printf and Lwt_io.printf statements.
It’s pretty close now but somehow the done output from the last Lwt_io.printf thread to complete sometimes outputs after the later executed Printf.print call:
Start time: 1500426619.207419
1.000000 started
2.000000 started
3.000000 started
3.000000 done
2.000000 done
End time: 1500426622.208663
Took: 3.001244
1.000000 done
Another execution:
Start time: 1500427025.013387
1.000000 started
2.000000 started
3.000000 started
3.000000 done
2.000000 done
End time: 1500427028.014688
1.000000 done
Took: 3.001301
Looks like it could be a race condition when flushing of the different internal buffers.
It’s not really important as the printing is only used for this example, but I’m curious to know if that can be solved?
@bramford, this is basically a bug. Thanks for finding it.
The reason this happens is that Lwt_io.printfis implemented as sprintf internally (actually, ksprintf). As a result, when you try to flush using %!, it only causes that ksprintf to flush to that internal Lwt_io string. It basically has no effect. We need to document this.
There are two ways to solve this:
Insert explicit Lwt_io.(flush stdout) any time Lwt_io can interact with something else writing to stdout.
Adding the explicit Lwt_io.flush solved the issue:
let wait_time n =
Lwt_io.printf "%f started\n%!" n >>= fun () ->
Lwt_unix.sleep (4.0 -. n) >>= fun () ->
Lwt_io.printf "%f done\n%!" n >>= fun () ->
Lwt_io.flush Lwt_io.stdout