How to create an `Lwt_stream` from a C-thread

Hi, I’d like to create an 'a Lwt_stream.t, where 'a is an OCaml value created from a C-stub function, asynchronously via a separate thread in C. What is the best way to do this? Are there any examples where this has been done previously? Thanks.

Here’s a way to do this using the ctypes library, which has support for generating code that integrates with Lwt via either the Lwt jobs or Lwt preemptive systems.

First, here’s an example C function, sleepy_time, which sleeps for a specified number of milliseconds then returns the current time:

#include <time.h>
#include <stdint.h>
#include <unistd.h>

int64_t sleepy_time(int delay)
{
  usleep(delay * 1000);
  return (int64_t)time(NULL);
}

(This returns an int64_t from C to OCaml, rather than a more structured OCaml value. Constructing complex OCaml values in C threads is likely to be delicate, and better avoided if possible.)

We’ll compile the function using the system C compiler:

cc -c stubs.c

Next, here’s a binding to the sleepy_time function using ctypes. Creating C function bindings with ctypes involves describing the names and types of the functions using OCaml values:

open Ctypes
module B(F: Ctypes.FOREIGN) =
struct
  let sleepy_time = F.(foreign "sleepy_time" (int @-> returning int64_t))
end

And here’s some code that generates OCaml and C code from the above description. There’s a little noise here (mostly a result of the fact that the OCaml standard library IO functions are quite low-level); the main points of interest are the calls to write_ml and write_c, and the option lwt_jobs, which requests generated code that integrates with Lwt:

let () =
  let mlfd, cfd = open_out "generated_ml.ml", open_out "generated_c.c" in
  let mlfmt = Format.formatter_of_out_channel mlfd
  and cfmt  = Format.formatter_of_out_channel cfd in
  let open Cstubs in begin
      write_ml ~concurrency:lwt_jobs ~prefix:"foo"  mlfmt (module Bindings.B);
      Format.fprintf cfmt "#include \"stubs.h\"@\n";
      write_c  ~concurrency:lwt_jobs ~prefix:"foo"  cfmt  (module Bindings.B);
      close_out mlfd;
      close_out cfd;
  end

Compiling and running these two modules (bindings.ml and gen.ml) generates OCaml and C code:

ocamlfind opt -o gen.exe -package ctypes.stubs -linkpkg bindings.ml gen.ml -linkpkg
./gen.exe

Finally, here’s a “main” module that invokes the binding. There’s:

  • a function sleepy_stream that builds an Lwt_stream.t value from repeated calls to the C function sleepy_time
  • a helper <|> that merges together two infinite streams, and
  • a main function that creates two infinite streams with different delay periods and prints out the values they build
open Lwt
module C = Bindings.B(Generated_ml)

let sleepy_stream id delay =
  Lwt_stream.from (fun () -> (C.sleepy_time delay).lwt >|= fun _ -> Some id)

let (<|>) l r = Lwt_stream.(from @@ fun () -> get l <?> get r)

let () =
  Lwt_main.run @@
    Lwt_stream.iter (fun (s, t) -> Printf.printf "[%s]: %Ld\n" s t; flush stdout)
      (sleepy_stream "fast" 100 <|> sleepy_stream "slow" 500)

And here’s the compilation step that builds an executable from the C function, the generated code, the bindings, and this main function:

ocamlfind opt -o main.exe -thread -package ctypes.stubs,lwt.unix -linkpkg \
   stubs.c generated_c.c  generated_ml.ml bindings.ml main.ml

Running the resulting executable shows the interleaved execution, as calls to the function execute concurrently (in multiple C threads):

$ ./main.exe
[fast]: 1584650096
[fast]: 1584650096
[fast]: 1584650096
[fast]: 1584650096
[slow]: 1584650096
[fast]: 1584650096
[fast]: 1584650096
[fast]: 1584650096
[fast]: 1584650096
[fast]: 1584650097
[slow]: 1584650097
[fast]: 1584650097
[fast]: 1584650097
3 Likes

Thanks @yallop, this example is quite instructive.

Perhaps my question should have been clearer though. Not only is the C thread creating the OCaml values, it is also responsible for generating the stream of data (i.e. only it knows when new data is available). Rather than the pull being triggered from OCaml, as in Lwt_stream.from, I need the push for new data to be triggered from C, as in Lwt_stream.create.

Previously what I was trying to trigger the push from C using caml_callback. I was registering the thread and acquiring the runtime lock (to play nice with the Gc) - this didn’t work because the Lwt scheduler needs to have the lock for the push to work.

What I’m attempting now instead is use the lwt_unix_send_notification mechanism to notify the Lwt scheduler to ask for data from C, whenever new data is available.

I hope the question is clearer now. Please let me know if the second approach has any obvious pitfalls. Thanks.

1 Like

There’s a trick I learned from Mark Hayden (he did it in C code, but it applies to ML too): I ran the design by a guy I know in “storage systems” and he confirmed that it’s a well-known design-pattern.
(1) you have a single central thread that does all your “heavyweight” computation – this is the one running LWT, or whatever
(2) You spawn off N threads doing whatever-you-want.
(3) and each of them communicated with the main thread using a socketpair (and a control-block – more on that later). So the main thread writes a single char to tell the thread to start work, and when the thread’s done, it writs a single char back.
(4) So the main thread can integrate these sockets into its I/O select-loop.
(5) of course, you can’t communicate the work that that subsidiary thread needs to do, with a single char. So you use a control-block (in the C heap) to communicate args and results.

This doesn’t apply directly to your example only because you specified that the C thread -pushes- its result, but I think you can see how this can be adapted pretty easily: just start the thread in the “I’m already working” state, and the main thread knows this (so it starts by waiting to read from the socket in the select-loop). etc, etc, etc.

I’ve done this in ocaml, but … lost the code in the mists of time.

BTW, it’s used in storage systems b/c there you need to have a bunch of threads actually -write- (or read) against the disk subsystem in order to express enough I/O concurrency for the kernel/controller/disks to do their job well.

2 Likes