From ba3832d0944551f5c1e53e88926493511823e855 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loi=CC=88c=20Correnson?= <loic.correnson@cea.fr> Date: Thu, 13 Feb 2020 15:01:41 +0100 Subject: [PATCH] [server] signal machinery --- src/plugins/server/main.ml | 47 +++++++++++++++---- src/plugins/server/main.mli | 6 +++ src/plugins/server/server_zmq.ml | 3 ++ src/plugins/server/share/protocol/server.md | 25 +++++++++- .../server/share/protocol/server_zmq.md | 11 +++-- 5 files changed, 78 insertions(+), 14 deletions(-) diff --git a/src/plugins/server/main.ml b/src/plugins/server/main.ml index 4644360254d..30f6d863983 100644 --- a/src/plugins/server/main.ml +++ b/src/plugins/server/main.ml @@ -59,6 +59,8 @@ type 'a request = [ | `Poll | `Request of 'a * string * json | `Kill of 'a + | `SigOn of string + | `SigOff of string | `Shutdown ] @@ -66,6 +68,7 @@ type 'a response = [ | `Data of 'a * json | `Error of 'a * string | `Killed of 'a + | `Signal of string | `Rejected of 'a ] @@ -85,6 +88,8 @@ type 'a process = { mutable killed : bool ; } +module Sigs = Set.Make(String) + type 'a server = { polling : int ; pretty : Format.formatter -> 'a -> unit ; @@ -93,6 +98,8 @@ type 'a server = { q_in : 'a process Queue.t ; q_out : 'a response Stack.t ; mutable daemon : Db.daemon option ; + mutable s_active : Sigs.t ; (* active *) + mutable s_signal : Sigs.t ; (* signaled *) mutable shutdown : bool ; mutable running : 'a process option ; } @@ -107,7 +114,9 @@ let pp_request pp fmt (r : _ request) = match r with | `Poll -> Format.fprintf fmt "Poll" | `Shutdown -> Format.fprintf fmt "Shutdown" - | `Kill id -> Format.fprintf fmt "Kill %a" pp id + | `SigOn sg -> Format.fprintf fmt "Signal %S : on" sg + | `SigOff sg -> Format.fprintf fmt "Signal %S : off" sg + | `Kill id -> Format.fprintf fmt "Kill [%a]" pp id | `Request(id,request,data) -> if Senv.debug_atleast 2 then Format.fprintf fmt "@[<hov 2>Request [%a] %s@ %a@]" @@ -123,6 +132,7 @@ let pp_response pp fmt (r : _ response) = | `Error(id,err) -> Format.fprintf fmt "Error %a: %s" pp id err | `Rejected id -> Format.fprintf fmt "Rejected %a" pp id | `Killed id -> Format.fprintf fmt "Killed %a" pp id + | `Signal sg -> Format.fprintf fmt "Signal %S" sg | `Data(id,data) -> if Senv.debug_atleast 2 then Format.fprintf fmt "@[<hov 2>Replies [%a]@ %a@]" @@ -185,6 +195,8 @@ let process_request (server : 'a server) (request : 'a request) : unit = Stack.clear server.q_out ; server.shutdown <- true ; end + | `SigOn sg -> server.s_active <- Sigs.add sg server.s_active + | `SigOff sg -> server.s_active <- Sigs.remove sg server.s_active | `Kill id -> begin let set_killed = kill_request server.equal id in @@ -225,17 +237,29 @@ let communicate server = let pool = ref [] in Stack.iter (fun r -> pool := r :: !pool) server.q_out ; Stack.clear server.q_out ; + server.s_active <- Sigs.empty ; message.callback !pool ; Extlib.may raise error ; true (* -------------------------------------------------------------------------- *) -(* --- Yielding --- *) +(* --- Yielding & Signaling --- *) (* -------------------------------------------------------------------------- *) let do_yield server () = Extlib.may raise_if_killed server.running ; ignore ( communicate server ) +let do_signal server s = + if Sigs.mem s server.s_active && not (Sigs.mem s server.s_signal) then + begin + server.s_signal <- Sigs.add s server.s_signal ; + Stack.push (`Signal s) server.q_out ; + end + +let nop _s = () +let emitter : (string -> unit) ref = ref nop +let signal s = !emitter s + (* -------------------------------------------------------------------------- *) (* --- One Step Process --- *) (* -------------------------------------------------------------------------- *) @@ -264,7 +288,7 @@ let kill () = raise Killed let daemons = ref [] let on callback = daemons := !daemons @ [ callback ] -let signal activity = +let set_active activity = List.iter (fun f -> try f activity with _ -> ()) !daemons let create ~pretty ?(equal=(=)) ~fetch () = @@ -273,6 +297,8 @@ let create ~pretty ?(equal=(=)) ~fetch () = fetch ; polling ; equal ; pretty ; q_in = Queue.create () ; q_out = Stack.create () ; + s_active = Sigs.empty ; + s_signal = Sigs.empty ; daemon = None ; running = None ; shutdown = false ; @@ -295,7 +321,7 @@ let start server = (do_yield server) in server.daemon <- Some daemon ; - signal true ; + set_active true ; end let stop server = @@ -306,7 +332,7 @@ let stop server = Senv.feedback "Server disabled." ; server.daemon <- None ; Db.off_progress daemon ; - signal false ; + set_active false ; end let foreground server = @@ -327,9 +353,10 @@ let run server = ( (* TODO: catch-break to be removed once Why3 signal handler is fixed *) Sys.catch_break true ) ; - Senv.feedback "Server running." ; foreground server ; - signal true ; + emitter := do_signal server ; + set_active true ; + Senv.feedback "Server running." ; begin try while not server.shutdown do @@ -339,12 +366,14 @@ let run server = with Sys.Break -> () (* Ctr+C, just leave the loop normally *) end; Senv.feedback "Server shutdown." ; - signal false ; + emitter := nop ; + set_active false ; with | Killed -> () | exn -> Senv.feedback "Server interruped (fatal error)." ; - signal false ; + emitter := nop ; + set_active false ; raise exn (* -------------------------------------------------------------------------- *) diff --git a/src/plugins/server/main.mli b/src/plugins/server/main.mli index 531149ff711..a524000e972 100644 --- a/src/plugins/server/main.mli +++ b/src/plugins/server/main.mli @@ -47,6 +47,8 @@ type 'a request = [ | `Poll | `Request of 'a * string * json | `Kill of 'a + | `SigOn of string + | `SigOff of string | `Shutdown ] @@ -57,6 +59,7 @@ type 'a response = [ | `Error of 'a * string | `Killed of 'a | `Rejected of 'a + | `Signal of string ] (** A paired request-response message. @@ -105,4 +108,7 @@ val kill : unit -> 'a They shall {i never} raise any exception. *) val on : (bool -> unit) -> unit +(** Emits the server signal *) +val signal : string -> unit + (* -------------------------------------------------------------------------- *) diff --git a/src/plugins/server/server_zmq.ml b/src/plugins/server/server_zmq.ml index c736e542210..a870c27440c 100644 --- a/src/plugins/server/server_zmq.ml +++ b/src/plugins/server/server_zmq.ml @@ -98,6 +98,8 @@ let rec decode = function | ("GET"|"SET"|"EXEC")::id::request::data :: w -> `Request(id,request,jdecode data) :: decode w | "KILL"::id:: w -> `Kill id :: decode w + | "SIGON" :: sg :: w -> `SigOn sg :: decode w + | "SIGOFF" :: sg :: w -> `SigOff sg :: decode w | "POLL" :: w -> `Poll :: decode w | "SHUTDOWN" :: _ -> [`Shutdown] | cmd::_ -> raise (WrongEncoding cmd) @@ -108,6 +110,7 @@ let rec encode = function | `Error(id,msg) :: w -> "ERROR" :: id :: msg :: encode w | `Killed id :: w -> "KILLED" :: id :: encode w | `Rejected id :: w -> "REJECTED" :: id :: encode w + | `Signal sg :: w -> "SIGNAL" :: sg :: encode w | [] -> [] (* -------------------------------------------------------------------------- *) diff --git a/src/plugins/server/share/protocol/server.md b/src/plugins/server/share/protocol/server.md index 1e479798cef..8d308a85a4d 100644 --- a/src/plugins/server/share/protocol/server.md +++ b/src/plugins/server/share/protocol/server.md @@ -52,6 +52,26 @@ To summarize: | `SET` | - | - | fast, side-effects | | `EXEC` | - | ✓ | resource demanding | +## Server Signals + +In response to a logical requests, the server might also emit _signals_ +to the client. However, since a lot of signals might be emitted, the server +must be aware of which signals the client is listening to. +Signal are identified by unique strings. + +The server and client can exchange two special commands to manage signals: + +| Command | Issued by | Effect | +|:--------|:---------:|:----------------------------| +| `SIGON s` | client | start listening to signal `<s>` | +| `SIGOFF s` | client | stop listening to signal `<s>` | +| `SIGNAL s` | server | signal `<s>` has been emitted | + +When one or many requests emit some signal `<s>` several times, +the client would be notified only once per cycle of data exchange. +Signals will be notified in addition to responses or logical requests +or server polling. + ## Transport Messages From the entry points layer, the asynchronous behavior of the Server makes @@ -70,10 +90,12 @@ a list of _commands_: | Commands | Parameters | Description | |:--------:|:----------:|:------------| -| `POLL` | - | Ask for pending responses, if any | +| `POLL` | - | Ask for pending responses and signals, if any | | `GET` | `id,request,data` | En-queue the given GET request | | `SET` | `id,request,data` | En-queue the given SET request | | `EXEC` | `id,request,data` | En-queue the given EXEC request | +| `SIGON` | `id` | Start listening to the given signal | +| `SIGOFF` | `id` | Stop listening to the given signal | | `KILL` | `id` | Cancel the given request or interrupt its execution | | `SHUTDOWN` | - | Makes the server to stop running | @@ -84,6 +106,7 @@ of _replies_, listed in table below: |:--------:|:----------:|:------------| | `DATA` | `id,data` | Response data from the identified request | | `ERROR` | `id,message` | Error message from the identified request | +| `SIGNAL` | `id` | The identified signal has been emitted since last exchange | | `KILLED` | `id` | The identified request has been killed or interrupted | | `REJECTED` | `id` | The identified request was not registered on the Server | diff --git a/src/plugins/server/share/protocol/server_zmq.md b/src/plugins/server/share/protocol/server_zmq.md index f510a31888e..67999c67a59 100644 --- a/src/plugins/server/share/protocol/server_zmq.md +++ b/src/plugins/server/share/protocol/server_zmq.md @@ -24,11 +24,13 @@ of each command is a single string identifying the command: | Commands | Parts | Part 1 | Part 2 | Part 3 | Part 4 | |:--------|:-----:|:-------|:-------|:-------|:-------| | `POLL()` | 1 | `"POLL"` | | | | -| `GET(id,request,data)` | 4 | `"GET"` | id | request | data | -| `SET(id,request,data)` | 4 | `"SET"` | id | request | data | +| `GET(id,request,data)` | 4 | `"GET"` | id | request | data | +| `SET(id,request,data)` | 4 | `"SET"` | id | request | data | | `EXEC(id,request,data)` | 4 | `"EXEC"` | id | request | data | -| `KILL(id)` | 2 | `"KILL"` | id | | | -| `SHUTDOWN` | 1 | `"SHUTDOWN"` | | | | +| `SIGON(id)` | 2 | `"SIGON"` | id | | | +| `SIGOFF(id)` | 2 | `"SIGOFF"` | id | | | +| `KILL(id)` | 2 | `"KILL"` | id | | | +| `SHUTDOWN` | 1 | `"SHUTDOWN"` | | | | ## Output Message Format @@ -42,6 +44,7 @@ of each reply is a finel string identifying the reply: | `ERROR(id,message)` | 4 | `"ERROR"` | id | message | | `KILLED(id)` | 2 | `"KILLED"` | id | | | `REJECTED(id)` | 2 | `"REJECTED"` | id | | +| `SIGNAL(id)` | 2 | `"SIGNAL"` | id | | | (special) | 2 | `"WRONG"` | message | | | (special) | 1 | `"NONE"` | | | -- GitLab