Async or Lwt. Multiple source, single reader

Hey, I am trying to figure out how I can use Async or Lwt instead of threads in the scenario with multiple writers and a single reader. Basically, if it were threads I would do the following:


open _and_connect socket1
open_and_connect socket2

thread_safe_q q

thread1:
while true:
  s = socket1.read()
  q.push({source: 1, message: s})

thread2:
while true:
  s = socket2.read()
  q.push({source: 2, message: s})

thread3:
while true:
  service(q.pop())

I read an Lwt chapter in 3110 ocaml course, and read through real world ocaml Async page, but still can’t figure this out. I was thinking of using Async pipe for this, e.g:


open Async
open _and_connect socket1
open_and_connect socket2

r, w = Pipe.create()

let rec read_and_forward source_name socket =
  match%bind Reader.read socket buffer with
  | `Eof -> return ()
  | `Ok bytes_read ->
    Writer.write w (source_name ^ Bytes.to_string buffer) ~len:bytes_read;
    let%bind () = Writer.flushed w in
    read_and_forward buffer source_name socket

let read_and_parse () = 
match%bind Reader.read r buffer with
  | `Eof -> return ()
  | `Ok bytes_read ->
    printf (Bytes.to_string buffer)

let run1() = read_and_forward "Source1" socket1
let run2() = read_and_forward "Source2" socket2
let run3() = read_and_parse ()

let () =
  run1 ();
  run2 ();
  run3 ();
  never_returns (Scheduler.go ())



Sorry I’m not answering about Async or Lwt but in a related fashion you may have a look at ocaml 5 lockfree structures, see

With Lwt you can use the module Lwt_stream which allows any number of readers and any number of writers.

let s, push = Lwt_stream.create () ;;

let rec loop source socket =
  let* message = read socket in
  push { source; message };
  loop source socket
;;

let w1 = loop 1 socket1 ;;
let w2 = loop 2 socket 2;;
let reader = Lwt_stream.iter_s service s

There are variation on the above but it should work as you expect. You can use the opam package lwt-pipe if you want to keep a bound on the size of the holding data-structure. Or you can roll your own queue/stream/pipe/whaddyawannacallit.

You can even go stream-less and just wait on the writers promises directly in the reader but that’s a bit finicky to get right and it imposes some constraints on how the sockets are distributed.

Thank you, sanette, I believe this is for the system threads rather than promises. Anyways, thanks =D

Thank you, Raphael. Can you please take a look at a couple follow up questions here:

read
Regarding the read functions you mention here: if I am to use the Lwt_io.read here to get use of promises?

Service callback
Is it ok that the service function isn’t a fancy Lwt.t style one, and an ordinary sequential execution function? Will the Lwt_stream.iter_s be servicing the requests one after another as they arrive? Essentially, performing the following:

It depends what you are trying to read. Lwt_io.read_* are functions to read input_channel which you can get for files and pipes. There is Lwt_unix.read (and Lwt_unix.pread) for reading file_descr which you can get for files, pipes and sockets. You might even need to write your own wrapper around those provided functions to get a message of the correct type.

I just put read as a placeholder for whatever function is appropriate for the type of input you are handling.

If service has a type that doesn’t involve Lwt.t, then you should use Lwt_stream.iter (no _s). But beware that if your service function performs blocking IO, then it will block the writers as well.

Yes. Lwt_stream.iter(_s) will be applying service in the order that the messages are pushed into the stream, one after another.

