diff --git a/core/out_queue.js b/core/out_queue.js index 18f9154..910e7d8 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -68,14 +68,14 @@ class OutQueue extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - //Уведомление об остановке обработчика очереди + //Уведомление о запуске обработчика очереди notifyStarted() { - //оповестим подписчиков о появлении нового отчета + //Оповестим подписчиков о запуске this.emit(SEVT_OUT_QUEUE_STARTED); } //Уведомление об остановке обработчика очереди notifyStopped() { - //оповестим подписчиков о появлении нового отчета + //Оповестим подписчиков об останове this.emit(SEVT_OUT_QUEUE_STOPPED); } //Добавление идентификатора позиции очереди в список обрабатываемых diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 7b2c5f9..34abea6 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -8,6 +8,8 @@ //---------------------- require("module-alias/register"); //Поддержка псевонимов при подключении модулей +const _ = require("lodash"); //Работа с массивами и объектами +const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД const { makeErrorText, validateObject, getAppSrvFunction } = require("./utils"); //Вспомогательные функции @@ -85,24 +87,35 @@ const appProcess = async prms => { }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); - //Выполняем обработчик "До" + //Собираем параметры для передачи серверу + let options = { + url: `${prms.service.sSrvRoot}/${prms.function.sFnURL}`, + method: prms.service.sFnPrmsType, + body: prms.queue.sMsg + }; + //Выполняем обработчик "До" (если он есть) if (prms.function.sAppSrvBefore) { const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); - await fnBefore(prms); + let resBefore = await fnBefore(prms); //!!!!!!!!!!!!!!!! КОНТРОЛЬ ФОРМАТА РЕЗУЛЬТАТА + options = _.cloneDeep(resBefore.options); } //Отправляем сообщение удалённому серверу + let serverResp = await rqp(options); + _.extend(prms, { serverResp }); //Выполняем обработчик "После" + let resAfter = null; if (prms.function.sAppSrvAfter) { const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); - await fnAfter(prms); + resAfter = await fnAfter(prms); //!!!!!!!!!!!!!!!! КОНТРОЛЬ ФОРМАТА РЕЗУЛЬТАТА + prms.queue.blResp = resAfter.blResp; + } else { + prms.queue.blResp = new Buffer(serverResp.toString()); } - let sMsg = - (prms.queue.blMsg ? prms.queue.blMsg.toString() : "null") + " MODIFICATION FOR " + prms.queue.nId; //Фиксируем успех исполнения newQueue = await dbConn.setQueueAppSrvResult({ nQueueId: prms.queue.nId, - blMsg: new Buffer(sMsg), - blResp: new Buffer("REPLAY ON " + prms.queue.nId) + blMsg: prms.queue.blMsg, + blResp: prms.queue.blResp }); //Фиксируем успешное исполнение сервером приложений - в статусе сообщения newQueue = await dbConn.setQueueState({