How to "block" in an agnostic way?

In the long EIO thread I inadvertently started a thread of discussion, here is a proper topic for it.

The problem: how to lock in an agnostic way?

Agnostic: independent of the user-chosen concurrency abstraction, but compatible with it.

A concrete example (concurrent lazy thunks)

Potential approaches

2 Likes

Threads

One approach may be to use OCaml threads. (By “threads” we mean what we get with the Thread module from the standard library, which are concurrency-only threads with no parallelism. They existed with OCaml 4, and they still exist in OCaml 5, with one bunch of threads running in alternation on each domain. They were sort of supposed to go away, but they ended up ported for backward-compatibility reason and they are carefully integrated in the Multicore runtime.)

If I understand correctly, the Mutex.t type of the standard library does not actually block the whole domain, but one thread on the domain. By default (unless the code explicitly uses thread) there is only one thread per domain, so the difference is not important. But what if your favorite concurrency library (Lwt, Eio, etc.) was careful to set up several threads, and have its scheduler handle them to ensure that it can make progress at any point, even if some of the threads are blocked?

So: the user-library scheduler manages a pool of threads on top of which its own fibers are run, each thread could block at any time and the scheduler has to migrate its fibers to a non-blocked thread, and ensure there is at least one non-blocked thread at any time. It sounds like a classic work-stealing design, only with no parallelism involved, just concurrency.

(Note: I’m not expert but I think the OCaml runtime actually does something slightly similar, it creates a “tick” thread on each domain whose purpose, if I understand correctly, is to service GC-related requests coming from other domains. So: an extra thread on the side for liveness.)

1 Like

Effects

@art-w suggests adding a Yield effect, and indeed it’s a natural idea: the stdlib could define basic concurrency effects like Yield, define default handlers for them (“at the domain level”), and let user-level concurrency libraries override them with their preferred semantics. (Those libraries even have the choice of sending some effects up to the default handler, etc.)

Yield

But what would be the API? The first idea is just

effect Yield : unit eff

This effect suspends the current computation, which can be resumed by just passing a unit value. But I think that this is not good enough, because this doesn’t give us any way to tell the scheduler when to restart the computation.

The protocol would be that if you resume a yielding fiber and it’s not ready yet (in the case of concurrent lazy: the other domain is not done computing the value yet), it yields again.

I don’t know much about implementing efficient schedulers for cooperative concurrency, but my intuition is that this approach does not scale. If you have many fibers, many of which yielded, you are going to waste a lot of time checking which can run again. We rather want a design where we are “told” when a fiber is ready again (push-based rather than pull-based, if you want). (I guess one could do clever things with exponentially-decreasing priorities, etc., but it sounds like we are working hard to solve a problem that we created ourselves in the first place with a bad API.)

So we rather want to have an effect like

Yield : notifier -> unit eff

where the yielding computation provides a “notifier” (a name I just made up) that the scheduler can subscribe to, to be notified when the fiber can be resumed. (It should be efficient to subscribe to a large set of watchers.) So the scheduler would typically have a poll/select-like loop to wait on the next blocked fiber to become ready.

Promises

Another way to think about the problem is that we should not design for “yielding”, we should design for “blocking”. In the concurrent-lazy example: instead of thinking of an API when we notice that another domain is already forcing the value, we should thinking of the API for the first domain to say: "I’m entering a blocking section that other domains/fibers/whatever will want to wait on.

Basically that sounds like a “promise” API. We could implement a concurrent lazy thunk as follows:

