Внедрение автономного обработчика исходящих сообщений очереди

This commit is contained in:
Mikhail Chechnev 2018-12-07 13:55:25 +03:00
parent f6bd95f702
commit b09ca9e7d4
8 changed files with 659 additions and 1239 deletions

View File

@ -9,7 +9,7 @@
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const oq = require("./out_queue2"); //Прослушивание очереди исходящих сообщений
const oq = require("./out_queue"); //Прослушивание очереди исходящих сообщений
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { validateObject } = require("./utils"); //Вспомогательные функции
const { SERR_COMMON, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы

View File

@ -11,9 +11,9 @@ const _ = require("lodash"); //Работа с массивами и колле
const EventEmitter = require("events"); //Обработчик пользовательских событий
const ChildProcess = require("child_process"); //Работа с дочерними процессами
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
@ -60,6 +60,8 @@ class OutQueue extends EventEmitter {
this.dbConn = prms.dbConn;
//Запомним логгер
this.logger = prms.logger;
//Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this);
} else {
@ -76,119 +78,106 @@ class OutQueue extends EventEmitter {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_STOPPED);
}
//Установка финальных статусов сообщения в БД
async finalise(prms) {
//Проверяем структуру переданного объекта для старта
//Добавление идентификатора позиции очереди в список обрабатываемых
addInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.finalise,
"Параметры функции установки финальных статусов сообщения в БД"
prmsOutQueueSchema.addInProgress,
"Параметры функции добавления идентификатора позиции очереди в список обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Если больше нет попыток исполнения и сообщение не в статусе успешной обработки сервером БД
if (
prms.queue.nExecState != objQueueSchema.NQUEUE_EXEC_STATE_DB_OK &&
prms.queue.nExecCnt >= prms.queue.nRetryAttempts
) {
//То считаем, что оно выполнено с ошибками и больше пытаться не надо
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: prms.queue.sExecMsg,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
} else {
//Если сообщение успешно исполнено сервером БД - то значит оно успешно исполнено вообще
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_OK) {
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} else {
//Если сообщение в статусе исполнения сервером приложений (чего здесь быть не может) - это ошибка
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP) {
//То выставим ему ошибку исполнения сервером приложений
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: prms.queue.sExecMsg,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
});
} else {
//Если сообщение в статусе исполнения сервером БД (чего здесь быть не может) - это ошибка
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB) {
//То выставим ему ошибку исполнения сервером БД
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: prms.queue.sExecMsg,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
});
}
}
}
//Проверим, что такого идентификатора ещё нет в списке обрабатываемых
const i = this.inProgress.indexOf(prms.nQueueId);
//Если нет - добавим
if (i === -1) this.inProgress.push(prms.nQueueId);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Удаление идентификатора позиции очереди из списка обрабатываемых
rmInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.rmInProgress,
"Параметры функции удаления идентификатора позиции очереди из списка обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Если удаляемый идентификатор есть в списке
const i = this.inProgress.indexOf(prms.nQueueId);
//Удалим его
if (i > -1) {
this.inProgress.splice(i, 1);
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Запуск обработки сообщения сервером БД
async dbProcess(prms) {
//Проверяем структуру переданного объекта для старта
//Проверка наличия идентификатора позиции очереди в списке обрабатываемых
isInProgress(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.dbProcess,
"Параметры функции запуска обработки ообщения сервером БД"
prmsOutQueueSchema.isInProgress,
"Параметры функции проверки наличия идентификатора позиции очереди в списке обрабатываемых"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Буфер для текущего состояния сообщения
let curQueue = null;
//Обрабатываем
try {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
curQueue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Вызов обработчика БД
curQueue = await this.dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
//Фиксируем успешное исполнение сервером БД - в статусе сообщения
curQueue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
});
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await this.logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
curQueue = await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await this.logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`,
{ nQueueId: prms.queue.nId }
);
}
//Вернём текущее состоянии сообщения очереди
return curQueue;
//Проверим наличие идентификатора в списке
return !(this.inProgress.indexOf(prms.nQueueId) === -1);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Старт обработчика
startQueueProcessor(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.startQueueProcessor,
"Параметры функции запуска обработчика сообщения очереди"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Добавляем идентификатор позиции очереди в список обрабатываемых
this.addInProgress({ nQueueId: prms.nQueueId });
//Отдаём команду дочернему процессу обработчика на старт исполнения
prms.proc.send({
nQueueId: prms.nQueueId,
connectSettings: this.dbConn.connectSettings
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Останов обработчика
stopQueueProcessor(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.stopQueueProcessor,
"Параметры функции останова обработчика сообщения очереди"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Удаляем идентификатор позиции очереди из списка обрабатываемых
this.rmInProgress({ nQueueId: prms.nQueueId });
//Завершаем дочерний процесс обработчика
prms.proc.kill();
//Увеличиваем количество доступных обработчиков
this.nWorkersLeft++;
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Запуск обработки очередного сообщения
async processMessage(prms) {
//Проверяем структуру переданного объекта для старта
processMessage(prms) {
//Проверяем структуру переданного объекта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.processMessage,
@ -204,30 +193,6 @@ class OutQueue extends EventEmitter {
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
//Текущее состояние сообщения
let curQueue = null;
//Скажем что начали обработку
await self.logger.info(
`Обрабатываю исходящее сообщение: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Установим его статус в БД - обрабатывается сервером приложений (только для новых или повторно обрабатываемых сервером приложений)
if (
prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE ||
prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
) {
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
}
//Установим его статус в БД - обрабатывается в БД (только если сюда пришло сообщение на повторную обработку сервером БД)
if (prms.queue.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) {
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
}
//Перехват сообщений обработчика
proc.on("message", async result => {
//Проверяем структуру полученного сообщения
@ -238,168 +203,81 @@ class OutQueue extends EventEmitter {
);
//Если структура сообщения в норме
if (!sCheckResult) {
//Движение события по статусам в зависимости от того в каком состоянии его вернул обработчик
try {
//Работаем от статуса сообщения полученного от обработчика
switch (result.nExecState) {
//Ошибка обработки
case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: {
//Установим ошибочный статус в БД для сообщений и увеличим счетчик попыток отправки
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: result.sExecMsg,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: result.nExecState
});
//Фиксируем ошибку в протоколе работы сервиса
await self.logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${
result.sExecMsg
}`,
{ nQueueId: prms.queue.nId }
);
break;
}
//Успех обработки
case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: {
//Если состояние менялось (а не просто повторная отработка)
if (result.nExecState != prms.queue.nExecState) {
//Пишем в базу успех отработки сервером приложений - результаты обработки
curQueue = await self.dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId,
blMsg: result.blMsg ? new Buffer(result.blMsg) : null,
blResp: result.blResp ? new Buffer(result.blResp) : null
});
//Пишем в базу успех отработки сервером приложений - статус сообщения
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: result.nExecState
});
//Пишем в базу успех отработки сервером приложений - запись в протокол работы сервера приложений
await self.logger.info(
`Исходящее сообщение ${
prms.queue.nId
} успешно отработано сервером приложений`,
{ nQueueId: prms.queue.nId }
);
}
//Запускаем обработку сервером БД
curQueue = await self.dbProcess(prms);
break;
}
//Обработчик ничего не делал
default: {
//Обработчик ничего не делал, но если сообщение сообщение в статусе - ошибка обработки сервером БД или обрабатывается сервером БД, то запустим обработчик БД
if (result.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR) {
//Запускаем обработчик сервера БД
curQueue = await self.dbProcess(prms);
} else {
//Во всех остальных случаях - ничего не делаем вообще
curQueue = prms.queue;
}
break;
}
}
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - статус сообщения
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
if (result.sResult == "ERR") {
//Фиксируем ошибку обработки - протокол работы сервиса
await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: result.sMsg,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await self.logger.error(sErr, { nQueueId: prms.queue.nId });
}
} else {
//Пришел неожиданный ответ обработчика - статус сообщения
curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sCheckResult,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
//Пришел неожиданный ответ обработчика - запись в протокол работы сервиса
await self.logger.error(
`Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`,
{ nQueueId: prms.queue.nId }
);
}
//Выставляем финальные статусы
try {
await self.finalise({ queue: curQueue });
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Установим его статус в БД - ошибка установки финального статуса
await self.dbConn.setQueueState({
//Фиксируем ошибку обработки - статус сообщения
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, {
nQueueId: prms.queue.nId
sExecMsg: `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
//Останавливаем обработчик и инкрементируем флаг их доступного количества
proc.kill();
this.nWorkersLeft++;
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(
`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
}
});
//Перехват ошибок обработчика
proc.on("error", async e => {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Установим его статус в БД - ошибка обработки сервером приложений
let curQueue = await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Так же фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения сервером приложений: ${sErr}`, {
//Фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
//Выставляем финальные статусы
try {
await self.finalise({ queue: curQueue });
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Установим его статус в БД - ошибка установки финального статуса
await self.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
await self.logger.error(`Фатальная ошибка обработчика сообщения ${prms.queue.nId}: ${sErr}`, {
nQueueId: prms.queue.nId
});
}
//Фиксируем ошибку обработки - статус сообщения
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Останавливаем обработчик и инкрементируем флаг их доступного количества
proc.kill();
this.nWorkersLeft++;
try {
this.stopQueueProcessor({ nQueueId: prms.queue.nId, proc });
} catch (e) {
//Отразим в протоколе ошибку останова
await self.logger.error(
`Ошибка останова обработчика исходящего сообщения: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
}
});
//Перехват останова обработчика
proc.on("exit", code => {});
//Запускаем обработчик
proc.send({
nQueueId: prms.queue.nId,
nExecState: prms.queue.nExecState,
blMsg: prms.queue.blMsg,
blResp: prms.queue.blResp,
service: _.find(this.services, { nId: prms.queue.nServiceId }),
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
})
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
this.startQueueProcessor({ nQueueId: prms.queue.nId, proc });
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -427,38 +305,22 @@ class OutQueue extends EventEmitter {
//Обходим их
for (let i = 0; i < outMsgs.length; i++) {
//И запускаем обработчики
try {
await this.processMessage({ queue: outMsgs[i] });
} catch (e) {
//Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
let curQueue = await this.dbConn.setQueueState({
nQueueId: outMsgs[i].nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(sErr, { nQueueId: outMsgs[i].nId });
//Выставляем финальные статусы
if (!this.isInProgress({ nQueueId: outMsgs[i].nId })) {
try {
await this.finalise({ queue: curQueue });
this.processMessage({ queue: outMsgs[i] });
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Установим его статус в БД - ошибка установки финального статуса
await self.dbConn.setQueueState({
//Фиксируем ошибку обработки сервером приложений - статус сообщения
await this.dbConn.setQueueState({
nQueueId: outMsgs[i].nId,
sExecMsg: sErr,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
outMsgs[i].nExecCnt + 1 < outMsgs[i].nRetryAttempts
? outMsgs[i].nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
await self.logger.error(
`Фатальная ошибка обработчика сообщения ${outMsgs[i].nId}: ${sErr}`,
{ nQueueId: outMsgs[i].nId }
);
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(makeErrorText(e), { nQueueId: outMsgs[i].nId });
}
}
}
@ -466,11 +328,8 @@ class OutQueue extends EventEmitter {
//Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений
await this.restartDetectingLoop();
} catch (e) {
//Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку в протоколе работы сервера приложений
await this.logger.error(sErr);
await this.logger.error(makeErrorText(e));
await this.restartDetectingLoop();
}
} else {

View File

@ -1,291 +0,0 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: отработка очереди исходящих сообщений
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const EventEmitter = require("events"); //Обработчик пользовательских событий
const ChildProcess = require("child_process"); //Работа с дочерними процессами
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_UNEXPECTED, SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
//--------------------------
// Глобальные идентификаторы
//--------------------------
//Типовые события
const SEVT_OUT_QUEUE_STARTED = "OUT_QUEUE_STARTED"; //Обработчик очереди запущен
const SEVT_OUT_QUEUE_STOPPED = "OUT_QUEUE_STOPPED"; //Обработчик очереди остановлен
//Время отложенного старта опроса очереди (мс)
const NDETECTING_LOOP_DELAY = 3000;
//Интервал проверки завершения обработчиков (мс)
const NWORKERS_WAIT_INTERVAL = 1000;
//------------
// Тело модуля
//------------
//Класс очереди сообщений
class OutQueue extends EventEmitter {
//Конструктор класса
constructor(prms) {
//Создадим экземпляр родительского класса
super();
//Проверяем структуру переданного объекта для подключения
let sCheckResult = validateObject(prms, prmsOutQueueSchema.OutQueue, "Параметры конструктора класса OutQueue");
//Если структура объекта в норме
if (!sCheckResult) {
//Список обслуживаемых сервисов
this.services = null;
//Признак функционирования обработчика
this.bWorking = false;
//Параметры очереди
this.outGoing = _.cloneDeep(prms.outGoing);
//Количество доступных обработчиков
this.nWorkersLeft = this.outGoing.nMaxWorkers;
//Идентификатор таймера проверки очереди
this.nDetectingLoopTimeOut = null;
//Запомним подключение к БД
this.dbConn = prms.dbConn;
//Запомним логгер
this.logger = prms.logger;
//Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.outDetectingLoop = this.outDetectingLoop.bind(this);
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Уведомление об остановке обработчика очереди
notifyStarted() {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_STARTED);
}
//Уведомление об остановке обработчика очереди
notifyStopped() {
//оповестим подписчиков о появлении нового отчета
this.emit(SEVT_OUT_QUEUE_STOPPED);
}
//Добавление идентификатора позиции очереди в список обрабатываемых
addInProgress(nId) {
const i = this.inProgress.indexOf(nId);
if (i === -1) this.inProgress.push(nId);
}
//Удаление идентификатора позиции очереди из списка обрабатываемых
rmInProgress(nId) {
const i = this.inProgress.indexOf(nId);
if (i > -1) {
this.inProgress.splice(i, 1);
}
}
//Проверка наличия идентификатора позиции очереди в списке обрабатываемых
isInProgress(nId) {
return !(this.inProgress.indexOf(nId) === -1);
}
//Запуск обработки очередного сообщения
processMessage(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.processMessage,
"Параметры функции запуска обработки очередного сообщения"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Проверим, что есть доступные обработчики
if (this.nWorkersLeft > 0) {
//Переопределим себя для обращения внутри обработчиков событий
const self = this;
//Создаём новый обработчик сообщений
const proc = ChildProcess.fork("core/out_queue_processor2", { silent: false });
//Текущее состояние сообщения
let curQueue = null;
//Перехват сообщений обработчика
proc.on("message", async result => {
//Проверяем структуру полученного сообщения
/*
let sCheckResult = validateObject(
result,
objOutQueueProcessorSchema.OutQueueProcessorTaskResult,
"Ответ обработчика очереди исходящих сообщений"
);
//Если структура сообщения в норме
if (!sCheckResult) {
*/
/*
} else {
//Пришел неожиданный ответ обработчика - запись в протокол работы сервера приложений
await self.logger.error(
`Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`,
{ nQueueId: prms.queue.nId }
);
}
*/
if (result.sExecResult == "ERR") {
await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sExecMsg}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: result.sExecMsg,
nIncExecCnt: NINC_EXEC_CNT_YES
});
}
//Останавливаем обработчик и инкрементируем флаг их доступного количества
this.rmInProgress(prms.queue.nId);
proc.kill();
this.nWorkersLeft++;
});
//Перехват ошибок обработчика
proc.on("error", async e => {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения: ${sErr}`, {
nQueueId: prms.queue.nId
});
//Фиксируем ошибку обработки - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Останавливаем обработчик и инкрементируем флаг их доступного количества
this.rmInProgress(prms.queue.nId);
proc.kill();
this.nWorkersLeft++;
});
//Перехват останова обработчика
proc.on("exit", code => {});
//Запускаем обработчик
this.addInProgress(prms.queue.nId);
proc.send({
nQueueId: prms.queue.nId,
connectSettings: self.dbConn.connectSettings
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
}
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Перезапуск опроса очереди исходящих сообщений
async restartDetectingLoop() {
//Включаем опрос очереди только если установлен флаг работы
if (this.bWorking) {
this.nDetectingLoopTimeOut = await setTimeout(async () => {
await this.outDetectingLoop();
}, this.outGoing.nCheckTimeout);
}
}
//Опрос очереди исходящих сообщений
async outDetectingLoop() {
//Если есть свободные обработчики
if (this.nWorkersLeft > 0) {
//Сходим на сервер за очередным исходящим сообщением
try {
//Заберем столько сообщений, сколько можем обработать одновременно
let outMsgs = await this.dbConn.getOutgoing({ nPortionSize: this.nWorkersLeft });
//Если есть сообщения
if (Array.isArray(outMsgs) && outMsgs.length > 0) {
//Обходим их
for (let i = 0; i < outMsgs.length; i++) {
//И запускаем обработчики
if (!this.isInProgress(outMsgs[i].nId)) {
try {
this.processMessage({ queue: outMsgs[i] });
} catch (e) {
//Какие непредвиденные ошибки при обработке текущего сообщения - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - статус сообщения (сам статус - не меняем, здесь только фатальные ошибки, но делаем инкремент количества попыток)
await this.dbConn.setQueueState({
nQueueId: outMsgs[i].nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES
});
//Фиксируем ошибку обработки сервером приложений - запись в протокол работы сервера приложений
await this.logger.error(sErr, { nQueueId: outMsgs[i].nId });
}
}
}
}
//Запустили отработку всех считанных - перезапускаем цикл опроса исходящих сообщений
await this.restartDetectingLoop();
} catch (e) {
//Какие непредвиденные ошибки при получении списка сообщений - подготовим текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку в протоколе работы сервера приложений
await this.logger.error(sErr);
await this.restartDetectingLoop();
}
} else {
//Нет свободных обработчиков - ждём и перезапускаем цикл опроса
await this.restartDetectingLoop();
}
}
//Запуск обработки очереди исходящих сообщений
startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueSchema.startProcessing,
"Параметры функции запуска обработки очереди исходящих сообщений"
);
//Если структура объекта в норме
if (!sCheckResult) {
//Выставляем флаг работы
this.bWorking = true;
//запоминаем список обслуживаемых сервисов
this.services = prms.services;
//Начинаем слушать очередь исходящих
setTimeout(this.outDetectingLoop, NDETECTING_LOOP_DELAY);
//И оповещаем всех что запустились
this.notifyStarted();
} else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Остановка обработки очереди исходящих сообщений
stopProcessing() {
//Выставляем флаг неработы
this.bWorking = false;
//Останавливаем опрос очереди
if (this.nDetectingLoopTimeOut) {
clearTimeout(this.nDetectingLoopTimeOut);
this.nDetectingLoopTimeOut = null;
}
//Ждем завершения работы всех обработчиков
let i = setInterval(() => {
if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) {
clearInterval(i);
this.notifyStopped();
}
}, NWORKERS_WAIT_INTERVAL);
}
}
//-----------------
// Интерфейс модуля
//-----------------
exports.SEVT_OUT_QUEUE_STARTED = SEVT_OUT_QUEUE_STARTED;
exports.SEVT_OUT_QUEUE_STOPPED = SEVT_OUT_QUEUE_STOPPED;
exports.OutQueue = OutQueue;

View File

@ -7,8 +7,10 @@
// Подключение библиотек
//----------------------
const _ = require("lodash"); //Работа с массивами и коллекциями
const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции
require("module-alias/register"); //Поддержка псевонимов при подключении модулей
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const { makeErrorText, makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции
const { ServerError } = require("./server_errors"); //Типовая ошибка
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
@ -19,8 +21,14 @@ const {
SERR_OBJECT_BAD_INTERFACE,
SERR_MODULES_NO_MODULE_SPECIFIED
} = require("./constants"); //Глобальные константы
//!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!!!!!!!
const fs = require("fs");
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
//--------------------------
// Глобальные идентификаторы
//--------------------------
let dbConn = null; //Подключение к БД
let logger = null; //Протоколирование работы
//------------
// Тело модуля
@ -28,84 +36,162 @@ const fs = require("fs");
//Отправка родительскому процессу ошибки обработки сообщения сервером приложений
const sendErrorResult = prms => {
//Проверяем параметры
//Проверяем структуру переданного сообщения
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.sendErrorResult,
"Параметры функции отправки ошибки обработки"
"Параметры функции отправки родительскому процессу ошибки обработки сообщения"
);
//Если параметры в норме
//Если структура объекта в норме
if (!sCheckResult) {
//Отправляем родительскому процессу ошибку
process.send({
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR,
sExecMsg: prms.sMessage,
blMsg: null,
blResp: null
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
sMsg: prms.sMessage
});
} else {
//Отправляем родительскому процессу сведения об ошибочных параметрах
process.send({
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR,
sExecMsg: sCheckResult,
blMsg: null,
blResp: null
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
sMsg: sCheckResult
});
}
};
//Отправка родительскому процессу успеха обработки сообщения сервером приложений
const sendOKResult = prms => {
//Проверяем параметры
const sendOKResult = () => {
process.send({
sResult: objOutQueueProcessorSchema.STASK_RESULT_OK,
sMsg: null
});
};
//Запуск обработки сообщения сервером приложений
const appProcess = async prms => {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.sendOKResult,
"Параметры функции отправки ответа об успехе обработки"
prmsOutQueueProcessorSchema.appProcess,
"Параметры функции запуска обработки ообщения сервером приложений"
);
//Если параметры в норме
//Если структура объекта в норме
if (!sCheckResult) {
//Отправляем родительскому процессу успех
process.send({
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK,
sExecMsg: null,
blMsg: prms.blMsg,
blResp: prms.blResp
});
//Обработанное сообщение
let newQueue = null;
//Обрабатываем
try {
//Фиксируем начало исполнения сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
if (prms.queue.blMsg) {
let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId;
//Фиксируем успех исполнения
newQueue = await dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId,
blMsg: new Buffer(sMsg),
blResp: new Buffer("REPLAY ON " + prms.queue.nId)
});
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, {
nQueueId: prms.queue.nId
});
} else {
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки`
);
}
} catch (e) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
}
//Возвращаем результат
return newQueue;
} else {
//Отправляем родительскому процессу сведения об ошибочных параметрах
process.send({
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR,
sExecMsg: sCheckResult,
blMsg: null,
blResp: null
});
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
};
//Отправка родительскому процессу сообщения без обработки
const sendUnChange = prms => {
//Проверяем параметры
//Запуск обработки сообщения сервером БД
const dbProcess = async prms => {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.sendUnChange,
"Параметры функции отправки сообщения без обработки"
prmsOutQueueProcessorSchema.dbProcess,
"Параметры функции запуска обработки ообщения сервером БД"
);
//Если параметры в норме
//Если структура объекта в норме
if (!sCheckResult) {
process.send({
nExecState: prms.task.nExecState,
sExecMsg: null,
blMsg: prms.task.blMsg ? new Buffer(prms.task.blMsg) : null,
blResp: prms.task.blResp ? new Buffer(prms.task.blResp) : null
});
//Обрабатываем
try {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Вызов обработчика БД
await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
//Фиксируем успешное исполнение сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
} catch (e) {
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
}
} else {
//Отправляем родительскому процессу сведения об ошибочных параметрах
process.send({
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR,
sExecMsg: sCheckResult,
blMsg: null,
blResp: null
});
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
};
@ -119,38 +205,248 @@ const processTask = async prms => {
);
//Если параметры в норме
if (!sCheckResult) {
//Обработке подлежат только необработанные сервером приложений сообщения
if (
prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE ||
prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
) {
setTimeout(() => {
//if (prms.task.nQueueId == 2) {
// sendErrorResult({ sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId });
//} else {
if (prms.task.blMsg) {
let b = new Buffer(prms.task.blMsg);
fs.writeFile("c:/repos/temp/" + prms.task.nQueueId, b, err => {
if (err) {
sendErrorResult({ sMessage: err.message });
} else {
let sMsg = b.toString() + " MODIFICATION FOR " + prms.task.nQueueId;
sendOKResult({
blMsg: new Buffer(sMsg),
blResp: new Buffer("REPLAY ON " + prms.task.nQueueId)
});
let q = null;
try {
//Создаём подключение к БД
dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings });
//Создаём логгер для протоколирования работы
logger = new lg.Logger();
//Подключим логгер к БД (и отключим когда надо)
dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => {
logger.setDBConnector(dbConn, true);
});
dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => {
logger.removeDBConnector();
});
//Подключаемся к БД
await dbConn.connect();
//Считываем запись очереди
q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId });
//Далее работаем от статуса считанной записи
switch (q.nExecState) {
//Поставлено в очередь
case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: {
//Запускаем обработку сервером приложений
try {
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
try {
await dbProcess({ queue: res });
} catch (e) {
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: res.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
res.nExecCnt + 1 < res.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText(e)}`,
{ nQueueId: res.nId }
);
}
}
});
} else {
sendErrorResult({
sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId + ": нет данных для обработки"
});
} catch (e) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
q.nExecCnt + 1 < q.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`,
{ nQueueId: q.nId }
);
}
break;
}
//}
}, 3000);
} else {
//Остальные возвращаем без изменения и отработки, с сохранением статусов и сообщений
sendUnChange(prms);
//Обрабатывается сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP: {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, {
nQueueId: q.nId
});
break;
}
//Ошибка обработки сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером приложений
try {
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
try {
await dbProcess({ queue: res });
} catch (e) {
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: res.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
res.nExecCnt + 1 < res.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${res.nId} сервером БД: ${makeErrorText(
e
)}`,
{ nQueueId: res.nId }
);
}
}
} catch (e) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
q.nExecCnt + 1 < q.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${q.nId} сервером приложений: ${makeErrorText(
e
)}`,
{ nQueueId: q.nId }
);
}
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: {
//Запускаем обработку в БД
try {
await dbProcess({ queue: q });
} catch (e) {
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
q.nExecCnt + 1 < q.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${q.nId} сервером БД: ${makeErrorText(e)}`,
{ nQueueId: q.nId }
);
}
break;
}
//Обрабатывается в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB: {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, {
nQueueId: q.nId
});
break;
}
//Ошибка обработки в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером БД
try {
await dbProcess({ queue: q });
} catch (e) {
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
q.nExecCnt + 1 < q.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${q.nId} сервером БД: ${makeErrorText(e)}`,
{ nQueueId: q.nId }
);
}
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: {
//Финализируем
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: null,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
break;
}
//Обработано с ошибками
case objQueueSchema.NQUEUE_EXEC_STATE_ERR: {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, {
nQueueId: q.nId
});
break;
}
//Обработано успешно
case objQueueSchema.NQUEUE_EXEC_STATE_OK: {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, {
nQueueId: q.nId
});
break;
}
default: {
//Ничего не делаем
break;
}
}
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем успех
sendOKResult();
} catch (e) {
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем ошибку
sendErrorResult({ sMessage: makeErrorText(e) });
}
} else {
sendErrorResult({ sMessage: sCheckResult });
@ -173,7 +469,7 @@ process.on("SIGTERM", () => {});
//Перехват ошибок
process.on("uncaughtException", e => {
//Отправляем ошибку родительскому процессу
sendErrorResult({ sMessage: e.message });
sendErrorResult({ sMessage: makeErrorText(e) });
});
//Приём сообщений

View File

@ -1,381 +0,0 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модуль ядра: обработчик исходящего сообщения
*/
//----------------------
// Подключение библиотек
//----------------------
require("module-alias/register"); //Поддержка псевонимов при подключении модулей
const _ = require("lodash"); //Работа с массивами и коллекциями
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции
const { ServerError } = require("./server_errors"); //Типовая ошибка
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const {
SERR_UNEXPECTED,
SERR_MODULES_BAD_INTERFACE,
SERR_OBJECT_BAD_INTERFACE,
SERR_MODULES_NO_MODULE_SPECIFIED
} = require("./constants"); //Глобальные константы
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
//----------
// Константы
//----------
//--------------------------
// Глобальные идентификаторы
//--------------------------
let dbConn = null; //Подключение к БД
let logger = null; //Протоколирование работы
//------------
// Тело модуля
//------------
//Отправка родительскому процессу ошибки обработки сообщения сервером приложений
const sendErrorResult = sMessage => {
process.send({
sExecResult: "ERR",
sExecMsg: sMessage
});
};
//Отправка родительскому процессу успеха обработки сообщения сервером приложений
const sendOKResult = () => {
process.send({
sExecResult: "OK",
sExecMsg: null
});
};
//Запись в файл !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!!
const writeToFile = queue => {
return new Promise((resolve, reject) => {
const fs = require("fs");
fs.writeFile("c:/repos/temp/" + queue.nId, queue.blMsg, err => {
if (err) {
reject(new ServerError(SERR_UNEXPECTED, `Ошибка отработки сообщения ${prms.queue.nId}`));
} else {
resolve();
}
});
});
};
//Запуск обработки сообщения сервером приложений
const appProcess = async prms => {
//Обработанное сообщение
let newQueue = null;
//Обрабатываем
try {
//Фиксируем начало исполнения сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
if (prms.queue.blMsg) {
await writeToFile(prms.queue);
let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId;
//Фиксируем успех исполнения
newQueue = await dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId,
blMsg: new Buffer(sMsg),
blResp: new Buffer("REPLAY ON " + prms.queue.nId)
});
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, {
nQueueId: prms.queue.nId
});
} else {
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки`
);
}
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${sErr}`, {
nQueueId: prms.queue.nId
});
}
//Возвращаем результат
return newQueue;
};
//Запуск обработки сообщения сервером БД
const dbProcess = async prms => {
//Проверяем структуру переданного объекта для старта
//let sCheckResult = validateObject(
// prms,
// prmsOutQueueSchema.dbProcess,
// "Параметры функции запуска обработки ообщения сервером БД"
//);
//Если структура объекта в норме
//if (!sCheckResult) {
//Обрабатываем
try {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Скажем что начали обработку
await logger.info(
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Вызов обработчика БД
await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
//Фиксируем успешное исполнение сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
} catch (e) {
//Сформируем текст ошибки
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
await dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: sErr,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${sErr}`, {
nQueueId: prms.queue.nId
});
}
//} else {
// throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//}
};
//Протоколирование предупреждения о ненадлежащем статусе сообщения
const warnBadStateForProcess = async prms => {
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
await logger.warn(`Cообщение ${prms.queue.nId} в статусе ${prms.queue.sExecState} попало в очередь обработчика`, {
nQueueId: prms.queue.nId
});
};
//Обработка задачи
const processTask = async prms => {
//Проверяем параметры
/*
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.processTask,
"Параметры функции обработки задачи"
);
*/
//Если параметры в норме
//if (!sCheckResult) {
let q = null;
try {
//Создаём подключение к БД
dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings });
//Создаём логгер для протоколирования работы
logger = new lg.Logger();
//Подключим логгер к БД (и отключим когда надо)
dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => {
logger.setDBConnector(dbConn, true);
});
dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => {
logger.removeDBConnector();
});
//Подключаемся к БД
await dbConn.connect();
//Считываем запись очереди
q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId });
//Далее работаем от статуса считанной записи
switch (q.nExecState) {
//Поставлено в очередь
case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: {
//Запускаем обработку сервером приложений
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res });
break;
}
//Обрабатывается сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Ошибка обработки сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером приложений
let res = await appProcess({ queue: q });
//И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) await dbProcess({ queue: res });
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано сервером приложений
case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK: {
//Запускаем обработку в БД
await dbProcess({ queue: q });
break;
}
//Обрабатывается в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Ошибка обработки в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: {
//Если ещё есть попытки отработки
if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером БД
await dbProcess({ queue: q });
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: q.sExecMsg,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
break;
}
//Успешно обработано в БД
case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: {
//Финализируем
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: null,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
break;
}
//Обработано с ошибками
case objQueueSchema.NQUEUE_EXEC_STATE_ERR: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
//Обработано успешно
case objQueueSchema.NQUEUE_EXEC_STATE_OK: {
//Ничего не делаем, но предупредим что так быть не должно
await warnBadStateForProcess({ queue: q });
break;
}
default: {
//Ничего не делаем
break;
}
}
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем успех
sendOKResult();
} catch (e) {
//Отключаемся от БД
if (dbConn) await dbConn.disconnect();
//Отправляем ошибку
let sErr = `${SERR_UNEXPECTED}: ${e.message}`;
if (e instanceof ServerError) sErr = `${e.sCode}: ${e.sMessage}`;
sendErrorResult(sErr);
}
//Отправим родителю информацию о том, что закончили обработку
//} else {
// sendErrorResult({ sMessage: sCheckResult });
//}
};
//---------------------------------
// Управление процессом обработчика
//---------------------------------
//Перехват CTRL + C (останов процесса)
process.on("SIGINT", () => {});
//Перехват CTRL + \ (останов процесса)
process.on("SIGQUIT", () => {});
//Перехват мягкого останова процесса
process.on("SIGTERM", () => {});
//Перехват ошибок
process.on("uncaughtException", e => {
//Отправляем ошибку родительскому процессу
sendErrorResult(e.message);
});
//Приём сообщений
process.on("message", task => {
//Проверяем структуру переданного сообщения
/*
let sCheckResult = validateObject(
task,
objOutQueueProcessorSchema.OutQueueProcessorTask,
"Задача обработчика очереди исходящих сообщений"
);
*/
//Если структура объекта в норме
//if (!sCheckResult) {
//Запускаем обработку
processTask({ task });
//} else {
// throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//}
});

