I am trying to understand Lwt_process by reading some examples included with Lwt source. Specifically I am surprised that an example is working when it should not – see below.
Here is an example from <ocsigen-lwt-git>/test/unix/test_lwt_process.ml (with only the interesting parts shown from the whole file )
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)
open Test
open Lwt.Infix
...
let pwrite ~stdin pout =
let args = [|"dummy.exe"; "read"|] in
let proc = Lwt_process.exec ~stdin ("./dummy.exe", args) in
let write = Lwt.finalize
(fun () -> Lwt_unix.write pout expected 0 expected_len)
(fun () -> Lwt_unix.close pout) in
proc >>= fun r ->
write >>= fun n -> (* ### QUESTION: Why is write running after the proc would have exited *)
assert (n = expected_len);
check_status r
...
let suite = suite "lwt_process" [
...
test "can write to subproc stdin"
(fun () ->
let pin, pout = Lwt_unix.pipe_out ~cloexec:true () in
pwrite ~stdin:(`FD_move pin) pout);
...
]
I am trying to understand how is it possible for the test "can write to subproc stdin" to run successfully !?
Note that proc is Unix.process_status Lwt.t which means that the after Lwt.bind i.e. >>=the process should have exited.
So why is write (to the stdin of the process ) running after the process would have already exited with an exit code r ?
The only way for this to run successfully would be that the let proc = ... causes an exec to run immediately. This goes against my understanding because let proc = is merely a description of the IO. The calls are only executed where the Lwt.bind are located.
For easy reference, here is <ocsigen-lwt-git>/test/unix/dummy.ml which corresponds to dummy.exe. Though this is not critical to understanding my question.
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)
let test_input_str = "the quick brown fox jumps over the lazy dog"
let test_input = Bytes.of_string test_input_str
let test_input_len = Bytes.length test_input
let read () =
let buf = Bytes.create test_input_len in
let rec aux n =
let i = Unix.read Unix.stdin buf n (Bytes.length buf - n) in
if i = 0 || n + i = test_input_len then
Bytes.equal buf test_input
else aux (n + i)
in
if aux 0 then
(* make sure there's nothing more to read *)
0 = Unix.read Unix.stdin buf 0 1
else false
let write fd =
assert (test_input_len = Unix.write fd test_input 0 test_input_len)
let () =
match Sys.argv.(1) with
| "read" -> exit @@ if read () then 0 else 1
| "write" -> write Unix.stdout
| "errwrite" -> write Unix.stderr
| _ -> invalid_arg "Sys.argv"
This is not quite right: Lwt has “eager” semantics in the sense that it will not yield until the current thread blocks (Async makes a different choice). Concretely, what is probably happending is that the whole payload is being written when the write thread is created.
What does eager mean here – so the promises start executing as soon as they are created? So the process exec in let proc = ... and pipe write in let write = ... start executing right away?
I still am trying to get a mental model here of why this example works…
Yes, that’s right; this is the basic model and it is the same for Lwt and Async; but that’s not what “eagerness” referred to. Rather what it means is that at bind points Lwt will keep going in the current thread if it is possible to do so without blocking, while Async will always try to yield to another thread.
You’ve already got two promises registered with the scheduler. The bind on proc finds an unresolved promise so goes back to the scheduler. The scheduler will then make progress (or wait for progress) on both promises. Once the proc one is resolved the scheduler triggers the bind callback, which binds write but likely finds the promise has already resolved and proceeds straight away to the assertion. If I understand correctly @nojb’s answer, for the second bind Async would still have let the scheduler choose whether to schedule the asserting callback right now or switch to another promise in progress, while Lwt doesn’t go back to the scheduler when binding a resolved promise.
As mentioned around the different answers, the write starts executing as soon as the promise for it is created. The promise does not affect the execution of the writing, it is merely a synchronisation primitive which witnesses the completion of the writing.
In most cases, promises are bound where they are created: Lwt_process.exec … >>= …. In those cases there are no questions about the synchronisation.
Declaring promises outside of a bind is a way to start a background task. (Provided the scheduler is running.)
However, in this case it is possible to get the best of both worlds with a simple both:
Lwt.both (* `both` waits for both of the promises to resolve *)
(Lwt_process.exec …)
(Lwt.finalize …) (* plot twist: because left-to-right argument evaluation in ocaml the promises start in the reversed order compared to the original example! This shouldn't matter in this case but still… Programming is such fun! *)
>>= fun (r, n) ->
…
or with more recent syntax
let* r = Lwt_process.exec …
and* n = Lwt.finalize …
in
…
One additional remark:
The following variation of the test should work (most times! see below).
let pwrite ~stdin pout =
let args = [|"dummy.exe"; "read"|] in
let proc = Lwt_process.exec ~stdin ("./dummy.exe", args) in
let write = Lwt.finalize
(fun () -> Lwt_unix.write pout expected 0 expected_len)
(fun () -> Lwt_unix.close pout) in
proc >>= fun r ->
assert (match Lwt.state write with | Return _ -> true | _ -> false); (* ← write should be finished! *)
write >>= fun n ->
assert (n = expected_len);
check_status r
However this stricter test might depend on some scheduler internals. Indeed, whilst the underlying system call for the write must finish before the system call for the proc have, there is no guarantee that these are propagated immediately to the promises. There could be a delay between a system call finishing and the associated promise resolving.
This was a very helpful remark and helped crystallise the concept for me. Thank you!
So the scheduler is some ambient global object? The very creation of a promise registers it already with the scheduler then… ?
This brings me to questions about Lwt.async and where it comes into the picture if at all here. Here proc and write are not top level promises because they are later used in >>= so we don’t need to worry about Lwt.async, correct?
The scheduler is a global object which only lives for the duration of the Lwt_main.run call. Or rather, the data-structure which holds the state of the scheduler is an ambient global object, but the control which loops over this state is driven by the Lwt_main.run. The implementation of this run function is roughly as follows:
let rec run (p : 'a Lwt.t) : 'a =
match Lwt.state p with
| Return x -> x
| Fail exc -> raise exc
| Sleep ->
resolve_all_paused_promises ();
resolve_promises_for_system_calls_that_have_completed ();
run p
This description holds true for the Unix scheduler. I.e., when using Lwt_main.run. But it’s not the case for js_of_ocaml: in that case the scheduler starts when program starts and doesn’t stop until the program exits.
The Lwt.async function (the use of which is discouraged in favour of Lwt.dont_wait, see below) is a way to (a) explicitly ignore the resolution of a promise and (b) dispatch exceptions from that ignored promise onto a global exception handler. Essentially Lwt.async is
let async f =
Lwt.on_failure (f ()) (fun exc -> !async_exn_handler exc);
() (* ← return `unit` to ignore the promise altogether *)
(The actual implementation is better at catching exceptions, also it uses some internals for efficiency.)
If you could be certain that a promise would not raise an exception, then you could simply ignore the promise altogether: let _ = … in …. But you can never be certain so you use let () = Lwt.async (fun () -> …) in ….
Note that Lwt.async uses a global mutable exception handler. If you want something more local (which you probably do), then you can use Lwt.dont_wait. It takes an explicit exception handler as an additional parameter.
Here we couldn’t use Lwt.async because we use the values that the promises resolve to (n and r) to check that the promises did what they were supposed to do. In order to get these values (n and r) we must bind the promises. (We can do away with bind with trickery but I wouldn’t recommend it.)
A few notes:
It is possible to pass a promise to async and also bind on it. I don’t think it’d be useful in and of itself, but it can happen if you pass promises around to various combinators that do their own things.
I don’t quite understand what you mean by promises being “top-level” so I don’t think I can answer that part of your question very precisely.
Very helpful background information. Thanks for spending all the time ! I’ve got a better idea of what is going on – thanks again.
I was using term “top-level” in the context of the documentation of Lwt.async from the lwt.mli file. (Repeated below for convenience – see right at the end where the phrase “top-level” is used)
val async : (unit -> unit t) -> unit
(** [Lwt.async f] applies [f ()], which returns a promise, and then makes it so
that if the promise is {{!t} {e rejected}}, the exception is passed to
[!]{!Lwt.async_exception_hook}.
In addition, if [f ()] raises an exception, it is also passed to
[!]{!Lwt.async_exception_hook}.
[!]{!Lwt.async_exception_hook} typically prints an error message and
terminates the program. If you need a similar behaviour with a different
exception handler, you can use {!Lwt.dont_wait}.
[Lwt.async] is misleadingly named. Itself, it has nothing to do with
asynchronous execution. It's actually a safety function for making Lwt
programs more debuggable.
For example, take this program, which prints messages in a loop, while
waiting for one line of user input:
{[
let () =
let rec show_nag () : _ Lwt.t =
let%lwt () = Lwt_io.printl "Please enter a line" in
let%lwt () = Lwt_unix.sleep 1. in
show_nag ()
in
ignore (show_nag ()); (* Bad – see note for (1)! *)
Lwt_main.run begin
let%lwt line = Lwt_io.(read_line stdin) in
Lwt_io.printl line
end
(* ocamlfind opt -linkpkg -thread -package lwt_ppx,lwt.unix code.ml && ./a.out *)
]}
If one of the I/O operations in [show_nag] were to fail, the promise
representing the whole loop would get rejected. However, since we are
ignoring that promise at {b (1)}, we never find out about the rejection. If
this failure and resulting rejection represents a bug in the program, we
have a harder time finding out about the bug.
A safer version differs only in using [Lwt.async] instead of
[Stdlib.ignore]:
{[
let () =
let rec show_nag () : _ Lwt.t =
let%lwt () = Lwt_io.printl "Please enter a line" in
let%lwt () = Lwt_unix.sleep 1. in
show_nag ()
in
Lwt.async (fun () -> show_nag ());
Lwt_main.run begin
let%lwt line = Lwt_io.(read_line stdin) in
Lwt_io.printl line
end
(* ocamlfind opt -linkpkg -thread -package lwt_ppx,lwt.unix code.ml && ./a.out *)
]}
In this version, if I/O in [show_nag] fails with an exception, the exception
is printed by [Lwt.async], and then the program exits.
The general rule for when to use [Lwt.async] is:
- Promises which are {e not} passed {e to} {!Lwt.bind}, {!Lwt.catch},
{!Lwt.join}, etc., are {b top-level} promises.
- One top-level promise is passed to {!Lwt_main.run}, as can be seen in most
examples in this manual.
- Every other top-level promise should be wrapped in [Lwt.async]. *)
One thing that should be asked here in this example:
let () =
let rec show_nag () : _ Lwt.t =
let%lwt () = Lwt_io.printl "Please enter a line" in
let%lwt () = Lwt_unix.sleep 1. in
show_nag ()
in
Lwt.async (fun () -> show_nag ());
Lwt_main.run begin
let%lwt line = Lwt_io.(read_line stdin) in
Lwt_io.printl line
end
The scheduler should not exist when the Lwt.async (fun () -> show_nag ()); is called because Lwt_main.run has not yet been called. Yet it seems everything works properly…
The data-structure in which to register promises exist: Lwt.pause works does register your promises, Lwt_unix.* does register your system calls, etc.
However, the registered promises will not resolve until the the code that iterate over this data-structure executes—which happens when Lwt_main.run is called.
Also note that async does not register any promise with the scheduler. In fact, in a typical program, most promises are unknown to the scheduler. The only promises which are registered to the scheduler are the promises which pause (Lwt.pause) or make a system call (Lwt_unix.*, Lwt_process.*, Lwt_io.*, etc.). These promises specifically are the ones that will hang until the scheduler is started, i.e., until Lwt_main.run is called.