diff --git a/core/in_queue.js b/core/in_queue.js index 99bc02a..ea34152 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -78,7 +78,7 @@ class InQueue extends EventEmitter { } //Обработка сообщения async processMessage(prms) { - //Проверяем структуру переданного объекта для старта + //Проверяем структуру переданного объекта для обработки let sCheckResult = validateObject( prms, prmsInQueueSchema.processMessage, @@ -89,9 +89,10 @@ class InQueue extends EventEmitter { //Буфер для сообщения очереди let q = null; try { - //Определимся с телом сообщения + //Тело сообщения и ответ на него let blMsg = null; - //Для POST сообщений - это тело запроса + let blResp = null; + //Определимся с телом сообщения - для POST сообщений - это тело запроса if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; } else { @@ -122,6 +123,8 @@ class InQueue extends EventEmitter { let resBefore = null; try { prms.queue = q; + prms.queue.blMsg = blMsg; + prms.queue.blResp = blResp; resBefore = await fnBefore(prms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); @@ -142,10 +145,17 @@ class InQueue extends EventEmitter { }); //Фиксируем успех исполнения if (resBefore.blMsg) { - q = await this.dbConn.setQueueAppSrvResult({ + blMsg = resBefore.blMsg; + q = await this.dbConn.setQueueMsg({ nQueueId: q.nId, - blMsg: resBefore.blMsg, - blResp: null + blMsg + }); + } + if (resBefore.blResp) { + blResp = resBefore.blResp; + q = await this.dbConn.setQueueResp({ + nQueueId: q.nId, + blResp }); } } else { @@ -168,6 +178,9 @@ class InQueue extends EventEmitter { nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK }); + //Считаем ответ полученный от системы + let qData = await this.dbConn.getQueueResp({ nQueueId: prms.queue.nId }); + blResp = qData.blResp; } //Выполняем обработчик "После" (если он есть) if (prms.function.sAppSrvAfter) { @@ -181,6 +194,8 @@ class InQueue extends EventEmitter { let resAfter = null; try { prms.queue = q; + prms.queue.blMsg = blMsg; + prms.queue.blResp = blResp; resAfter = await fnAfter(prms); } catch (e) { throw new ServerError(SERR_APP_SERVER_AFTER, e.message); @@ -200,11 +215,13 @@ class InQueue extends EventEmitter { nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK }); //Фиксируем успех исполнения - q = await this.dbConn.setQueueAppSrvResult({ - nQueueId: q.nId, - blMsg: q.blMsg, - blResp: resAfter.blResp - }); + if (resAfter.blResp) { + blResp = resAfter.blResp; + q = await this.dbConn.setQueueResp({ + nQueueId: q.nId, + blResp + }); + } } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -212,7 +229,7 @@ class InQueue extends EventEmitter { } } //Всё успешно - отдаём результат клиенту - prms.res.status(200).send(q.blResp); + prms.res.status(200).send(blResp); //Фиксируем успех обработки - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index f64cfc1..7ff4b21 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -12,11 +12,12 @@ const _ = require("lodash"); //Работа с массивами и объек const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД -const { makeErrorText, validateObject, getAppSrvFunction } = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject, getAppSrvFunction, buildURL } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди +const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const { SERR_OBJECT_BAD_INTERFACE, SERR_APP_SERVER_BEFORE, SERR_APP_SERVER_AFTER } = require("./constants"); //Глобальные константы const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД @@ -87,12 +88,23 @@ const appProcess = async prms => { }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); + //Считаем тело сообщения + let qData = await dbConn.getQueueMsg({ nQueueId: prms.queue.nId }); + //Кладём данные тела в объект сообщения и инициализируем поле для ответа + _.extend(prms.queue, { blMsg: qData.blMsg, blResp: null }); //Собираем параметры для передачи серверу - let options = { - url: `${prms.service.sSrvRoot}/${prms.function.sFnURL}`, - method: prms.service.sFnPrmsType, - body: prms.queue.sMsg - }; + let options = { method: prms.service.sFnPrmsType }; + //Определимся с URL и телом сообщения в зависимости от способа передачи параметров + if (prms.service.sFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { + options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL }); + options.body = prms.queue.blMsg; + } else { + options.url = buildURL({ + sSrvRoot: prms.service.sSrvRoot, + sFnURL: prms.function.sFnURL, + sQuery: prms.queue.blMsg.toString() + }); + } //Выполняем обработчик "До" (если он есть) if (prms.function.sAppSrvBefore) { const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);