Recommended Practice for handling background/asynchronous tasks with Dream

I was reading this blog post by @jsthomas and had a question regarding best practices for handling asynchronous/background worker jobs in Dream/OCaml web servers in general.

In the post, in order to send emails without blocking the web server, @jsthomas uses RabbitMQ to provide a job queue between the main web server and a background worker process that periodically pulls tasks from the queue and operates on them.

I suppose that because Lwt threads are cooperative, if you want to ensure that pending tasks on the background worker don’t ever affect the main web server, then you need to start a separate process or system-level thread. Also, reading at the page on Lwt_preemptive, I guess even if you spawn a system-level thread, any Lwt operations have to be run back on the main thread:

Note that Lwt thread-local storage (i.e., Lwt.with_value) cannot be safely used from within f. The same goes for most of the rest of Lwt. If you need to run an Lwt thread in f, use Lwt_preemptive.run_in_main.

val run_in_main : (unit -> 'a Lwt.t) -> 'a

run_in_main f can be called from a detached computation to execute f () in the main preemptive thread, i.e. the one executing run_in_main f blocks until f () completes, then returns its result. If f () raises an exception, run_in_main f raises the same exception.

So it seems like using a message broker (and thereby also having to come up with a serialisation/deserialisation process for your jobs) is fundamentally necessary for implementing asynchronous tasks for a web server in OCaml.

Is my understanding correct? Is usage of a message broker necessary for implementing non-trivial web servers in OCaml, or are there alternatives?

Would my performance suck if I just used Lwt.async?

1 Like

There are other reasons for offloading such background tasks to a message-queue for processing by outboard processes. For instance, the message-queue software might have its own persistence/durability, so if it crashes, the tasks won’t be lost, whereas the main web-app server process probably doesn’t have that. Also, if there are a ton of background tasks, that can slow down the main web-app server; if you spool them to a queue, they can be processed by a fixed number of background processes, with known load characteristics. Those processes can be on other machines, too.

All of this is independent of OCaml and LWT: these and other considerations are the case for all OLTP application servers, going back all the way to the original TPF (“TWA airline control program” – you can read it about in Spector & Gifford’s seminal paper in CACM back in the mid '80s). And independently, these ideas were rediscovered by eBay – something-something Ratzesberger wrote a nice blog post about it … 20 years ago.

Hope this helps. Please rest assured, it has nothing to do with OCaml and LWT.

1 Like

I was thinking of this recently. Planning to just spin off a recursive Lwt promise that sleeps for some time, does its job, and repeats. If I understand correctly Lwt.async should work fine for this. EDIT: but when talking about performance it’s best to measure real-world performance and optimize if actually necessary.

Right, I was aware of the persistence/durability argument, but at the moment, I’m just playing around with some experiments, so I’m not too concerned about the robustness of the server, I just wanted to make sure I wasn’t completely shooting myself in the foot (…I suppose partially doing so would be fine :slight_smile: ).

Yup, that was the direction I seemed to have converged upon as well.

I guess I’ll wrap it up in a Worker interface where I can send jobs to it (queue_job: task -> unit), and internally use a task Lwt_mvar.t as the queue of tasks (using Lwt.async (fun () -> Lwt_mvar.put queue task) to enqueue tasks, and a recursive worker that is started on initialisation to take from the queue and work on the task).

Then, if I run into a case where I need persistence/durability, I can modify the internals of the worker module to use a message queue instead transparently.

1 Like

In this scenario I usually create a pipe (Lwt Pipe or Async_kernel.Pipe) then I “spawn” a recursive promise via Lwt.async (or Deferred.don't_wait_for) passing to the new thread the reading end of the pipe.

The newly spawned promise is as simple as an infinite loop which try to read the pipe, waiting for messages written by the “parent” promise: no “sleep” is needed.

In both Lwt Pipe and Async Pipe you can read in a blocking way or read “with timeout” in case you want your thread perform some other actions at least every n seconds.

This “architecture” is I way to reproduce the super classical Unix scenario: open a (Unix) pipe, fork and pass the two ends of the pipe to the parent and child. See Classical inter-process communication: pipes.

My eternal gratitude to Xavier Leroy and Didier Rémy for that book.