Skip to content
Snippets Groups Projects
Commit 02cb6253 authored by Loïc Correnson's avatar Loïc Correnson
Browse files

[ivette/server] make server-zmq separated

parent a67f9127
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,9 @@
"react-virtualized": "^9.21.2",
"react-window": "^1.8.6",
"source-map-support": "^0.5.16",
"tippy.js": "5.2.1",
"tippy.js": "5.2.1"
},
"optionalDependencies": {
"zeromq": "^6.0.0-beta.5"
}
}
/* ************************************************************************ */
/* */
/* This file is part of Frama-C. */
/* */
/* Copyright (C) 2007-2021 */
/* CEA (Commissariat à l'énergie atomique et aux énergies */
/* alternatives) */
/* */
/* you can redistribute it and/or modify it under the terms of the GNU */
/* Lesser General Public License as published by the Free Software */
/* Foundation, version 2.1. */
/* */
/* It is distributed in the hope that it will be useful, */
/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
/* GNU Lesser General Public License for more details. */
/* */
/* See the GNU Lesser General Public License version 2.1 */
/* for more details (enclosed in the file licenses/LGPLv2.1). */
/* */
/* ************************************************************************ */
import { json } from 'dome/data/json';
// --------------------------------------------------------------------------
// --- Frama-C Server Access (Client side)
// --------------------------------------------------------------------------
export interface Client {
/** Connection */
connect(addr: string): void;
/** Disconnection */
disconnect(): void;
/** Send Request */
send(kind: string, id: string, request: string, data: any): void;
/** Signal ON */
sigOn(id: string): void;
/** Signal ON */
sigOff(id: string): void;
/** Kill Request */
kill(id: string): void;
/** Polling */
poll(): void;
/** Shutdown the server */
shutdown(): void;
/** Request data callback */
onData(callback: (id: string, data: json) => void): void;
/** Rejected request callback */
onRejected(callback: (id: string, msg: string) => void): void;
/** Killed request callback */
onKilled(callback: (id: string) => void): void;
/** Signal callback */
onSignal(callback: (id: string) => void): void;
/** Error callback */
onError(callback: (msg: string) => void): void;
/** Idle callback */
onIdle(callback: () => void): void;
}
// --------------------------------------------------------------------------
/* ************************************************************************ */
/* */
/* This file is part of Frama-C. */
/* */
/* Copyright (C) 2007-2021 */
/* CEA (Commissariat à l'énergie atomique et aux énergies */
/* alternatives) */
/* */
/* you can redistribute it and/or modify it under the terms of the GNU */
/* Lesser General Public License as published by the Free Software */
/* Foundation, version 2.1. */
/* */
/* It is distributed in the hope that it will be useful, */
/* but WITHOUT ANY WARRANTY; without even the implied warranty of */
/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
/* GNU Lesser General Public License for more details. */
/* */
/* See the GNU Lesser General Public License version 2.1 */
/* for more details (enclosed in the file licenses/LGPLv2.1). */
/* */
/* ************************************************************************ */
import Emitter from 'events';
import { Request as ZmqRequest } from 'zeromq';
import { json } from 'dome/data/json';
import { Client } from './client';
const pollingTimeout = 50;
// --------------------------------------------------------------------------
// --- Frama-C Server API
// --------------------------------------------------------------------------
class ZmqClient implements Client {
constructor() { }
events = new Emitter();
queueCmd: string[] = [];
queueId: string[] = [];
zmqSocket: ZmqRequest | undefined;
zmqIsBusy = false;
/** Connection */
connect(sockaddr: string): void {
if (!this.zmqSocket) {
this.zmqSocket.close();
}
this.zmqSocket = new ZmqRequest();
this.zmqIsBusy = false;
this.zmqSocket.connect(sockaddr);
}
disconnect(): void {
this.zmqIsBusy = false;
this.queueCmd = [];
this.queueId = [];
if (this.zmqSocket) {
this.zmqSocket.close();
this.zmqSocket = undefined;
}
}
/** Send Request */
send(kind: string, id: string, request: string, data: any): void {
this.queueCmd.push(kind, id, request, data);
this.queueId.push(id);
this._flush();
}
/** Signal ON */
sigOn(id: string): void { this.queueCmd.push('SIGON', id); this._flush(); }
/** Signal ON */
sigOff(id: string): void { this.queueCmd.push('SIGOFF', id); this._flush(); }
/** Kill Request */
kill(id: string): void {
if (this.zmqSocket) {
this.queueCmd.push('KILL', id);
this._flush();
}
}
/** Polling */
poll(): void { }
/** Shutdown the server */
shutdown(): void {
this._reset();
this._flush();
this.queueCmd.push('SHUTDOWN');
}
/** Request data callback */
onData(callback: (id: string, data: json) => void): void {
this.events.on('DATA', callback);
}
/** Rejected request callback */
onRejected(callback: (id: string, err: string) => void): void {
this.events.on('REJECT', callback);
}
/** Request error callback */
onError(callback: (msg: string) => void): void {
this.events.on('ERROR', callback);
}
/** Killed request callback */
onKilled(callback: (id: string) => void): void {
this.events.on('KILL', callback);
}
/** Signal callback */
onSignal(callback: (id: string) => void): void {
this.events.on('SIGNAL', callback);
}
/** Idle callback */
onIdle(callback: () => void): void {
this.events.on('CALLBACK', callback);
}
// --------------------------------------------------------------------------
// --- Low-Level Management
// --------------------------------------------------------------------------
pollingTimer: NodeJS.Timeout | undefined;
flushingTimer: NodeJS.Immediate | undefined;
_reset() {
if (this.flushingTimer) {
clearImmediate(this.flushingTimer);
this.flushingTimer = undefined;
}
if (this.pollingTimer) {
clearTimeout(this.pollingTimer);
this.pollingTimer = undefined;
}
}
_flush() {
if (!this.flushingTimer) {
this.flushingTimer = setImmediate(() => {
this.flushingTimer = undefined;
this._send();
});
}
}
_poll() {
if (!this.pollingTimer) {
this.pollingTimer = setTimeout(() => {
this.pollingTimer = undefined;
this._send();
}, pollingTimeout);
}
}
async _send() {
// when busy, will be eventually re-triggered
if (!this.zmqIsBusy) {
const cmds = this.queueCmd;
if (!cmds.length) {
this.queueCmd.push('POLL');
this.events.emit('IDLE');
}
this.zmqIsBusy = true;
const ids = this.queueId;
this.queueCmd = [];
this.queueId = [];
try {
await this.zmqSocket?.send(cmds);
const resp = await this.zmqSocket?.receive();
this._receive(resp);
} catch (error) {
this._error(`Error in send/receive on ZMQ socket. ${error.toString()}`);
const err = 'Canceled request';
ids.forEach((rid) => this._reject(rid, err));
}
this.zmqIsBusy = false;
this.events.emit('IDLE');
}
}
_data(id: string, data: any) {
this.events.emit('DATA', id, data);
}
_reject(id: string, error: string) {
this.events.emit('REJECT', id, error);
}
_signal(id: string) {
this.events.emit('SIGNAL', id);
}
_error(err: any) {
this.events.emit('ERROR', err);
}
_receive(resp: any) {
try {
let rid;
let data;
let err;
let cmd;
const shift = () => resp.shift().toString();
let unknownResponse = false;
while (resp.length && !unknownResponse) {
cmd = shift();
switch (cmd) {
case 'NONE':
break;
case 'DATA':
rid = shift();
data = shift();
this._data(rid, data);
break;
case 'KILLED':
rid = shift();
this._reject(rid, 'Killed');
break;
case 'ERROR':
rid = shift();
err = shift();
this._reject(rid, err);
break;
case 'REJECTED':
rid = shift();
this._reject(rid, 'Rejected');
break;
case 'SIGNAL':
rid = shift();
this._signal(rid);
break;
case 'WRONG':
err = shift();
this._error(`ZMQ Protocol Error: ${err}`);
break;
default:
this._error(`Unknown Response: ${cmd}`);
unknownResponse = true;
break;
}
}
} finally {
if (this.queueCmd.length) this._flush();
else this._poll();
}
}
}
export const client: Client = new ZmqClient();
// --------------------------------------------------------------------------
......@@ -21,7 +21,7 @@
/* ************************************************************************ */
// --------------------------------------------------------------------------
// --- Frama-C Server
// --- Connection to Frama-C Server
// --------------------------------------------------------------------------
/**
......@@ -36,8 +36,8 @@ import * as Dome from 'dome';
import * as System from 'dome/system';
import * as Json from 'dome/data/json';
import { RichTextBuffer } from 'dome/text/buffers';
import { Request as ZmqRequest } from 'zeromq';
import { ChildProcess } from 'child_process';
import { client } from './client_zmq';
// --------------------------------------------------------------------------
// --- Pretty Printing (Browser Console)
......@@ -150,19 +150,6 @@ type RejectPromise = (error: Error) => void;
/** Pending promise callbacks (pairs of (resolve, reject)). */
let pending: IndexedPair<ResolvePromise, RejectPromise> = {};
/** Queue of server commands to be sent. */
let queueCmd: string[] = [];
/** Waiting request ids to be sent. */
let queueId: string[] = [];
/** Polling timeout and timer. */
const pollingTimeout = 50;
let pollingTimer: NodeJS.Timeout | undefined;
/** Flushing timer. */
let flushingTimer: NodeJS.Immediate | undefined;
/** Server process. */
let process: ChildProcess | undefined;
......@@ -170,11 +157,6 @@ let process: ChildProcess | undefined;
const killingTimeout = 300;
let killingTimer: NodeJS.Timeout | undefined;
/** ZMQ (REQ) socket. */
let zmqSocket: ZmqRequest | undefined;
/** Flag on whether ZMQ socket is busy. */
let zmqIsBusy = false;
// --------------------------------------------------------------------------
// --- Server Console
// --------------------------------------------------------------------------
......@@ -513,9 +495,7 @@ async function _launch() {
}
});
// Connect to Server
zmqSocket = new ZmqRequest();
zmqIsBusy = false;
zmqSocket.connect(sockaddr);
client.connect(sockaddr);
}
// --------------------------------------------------------------------------
......@@ -526,28 +506,20 @@ function _reset() {
D.log('Reset to initial configuration');
rqCount = 0;
queueCmd = [];
queueId = [];
_.forEach(pending, ([, reject]) => reject(new Error('Server reset')));
pending = {};
if (flushingTimer) {
clearImmediate(flushingTimer);
flushingTimer = undefined;
}
if (pollingTimer) {
clearTimeout(pollingTimer);
pollingTimer = undefined;
}
if (killingTimer) {
clearTimeout(killingTimer);
killingTimer = undefined;
}
}
function _kill() {
D.log('Hard kill');
_reset();
client.disconnect();
if (process) {
process.kill();
}
......@@ -555,10 +527,8 @@ function _kill() {
async function _shutdown() {
D.log('Shutdown');
_reset();
queueCmd.push('SHUTDOWN');
_flush();
client.shutdown();
const killingPromise = new Promise((resolve) => {
if (!killingTimer) {
if (process) {
......@@ -575,11 +545,7 @@ async function _shutdown() {
function _exit(error?: Error) {
_reset();
if (zmqSocket) {
zmqSocket.close();
zmqSocket = undefined;
}
zmqIsBusy = false;
client.disconnect();
process = undefined;
if (status.stage === Stage.RESTARTING) {
setImmediate(start);
......@@ -595,12 +561,12 @@ function _exit(error?: Error) {
// --------------------------------------------------------------------------
class SignalHandler {
id: any;
id: string;
event: Dome.Event;
active: boolean;
listen: boolean;
constructor(id: any) {
constructor(id: string) {
this.id = id;
this.event = new SIGNAL(id);
this.active = false;
......@@ -635,8 +601,7 @@ class SignalHandler {
sigon() {
if (this.active && !this.listen) {
this.listen = true;
queueCmd.push('SIGON', this.id);
_flush();
client.sigOn(this.id);
}
}
......@@ -645,8 +610,7 @@ class SignalHandler {
if (!this.active && this.listen) {
if (isRunning()) {
this.listen = false;
queueCmd.push('SIGOFF', this.id);
_flush();
client.sigOff(this.id);
}
}
}
......@@ -660,7 +624,7 @@ class SignalHandler {
const signals: Map<string, SignalHandler> = new Map();
function _signal(id: any) {
function _signal(id: string): SignalHandler {
let s = signals.get(id);
if (!s) {
s = new SignalHandler(id);
......@@ -669,6 +633,10 @@ function _signal(id: any) {
return s;
}
client.onSignal((id: string) => {
_signal(id).event.emit();
})
// --- External API
/**
......@@ -776,7 +744,6 @@ export function send<In, Out>(
if (!request.name) return Promise.reject(new Error('Undefined request'));
const rid = `RQ.${rqCount}`;
rqCount += 1;
const data = JSON.stringify(param);
const promise: Killable<Out> = new Promise<Out>((resolve, reject) => {
const decodedResolve = (js: Json.json) => {
const result = Json.jTry(request.output)(js);
......@@ -785,139 +752,32 @@ export function send<In, Out>(
pending[rid] = [decodedResolve, reject];
});
promise.kill = () => {
if (zmqSocket && pending[rid]) {
queueCmd.push('KILL', rid);
_flush();
}
if (pending[rid]) client.kill(rid);
};
queueCmd.push(request.kind, rid, request.name, data);
queueId.push(rid);
_flush();
client.send(request.kind, rid, request.name, param);
return promise;
}
function _resolve(id: string | number, data: string) {
client.onData((id: string, data: Json.json) => {
const [resolve] = pending[id];
if (resolve) {
delete pending[id];
resolve(JSON.parse(data));
resolve(data);
}
}
});
function _reject(id: string | number, error: Error) {
client.onRejected((id: string, error: string) => {
const [, reject] = pending[id];
if (reject) {
delete pending[id];
reject(error);
}
}
function _cancel(ids: any[]) {
ids.forEach((rid) => _reject(rid, new Error('Canceled request')));
}
function _waiting() {
return _.find(pending, () => true) !== undefined;
}
// --------------------------------------------------------------------------
// --- Server Command Queue
// --------------------------------------------------------------------------
function _flush() {
if (!flushingTimer) {
flushingTimer = setImmediate(() => {
flushingTimer = undefined;
_send();
});
reject(new Error(error));
}
}
function _poll() {
if (!pollingTimer) {
const delay = (config && config.polling) || pollingTimeout;
pollingTimer = setTimeout(() => {
pollingTimer = undefined;
_send();
}, delay);
}
}
async function _send() {
// when busy, will be eventually re-triggered
if (!zmqIsBusy) {
const cmds = queueCmd;
if (!cmds.length) {
cmds.push('POLL');
if (!_waiting())
rqCount = 0; // No pending command nor pending response
}
zmqIsBusy = true;
const ids = queueId;
queueCmd = [];
queueId = [];
try {
await zmqSocket?.send(cmds);
const resp = await zmqSocket?.receive();
_receive(resp);
} catch (error) {
D.error(`Error in send/receive on ZMQ socket. ${error.toString()}`);
_cancel(ids);
}
zmqIsBusy = false;
STATUS.emit(status);
}
}
});
function _receive(resp: any) {
try {
let rid;
let data;
let err;
let cmd;
const shift = () => resp.shift().toString();
let unknownResponse = false;
while (resp.length && !unknownResponse) {
cmd = shift();
switch (cmd) {
case 'NONE':
break;
case 'DATA':
rid = shift();
data = shift();
_resolve(rid, data);
break;
case 'KILLED':
rid = shift();
_reject(rid, new Error('Killed'));
break;
case 'ERROR':
rid = shift();
err = shift();
_reject(rid, err);
break;
case 'REJECTED':
rid = shift();
_reject(rid, new Error('Rejected'));
break;
case 'SIGNAL':
rid = shift();
(new SIGNAL(rid)).emit();
break;
case 'WRONG':
err = shift();
D.error(`ZMQ Protocol Error: ${err}`);
break;
default:
D.error(`Unknown Response: ${cmd}`);
unknownResponse = true;
break;
}
}
} finally {
if (queueCmd.length) _flush();
else _poll();
}
}
client.onIdle(() => {
const waiting = _.find(pending, () => true) !== undefined;
if (!waiting)
rqCount = 0; // No pending command nor pending response
});
// --------------------------------------------------------------------------
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment