Hello all, I’m hoping that someone might have an idea of what is going wrong here. I’m putting together a simple asynchronous agent (like F#'s MailboxProcessor
) using Async
, and I’ve run into a weird issue where Async.after
seems to hang forever (while working as expected in other parts of my code).
open Core
open Async
type 'a t = ('a, Pipe.Writer.phantom) Pipe.t
type 'b inbox = ('b, Pipe.Reader.phantom) Pipe.t
type 'b answer = State of 'b | Async of 'b Deferred.t | Stop
let ( >> ) f g x = g (f x)
let ( >?= ) answer loop =
match answer with
| State s -> loop s
| Async d -> d >>= loop
(* | Async d -> Thread_safe.block_on_async_exn (fun () -> d) |> loop *)
| Stop -> return ()
let create ?(size=100) ~init body : 'a t =
let (inbox, mailslot) = Pipe.create () in
let () = Pipe.set_size_budget inbox size in
let rec msg_loop state =
Pipe.read inbox >>= function
| `Eof -> return ()
| `Ok msg -> body state msg >?= msg_loop in
let () = msg_loop init >>= (fun () -> return (Pipe.close mailslot))
|> don't_wait_for in
mailslot
let post : 'a t -> 'a -> unit Deferred.t = Pipe.write
let close : 'a t -> unit = Pipe.close
let result_of_timeout = function
| `Result a -> Result.return a
| `Timeout -> Result.fail "Timeout."
let post_and_reply ?(timeout=86400.) t closure =
let times_up = Time.Span.of_sec timeout |> with_timeout in
let (reply, channel) = Pipe.create () in
let msg = closure channel in
post t msg >>= fun () ->
Pipe.read reply |> times_up >>| result_of_timeout >>=? function
| `Eof -> Deferred.Result.fail "Reply channel unexpectedly closed (`Eof)"
| `Ok r -> Pipe.close_read reply; Deferred.Result.return r
let post_and_reply_sync t closure =
(fun () -> post_and_reply t closure)
|> Thread_safe.block_on_async
|> Result.map_error ~f:(Error.of_exn >> Error.to_string_hum)
|> Result.join
let post_and_reply_exn t closure =
post_and_reply t closure >>| Result.ok_or_failwith
let post_and_reply_sync_exn t closure =
(fun () -> post_and_reply_exn t closure) |> Thread_safe.block_on_async_exn
module Test = struct
type msg =
| Add of int
| Show
| BlockUp of float
| BindUp of float
| SlowShow of float
| Fetch of int t
| Stop
let body state = function
| Add i -> State (state + i)
| Show -> printf "state = %d\n" state; State state
| BlockUp secs -> begin fun () ->
return (print_endline "(block) just a sec...") >>= fun () ->
after (Time.Span.of_sec secs) >>= fun () ->
return state
end |> Thread_safe.block_on_async_exn |> fun s -> State s
| BindUp 0. -> Async begin
return (print_endline "(bind) zero secs...") >>= fun () ->
return state
end
| BindUp secs -> Async begin
return (print_endline "(bind) just a sec...") >>= fun () ->
after (Time.Span.of_sec secs) >>= fun () ->
return state
end
| SlowShow secs -> begin
after (Time.Span.of_sec secs) >>= fun () ->
return (printf "(slow) state = %d\n" state)
end |> don't_wait_for; State state
| Stop -> Stop
| Fetch chan -> Pipe.write chan state |> don't_wait_for; State state
let run () =
let mailbox = create ~init:0 body in
post mailbox (Add 1) >>= fun () ->
post mailbox (Add 1) >>= fun () ->
post mailbox Show >>= fun () ->
post mailbox (BindUp 0.0) >>= fun () ->
post mailbox (BindUp 3.0) >>= fun () ->
post mailbox (Add 10) >>= fun () ->
post mailbox Show >>= fun () ->
post mailbox (Add 1) >>= fun () ->
post mailbox (Add 1) >>= fun () ->
post mailbox (Add 1) >>= fun () ->
post mailbox (Add 1) >>= fun () ->
post mailbox (SlowShow 3.0) >>= fun () ->
post mailbox (SlowShow 3.0) >>= fun () ->
post mailbox (SlowShow 3.0) >>= fun () ->
post_and_reply_exn mailbox (fun c -> Fetch c) >>= fun reply ->
return (printf "Fetched state = %i\n" reply) >>= fun () ->
post mailbox Stop |> don't_wait_for;
never_returns (Scheduler.go ())
end
The perma-hang happens at the (BindUp 3.0)
message, since it preventing continuation of the agent’s msg_loop
. I know the offending Deferred is the after
since (BindUp 0.0)
which simply wraps the determined state doesn’t cause any problems.
The commented out version of Async
answer handling (in >?=
) works, as does the BlockUp
message, which explicitly uses Thread_safe.block_on_async
.
My naive suspicion is that the scheduler is just not running the after contained within the BindUp
handler at all, but I don’t know why that might be, since this is the first time I’ve used JS Async. Hope there is someone out there who can enlighten me!