View File

@ -8,50 +8,24 @@
//----------------------
const Schema = require("validate"); //Схемы валидации
const { Service } = require("./obj_service"); //Схема валидации сервиса
const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса
const {
NQUEUE_EXEC_STATE_INQUEUE,
NQUEUE_EXEC_STATE_APP,
NQUEUE_EXEC_STATE_APP_OK,
NQUEUE_EXEC_STATE_APP_ERR,
NQUEUE_EXEC_STATE_DB,
NQUEUE_EXEC_STATE_DB_OK,
NQUEUE_EXEC_STATE_DB_ERR,
NQUEUE_EXEC_STATE_OK,
NQUEUE_EXEC_STATE_ERR
} = require("./obj_queue"); //Схема валидации сообщения очереди обмена
const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений
//------------
// Тело модуля
//------------
//----------
// Константы
//----------
//Валидация данных сообщения очереди
const validateBuffer = val => {
//Либо null
if (val === null) {
return true;
} else {
//Либо данные для формирования Buffer
const s = new Schema({
type: {
type: String,
required: true
},
data: {
type: Array,
required: true
}
});
const errs = s.validate(val, { strip: false });
return errs.length == 0;
}
};
//Состояния обработки сообщений очереди обмена
const STASK_RESULT_OK = "OK"; //Обработано успешно
const STASK_RESULT_ERR = "ERR"; //Обработано с ошибками
//------------------
// Интерфейс модуля
//------------------
//Константы
exports.STASK_RESULT_OK = STASK_RESULT_OK;
exports.STASK_RESULT_ERR = STASK_RESULT_ERR;
//Схема валидации задачи обработчику очереди исходящих сообщений
exports.OutQueueProcessorTask = new Schema({
//Идентификатор записи журнала обмена для обработки
@ -64,121 +38,39 @@ exports.OutQueueProcessorTask = new Schema({
required: path => `Не указан идентификатор записи журнала обмена для обработки (${path})`
}
},
//Состояние обработки сообщения очереди обмена
nExecState: {
type: Number,
enum: [
NQUEUE_EXEC_STATE_INQUEUE,
NQUEUE_EXEC_STATE_APP,
NQUEUE_EXEC_STATE_APP_OK,
NQUEUE_EXEC_STATE_APP_ERR,
NQUEUE_EXEC_STATE_DB,
NQUEUE_EXEC_STATE_DB_OK,
NQUEUE_EXEC_STATE_DB_ERR,
NQUEUE_EXEC_STATE_OK,
NQUEUE_EXEC_STATE_ERR
],
//Параметры подключения к БД
connectSettings: {
schema: dbConnect,
required: true,
message: {
type: path =>
`Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
required: path => `Не указано состояние обработки сообщения очереди обмена (${path})`
}
},
//Данные сообщения очереди обмена
blMsg: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные записи журнала обмена для обработки (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
},
//Данные ответа на сообщение очереди обмена
blResp: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные ответа (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`,
required: path => `Не указаны данные ответа (${path})`
}
},
//Cервис-обработчик
service: {
schema: Service,
required: true,
message: {
required: path => `Не указан сервис для обработки сообщения очереди (${path})`
}
},
//Функция сервиса-обработчика
function: {
schema: ServiceFunction,
required: true,
message: {
required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})`
required: path => `Не указаны параметры подключения к БД (${path})`
}
}
}).validator({
required: val => typeof val != "undefined"
});
//Схема валидации ответа обработчика очереди исходящих сообщений
exports.OutQueueProcessorTaskResult = new Schema({
//Состояние обработки сообщения очереди обмена
nExecState: {
type: Number,
enum: [
NQUEUE_EXEC_STATE_INQUEUE,
NQUEUE_EXEC_STATE_APP,
NQUEUE_EXEC_STATE_APP_OK,
NQUEUE_EXEC_STATE_APP_ERR,
NQUEUE_EXEC_STATE_DB,
NQUEUE_EXEC_STATE_DB_OK,
NQUEUE_EXEC_STATE_DB_ERR,
NQUEUE_EXEC_STATE_OK,
NQUEUE_EXEC_STATE_ERR
],
sResult: {
type: String,
enum: [STASK_RESULT_OK, STASK_RESULT_ERR],
required: true,
message: {
type: path =>
`Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - Number)`,
`Состояние обработки сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
enum: path => `Значение состояния обработки сообщения очереди обмена (${path}) не поддерживается`,
required: path => `Не указано состояние обработки сообщения очереди обмена (${path})`
}
},
//Информация от обработчика сообщения очереди обмена
sExecMsg: {
sMsg: {
type: String,
required: false,
required: true,
message: {
type: path =>
`Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})`
}
},
//Данные сообщения очереди обмена
blMsg: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
},
//Данные ответа сообщения очереди обмена
blResp: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или {type: String, data: Array})`,
required: path => `Не указаны данные ответа сообщения очереди обмена (${path})`
}
}
}).validator({
required: val => typeof val != "undefined"

View File

@ -14,6 +14,14 @@ const { Queue } = require("./obj_queue"); //Схема валидации соо
const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД
const { Logger } = require("../core/logger"); //Класс для протоколирования работы
//-------------
// Тело модуля
//-------------
const validateChildProcess = val => {
return val["constructor"]["name"] === "ChildProcess";
};
//------------------
// Интерфейс модуля
//------------------
@ -25,7 +33,7 @@ exports.OutQueue = new Schema({
schema: outGoing,
required: true,
message: {
required: "Не указаны параметры обработки очереди исходящих сообщений (outGoing)"
required: path => `Не указаны параметры обработки очереди исходящих сообщений (${path})`
}
},
//Объект для взаимодействия с БД
@ -33,8 +41,9 @@ exports.OutQueue = new Schema({
type: DBConnector,
required: true,
message: {
type: "Объект для взаимодействия с БД (dbConn) имеет некорректный тип данных (ожидалось - DBConnector)",
required: "Не указан объект для взаимодействия с БД (dbConn)"
type: path =>
`Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`,
required: path => `Не указан объект для взаимодействия с БД (${path})`
}
},
//Объект для протоколирования работы
@ -42,32 +51,94 @@ exports.OutQueue = new Schema({
type: Logger,
required: true,
message: {
type: "Объект для протоколирования работы (logger) имеет некорректный тип данных (ожидалось - Logger)",
required: "Не указаны объект для протоколирования работы (logger)"
type: path =>
`Объект для протоколирования работы (${path}) имеет некорректный тип данных (ожидалось - Logger)`,
required: path => `Не указаны объект для протоколирования работы (${path})`
}
}
});
//Схема валидации параметров функции установки финальных статусов сообщения в БД
exports.finalise = new Schema({
//Обрабатываемое исходящее сообщение
queue: {
schema: Queue,
//Схема валидации параметров функции добавления идентификатора сообщения очереди в список обрабатываемых
exports.addInProgress = new Schema({
//Идентификатор сообщения
nQueueId: {
type: Number,
required: true,
message: {
required: "Не указано обрабатываемое исходящее сообщение (queue)"
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
}
});
//Схема валидации параметров функции запуска обработчика БД
exports.dbProcess = new Schema({
//Обрабатываемое исходящее сообщение
queue: {
schema: Queue,
//Схема валидации параметров функции удаления идентификатора сообщения очереди из списка обрабатываемых
exports.rmInProgress = new Schema({
//Идентификатор сообщения
nQueueId: {
type: Number,
required: true,
message: {
required: "Не указано обрабатываемое исходящее сообщение (queue)"
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
}
});
//Схема валидации параметров функции проверки наличия идентификатора сообщения очереди в списке обрабатываемых
exports.isInProgress = new Schema({
//Идентификатор сообщения
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
}
});
//Схема валидации параметров функции запуска обработчика сообщения очереди
exports.startQueueProcessor = new Schema({
//Идентификатор сообщения
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
},
//Процесс обработчика
proc: {
use: { validateChildProcess },
required: true,
message: {
validateChildProcess: path =>
`Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`,
required: path => `Не указан процесс обработчика (${path})`
}
}
});
//Схема валидации параметров функции останова обработчика сообщения очереди
exports.stopQueueProcessor = new Schema({
//Идентификатор сообщения
nQueueId: {
type: Number,
required: true,
message: {
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`,
required: path => `Не указан идентификатор сообщения (${path})`
}
},
//Процесс обработчика
proc: {
use: { validateChildProcess },
required: true,
message: {
validateChildProcess: path =>
`Процесс обработчика (${path}) имеет некорректный тип данных (ожидалось - ChildProcess)`,
required: path => `Не указан процесс обработчика (${path})`
}
}
});
@ -79,7 +150,7 @@ exports.processMessage = new Schema({
schema: Queue,
required: true,
message: {
required: "Не указано обрабатываемое исходящее сообщение (queue)"
required: path => `Не указано обрабатываемое исходящее сообщение (${path})`
}
}
});

