Tail recursive, efficient ways to dispatch ~1M requests via Lwt

Hi all, as part of a benchmark I’m trying to dispatch a lot of requests (~1M) that come from time-series data.

The constraints unfortunately are that I need the latency of each request (in order to subsequently analyse them) however I am having some difficult actually doing that.

I’ve tried a couple of different approaches (Lwt_stream.iter_p, a custom array based iteration one) however I just keep hitting different stack overflows (mainly list based ones).

Is there a good standard approach for this kind of problem that I just don’t know?

Each dispatch looks something like:

let apply start f = 
  let%lwt () = Lwt_unix.sleep (start - Unix.gettimeofday()) in
  let start = Unix.getimeofday() in
  let%lwt () = f () in
  Unix.gettimeofday() - start |> Lwt.return

The aim of this is such that each call to f occurs at a known time from the time series input.

I’ve tried using Lwt_list.iter_n ~max_concurrency however it seems to artifically limit the throughput.

I’ve also tried using a singular generator thread however that implementation ended up being quite low throughput as well… (though that may have been for other reasons tbh)

1 Like

I have a few ideas for solutions about your problem, but I’m not entirely sure of what you are trying to solve. Can you answer the following questions and I can give you some pointers.

  • The time-series data is stored as a file? An array? A list? What format is it in?
  • Is your time-series data sorted? Specifically, is it sorted by time-stamps?
  • Are there identifiers to the data that you are handling?
  • What kind of output are you looking for? Do you want to stream the results as they are made available or is a chunk of result at the very end ok?
1 Like

Thanks for the general offer of help, for various reasons I’ve got the time series data pre-generated and is currently being stored just in a list. Additionally the results should fit within working memory so it should be fine to just store them in a singular chunk (also I have concerns that streaming them would introduce another bottleneck). Additionally the time series data can be sorted before executing the benchmark.

The main issue I’m having is that I don’t know what is causing the stack overflow… In several places its relatively obvious that it is a call to List.map etc which is throwing it, however other times it seems like I’m dispatching too many promises to Lwt which is causing it…

Lwt does use some Lists internally and so maybe there could be a stack overflow coming from there. It could also be with some specific of how you make all those calls. Let’s see if the design below solves your issue.

The idea is that with the time-series being sorted, you know that timestamps are in increasing order and so you can sleep from one to the next. You do not need to creates all the promises in one go. Hopefully you should process each element fast enough that you won’t have too many on-going work and you keep latency small.

let series : (float * 'a) list = <your data here>
let series = List.sort series
let n = List.length series
let results = Array.make n Lwt.return_none

let rec iter n = function
  | [] -> Lwt.return_unit
  | (next_time, next_input) :: rest_of_series ->
    let delay = next_time - Unix.gettimeofday () in
    (if delay <= 0. then Lwt.return_unit else Lwt_unix.sleep delay) >>= fun () ->
    results.(n) <- (
      let starts = Unix.gettimeofday () in
      Lwt.apply f next_input >>= fun () ->
      let ends = Unix.gettimeofday () in
      Lwt.return (ends - starts)
    ) ;
    iter (n + 1) rest_of_series

let run () =
  iter 0 series >>= fun () ->
  wait_on_all_array result >>= fun () ->
  Lwt.return result

Some notes:

  • The result array is used both to store the results (once the whole thing has run you can Array.map Lwt.state to get the results) and as a synchronisation point (you’ll have to write wait_on_all_array; I can help).
  • The function Lwt.apply is just for exception management: transforming raised exceptions into rejected promises.
  • If this still overflows it could be in one of the call to f or it could be because of some internal error management that prevents the recursive call from being tail-optimised. In this case, let me know and I can try to work around that.
2 Likes

Brilliant thanks!!

I just tried a similar approach earlier today, however it was very slow (~2k ops/s vs the Lwt_stream.iter_n’s 10k vs the Lwt_stream.iter_p’s 20k before a stack overflow). Though this is working a lot better ~25k before it ends up in a stack overflow. The stack overflow is in the Lwt end of things, however at most there should be ~250k promises getting called which should be within the range that it is designed to deal with…

(I suspected that the slow performance of my first generator approach was delaying the dispatching thread so I put the generator into a preemptive thread and got a Libev cannot allocate -37 bytes…)

Additionally I was using flambda (for other parts of the system could benefit from it) which I imagine could potentially be interfering with the tail recursion… So am currently rerunning the tests with that removed…

Could I ask: is your goal to actually process this rate of realtime events from an external source, or are you using this approach a sort of time-series discrete-time simulator ? The reason I ask is, if it’s the latter, it might be worth thinking about building your own version of a “concurrency” monad, that wasn’t tied to the wallclock, and that you could optimize directly.

Unfortunately this is real time, onto a real system, so everything is um ‘fun’ :slight_smile:

Just to make sure I added [@tailrec] annotations to the actual recursive call and that’s definitely a tail call, even with the internal exception management.

Because it still stack-overflows, you can try to take the recursion entirely out of Lwt. It does mean creating many more sleeping promises (in a way, shifting the responsibility of scheduling from Lwt to Lwt_unix). I have low hopes, but it’s worth a shot; if anything, the different stack-overflow threshold might help pinpoint the source of the problem.

let rec iter n = function
  | [] -> ()
  | (next_time, next_input) :: rest_of_series ->
    results.(n) <- (
      let delay = next_time - Unix.gettimeofday () in
      (if delay <= 0. then Lwt.return_unit else Lwt_unix.sleep delay) >>= fun () ->
      let starts = Unix.gettimeofday () in
      Lwt.apply f next_input >>= fun () ->
      let ends = Unix.gettimeofday () in
      Lwt.return (ends - starts)
    ) ;
    iter (n + 1) rest_of_series

Some questions / remarks:

  • What Lwt_engine.t are you running on?
  • How many concurrent f are ever executing simultaneously? I’m thinking the issue might not be with the total number of promises, but with the simultaneously unresolved promises.
  • You can use [@tailcall] to make sure a given function call is tail-optimised. (Search “tailcall” in https://caml.inria.fr/pub/docs/manual-ocaml/attributes.html for more details.) The syntax is (iter[@tailcall]) (n + 1) rest_of_series.

Just to reiterate, thanks for helping me look into this!

So running the test with the Lwt_unix.sleep dispatcher, its dies after 20k ops/s with a stack overflow in Lwt.

If I explicitly limit the number of concurrent promises (via Lwt_stream.iter_n ) I don’t get stack overflows, however obviously that isn’t quite right from a correctness of benchmarking perspective…

This is all on the libev engine, additionally I think it may actually be rather than the number of simultaneously resolved promises (with attached callbacks), though proving this would be difficult (specifically if the Lwt_unix.read function doesn’t allow other callbacks to run if the socket has data available (this results in annoying issues where it doesn’t pause answering rpcs, and thus doesn’t allow other lwt threads to run in the fifo manner), which could be building up the number of resolved but not yet called back promises…

1 Like

I’ve been looking into this direction: trying to find non-tailrecursive function that could overrun when a lot of promises are resolved “simultaneously” but no luck so far.

Is there a way you could share the code with me so I can dive a bit deeper into this?
You can DM me about it, or you can open an issue on the bug tracker of the project with some links to your code.