Lwt, message queue, multiple writer, single reader

Dear All,

I want to write some Lwt code, where there is a single “reader thread” that services messages, and multiple “writer threads” that create messages. I want the messages from the writers to go on a queue and be served by the reader thread in FIFO order.

Using system threads, it is possible to construct such a queue using mutexes and condition variables.

My question is: Is there an equivalent “inter-thread message queue” for Lwt? I’m aware of Lwt_stream, and the suggestion to use Lwt_pipe, but from very brief scanning of the code, both look a bit complicated for what I want. But perhaps I should just use Lwt_pipe anyway?

1 Like

Using OCaml’s threads library I wrote this, which is basically what I want (but I was hoping for something for Lwt):

I suppose if I replicate the code in Lwt, using the standard Queue module, this will work fine? And I can avoid some of the locking as well since I know Lwt threads are copperative.

I added an Lwt version of the code in the repo above.

Benchmarking indicated that the Lwt version was about 2.5 times slower than the native OCaml thread version, which was a bit unexpected. This may be because the Lwt scheduler seems to be fairer than the native pre-emptive thread scheduler, resulting in more thread transitions.

@thomas_ridge, the Lwt test is slower because it is calling Lwt.pause too often.

If all the “tasks” in an Lwt program are “asleep” (loosely speaking), Lwt checks for I/O. Your three writers put themselves to “sleep” with Lwt.pause after each enqueue. As a result, Lwt is making one call to select/epoll/kevent for each three (actually, six) enqueues. That’s why it’s very slow.

I opened a PR that makes the Lwt test go about 10x faster. It has each writer do 1000 enqueues before calling Lwt.pause.

So, basically, with Lwt, if you have a CPU-bound “task,” you should avoid calling Lwt.pause in it too often
:slight_smile:

We’ve had some problems with this in the past, so maybe Lwt should be more intelligent about whether, at each iteration, it chooses to poll for I/O, or resume paused tasks.

1 Like

maybe Lwt should be more intelligent

I opened an issue for addressing this in the near future: Improve performance in CPU-bound programs · Issue #622 · ocsigen/lwt · GitHub

@antron Thank you for the feedback. Your explanation makes sense.

If I didn’t have any pauses, will Lwt still execute each thread reasonably often?

@antron Actually I’m not sure I understand… why is Lwt trying to use select/epoll/kevent? There isn’t any I/O happening here.

Unless… are mutexes and condition vars implemented this way somehow?

Just another observation: if I remove the pause altogether, then the system hangs. So presumably bind is not used as a possible point at which to schedule another thread.

@thomas_ridge Lwt’s scheduler doesn’t fully know if you’ve started I/O yet, mainly because some I/O is implemented by running calls in worker threads, and the functions that dispatch work to those threads currently don’t notify the scheduler when work starts. The completions from these I/O calls are communicated on a pipe, and Lwt has to monitor at least that pipe.

Note that you would get bad performance also if you started one very long-running I/O operation in the “background” in your test case, so the issue isn’t about whether Lwt knows there is any I/O completion to check for. It’s about how often it calls select/etc. when there is CPU work left (the Lwt.pause queue Is not empty), but I/O happens not to be completing for any reason.

Just another observation: if I remove the pause altogether, then the system hangs. So presumably bind is not used as a possible point at which to schedule another thread.

Indeed. We had an issue about changing that (Lwt semantic improvements: exception safety and stack overflows by aantron · Pull Request #500 · ocsigen/lwt · GitHub), but eventually decided not to do it.

To be more precise, if the promise you are calling bind on is already resolved, the callback is called right away.

Lwt.pause is the only call that introduces pending promises in your test, so removing it makes your program completely synchronous. I think, then, the first writer to run loops forever, or until memory runs out.

And I hope only the program hangs, not your whole system :slight_smile:

@antron Thank you very much!