Async.after hanging forever unless explicitly blocked on

Hello all, I’m hoping that someone might have an idea of what is going wrong here. I’m putting together a simple asynchronous agent (like F#'s MailboxProcessor) using Async, and I’ve run into a weird issue where Async.after seems to hang forever (while working as expected in other parts of my code).

open Core
open Async

type 'a t = ('a, Pipe.Writer.phantom) Pipe.t

type 'b inbox = ('b, Pipe.Reader.phantom) Pipe.t

type 'b answer = State of 'b | Async of 'b Deferred.t | Stop

let ( >> ) f g x = g (f x)

let ( >?= ) answer loop =
  match answer with
  | State s -> loop s
  | Async d -> d >>= loop
  (* | Async d -> Thread_safe.block_on_async_exn (fun () -> d) |> loop *)
  | Stop    -> return ()

let create ?(size=100) ~init body : 'a t =
  let (inbox, mailslot) = Pipe.create () in
  let () = Pipe.set_size_budget inbox size in
  let rec msg_loop state =
    Pipe.read inbox >>= function
    | `Eof -> return ()
    | `Ok msg -> body state msg >?= msg_loop in
  let () = msg_loop init >>= (fun () -> return (Pipe.close mailslot))
           |> don't_wait_for in
  mailslot

let post : 'a t -> 'a -> unit Deferred.t = Pipe.write

let close : 'a t -> unit = Pipe.close

let result_of_timeout = function
  | `Result a -> Result.return a
  | `Timeout  -> Result.fail "Timeout."

