From 02cb6253342c65516b524c8488f95f81b23dc8fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loi=CC=88c=20Correnson?= <loic.correnson@cea.fr> Date: Fri, 10 Dec 2021 18:50:26 +0100 Subject: [PATCH] [ivette/server] make server-zmq separated --- ivette/package.json | 4 +- ivette/src/frama-c/client.ts | 75 +++++++++ ivette/src/frama-c/client_zmq.ts | 259 +++++++++++++++++++++++++++++++ ivette/src/frama-c/server.ts | 200 ++++-------------------- 4 files changed, 367 insertions(+), 171 deletions(-) create mode 100644 ivette/src/frama-c/client.ts create mode 100644 ivette/src/frama-c/client_zmq.ts diff --git a/ivette/package.json b/ivette/package.json index 4accc25c115..79696ec9657 100644 --- a/ivette/package.json +++ b/ivette/package.json @@ -77,7 +77,9 @@ "react-virtualized": "^9.21.2", "react-window": "^1.8.6", "source-map-support": "^0.5.16", - "tippy.js": "5.2.1", + "tippy.js": "5.2.1" + }, + "optionalDependencies": { "zeromq": "^6.0.0-beta.5" } } diff --git a/ivette/src/frama-c/client.ts b/ivette/src/frama-c/client.ts new file mode 100644 index 00000000000..526c942b6b8 --- /dev/null +++ b/ivette/src/frama-c/client.ts @@ -0,0 +1,75 @@ +/* ************************************************************************ */ +/* */ +/* This file is part of Frama-C. */ +/* */ +/* Copyright (C) 2007-2021 */ +/* CEA (Commissariat à l'énergie atomique et aux énergies */ +/* alternatives) */ +/* */ +/* you can redistribute it and/or modify it under the terms of the GNU */ +/* Lesser General Public License as published by the Free Software */ +/* Foundation, version 2.1. */ +/* */ +/* It is distributed in the hope that it will be useful, */ +/* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +/* GNU Lesser General Public License for more details. */ +/* */ +/* See the GNU Lesser General Public License version 2.1 */ +/* for more details (enclosed in the file licenses/LGPLv2.1). */ +/* */ +/* ************************************************************************ */ + +import { json } from 'dome/data/json'; + +// -------------------------------------------------------------------------- +// --- Frama-C Server Access (Client side) +// -------------------------------------------------------------------------- + +export interface Client { + + /** Connection */ + connect(addr: string): void; + + /** Disconnection */ + disconnect(): void; + + /** Send Request */ + send(kind: string, id: string, request: string, data: any): void; + + /** Signal ON */ + sigOn(id: string): void; + + /** Signal ON */ + sigOff(id: string): void; + + /** Kill Request */ + kill(id: string): void; + + /** Polling */ + poll(): void; + + /** Shutdown the server */ + shutdown(): void; + + /** Request data callback */ + onData(callback: (id: string, data: json) => void): void; + + /** Rejected request callback */ + onRejected(callback: (id: string, msg: string) => void): void; + + /** Killed request callback */ + onKilled(callback: (id: string) => void): void; + + /** Signal callback */ + onSignal(callback: (id: string) => void): void; + + /** Error callback */ + onError(callback: (msg: string) => void): void; + + /** Idle callback */ + onIdle(callback: () => void): void; + +} + +// -------------------------------------------------------------------------- diff --git a/ivette/src/frama-c/client_zmq.ts b/ivette/src/frama-c/client_zmq.ts new file mode 100644 index 00000000000..e3432ca39a5 --- /dev/null +++ b/ivette/src/frama-c/client_zmq.ts @@ -0,0 +1,259 @@ +/* ************************************************************************ */ +/* */ +/* This file is part of Frama-C. */ +/* */ +/* Copyright (C) 2007-2021 */ +/* CEA (Commissariat à l'énergie atomique et aux énergies */ +/* alternatives) */ +/* */ +/* you can redistribute it and/or modify it under the terms of the GNU */ +/* Lesser General Public License as published by the Free Software */ +/* Foundation, version 2.1. */ +/* */ +/* It is distributed in the hope that it will be useful, */ +/* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +/* GNU Lesser General Public License for more details. */ +/* */ +/* See the GNU Lesser General Public License version 2.1 */ +/* for more details (enclosed in the file licenses/LGPLv2.1). */ +/* */ +/* ************************************************************************ */ + +import Emitter from 'events'; +import { Request as ZmqRequest } from 'zeromq'; +import { json } from 'dome/data/json'; +import { Client } from './client'; + +const pollingTimeout = 50; + +// -------------------------------------------------------------------------- +// --- Frama-C Server API +// -------------------------------------------------------------------------- + +class ZmqClient implements Client { + + constructor() { } + + events = new Emitter(); + + queueCmd: string[] = []; + queueId: string[] = []; + zmqSocket: ZmqRequest | undefined; + zmqIsBusy = false; + + /** Connection */ + connect(sockaddr: string): void { + if (!this.zmqSocket) { + this.zmqSocket.close(); + } + this.zmqSocket = new ZmqRequest(); + this.zmqIsBusy = false; + this.zmqSocket.connect(sockaddr); + } + + disconnect(): void { + this.zmqIsBusy = false; + this.queueCmd = []; + this.queueId = []; + if (this.zmqSocket) { + this.zmqSocket.close(); + this.zmqSocket = undefined; + } + } + + /** Send Request */ + send(kind: string, id: string, request: string, data: any): void { + this.queueCmd.push(kind, id, request, data); + this.queueId.push(id); + this._flush(); + } + + /** Signal ON */ + sigOn(id: string): void { this.queueCmd.push('SIGON', id); this._flush(); } + + /** Signal ON */ + sigOff(id: string): void { this.queueCmd.push('SIGOFF', id); this._flush(); } + + /** Kill Request */ + kill(id: string): void { + if (this.zmqSocket) { + this.queueCmd.push('KILL', id); + this._flush(); + } + } + + /** Polling */ + poll(): void { } + + /** Shutdown the server */ + shutdown(): void { + this._reset(); + this._flush(); + this.queueCmd.push('SHUTDOWN'); + } + + /** Request data callback */ + onData(callback: (id: string, data: json) => void): void { + this.events.on('DATA', callback); + } + + /** Rejected request callback */ + onRejected(callback: (id: string, err: string) => void): void { + this.events.on('REJECT', callback); + } + + /** Request error callback */ + onError(callback: (msg: string) => void): void { + this.events.on('ERROR', callback); + } + + /** Killed request callback */ + onKilled(callback: (id: string) => void): void { + this.events.on('KILL', callback); + } + + /** Signal callback */ + onSignal(callback: (id: string) => void): void { + this.events.on('SIGNAL', callback); + } + + /** Idle callback */ + onIdle(callback: () => void): void { + this.events.on('CALLBACK', callback); + } + + // -------------------------------------------------------------------------- + // --- Low-Level Management + // -------------------------------------------------------------------------- + + pollingTimer: NodeJS.Timeout | undefined; + flushingTimer: NodeJS.Immediate | undefined; + + _reset() { + if (this.flushingTimer) { + clearImmediate(this.flushingTimer); + this.flushingTimer = undefined; + } + if (this.pollingTimer) { + clearTimeout(this.pollingTimer); + this.pollingTimer = undefined; + } + } + + _flush() { + if (!this.flushingTimer) { + this.flushingTimer = setImmediate(() => { + this.flushingTimer = undefined; + this._send(); + }); + } + } + + _poll() { + if (!this.pollingTimer) { + this.pollingTimer = setTimeout(() => { + this.pollingTimer = undefined; + this._send(); + }, pollingTimeout); + } + } + + async _send() { + // when busy, will be eventually re-triggered + if (!this.zmqIsBusy) { + const cmds = this.queueCmd; + if (!cmds.length) { + this.queueCmd.push('POLL'); + this.events.emit('IDLE'); + } + this.zmqIsBusy = true; + const ids = this.queueId; + this.queueCmd = []; + this.queueId = []; + try { + await this.zmqSocket?.send(cmds); + const resp = await this.zmqSocket?.receive(); + this._receive(resp); + } catch (error) { + this._error(`Error in send/receive on ZMQ socket. ${error.toString()}`); + const err = 'Canceled request'; + ids.forEach((rid) => this._reject(rid, err)); + } + this.zmqIsBusy = false; + this.events.emit('IDLE'); + } + } + + _data(id: string, data: any) { + this.events.emit('DATA', id, data); + } + + _reject(id: string, error: string) { + this.events.emit('REJECT', id, error); + } + + _signal(id: string) { + this.events.emit('SIGNAL', id); + } + + _error(err: any) { + this.events.emit('ERROR', err); + } + + _receive(resp: any) { + try { + let rid; + let data; + let err; + let cmd; + const shift = () => resp.shift().toString(); + let unknownResponse = false; + while (resp.length && !unknownResponse) { + cmd = shift(); + switch (cmd) { + case 'NONE': + break; + case 'DATA': + rid = shift(); + data = shift(); + this._data(rid, data); + break; + case 'KILLED': + rid = shift(); + this._reject(rid, 'Killed'); + break; + case 'ERROR': + rid = shift(); + err = shift(); + this._reject(rid, err); + break; + case 'REJECTED': + rid = shift(); + this._reject(rid, 'Rejected'); + break; + case 'SIGNAL': + rid = shift(); + this._signal(rid); + break; + case 'WRONG': + err = shift(); + this._error(`ZMQ Protocol Error: ${err}`); + break; + default: + this._error(`Unknown Response: ${cmd}`); + unknownResponse = true; + break; + } + } + } finally { + if (this.queueCmd.length) this._flush(); + else this._poll(); + } + } + +} + +export const client: Client = new ZmqClient(); + +// -------------------------------------------------------------------------- diff --git a/ivette/src/frama-c/server.ts b/ivette/src/frama-c/server.ts index d13a5ab4b3f..feebcd4a9d9 100644 --- a/ivette/src/frama-c/server.ts +++ b/ivette/src/frama-c/server.ts @@ -21,7 +21,7 @@ /* ************************************************************************ */ // -------------------------------------------------------------------------- -// --- Frama-C Server +// --- Connection to Frama-C Server // -------------------------------------------------------------------------- /** @@ -36,8 +36,8 @@ import * as Dome from 'dome'; import * as System from 'dome/system'; import * as Json from 'dome/data/json'; import { RichTextBuffer } from 'dome/text/buffers'; -import { Request as ZmqRequest } from 'zeromq'; import { ChildProcess } from 'child_process'; +import { client } from './client_zmq'; // -------------------------------------------------------------------------- // --- Pretty Printing (Browser Console) @@ -150,19 +150,6 @@ type RejectPromise = (error: Error) => void; /** Pending promise callbacks (pairs of (resolve, reject)). */ let pending: IndexedPair<ResolvePromise, RejectPromise> = {}; -/** Queue of server commands to be sent. */ -let queueCmd: string[] = []; - -/** Waiting request ids to be sent. */ -let queueId: string[] = []; - -/** Polling timeout and timer. */ -const pollingTimeout = 50; -let pollingTimer: NodeJS.Timeout | undefined; - -/** Flushing timer. */ -let flushingTimer: NodeJS.Immediate | undefined; - /** Server process. */ let process: ChildProcess | undefined; @@ -170,11 +157,6 @@ let process: ChildProcess | undefined; const killingTimeout = 300; let killingTimer: NodeJS.Timeout | undefined; -/** ZMQ (REQ) socket. */ -let zmqSocket: ZmqRequest | undefined; -/** Flag on whether ZMQ socket is busy. */ -let zmqIsBusy = false; - // -------------------------------------------------------------------------- // --- Server Console // -------------------------------------------------------------------------- @@ -513,9 +495,7 @@ async function _launch() { } }); // Connect to Server - zmqSocket = new ZmqRequest(); - zmqIsBusy = false; - zmqSocket.connect(sockaddr); + client.connect(sockaddr); } // -------------------------------------------------------------------------- @@ -526,28 +506,20 @@ function _reset() { D.log('Reset to initial configuration'); rqCount = 0; - queueCmd = []; - queueId = []; _.forEach(pending, ([, reject]) => reject(new Error('Server reset'))); pending = {}; - if (flushingTimer) { - clearImmediate(flushingTimer); - flushingTimer = undefined; - } - if (pollingTimer) { - clearTimeout(pollingTimer); - pollingTimer = undefined; - } + if (killingTimer) { clearTimeout(killingTimer); killingTimer = undefined; } + } function _kill() { D.log('Hard kill'); - _reset(); + client.disconnect(); if (process) { process.kill(); } @@ -555,10 +527,8 @@ function _kill() { async function _shutdown() { D.log('Shutdown'); - _reset(); - queueCmd.push('SHUTDOWN'); - _flush(); + client.shutdown(); const killingPromise = new Promise((resolve) => { if (!killingTimer) { if (process) { @@ -575,11 +545,7 @@ async function _shutdown() { function _exit(error?: Error) { _reset(); - if (zmqSocket) { - zmqSocket.close(); - zmqSocket = undefined; - } - zmqIsBusy = false; + client.disconnect(); process = undefined; if (status.stage === Stage.RESTARTING) { setImmediate(start); @@ -595,12 +561,12 @@ function _exit(error?: Error) { // -------------------------------------------------------------------------- class SignalHandler { - id: any; + id: string; event: Dome.Event; active: boolean; listen: boolean; - constructor(id: any) { + constructor(id: string) { this.id = id; this.event = new SIGNAL(id); this.active = false; @@ -635,8 +601,7 @@ class SignalHandler { sigon() { if (this.active && !this.listen) { this.listen = true; - queueCmd.push('SIGON', this.id); - _flush(); + client.sigOn(this.id); } } @@ -645,8 +610,7 @@ class SignalHandler { if (!this.active && this.listen) { if (isRunning()) { this.listen = false; - queueCmd.push('SIGOFF', this.id); - _flush(); + client.sigOff(this.id); } } } @@ -660,7 +624,7 @@ class SignalHandler { const signals: Map<string, SignalHandler> = new Map(); -function _signal(id: any) { +function _signal(id: string): SignalHandler { let s = signals.get(id); if (!s) { s = new SignalHandler(id); @@ -669,6 +633,10 @@ function _signal(id: any) { return s; } +client.onSignal((id: string) => { + _signal(id).event.emit(); +}) + // --- External API /** @@ -776,7 +744,6 @@ export function send<In, Out>( if (!request.name) return Promise.reject(new Error('Undefined request')); const rid = `RQ.${rqCount}`; rqCount += 1; - const data = JSON.stringify(param); const promise: Killable<Out> = new Promise<Out>((resolve, reject) => { const decodedResolve = (js: Json.json) => { const result = Json.jTry(request.output)(js); @@ -785,139 +752,32 @@ export function send<In, Out>( pending[rid] = [decodedResolve, reject]; }); promise.kill = () => { - if (zmqSocket && pending[rid]) { - queueCmd.push('KILL', rid); - _flush(); - } + if (pending[rid]) client.kill(rid); }; - queueCmd.push(request.kind, rid, request.name, data); - queueId.push(rid); - _flush(); + client.send(request.kind, rid, request.name, param); return promise; } -function _resolve(id: string | number, data: string) { +client.onData((id: string, data: Json.json) => { const [resolve] = pending[id]; if (resolve) { delete pending[id]; - resolve(JSON.parse(data)); + resolve(data); } -} +}); -function _reject(id: string | number, error: Error) { +client.onRejected((id: string, error: string) => { const [, reject] = pending[id]; if (reject) { delete pending[id]; - reject(error); - } -} - -function _cancel(ids: any[]) { - ids.forEach((rid) => _reject(rid, new Error('Canceled request'))); -} - -function _waiting() { - return _.find(pending, () => true) !== undefined; -} - -// -------------------------------------------------------------------------- -// --- Server Command Queue -// -------------------------------------------------------------------------- - -function _flush() { - if (!flushingTimer) { - flushingTimer = setImmediate(() => { - flushingTimer = undefined; - _send(); - }); + reject(new Error(error)); } -} - -function _poll() { - if (!pollingTimer) { - const delay = (config && config.polling) || pollingTimeout; - pollingTimer = setTimeout(() => { - pollingTimer = undefined; - _send(); - }, delay); - } -} - -async function _send() { - // when busy, will be eventually re-triggered - if (!zmqIsBusy) { - const cmds = queueCmd; - if (!cmds.length) { - cmds.push('POLL'); - if (!_waiting()) - rqCount = 0; // No pending command nor pending response - } - zmqIsBusy = true; - const ids = queueId; - queueCmd = []; - queueId = []; - try { - await zmqSocket?.send(cmds); - const resp = await zmqSocket?.receive(); - _receive(resp); - } catch (error) { - D.error(`Error in send/receive on ZMQ socket. ${error.toString()}`); - _cancel(ids); - } - zmqIsBusy = false; - STATUS.emit(status); - } -} +}); -function _receive(resp: any) { - try { - let rid; - let data; - let err; - let cmd; - const shift = () => resp.shift().toString(); - let unknownResponse = false; - while (resp.length && !unknownResponse) { - cmd = shift(); - switch (cmd) { - case 'NONE': - break; - case 'DATA': - rid = shift(); - data = shift(); - _resolve(rid, data); - break; - case 'KILLED': - rid = shift(); - _reject(rid, new Error('Killed')); - break; - case 'ERROR': - rid = shift(); - err = shift(); - _reject(rid, err); - break; - case 'REJECTED': - rid = shift(); - _reject(rid, new Error('Rejected')); - break; - case 'SIGNAL': - rid = shift(); - (new SIGNAL(rid)).emit(); - break; - case 'WRONG': - err = shift(); - D.error(`ZMQ Protocol Error: ${err}`); - break; - default: - D.error(`Unknown Response: ${cmd}`); - unknownResponse = true; - break; - } - } - } finally { - if (queueCmd.length) _flush(); - else _poll(); - } -} +client.onIdle(() => { + const waiting = _.find(pending, () => true) !== undefined; + if (!waiting) + rqCount = 0; // No pending command nor pending response +}); // -------------------------------------------------------------------------- -- GitLab