View File

@ -8,23 +8,9 @@
//----------------------
const Schema = require("validate"); //Схемы валидации
const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди
const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений
//------------
// Тело модуля
//------------
//Валидация данных сообщения очереди
const validateBuffer = val => {
//Либо null
if (val === null) {
return true;
} else {
//Либо Buffer
return val instanceof Buffer;
}
};
//------------------
// Интерфейс модуля
//------------------
@ -42,38 +28,26 @@ exports.sendErrorResult = new Schema({
}
});
//Схема валидации параметров функции отправки успеха обработки
exports.sendOKResult = new Schema({
//Данные сообщения очереди обмена
blMsg: {
use: { validateBuffer },
//Схема валидации параметров функции обработчки сообщения сервером приложений
exports.appProcess = new Schema({
//Обрабатываемое сообщение очереди
queue: {
schema: Queue,
required: true,
message: {
validateBuffer: path =>
`Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`,
required: path => `Не указаны данные сообщения очереди обмена (${path})`
}
},
//Данные ответа сообщения очереди обмена
blResp: {
use: { validateBuffer },
required: true,
message: {
validateBuffer: path =>
`Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`,
required: path => `Не указаны данные ответа сообщения очереди обмена (${path})`
required: path => `Не указано обрабатываемое сообщение очреди (${path})`
}
}
}).validator({ required: val => typeof val != "undefined" });
});
//Параметры функции отправки сообщения родителю без обработки
exports.sendUnChange = new Schema({
//Задача обработки
task: {
schema: OutQueueProcessorTask,
//Схема валидации параметров функции обработчки сообщения сервером БД
exports.dbProcess = new Schema({
//Обрабатываемое сообщение очереди
queue: {
schema: Queue,
required: true,
message: {
required: path => `Не указана задача для обработки (${path})`
required: path => `Не указано обрабатываемое сообщение очреди (${path})`
}
}
});