Running transactions in caqti with Dream and Rapper?


I’m trying to figure out how to run transactions in Dream with Caqti sql.

Say I have a few insert queries made with rapper

let insert_user id = [%rapper get_one {sql| INSERT INTO.... |sql}] 
let insert-post id = [%rapper get_one {sql| INSERT INTO.... |sql}]

Is there a way to run multiple insert queries all as a transaction in Dream? What’s the best way to do so?

The second argument of Dream.sql is Caqti_lwt.connection -> 'a promise, and Caqti_lwt.connection is defined as type connection = (module CONNECTION) wich means it’s actually the module you need to run your transaction.
So if you unpack the module in the signature of your function, you’ll have all you need:

let f ((module DB : Caqti_lwt.CONNECTION) as connection) =

So now you can use the DB module to access with_transaction, start, commit, rollback or use the connection variable if you need to pass the argument to another function.

1 Like

Awesome, thank you!

I was also able to find a usage example in a PR that was unmerged:

(** Runs transaction from left to right *)
let sql_transaction queries =
  let open Lwt.Infix in
  let aux (module Connection : Caqti_lwt.CONNECTION) =
    match%lwt Connection.start () with
    > Ok () ->
      let query_result =
          (fun a b -> a >>= fun _ -> b (module Connection : Caqti_lwt.CONNECTION))
          (Lwt.return (Ok ()))
      (match%lwt query_result with
       > Ok _ -> Connection.commit ()
       > Error _ as e -> Connection.rollback () >>= fun _ -> Lwt.return e)
    > Error _ as e -> Lwt.return e

Which you can then run

let store_user = [%rapper execute {sql| INSERT INTO users ... |sql}]
let store_something_else = [%rapper execute {sql| INSERT INTO something_else ... |sql}]
Dream.sql req (sql_transaction [store_user params_except_connection; store_something_else params])

If I understand this correctly

I don’t think (module Connection : Caqti_lwt.CONNECTION) carries enough information to connect to the database.

With Dream, there is a Dream.sql function which is used in the following template:

Dream.sql request (fun db -> 
    match%lwt query ~arg1 ~arg2 db with
    | Ok result -> do something with the result
    | Error err -> do something about the error

(Nota err convert it to string)

where query is a [%rapper …] built query.

Each other queries must be preceded by let%lwt or match%lwt (or other %lwt tokens)

The pool connection must be initialized:

let () = ?interface:(Some "")
  @@   Dream.sql_pool ~size:50 "mariadb://user:password@localhost/db"

The fold_left and the bind function (>>=) are needed if we want to iterate the list AND run a new transaction for every item of the list. But I guess an higher level redaction can be prefered:

let%lwt results = Lwt_list.map_s
   (function which convert an item to a query call)
   list in
  [analyze the results]

(Not tried…)

Note : I guess in fun a b -> a >>= fun _ -> b the _ means that we ignore the previous result/error at each step… and return the last result/error ! I will try to find a better approach… (stopping at the first error). Perhaps simply opening Lwt_result.Infix instead of Lwt.Infix.

Well, I am unhappy with the

let aux (module Connection : Caqti_lwt.CONNECTION)

My understanding is that this sentence can’t have an idea of the location, credentials of the database.

I guess a good alternative is:

let sql_transaction queries cnx =
  let module Connection = (val cnx : Caqti_lwt.CONNECTION) in

The Lwt.Infix doesn’t stop the query chain on the first error. I prefer the use of the Lwt_result monad.

I have updated my tutorial. index (caqti_ppx_rapper.index) It includes my version of transaction which is heavily inspired.


Dream.sql req (sql_transaction [store_user params_except_connection; store_something_else params])

will evalute:

sql_transaction [store_user params_except_connection; store_something_else params] db_cnx

Where db_cnx is the connexion handler… You will need a function with this extra parameter, like I have proposed.