Модифицирован алгоритм движения обрабатываемой позиции очереди по статусной модели - стало устойчивей в случае возникновения неожиданных ответов обраотчика (теперь анализируем менялось ли количество попыток исполнения с момента запуска обработчика и перечитываем новое состояние позиции очереди по завершению обработчика)

This commit is contained in:
Mikhail Chechnev 2018-12-24 14:00:24 +03:00
parent 6911886e74
commit ca445a4083

View File

@ -13,7 +13,7 @@ const ChildProcess = require("child_process"); //Работа с дочерни
const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы
const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схемы валидации сообщений обмена с обработчиком сообщения очереди
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const prmsOutQueueSchema = require("../models/prms_out_queue"); //Схемы валидации параметров функций класса
@ -193,12 +193,14 @@ class OutQueue extends EventEmitter {
if (this.nWorkersLeft > 0) {
//Переопределим себя для обращения внутри обработчиков событий
const self = this;
//Запоминаем текущее количество попыток обработки
const nQueueOldExecCnt = prms.queue.nExecCnt;
//Создаём новый обработчик сообщений
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
//Текущее состояние сообщения
let curQueue = null;
//Перехват сообщений обработчика
proc.on("message", async result => {
//Считываем сообщение изменённое обработчиком
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
//Проверяем структуру полученного сообщения
let sCheckResult = validateObject(
result,
@ -207,7 +209,8 @@ class OutQueue extends EventEmitter {
);
//Если структура сообщения в норме
if (!sCheckResult) {
if (result.sResult == "ERR") {
//Анализируем результат обработки
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) {
//Фиксируем ошибку обработки - протокол работы сервиса
await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, {
nQueueId: prms.queue.nId
@ -216,13 +219,17 @@ class OutQueue extends EventEmitter {
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: result.sMsg,
nIncExecCnt: NINC_EXEC_CNT_YES,
nIncExecCnt:
nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
(nQueueOldExecCnt == prms.queue.nExecCnt
? prms.queue.nExecCnt + 1
: prms.queue.nExecCnt) < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
}
//Если есть контекст для сервиса - сохраним его для дальнейшего использования
} else {
//Пришел неожиданный ответ обработчика - запись в протокол работы сервиса
await self.logger.error(
@ -233,9 +240,11 @@ class OutQueue extends EventEmitter {
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: `Неожиданный ответ обработчика для сообщения ${prms.queue.nId}: ${sCheckResult}`,
nIncExecCnt: NINC_EXEC_CNT_YES,
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
(nQueueOldExecCnt == prms.queue.nExecCnt
? prms.queue.nExecCnt + 1
: prms.queue.nExecCnt) < prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
@ -253,6 +262,8 @@ class OutQueue extends EventEmitter {
});
//Перехват ошибок обработчика
proc.on("error", async e => {
//Считываем сообщение изменённое обработчиком
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
//Фиксируем ошибку в протоколе работы
await self.logger.error(`Ошибка обработки исходящего сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
@ -261,9 +272,10 @@ class OutQueue extends EventEmitter {
await this.dbConn.setQueueState({
nQueueId: prms.queue.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nIncExecCnt: nQueueOldExecCnt == prms.queue.nExecCnt ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState:
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
(nQueueOldExecCnt == prms.queue.nExecCnt ? prms.queue.nExecCnt + 1 : prms.queue.nExecCnt) <
prms.queue.nRetryAttempts
? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
@ -374,7 +386,7 @@ class OutQueue extends EventEmitter {
}
//Ждем завершения работы всех обработчиков
let i = setInterval(() => {
if (!this.bWorking && this.nWorkersLeft == this.outGoing.nMaxWorkers) {
if (!this.bWorking && this.nWorkersLeft >= this.outGoing.nMaxWorkers) {
clearInterval(i);
this.notifyStopped();
}