What is the preferred way to write a lwt_stream which is the cartesian product of two other lwt_streams?
I figured out an answer though it doesn’t look very pretty to me. The code uses async
and iter_p
.
open Lwt.Infix
let product_stream s1 s2 =
let s, f = Lwt_stream.create () in
Lwt.async (fun () ->
Lwt_stream.iter_p
(fun v2 ->
Lwt_stream.iter_p
(fun v1 ->
f (Some (v1, v2)) ;
Lwt.return_unit)
(Lwt_stream.clone s1))
(Lwt_stream.clone s2)
>|= fun () -> f None) ;
s
I prepared a few test cases here which cover situations where streams are closed and not closed; stream values are in one shot and interleaved.