You can use Lwt_stream.iter_n to allow multiple messages to be processed concurrently (provided service is cooperative.

Thanks again for the information, Raphael. Some more questions arise, can you please address them?

If service has a type that doesn’t involve Lwt.t, then you should use Lwt_stream.iter (no _s). But beware that if your service function performs blocking IO, then it will block the writers as well.

Well, my service does some computation, e.g. fibonacci,


let rec fibonacci n =
if n < 3 then
1
else
fibonacci (n-1) + fibonacci (n-2)

and then writes. Can you please check my understanding? Suppose I use the Lwt_stream.iter_s along with Lwt_io’s val write : output_channel -> string -> unit Lwt.t. My fibonacci computation is time consuming. What if I wrap my time consuming fibonacci function with Lwt.map to be accepting and outputting promises like this:

let s, push = Lwt_stream.create () ;;

let lwt_fibonacci = Lwt.map fibonacci

let service s = lwt_fibonacci >>= Lwt_io.write out_channel

let reader = Lwt_stream.iter_s service s

It looks to me that the IO isn’t blocking the stream writers since I use the Lwt_io.write function that produces a promise. Will the fibonacci computation be blocking the stream writers? The function is actively computing rather than waiting on IO.

This won’t do you any good. Specifically, Lwt is a cooperative concurrency library which uses promises as an abstraction to encapsulate the concurrency.

It’s concurrency-only (specifically, it doesn’t do parallelism) which means that nothing else runs whilst the fibonacci is burning your CPU.
It’s cooperative which means that unless your computation explicitely pauses, nothing else runs: nothing will preemptively interrupt your fibonacci to do work, everything else is just suspended. Note that whenever you perform IO though Lwt (e.g., Lwt_io.write) the library takes care of putting that on in the background and then giving the control back to the scheduler: IO are points of cooperation.


Depending on your program, it might be ok to not pause in service. For example, if you receive messages from the sockets with a predictably low-rate it might be ok because what it essentially does is pushing back the buffers onto the sockets themselves. (Whereas if you pause enough in service then the stream is where the messages are held.)

Still I wouldn’t recommend that, and in this case you might be better off using a different abstraction than Lwt.


There are multiple ways to make service cooperate with the scheduler so that the writers and the reader can all leave in harmony. In order from the one I’d recommend the most to the one I’d recommend the least:

  • Use Lwt_domains.detach to wrap fibonacci. Requires ocaml5 and domainslib. It makes the CPU intensive computation run in a separate domain (i.e., in parallel).

  • Add some explicit Lwt.pause () in your fibonacci function. That’s a bit annoying because you are pushing your I/O monad into your CPU-only code and it might be difficult to find the perfect granularity (how often should you pause?), but it does the job.

  • Use Lwt_preemptive.detach to wrap fibonacci. Same as Lwt_domains.detach but it uses a system thread and it doesn’t do parallelism (instead it adds some preemptive concurrency on top of your cooperative concurrency).

1 Like

Thank you, Raphael, for taking the time to explain all these to me. This is enlightening =D

Hey Raphael, thanks for the information. Can you please take a look at my implementation of:

  • multiple producers. In read world these will be sockets.
  • multiple consumers
  • pool of threads available to consumers
  • consumers write to a single stdout. In real world it will be a single socket

as a result:

  • the service threads don’t block reading from multiple inputs. The multiple inputs are modeled as a stream [39; 40; 42; 43] |> Lwt_stream.of_list. In real world the stream will be being populated by sockets
  • the service threads don’t block writing to a stdout
  • no race conditions in writing to stdout

open Lwt.Infix

let rec fib = function
  | 1 -> 1 
  | 2 -> 1
  |n -> fib (n - 1) + fib (n -2)

let pool = Lwt_domain.setup_pool 6

let t0 = Unix.gettimeofday()

let fib_and_print n =
  let start = Unix.gettimeofday() in
  let x = fib n in
  let finish = Unix.gettimeofday() in
  let s = Printf.sprintf "fib %d = %d computed in %f \n " n x (finish -. start) in
  s

let time_and_print s = 
  let t1 = Unix.gettimeofday() in
    let t_s = string_of_float(t1 -. t0) in
    Lwt_io.print ("current_time: " ^ t_s ^ ". " ^ s)

let () =
  [39; 40; 42; 43] |> Lwt_stream.of_list |> Lwt_stream.iter_p (
    fun x -> 
      Lwt_domain.detach pool fib_and_print x >>= time_and_print
  
  ) |> Lwt_main.run ;

Lwt_domain.teardown_pool pool


This is its output:

current_time: 1.15529894829. fib 39 = 63245986 computed in 1.155168 
 current_time: 1.86044502258. fib 40 = 102334155 computed in 1.860324 
 current_time: 4.75877690315. fib 42 = 267914296 computed in 4.758645 
 current_time: 7.61613297462. fib 43 = 433494437 computed in 7.615996 

It looks ok. The timing show that the work is done in parallel.

If you want a single stdout / a single socket, you might need to add safeguards for the side-effect showing the result. Currently you perform that side-effect inside a detached computation which means you can have parallel accesses to the socket you want to send the result to. So you’ll probably need some kind of synchronisation there (possibly a lock).

I believe that there are no race conditions when writing to a single output. And there is no need to have a lock around it. Since in the lwt_domain package documentation I found the following:

This module provides the necessary function (detach) to schedule some computations to be ran in parallel in a separate domain. The result of such a computation is exposed to the caller of detach as a promise. Thus, this module allows to mix multicore parallelism with the concurrent-only scheduling of the rest of Lwt.

So, Lwt_domain.detach pool fib_and_print x returns a promise, and I don’t use any Lwt calls inside the fib_and_print function.