How is Deferred.any in async supposed to work?

Is there an api similar to Lwt.pick in Async.Deferred? The closest that I managed to find is Deferred.any, however it seems the other Deferred.t doesn’t seem to be determined or cancelled, i.e. in

let any = Deferred.any [d1;d2] ....

When d1 is determined any also becomes determined, however, d2 is still not determined. How do we cancel Deferred.t in async. Is this not possible?

Consider the following example,
$ cat async_test.ml

open Async

let print_determined l =
  Print.print_string "\nDeferred.t statuses\n";
  List.iter ~f:(fun d -> Print.printf "%b\n" (Deferred.is_determined d)) l

let determine l =
  List.iter l ~f:(fun (d, ivar) ->
      if not (Deferred.is_determined d) then Ivar.fill ivar ())

let () =
  let ivar1 = ref @@ Ivar.create () in
  let ivar2 = ref @@ Ivar.create () in

  let d1 = Deferred.create (fun i -> ivar1 := i) in
  let d2 =
    let stdin = Lazy.force Reader.stdin in
    let stdout = Lazy.force Writer.stdout in
    Deferred.create (fun i ->
        ivar2 := i;
        let rec loop () =
          Print.print_string "Enter message: ";
          Reader.read_line stdin >>> function
          | `Ok line ->
              Writer.writef stdout "%s\n" line;
              loop ()
          | `Eof -> Writer.write stdout "Eof\n"
        in
        loop ())
  in

  let l = [ d1; d2 ] in
  let any = Deferred.any l in
  let _ =
    any >>= fun _ ->
    Print.printf "\n[any] is determined now.\n";

    determine [ (d1, !ivar1); (d2, !ivar2) ];

    print_determined (any :: l);
    Deferred.unit
  in

  print_determined (any :: l);

  Ivar.fill !ivar2 ();
  Core.never_returns (Scheduler.go ())

$ cat dune

(executable
 (name async_any)
 (libraries core async))

A sample run as follows,

$ dune exec ./async_test.exe

Enter message:
Deferred.t statuses
false
false
false

[any] is determined now.

Deferred.t statuses
true
true
true
hello
hello
Enter message: hello
hello
Enter message: ^C⏎

In the above, even after any, d1 and d2 are determined the code in d2 is still executing. Isn’t this supposed to stop after d2 is determined?

Hey there, yeah, Async can be a little mind-bending at first.

Firstly, the body of a Deferred.create (fun ivar -> ...) doesn’t just stop existing because the ivar was filled. In general, although OS threads can be killed, deferred computations in Async are not mapped entirely to a single thread so you can’t kill them without horrific unintended collateral damage. Async is concurrency library that is a hybrid between cooperative event-driven multi-tasking and pre-emptive threading. That means whatever happens between the Async operators is not interruptible.[1]

Next, the loop you construct underneath d2 actually creates a third deferred (in fact, more, one for each iteration)… The reason you’re not confronted with it is because you’re ignoring the deferred result by using >>>. Using >>> in a loop like that lets you fire off a chain of deferreds untethered from your main control logic. If you used >>= or >>| it would be more apparent, but see next point.

Note the signature differences between the functions.

utop # ( >>> ) ;;
 'a Deferred.t -> ('a -> unit) -> unit = <fun>
utop # ( >>= ) ;;
 'a Deferred.t -> ('a -> 'b Deferred.t) -> 'b Deferred.t = <fun>
utop # ( >>| ) ;;
 'a Deferred.t -> ('a -> 'b) -> 'b Deferred.t = <fun>

Finally, as an Async programming rule-of-thumb, if you find yourself manipulating ivars directly, you’re probably doing something wrong. They’re low-level and most programs don’t deal with them.

A more typical Async program that uses any might look like this.

let d1 = 
 (* timeout after 60 seconds *) 
  Clock.after (sec 60.)
in
let d2 =
  let rec loop () =
    Print.print_string "Enter message: ";
    Reader.read_line stdin >>= function
       | `Ok line ->
         Writer.writef stdout "%s\n" line;
         loop ()
       | `Eof ->
         Writer.write stdout "Eof\n";
         Deferred.return ()
  in
  loop ()
in
let l = [d1; d2] in
let any = Deferred.any l in

Hope this is helpful!

  1. Although you do see a pattern in things like the Async library where some functions take a ?stop deferred that stops them from running if they become determined. That’s functionality provided entirely by the builder of that interface, and it may even use something like Deferred.any inside to fulfill that feature.
1 Like

I’m not super familiar with Lwt, so I may have some details wrong. Reading the docs on cancellation suggests that Lwt promises have to be implemented in a specific way to make them cancellable. Cancellation basically causes a promise to “reject” with a specific cancellation exception, making dependent promises also cancelled. It can’t abort computation within a promise, though, AFAICT. So, it’s “best effort”.

The documentation also has the following paragraph:

It is recommended to avoid Lwt.cancel, and handle cancelation by tracking the needed extra state explicitly within your library or application.

This would be equivalent to the ?stop parameter which aborts computation in some Async deferreds. The implementer of a “cancellable” deferred must check ?stop periodically to see if the computation should be aborted, usually at the top of a polling loop or something similar.

1 Like

You also might want to try the Deferred.choose API. It doesn’t do any general purpose cancellation, but you can associated with each deferred you’re choosing between a function for handling the result and transforming it into a uniform type, and that final function will only be run for the chosen deferred.

3 Likes