From 9f7c7d458992e9638394c36c3312170055347ab6 Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Fri, 14 Jun 2019 21:48:51 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=20=D1=81?= =?UTF-8?q?=20=D0=BF=D0=B0=D1=80=D0=B0=D0=BC=D0=B5=D1=82=D1=80=D0=B0=D0=BC?= =?UTF-8?q?=D0=B8=20=D0=BE=D1=82=D0=BF=D1=80=D0=B0=D0=B2=D0=BA=D0=B8=20?= =?UTF-8?q?=D0=B8=20HTP-=D0=B7=D0=B0=D0=B3=D0=BE=D0=BB=D0=BE=D0=B2=D0=BA?= =?UTF-8?q?=D0=B0=D0=BC=D0=B8=20=D0=B4=D0=BB=D1=8F=20=D0=B8=D1=81=D1=85?= =?UTF-8?q?=D0=BE=D0=B4=D1=8F=D1=89=D0=B8=D1=85=20(=D1=83=D0=BF=D1=80?= =?UTF-8?q?=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BD=D0=B0=20PL/?= =?UTF-8?q?SQL)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue_processor.js | 51 +++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 9c694b6..6fbc886 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -12,7 +12,14 @@ const _ = require("lodash"); //Работа с массивами и объек const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД -const { makeErrorText, validateObject, getAppSrvFunction, buildURL } = require("./utils"); //Вспомогательные функции +const { + makeErrorText, + validateObject, + getAppSrvFunction, + buildURL, + parseOptionsXML, + buildOptionsXML +} = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля @@ -131,6 +138,8 @@ const appProcess = async prms => { _.extend(prms.service, serviceCtx); //Собираем параметры для передачи серверу let options = { method: prms.function.sFnPrmsType, encoding: null }; + //Инициализируем параметры ответа сервера + let optionsResp = {}; //Определимся с URL и телом сообщения в зависимости от способа передачи параметров if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL }); @@ -142,12 +151,27 @@ const appProcess = async prms => { sQuery: prms.queue.blMsg === null ? "" : prms.queue.blMsg.toString() }); } + //Дополним получившиеся параметры переданными в сообщении + if (prms.queue.sOptions) { + try { + let optionsTmp = await parseOptionsXML({ sOptions: prms.queue.sOptions }); + _.extend(options, optionsTmp); + } catch (e) { + await logger.warn( + `Указанные для сообщения параметры имеют некорректный формат - использую параметры по умолчанию. Ошибка парсера: ${makeErrorText( + e + )}`, + { nQueueId: prms.queue.nId } + ); + } + } //Выполняем обработчик "До" (если он есть) if (prms.function.sAppSrvBefore) { const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); let resBefore = null; try { let resBeforePrms = _.cloneDeep(prms); + resBeforePrms.options = _.cloneDeep(options); resBefore = await fnBefore(resBeforePrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); @@ -209,15 +233,36 @@ const appProcess = async prms => { }); //Отправляем сообщение удалённому серверу try { + //Сохраняем параметры с которыми уходило сообщение + try { + let sOptions = buildOptionsXML({ options }); + await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions }); + } catch (e) { + await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); + } //Ждем ответ от удалённого сервера + options.resolveWithFullResponse = true; let serverResp = await rqp(options); //Сохраняем полученный ответ - prms.queue.blResp = new Buffer(serverResp || ""); + prms.queue.blResp = new Buffer(serverResp.body || ""); await dbConn.setQueueResp({ nQueueId: prms.queue.nId, blResp: prms.queue.blResp, nIsOriginal: NIS_ORIGINAL_YES }); + //Сохраняем заголовки ответа + optionsResp.headers = _.cloneDeep(serverResp.headers); + try { + let sOptionsResp = buildOptionsXML({ options: optionsResp }); + await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp }); + } catch (e) { + await logger.warn( + `Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`, + { nQueueId: prms.queue.nId } + ); + } } catch (e) { //Прекращаем исполнение если были ошибки let sError = "Неожиданная ошибка удалённого сервиса"; @@ -234,6 +279,8 @@ const appProcess = async prms => { let resAfter = null; try { let resAfterPrms = _.cloneDeep(prms); + resAfterPrms.options = _.cloneDeep(options); + resAfterPrms.optionsResp = _.cloneDeep(optionsResp); resAfter = await fnAfter(resAfterPrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_AFTER, e.message);