Полный цикл обработки исходящего сообщения - обработчик ДО + отправка на сервер + обработчик ПОСЛЕ

This commit is contained in:
Mikhail Chechnev 2018-12-10 00:06:11 +03:00
parent b85007e130
commit 7411cf61a0
2 changed files with 23 additions and 10 deletions

View File

@ -68,14 +68,14 @@ class OutQueue extends EventEmitter {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }
} }
//Уведомление об остановке обработчика очереди //Уведомление о запуске обработчика очереди
notifyStarted() { notifyStarted() {
//оповестим подписчиков о появлении нового отчета //Оповестим подписчиков о запуске
this.emit(SEVT_OUT_QUEUE_STARTED); this.emit(SEVT_OUT_QUEUE_STARTED);
} }
//Уведомление об остановке обработчика очереди //Уведомление об остановке обработчика очереди
notifyStopped() { notifyStopped() {
//оповестим подписчиков о появлении нового отчета //Оповестим подписчиков об останове
this.emit(SEVT_OUT_QUEUE_STOPPED); this.emit(SEVT_OUT_QUEUE_STOPPED);
} }
//Добавление идентификатора позиции очереди в список обрабатываемых //Добавление идентификатора позиции очереди в список обрабатываемых

View File

@ -8,6 +8,8 @@
//---------------------- //----------------------
require("module-alias/register"); //Поддержка псевонимов при подключении модулей require("module-alias/register"); //Поддержка псевонимов при подключении модулей
const _ = require("lodash"); //Работа с массивами и объектами
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const lg = require("./logger"); //Протоколирование работы const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД const db = require("./db_connector"); //Взаимодействие с БД
const { makeErrorText, validateObject, getAppSrvFunction } = require("./utils"); //Вспомогательные функции const { makeErrorText, validateObject, getAppSrvFunction } = require("./utils"); //Вспомогательные функции
@ -85,24 +87,35 @@ const appProcess = async prms => {
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId } { nQueueId: prms.queue.nId }
); );
//Выполняем обработчик "До" //Собираем параметры для передачи серверу
let options = {
url: `${prms.service.sSrvRoot}/${prms.function.sFnURL}`,
method: prms.service.sFnPrmsType,
body: prms.queue.sMsg
};
//Выполняем обработчик "До" (если он есть)
if (prms.function.sAppSrvBefore) { if (prms.function.sAppSrvBefore) {
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
await fnBefore(prms); let resBefore = await fnBefore(prms); //!!!!!!!!!!!!!!!! КОНТРОЛЬ ФОРМАТА РЕЗУЛЬТАТА
options = _.cloneDeep(resBefore.options);
} }
//Отправляем сообщение удалённому серверу //Отправляем сообщение удалённому серверу
let serverResp = await rqp(options);
_.extend(prms, { serverResp });
//Выполняем обработчик "После" //Выполняем обработчик "После"
let resAfter = null;
if (prms.function.sAppSrvAfter) { if (prms.function.sAppSrvAfter) {
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
await fnAfter(prms); resAfter = await fnAfter(prms); //!!!!!!!!!!!!!!!! КОНТРОЛЬ ФОРМАТА РЕЗУЛЬТАТА
prms.queue.blResp = resAfter.blResp;
} else {
prms.queue.blResp = new Buffer(serverResp.toString());
} }
let sMsg =
(prms.queue.blMsg ? prms.queue.blMsg.toString() : "null") + " MODIFICATION FOR " + prms.queue.nId;
//Фиксируем успех исполнения //Фиксируем успех исполнения
newQueue = await dbConn.setQueueAppSrvResult({ newQueue = await dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
blMsg: new Buffer(sMsg), blMsg: prms.queue.blMsg,
blResp: new Buffer("REPLAY ON " + prms.queue.nId) blResp: prms.queue.blResp
}); });
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения //Фиксируем успешное исполнение сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({ newQueue = await dbConn.setQueueState({