Очередь исходящих, запуск и останов сервера приложений

This commit is contained in:
Mikhail Chechnev 2018-11-19 21:25:40 +03:00
parent edda9c0e3b
commit f354847515
7 changed files with 237 additions and 22 deletions

View File

@ -14,7 +14,7 @@ let dbConnect = {
//Пароль пользователя БД
sPassword: "parus",
//Строка подключения к БД
sConnectString: "DEMOP_CITKSERV_WAN",
sConnectString: "DEMOP_CITKSERV",
//Наименование модуля (для сессии БД)
sSessionModuleName: "PARUS$ExchangeServer",
//Подключаемый модуль обслуживания БД (низкоуровневые функции работы с СУБД)

View File

@ -17,8 +17,3 @@ exports.SERR_MODULES_BAD_INTERFACE = "ERR_MODULES_BAD_INTERFACE"; //Ошибоч
//Типовые коды ошибок работы с объектами
exports.SERR_OBJECT_BAD_INTERFACE = "ERR_OBJECT_BAD_INTERFACE"; //Ошибочный интерфейс объекта
//Типовые коды ошибок работы с БД
exports.SERR_DB_CONNECT = "ERR_DB_CONNECT"; //Ошибка подключения к БД
exports.SERR_DB_DISCONNECT = "ERR_DB_DISCONNECT"; //Ошибка отключения от БД
exports.SERR_DB_EXECUTE = "ERR_DB_EXECUTE"; //Ошибка исполнения функции в БД

View File

@ -8,6 +8,7 @@
//----------------------
const _ = require("lodash"); //Работа с массивами и объектами
const EventEmitter = require("events"); //Обработчик пользовательских событий
const glConst = require("../core/constants.js"); //Глобальные константы
const { ServerError } = require("../core/server_errors.js"); //Типовая ошибка
const { checkModuleInterface, makeModuleFullPath, checkObject } = require("../core/utils.js"); //Вспомогательные функции
@ -21,13 +22,24 @@ const NLOG_STATE_INF = 0; //Информация
const NLOG_STATE_WRN = 1; //Предупреждение
const NLOG_STATE_ERR = 2; //Ошибка
//Типовые коды ошибок работы с БД
const SERR_DB_CONNECT = "ERR_DB_CONNECT"; //Ошибка подключения к БД
const SERR_DB_DISCONNECT = "ERR_DB_DISCONNECT"; //Ошибка отключения от БД
const SERR_DB_EXECUTE = "ERR_DB_EXECUTE"; //Ошибка исполнения функции в БД
//События модуля
const SEVT_DB_CONNECTOR_CONNECTED = "DB_CONNECTOR_CONNECTED"; //Подключено к БД
const SEVT_DB_CONNECTOR_DISCONNECTED = "DB_CONNECTOR_DISCONNECTED"; //Отключено от БД
//------------
// Тело модуля
//------------
class DBConnector {
class DBConnector extends EventEmitter {
//Конструктор
constructor(prms) {
//создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
let sCheckResult = checkObject(prms, {
fields: [
@ -88,9 +100,10 @@ class DBConnector {
try {
this.connection = await this.connector.connect(this.connectSettings);
this.bConnected = true;
this.emit(SEVT_DB_CONNECTOR_CONNECTED, this.connection);
return this.connection;
} catch (e) {
throw new ServerError(glConst.SERR_DB_CONNECT, e.message);
throw new ServerError(SERR_DB_CONNECT, e.message);
}
}
//Отключиться от БД
@ -100,9 +113,10 @@ class DBConnector {
await this.connector.disconnect({ connection: this.connection });
this.connection = {};
this.bConnected = false;
this.emit(SEVT_DB_CONNECTOR_DISCONNECTED, this.connection);
return;
} catch (e) {
throw new ServerError(glConst.SERR_DB_DISCONNECT, e.message);
throw new ServerError(SERR_DB_DISCONNECT, e.message);
}
}
}
@ -126,10 +140,10 @@ class DBConnector {
let res = await Promise.all(srvsFuncs);
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(glConst.SERR_DB_EXECUTE, "Нет подключения к БД");
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Запись в журнал работы
@ -153,7 +167,7 @@ class DBConnector {
let res = await this.connector.log(logData);
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(
@ -162,7 +176,7 @@ class DBConnector {
);
}
} else {
throw new ServerError(glConst.SERR_DB_EXECUTE, "Нет подключения к БД");
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
//Запись информации в журнал работы
@ -175,7 +189,7 @@ class DBConnector {
let res = await this.putLog(logData);
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
}
//Запись предупреждения в журнал работы
@ -188,7 +202,7 @@ class DBConnector {
let res = await this.putLog(logData);
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
}
//Запись ошибки в журнал работы
@ -201,7 +215,7 @@ class DBConnector {
let res = await this.putLog(logData);
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
}
//Считать очередную порцию исходящих сообщений
@ -220,7 +234,7 @@ class DBConnector {
});
return res;
} catch (e) {
throw new ServerError(glConst.SERR_DB_EXECUTE, e.message);
throw new ServerError(SERR_DB_EXECUTE, e.message);
}
} else {
throw new ServerError(
@ -229,7 +243,7 @@ class DBConnector {
);
}
} else {
throw new ServerError(glConst.SERR_DB_EXECUTE, "Нет подключения к БД");
throw new ServerError(SERR_DB_EXECUTE, "Нет подключения к БД");
}
}
}
@ -241,4 +255,9 @@ class DBConnector {
exports.NLOG_STATE_INF = NLOG_STATE_INF;
exports.NLOG_STATE_WRN = NLOG_STATE_WRN;
exports.NLOG_STATE_ERR = NLOG_STATE_ERR;
exports.SERR_DB_CONNECT = SERR_DB_CONNECT;
exports.SERR_DB_DISCONNECT = SERR_DB_DISCONNECT;
exports.SERR_DB_EXECUTE = SERR_DB_EXECUTE;
exports.SEVT_DB_CONNECTOR_CONNECTED = SEVT_DB_CONNECTOR_CONNECTED;
exports.SEVT_DB_CONNECTOR_DISCONNECTED = SEVT_DB_CONNECTOR_DISCONNECTED;
exports.DBConnector = DBConnector;

View File

@ -51,11 +51,14 @@ class Logger {
this.bLogDB = true;
}
}
//Удаление объекта для протоколирования в БД
removeDBConnector() {
this.dbConnector = "";
this.bLogDB = false;
}
//Протоколирование в БД
async logToDB(loggerMessage) {
//Если надо протоколировать и есть чем
console.log(this.dbConnector.bConnected);
console.log("DB LOGGER IN");
if (this.bLogDB && this.dbConnector && this.dbConnector.bConnected) {
//Если протоколируем стандартное сообщение
if (loggerMessage instanceof LoggerMessage) {
@ -85,7 +88,6 @@ class Logger {
await this.dbConnector.putLogInf(loggerMessage);
}
}
console.log("DB LOGGER OUT");
}
//Протоколирование
async log(loggerMessage) {

110
core/out_queue.js Normal file
View File

@ -0,0 +1,110 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: отработка очереди исходящих сообщений
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий
const { checkObject } = require("../core/utils.js"); //Вспомогательные функции
//--------------------------
// Глобальные идентификаторы
//--------------------------
//Типовые события
const SEVT_OUT_QUEUE_NEW = "OUT_QUEUE_NEW"; //Новое сообщение в очереди
//------------
// Тело модуля
//------------
//Класс очереди сообщений
class OutQueue extends EventEmitter {
//конструктор класса
constructor(prms, dbConn, logger) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта с параметрами очереди
let sCheckResult = checkObject(prms, {
fields: [{ sName: "nPortionSize", bRequired: true }, { sName: "nCheckTimeout", bRequired: true }]
});
//Если структура объекта в норме
if (!sCheckResult) {
//Хранилище очереди сообщений
this.queue = [];
//Признак функционирования обработчика
this.bWorking = false;
//Параметры очереди
_.extend(this, prms);
//Запомним подключение к БД
this.dbConn = dbConn;
//Запомним логгер
this.logger = logger;
} else {
throw new ServerError(
glConst.SERR_OBJECT_BAD_INTERFACE,
"Объект имеет недопустимый интерфейс: " + sCheckResult
);
}
}
//Добавление нового исходящего сообщения в очередь для отработки
addMessage(message) {
//Cоздадим новый элемент очереди
let tmp = {};
_.extend(tmp, message);
//добавим его в очередь
this.queue.push(tmp);
}
//Уведомление о получении нового сообщения
notifyNewOutMessage(message) {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_NEW, message);
}
//Перезапуск опроса очереди исходящих сообщений
restartDetectingLoop() {
if (this.bWorking)
setTimeout(() => {
this.outDetectingLoop();
}, this.nCheckTimeout);
}
//Опрос очереди исходящих сообщений
async outDetectingLoop() {
//Сходим на сервер за очередным исходящим сообщением
try {
let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nPortionSize });
if (Array.isArray(outMsgs) && outMsgs.length > 0) {
await this.logger.info("Новое исходящее сообщение: " + outMsgs.toString());
} else {
await this.logger.info("Нет новых сообщений");
}
this.restartDetectingLoop();
} catch (e) {
await this.logger.error("При получении исходящего сообщения: " + e.sCode + ": " + e.sMessage);
this.restartDetectingLoop();
}
}
//Запуск обработки очереди печати
async startProcessing() {
await this.logger.info("Запуск обработчика очереди исходящих сообщений...");
this.bWorking = true;
this.outDetectingLoop();
await this.logger.info("Обработчик очереди исходящих сообщений запущен");
}
//Остановка обработки очереди печати
async stopProcessing() {
await this.logger.info("Останов обработчика очереди исходящих сообщений...");
this.bWorking = false;
await this.logger.info("Обработчик очереди исходящих сообщений остановлен");
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.SEVT_OUT_QUEUE_NEW = SEVT_OUT_QUEUE_NEW;
exports.OutQueue = OutQueue;

View File

@ -2,3 +2,91 @@
Сервис интеграции ПП Парус 8 с WEB API
Точка входа в сервер приложений
*/
//----------------------
// Подключение библиотек
//----------------------
require("module-alias/register");
const cfg = require("./config.js");
const lg = require("./core/logger.js");
const db = require("./core/db_connector.js");
const oq = require("./core/out_queue.js");
//--------------------------
// Глобальные идентификаторы
//--------------------------
let dbConn = new db.DBConnector(cfg.dbConnect); //Взаимодействие с БД
let logger = new lg.Logger(); //Протоколирование работы
let outQ = new oq.OutQueue(cfg.outgoing, dbConn, logger); //Отслеживание очереди исходящих
//----------------------------------------
// Управление процессом сервера приложений
//----------------------------------------
//При подключении к БД
const onDBConnected = async connection => {
logger.setDBConnector(dbConn);
await logger.info("Сервер приложений подключен к БД");
};
//При отключении от БД
const onDBDisconnected = async () => {
logger.removeDBConnector();
await logger.info("Сервер приложений отключен от БД");
};
//Запуск сервера
const run = async () => {
await logger.info("Запуск сервера приложений...");
dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, onDBConnected);
dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, onDBDisconnected);
await logger.info("Подключение сервера приложений к БД...");
try {
await dbConn.connect();
} catch (e) {
await logger.error("Ошибка подключения к БД: " + e.sCODE + ": " + e.sMessage);
stop();
return;
}
await outQ.startProcessing();
await logger.info("Сервер приложений запущен");
};
//Останов сервера
const stop = async () => {
await logger.warn("Останов сервера приложений...");
outQ.stopProcessing();
if (dbConn.bConnected) {
await logger.warn("Отключение сервера приложений от БД...");
try {
await dbConn.disconnect();
process.exit(0);
} catch (e) {
await logger.error("Ошибка отключения от БД: " + e.sCODE + ": " + e.sMessage);
process.exit(1);
}
} else {
process.exit(0);
}
};
//Обработка события "выход" жизненного цикла процесса
process.on("exit", code => {
//Сообщим о завершении процесса
logger.warn("Сервер приложений остановлен (код: " + code + ") ");
});
//Перехват CTRL + C (останова процесса)
process.on("SIGINT", () => {
//Инициируем выход из процесса
stop();
});
//------------
// Точка входа
//------------
//Старутем
run();

View File

@ -25,9 +25,10 @@ const tests = async () => {
try {
//await l.warn("CONNECTING...");
await a.connect();
for (i = 0; i <= 1000; i++) await l.info(i);
//await l.info("CONNECTED!");
//await l.warn("READING SERVICES...");
let srv = await a.getServices();
//let srv = await a.getServices();
//await l.info(srv);
console.log("1");
await l.warn("DISCONNECTING...");