type 'a lazy = 'a cell Atomic.t
type 'a cell =
  | Thunk of (unit -> 'a)
  | Forcing of ('a, exn) result Promise.t
  | Done of ('a, exn) result

let force laz =
  match Atomic.get laz with
  | Thunk f -> ...
  | Forcing prom -> Promise.await prom
  | Done res -> res

Of course, we are just shifting to a slightly different problem: instead of designing agnostic blocking, now we need to design agnostic promises. A “promise” is a blocking computation that also returns a value (instead of just “becoming ready”), but maybe this point of view can help with the API design.

2 Likes

I’ve thought about the blocking problem a bit in the context of interop between different concurrency libraries. Unlike GHC Haskell or Go, OCaml does not bake in a thread scheduler and primitive synchronization structure (MVars in GHC and channels in Go). While this allows users the freedom to develop their own concurrency libraries, interop between them becomes tricky as there is no fundamental synchronization mechanism. Specifically, I have been thinking about applications that might want to utilise both domainslib and eio at the same time. Currently, there is no clean way for the user-level thread in eio (a fibre) to synchronize with a user-level thread in domainslib (a task) since the libraries are not aware of each others blocking and wake up semantics. Of course, one can build a custom channel that is specialised for those two libraries. This is similar to this rust crate which allows mixing between Tokio and Rayon. These pointwise solutions are unsatisfactory.

Ideally, I would like the synchronization structure, that which allows different lightweight threads to communicate in a blocking fashion, to be independent of the actual scheduler that they belong to. You should be able to use such a structure to communicate in a blocking fashion between user-level threads that belong to different schedulers.

It turns out we can do this thanks to effect handlers if we agree on an interface. Every library that implements its own scheduler handles the effect:

type 'a resumer = 'a -> unit
type _ eff += Suspend : ('a resumer -> unit) -> 'a eff

Whenever the synchronization structure (say MVar or Channel) wants to block the current user-level thread on a particular condition, it performs Suspend f. At the handler, i.e, the scheduler, f is applied to a “resumer” function. This resumer function, when applied to the result, adds the blocked thread to the scheduler so that it resumes with the result. The blocking operation squirrels away this resumer in the synchronization structure’s state. Now that the current thread is blocked in the synchronization structure, the handler switches control to the next runnable thread in the scheduler.

Rather than trying to understand the semantics by reading this English description, it may be easier to see this interface in action. Here is an implementation of scheduler parametric mvar. The MVar implementation does not refer to a concrete scheduler. We use this MVar to communicate between a LIFO and a FIFO scheduler. If both domainslib and eio handle the suspend effect, then we can use the same MVar to coordinate between them (of course, the MVar implementation needs to be made multi-threading safe).

The core scheduler interface also includes a Stuck effect.

type _ eff += Stuck : unit eff (* needs a better illustrative name *)

in order to distinguish between the cases where (a) the scheduler queue is empty and there are no thread blocked on a synchronization structure and (b) the scheduler queue is empty and there are threads that are blocked on a synchronization structure. The former indicates the end of execution of the scheduler while the latter must be handled specially such that when the blocked thread resumes the scheduler continues. It turns out that the Stuck effect is also nice to construct hierarchical schedulers (as is done in the example code).

If we agree that something like this is a good interface, then the effects should be declared in the standard library (or at a similar level) so that different concurrency libraries may coordinate.

2 Likes

Thanks @kayceesrk! It’s great that you have concrete proposals to offer instead of my very incomplete musings. I need to wrap my head around the type and example code to understand if schedulers really have enough flexibility with this interface, it’s a bit magical.

Another note: I already touched this question of “agnostic blocking” in

which might be of interest to some people as well. (But at the time there was not much discussion, and no concrete suggestsions that could lead to working proposals.)

2 Likes

Another thing: there are different variants of the problem, depending on whether we only consider inside-domain synchronization (between different schedulers on a single domain), or cross-domain synchronization (synchronizing with events that occur in parallel in other domains).

@kayceersk proposal seems to work very well for inside-domain synchronization, but I’m not sure how it works for across-domains synchronization (there isn’t in the code examples), which would be needed for my example of (cross-domain) concurrent lazy values. I hope it can be extended to handle this as well!

Thanks @gasche. I would be very curious to hear your thoughts about the interface. The current interface has not been validated in the large. It would be nice to modify domainslib and eio to handle this effect and see if works.

I don’t think it makes any difference. If you implement your lazy values using @kayceesrk 's agnostic MVar (or something simpler since you do not need to trigger it again), even if you have several domains, the code will be the same. The forced lazy function put the actual value into the MVar, which triggers all the resume functions that were stored up to now, which enqueues the continuations into their respective schedulers.

This is a rather contrived interface, but I blame the lack of pointers in OCaml. Short of pointers, passing a setter function as an argument to the suspending effect is the best approximation. So, Suspend looks good to me.

I am less convinced by Stuck though, as I don’t understand how the scheduler can ever recover from it. But perhaps I am missing something.

Note that this is very close to the effect that Eio defines already:

type 'a enqueue = ('a, exn) result -> unit
type _ eff +=
  Suspend : (Fibre_context.t -> 'a enqueue -> unit) -> 'a eff

The two differences are:

  • We allow the operation to discontinue with an exception.
  • We share a fibre context, allowing cancellation.

The idea with cancellation is that you can call set_cancel_fn to attach a function that cancels the operation to the fibre’s context. If the user cancels (e.g. a timeout happens), this function will be called.

When your operation finishes, you call clear_cancel_fn to remove it again. If that returns true then you have removed the unused cancellation function and can enqueue the operation’s result. If it returns false then the operation was cancelled concurrently and you should do nothing.

2 Likes

Stuck effect is used in the LIFO and FIFO schedulers to implement hierarchical scheduling in code-snippets/scheduler_parateric_mvar at master · kayceesrk/code-snippets · GitHub. But I’m not married to this interface. The important observation is that effects are useful here.

It seems to me that there should be two flavors of lazy, at this point. One that never crosses domain barriers (basically what we currently have), so it can remain simple and fast for programs that use a lot of lazy thunks (for example lazy streams, tying the knot, etc.) ; and one that is typically used at toplevel for lazily initialized values, which needs to be domain safe because it will probably be accessed from any of them. That one can be a single use MVar or something approaching.

2 Likes

The interface defined by @kayceesrk, if indeed sufficiently expressive to accommodate the various existing concurrency libraries, should really be added to the stdlib (or at least a sort of little compatibility package that all the libraries can depend on, at first). This is exactly what reducing fragmentation by promoting interoperability looks like. :heart_eyes:

4 Likes

If I understand correctly, the idea of @kayceesrk’s

effect Suspend : (('a -> unit) -> unit) -> 'a eff

is that the handler/scheduler passes a callback to the promise (the computation currently ongoing that will eventually produce an 'a) to be woken up when the value will be available.

  1. I may be missing something (I often do when I disagree with @silene), but to me this is not the same thing as a pointer or a setter function. The callback passed to the scheduler may do much more than a write, it could update whatever notification structure the scheduler uses internally. A pointer, a reference etc. would not do.

  2. I think we do need to think carefully about how to do this across domains, because it means that the callback may be invoked on another domain than the scheduler, while the scheduler is running in parallel. That may still be the right API (I love the simplicity of it), but at least the code using the API will need to be written differently (using atomics, etc.). Maybe a slightly different API would be preferable for cross-domain suspensions; at least a different name to force people to think about the fact that they need to write concurrent-safe code.

(Unrelated to the post I’m replying to.) This API forces a fairly imperative style onto the users of Suspend, because the user of the effect needs to “remember” the scheduler callback by mutating its own state, to invoke it once its computation is finished. This makes sense for Mvar, and it would also blend fine within a definition of concurrent lazy thunks (Forcing would take a queue of waiters), but I wonder if there is a different API, probably not using unit in the double negation, that allows a more functional style.

My musings in this direction go back to something fairly similar to Eio’s promises, except with a sort of "non-blocking await".

Current style:

effect Suspend : (('a -> unit) -> unit) -> 'a eff

(* on the user side *)
type 'a thunk = 'a cell ref
and 'a cell = ... | Forcing of 'a list ref

let force thunk =
match !thunk with
| Done res -> res
| Forcing waitlist -> perform Suspend (fun cb -> waitlist := cb :: !waitlist)
| Thunk f ->
  let waitlist = ref [] in
  thunk := Forcing waitlist;
  let res = f () in
  thunk := Done res;
  List.iter (fun callback -> callback res) !waitlist;
  res

(* on the scheduler side *)
match fiber_state with
| Suspend wait ->
  wait (fun v -> <notify completion>);
  <continue scheduling>

Promise style:

effect Suspend : 'a Promise.t -> 'a eff

(* on the user side *)
type 'a thunk = 'a cell ref
and 'a cell = ... | Forcing of 'a Promise.t

let force thunk =
match !thunk with
| Done res -> res
| Forcing prom -> perform Suspend prom
| Thunk f ->
  let prom = Promise.create () in
  thunk := Forcing prom;
  let res = f () in
  thunk := Done res;
  Promise.resolve prom res

(* on the scheduler side *)
match fiber_state with
| Suspend prom ->
  Promise.non_blocking_await (fun v -> <notify completion>);
  <continue scheduling>

Basically this form of Promise is exactly a queue of ('a -> unit) callbacks, under a very slightly more abstract interface. This could still be a nice addition to @kayceesrk’s library to reduce redundancy among users. The scheduler code is similar, but the user code is more high-level.

1 Like

No, it cannot. We are talking about some scheduler-agnostic code. The callback cannot do anything interesting at that point, except storing the value it receives. Any other action could (and should) have been done before performing the effect. For instance, the MVar example just does (fun r -> Queue.push (v,r) q). Sure, it does more than just storing r, it also stores v. But nothing forces the code to do that during the callback, it could have been done before performing the effect. That is why I am saying that the callback is nothing more than a setter and the same functionality could have been achieved using a pointer.

I do not think the user of the suspend effect needs to be aware of that. As far as the examples are concerned, the code will always be resumed on the same domain the effect was originally triggered. (I am assuming that each domain has its own scheduler.)

Now, the user code might want to be reentrant. But that is orthogonal to the effect itself.

1 Like

Ah, I see it now. I wrote “the callback passed to the scheduler”, but I meant the callback passed by the scheduler. I now understand that what you had in mind was that user of Suspend could provide a pointer (to store the scheduler’s callback), when I thought that you meant that the scheduler could provide a pointer (… which wouldn’t work).

Maybe a slightly different API would be preferable for cross-domain suspensions

I do not think the user of the suspend effect needs to be aware of that. […] (I am assuming that each domain has its own scheduler.)

The scenario that I have in mind is that I’m implementing a datastructure that (1) may need to block its readers in some circumstances and (2) can be shared across domains. (For example cross-domain lazy thunks.)

When I implement an operation that needs to block on the completion of something, the operation is going to run in some domain O, but the completion may happen on another domain C. (In my example, O is the domain forcing a thunk that is already being forced by some other domain C.)

Performing the Suspend effect requires storing the demand from O in my own state. This state needs to be domain-safe / synchronized, which is expected given that I’m implementing a domain-concurrent data structure. Instead of Queue.t as in the (domain-local) MVar example, I needed an atomic queue.

But then, when C decides to complete and wake up the suspensions that were accumulated, it will fire all the callbacks provided by the blocked computations. So the callback provided by O will be invoked by C, and O’s scheduler may at this point be running other code in parallel. In particular, if the callback provided by O accesses its internal scheduler datastructures, it needs to do so in a domain-concurrent synchronized way.

Unless I’m missing something, both the performer of the Suspend effect (the user) and the handler of the Suspend effect (the scheduler) need to be written in domain-concurrent style in this case. I don’t know if we expect scheduler authors to always provide domain-safe callbacks, or if we want to distinguish “suspensions that are known to be local” and “suspensions that may be awaken from another domain” for performance, simplicity (in the local case) or reasoning reasons.

I would say that it is the user fault in this case. They used non-reentrant schedulers inside a multi-domain program.

Having an effect SuspendButDontSuspendIfYouAreNotReentrant is a bit overprotective.

I wondered about an API like

effect Suspend :
  (local:('a -> unit) -> remote:('a -> unit) -> unit)
  -> 'a eff

where the scheduler offers two callbacks, one (local) to be invoked when the completer owns the scheduler state, and the other (remote) when it does not / it is running in parallel.

But in fact I think that this can be avoided with a dynamic check: the scheduler can pass a callback function that looks like:

fun result ->
  if Domain.self () = scheduler_domain ()
  then <local completion>
  else <remote completion>

This is safe (safer than hoping that the user will correctly guess whether completion happens across domains or not) and preserves the simpler API.

It is quite nice that a similar interface has been arrived at independently. I haven’t thought enough about cancellation semantics that cuts across different libraries. Is cancellation something we should be considering at the same level as the ability to block a thread. Is this as fundamental as blocking?

Apart from blocking and cancelling a thread, are there other properties we should be considering? Is there a way to modularly introduce such additional properties?

Yes, that could work. For what it’s worth, this is what Eio_linux uses:

let enqueue_thread st k x =
  Lf_queue.push st.run_q (Thread (k, x));
  if Atomic.get st.need_wakeup then wakeup st

Lf_queue is a lock-free queue that can be added to safely from multiple domains (note that even if this operation is local, it might be running in parallel with a remote one). need_wakeup will always be false if running in the scheduler’s domain. I don’t know whether doing the Domain.self check first would save any time compared to checking the atomic in all cases.

Eio’s Fibre_context also contains a unique fibre ID for tracing (which still needs a bit of work; this is mostly waiting for eventring to go in). We’ll probably add fibre-local variables here too. Not sure what would be the best way to make this modular, or which features should be common to all schedulers.