Async and recursive loop

Hello everyone!
In the code below I am trying to use async in a recursive loop by creating an async task that should start with a delay of 1 second after the 3rd iteration of the loop.
It is obvious that such a solution will not work, because the scheduler is launched only after the program exits the loop at the 100th iteration.

open Async

(** Some computational load to stretch out the time of each iteration of the loop *)
let start_heavy_computation diff =
  List.init 1000000 Fun.id
  |> List.find_all (fun n -> n > 900000 - diff && n < 900000 + diff)
  |> List.fold_left (fun a x -> a + x) 0
  |> ignore

let submit_async_task () =
  Stdio.printf "submit_async_task\n%!";
  let task = after (Core.Time_float.Span.of_sec 1.) >>= 
    (fun _ -> return @@ Stdio.printf "Executing Async task\n%!")
  in
  upon task (fun _ -> Stdio.printf "Async task executed\n%!")

let rec loop count = 
  Stdio.printf "Iter %d\n%!" count;
  start_heavy_computation count;

  match count with
  | _ when count = 5 ->
    submit_async_task ();
    loop (count + 1)
  | _ when count = 100 ->
    return ()
  | _ -> 
    loop (count + 1)

let () = 
  upon (loop 1) (fun _ -> Stdio.printf "Done\n%!");
  Core.never_returns @@ Scheduler.go ()

And the output is corresponding:

Iter 1
Iter 2
Iter 3
submit_async_task
Iter 4
...
Iter 99
Iter 100
Done
Executing Async task
Async task executed

I worked a little with F# and its async computation expressions and I know that there I can run an async function via Async.Start and it will be executed in a thread pool that supports the runtime of the .net platform without the need to run this thread pool manually.

Is it possible to do something similar in OCaml with a recursive loop like this using async library?

1 Like

It is obvious that such a solution will not work, because the scheduler is launched only after the program exits the loop at the 100th iteration.

It actually is not because of this. Rather you are running into the issue of code between binds cannot be interrupted. The whole loop will iterate until it is done because there is no bind or something like that to allow some other work (like that created with submit_async_task) to be scheduled and determined.

To let the behavior that I think you’re looking for happen (eg, the thing printing to the screen 1 second after the proper iteration rather than way at the end), one way is you could add in a bind point with a short wait in your loop.

let rec loop count =
  Stdio.printf "Iter %d\n%!" count ;
  start_heavy_computation count ;
  match count with
  | _ when count = 5 ->
      submit_async_task () ;
      loop (count + 1)
  | _ when count = 100 ->
      return ()
  | _ ->
      (* Right here will allow your other jobs to get scheduled and determined. *)
      after (Core.Time_float.Span.of_ms 100.) 
      >>= fun () -> loop (count + 1)

Then you will get something like this:

Iter 1                             
Iter 2
Iter 3
Iter 4
Iter 5
submit_async_task
Iter 6
Iter 7
Iter 8
Iter 9
Iter 10
Iter 11
Executing Async task
Async task executed
Iter 12
Iter 13
Iter 14
Iter 15
...
...

Which I think is what you’re after.

Thanks! Works like a charm!
I realized that async will only work if the chain of binds is not interrupted.
For my case, using your trick but with a delay of 0ns works great.

after (Core.Time_float.Span.of_ns 0.) 
      >>= fun () -> loop (count + 1)
1 Like

There’s a general observation here that might be worth noting:

In all green-threaded systems (hence, Lwt), threads that do anything nontrivial need to “yield” periodically in order to allow other threads to get a chance at the scheduler. That 0ns “after” is a yield. I remember once a LISP machine guy told me that some old version of the LISPm would implicitly yield on backward jumps and subroutine returns. B/c that was enough to guarantee that every thread of execution would sooner or later yield.

1 Like

I do wonder about this solution. I remember in systems that supported async concurrency back in the day, we would do this sort of thing: yield() periodically in the “main thread”. But there was always a caveat, that async tasks themselves could not, must not themselves do something like what this main thread is doing, or you’d end up getting a stack of threads each of which was doing a yield. We might call what this main thread is doing “complex computation”, and one might say that in a system using this “main thread + bunch of async tasks”, the async tasks must not engage in “complex computation”.

It seems like that dictum applies here, unless that main task (and any other complex computation) is CPS-translated (or the equivalent in “promise-invoking translation” or whatever.

1 Like

This is interesting. While it gets the behavior that’s being looked for, I’m honestly not sure if that “yield in the loop function” is the best idea, or scalable, or even if it is a something that should be avoided at all costs…I would be very interested in learning about a better solution!

(I deleted the post directly above this one as I tried to use the reply function, but failed.)

If you’re looking for something that actually works under all conditions, and allows you to write code that can run “under the main thread” or as an async task, then you have two choices:

  1. CPS-translate your code.

  2. use a green-threads package that supports multiple stacks and switching between them.

#1 is … well, it’s pretty obvious: you write your code using promises throughout. #2 is (IIUC) what all the algebraic effects stuff gives you. People have used it to re-implement Lwt-like concurrency, but while still using real stacks.

But if you expect that function to only be called “in the main thread” then there’s nothing wrong with “yield in the loop”, seriously. It’s a standard way of doing it, going back to when “real threads” on UNIX machines were rare, so people were coming up with all sorts of hacks to get 'em. I’ve seen RPC systems that do this, so that even though a process can only be handling one RPC at a time, it can be accumulating inbound requests, or pushing out replies, while handling that one RPC. Because those two tasks are fully-async and don’t involve significant computation.

2 Likes

Thanks for the useful insights!
Async in OCaml have become more clear to me. However, I would like to clarify one point.
For a simple experiment, I started printing
Core_thread.(self () |> id)
and observed that all calculations both in the loop and in submit_async_task () occur in the main thread with id 0.
Hence I have two questions:

  1. Is it possible to execute async in other threads?
  2. Do I understand correctly that if they are executed in the main thread, then it is reasonable to split any long async operation into several parts and connect it via bind so that the scheduler can switch to other tasks from time to time?

I’m not an expert in the use of Lwt and Async: I used Lwt once. But from reading posts in this forum:

  1. I think both Lwt and Async can be configured with multiple native threads for executing async tasks. Others would know better how this is done.

  2. I would think that this is close to the recommended practice, yes.

Something to remember about running async tasks in multiple threads: OCaml code has “grown up” in a world of single-threading. So (for instance) in the case of Lwt, you pretty much assume that non-I/O code between two binds runs uninterrupted. You don’t worry about having to put mutexes around data structures in such cases. But if you switch to multiple native threads, then that can and probably will change, and you’ll need to be aware of what datastructures are accessible from multiple threads and use mutexes appropriately. It all gets more complicated.

Just something to be aware of.

1 Like