Async socket question

Hi all. This is my first post, and I am quite new to OCaml.

I am working on a small TCP client utility, using Async/Core.

The connection is opened using

Tcp.with_connection (Tcp.Where_to_connect.of_host_and_port { host = "localhost"; port = myPort })

I need to be able to accept keyboard input, as well as read input from the socket. I use the Deferred.any for this purpose.

Calling Reader.read reader buf on the socket results in `Eof, which is OK, but when the method (containing the Deferred.any code) is called recursively, I get an exception:

“unhandled exception in Async scheduler”
(“unhandled exception”
((monitor.ml.Error
(“can not read from reader” (reason “in use”)
(reader
((id 0) (state In_use) (available 0) (pos 0)
(open_flags (Full (Ok (rdwr () (unrecognized_bits 0x8000)))))
(last_read_time (2019-12-09 11:34:02.122535-05:00))
(close_may_destroy_buf Not_now) (close_finished Empty)
(fd
((file_descr 0) (info ) (kind Char) (supports_nonblock false)
(have_set_nonblock false) (state (Open Empty))
(watching ((read Not_watching) (write Not_watching)))
(watching_has_changed false) (num_active_syscalls 1)
(close_finished Empty))))))

Reader.is_closed on the reader returns false.
How can I “monitor” the socket recursively without this exception?

Michael

It’s a bit difficult to know what you mean by this. Can you share the code you are using to try to read from the socket?

let  logIn reader writer _ = 
  let buf = Bytes.create 1024 in
  Writer.write writer ( generateLogonMessage 1 );
  Writer.flushed writer >>= function () -> 
  Reader.read reader buf >>| function
    | `Eof ->  None 
    | `Ok len ->  Some (buf)
    

let readInput reader =      
   let buf = Bytes.create 2048 in 
    Deferred.any [ 
      (Reader.read_line inn >>= function
      | `Eof -> return None
      | `Ok line ->  return (Some (Bytes.of_string line))); (* user input *)
      
      (Reader.read reader buf >>= function
      | `Eof -> return None
      | `Ok len -> return (Some (Bytes.sub ~pos:0 ~len:len buf))); (* data from server *)   ]
  

let onConnect _ reader writer =
  let rec loop connected =
      let seqNum = 0 in
      match connected with
      | false -> logIn reader writer (seqNum + 1) >>= (function 
                | None -> (loop true)
                | Some _ -> (loop false))
      | true -> 
            
        readInput reader >>= function 
          | None -> (loop true)
          | Some command -> begin
            
            match (String.substr_index command ~pattern:"cmd:") with
            | None -> Writer.write out command; Writer.flushed out 
            | Some _ -> Writer.write_bytes writer command; Writer.flushed writer 

            end           
          in (loop false)

readInput is supposed to take input from keyboard/socket

Ah, I believe what is happening is that you have two simultaneous calls to Reader.read, which is disallowed (see the module comment on Reader):

Each of the read functions returns a deferred that will become determined when the read completes. It is an error to have two simultaneous reads. That is, if you call a read function, you should not call another read function until the first one completes.

One suggestion would be to use the Pipe interface rather than directly using Reader. You can call Reader.lines or Reader.pipe and then use Pipe.read_choice to “try” to read from the socket (without actually consuming the data if the other input arrives first).

1 Like

So, it is not possible to read from 2 different sources simultaneously?

You can read from two different sources simultaneously. Rather, you cannot call Reader.read on the same Reader.t twice simultaneously.

What I think is happening is that on the first call to loop, you read from user input and also the socket. One of them completes, which causes loop to be called again. But it calls Reader.read again, on the same two Reader.t's. If the one which was waiting for input in the first call is still waiting for input, then you have called Reader.read twice simultaneously on the same Reader.t, which is disallowed. (There is also a correctness bug, because you will throw away the data that you consumed from the first call to Reader.read on the slower input).

Instead, you can do something like:

let readInput in1 in2 =
  Deferred.choice
    [ Pipe.read_choice_single_consumer_exn in1 [%here] |> Deferred.Choice.map ~f:Either.first
    ; Pipe.read_choice_single_consumer_exn in2 [%here] |> Deferred.Choice.map ~f:Either.second
    ]
  >>= function
  | First input_from_in1 -> do_something ()
  | Second input_from_in2 -> do_something_else ()

You can get Pipe.Reader.t's from the Reader.t using Reader.pipe (call once and reuse the same Pipe.Reader.t).

1 Like

Thanks a lot!
I really like working with OCaml, but it’s hard to come across code examples, especially for advanced topics such as this.