let post_and_reply ?(timeout=86400.) t closure =
  let times_up = Time.Span.of_sec timeout |> with_timeout in
  let (reply, channel) = Pipe.create () in
  let msg = closure channel in
  post t msg >>= fun () ->
  Pipe.read reply |> times_up >>| result_of_timeout >>=? function
  | `Eof  -> Deferred.Result.fail "Reply channel unexpectedly closed (`Eof)"
  | `Ok r -> Pipe.close_read reply; Deferred.Result.return r

let post_and_reply_sync t closure =
  (fun () -> post_and_reply t closure)
  |> Thread_safe.block_on_async
  |> Result.map_error ~f:(Error.of_exn >> Error.to_string_hum)
  |> Result.join

let post_and_reply_exn t closure =
  post_and_reply t closure >>| Result.ok_or_failwith

let post_and_reply_sync_exn t closure =
  (fun () -> post_and_reply_exn t closure) |> Thread_safe.block_on_async_exn

module Test = struct
  type msg =
    | Add of int
    | Show
    | BlockUp of float
    | BindUp of float
    | SlowShow of float
    | Fetch of int t
    | Stop

  let body state = function
    | Add i -> State (state + i)
    | Show  -> printf "state = %d\n" state; State state
    | BlockUp secs -> begin fun () ->
        return (print_endline "(block) just a sec...") >>= fun () ->
        after (Time.Span.of_sec secs) >>= fun () ->
        return state
      end |> Thread_safe.block_on_async_exn |> fun s -> State s
    | BindUp 0. -> Async begin
        return (print_endline "(bind) zero secs...") >>= fun () ->
        return state
      end
    | BindUp secs -> Async begin
        return (print_endline "(bind) just a sec...") >>= fun () ->
        after (Time.Span.of_sec secs) >>= fun () ->
        return state
      end
    | SlowShow secs -> begin
        after (Time.Span.of_sec secs) >>= fun () ->
        return (printf "(slow) state = %d\n" state)
      end |> don't_wait_for; State state
    | Stop  -> Stop
    | Fetch chan -> Pipe.write chan state |> don't_wait_for; State state

  let run () =
    let mailbox = create ~init:0 body in
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox Show                             >>= fun () ->
    post mailbox (BindUp 0.0)                     >>= fun () ->
    post mailbox (BindUp 3.0)                     >>= fun () ->
    post mailbox (Add 10)                         >>= fun () ->
    post mailbox Show                             >>= fun () ->
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox (Add 1)                          >>= fun () ->
    post mailbox (SlowShow 3.0)                   >>= fun () ->
    post mailbox (SlowShow 3.0)                   >>= fun () ->
    post mailbox (SlowShow 3.0)                   >>= fun () ->
    post_and_reply_exn mailbox (fun c -> Fetch c)   >>= fun reply ->
    return (printf "Fetched state = %i\n" reply)  >>= fun () ->
    post mailbox Stop |> don't_wait_for;
    never_returns (Scheduler.go ())
end

The perma-hang happens at the (BindUp 3.0) message, since it preventing continuation of the agent’s msg_loop. I know the offending Deferred is the after since (BindUp 0.0) which simply wraps the determined state doesn’t cause any problems.

The commented out version of Async answer handling (in >?= ) works, as does the BlockUp message, which explicitly uses Thread_safe.block_on_async.

My naive suspicion is that the scheduler is just not running the after contained within the BindUp handler at all, but I don’t know why that might be, since this is the first time I’ve used JS Async. Hope there is someone out there who can enlighten me!

That’s not the behavior I observe running this code… Well, the first thing is to get the code compiling by fixing the parentheses in run:

  let run () =
    let mailbox = create ~init:0 body in
    don't_wait_for
      (post mailbox (Add 1)                          >>= fun () ->
       post mailbox (Add 1)                          >>= fun () ->
       post mailbox Show                             >>= fun () ->
       post mailbox (BindUp 0.0)                     >>= fun () ->
       post mailbox (BindUp 3.0)                     >>= fun () ->
       post mailbox (Add 10)                         >>= fun () ->
       post mailbox Show                             >>= fun () ->
       post mailbox (Add 1)                          >>= fun () ->
       post mailbox (Add 1)                          >>= fun () ->
       post mailbox (Add 1)                          >>= fun () ->
       post mailbox (Add 1)                          >>= fun () ->
       post mailbox (SlowShow 3.0)                   >>= fun () ->
       post mailbox (SlowShow 3.0)                   >>= fun () ->
       post mailbox (SlowShow 3.0)                   >>= fun () ->
       post_and_reply_exn mailbox (fun c -> Fetch c)   >>= fun reply ->
       return (printf "Fetched state = %i\n" reply)  >>= fun () ->
       post mailbox Stop);
    never_returns (Scheduler.go ())

Once that’s done, I observe that the Fetched state message is printed; if you replace don't_wait_for x with upon x (fun () -> Shutdown.shutdown 0), the program completes successfully.

By the way, I would recommend using Command.async to run your program. This will set up the appropriate at-shutdown handlers to flush any readers/writers and do a couple other cleanup things; it will also automatically shut down your program after the main function completes.

In addition, you can use Pipe.Writer.t and Pipe.Reader.t to avoid explicitly naming the phantom types. Also, you probably don’t want to use Thread_safe.block_on_async in your program. From its documentation:

This function can be called from the main thread (before Async is
started) or from a thread outside Async.

I think it is not supported to call that function within an Async thread.

Huh, I must have made an error in pasting, since it was compiling and running fine on my end, other than hanging after the (bind) just a sec... print. I know that it will always hang without returning after the Fetched state print if it makes it that far due to the lack of shutdown.

Even though it doesn’t make a difference in compilation (for me), throwing all the posts into a block before applying don't_wait_for seems to have done the trick! I naively thought that it was already being treated like that since it was compiling fine (and running, until I added in the Async answer binding to >?=.

Thanks for the warning about Thread_safe.block_on_async, I only added pieces using it as part of the debugging process after my issues with handing BindUp (which is for testing out answering with an 'a Deferred.t). I intended to strip it out anyway, but I’ll make sure to read more into it and avoid using it in the middle of programs in the future.

Ah forgot that I also threw block_on_async in the “synchronous” post_and_reply calls to the agent. Since those would be being used in a synchronous context, with the agent running in the background, would that still cause problems? Otherwise I’m not sure if there is an equivalent solution available to the synchronous post_and_reply of the F# MailboxProcessor.

I think the key thing is what thread you’re running block_on_async from. I don’t remember the exact rules, but fortunately, if you call block_on_async where you aren’t supposed to, you at least get an exception instead of, say, deadlock (excerpt from thread_safe.ml):

  (* We disallow calling [block_on_async] if the caller is running inside async.  This can
     happen if one is the scheduler, or if one is in some other thread that has used, e.g.
     [run_in_async] to call into async and run a cycle.  We do however, want to allow the
     main thread to call [block_on_async], in which case it should release the lock and
     allow the scheduler, which is running in another thread, to run. *)
  if i_am_the_scheduler t || (am_holding_lock t && not (is_main_thread ()))
  then raise_s [%message "called [block_on_async] from within async"];

Oh thanks, I should have just checked the source myself I guess. I haven’t gotten around to testing out the _sync versions yet, but it sounds like they might work for the intended purpose from that blurb. I’ll have to make a similar note/caution about never calling it from within an async thread though of course.

I just skimmed this but FYI: IME you don’t want an async agent but, rather, a bounded async message queue with async enqueue and dequeue that suspends until there is room in that queue. Otherwise you have no back pressure and slow intermediate agents become a memory leak.