From 56e3cff0241e9ea8fc73eed500115d7a5afade6a Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Tue, 5 Jul 2022 14:59:01 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-441=20-=20=D0=BD=D0=BE?= =?UTF-8?q?=D0=B2=D0=BE=D0=B5=20=D1=80=D0=B0=D1=81=D1=88=D0=B8=D1=80=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20"=D0=A0=D0=B0=D1=81=D1=81=D1=8B=D0=BB?= =?UTF-8?q?=D0=BA=D0=B0=20E-Mail"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/in_queue.js | 70 +++----- core/out_queue_processor.js | 279 ++++++++++++++---------------- core/utils.js | 4 +- models/obj_out_queue_processor.js | 10 ++ models/prms_utils.js | 28 ++- modules/send_mail.js | 175 +++++++++++++++++++ 6 files changed, 359 insertions(+), 207 deletions(-) create mode 100644 modules/send_mail.js diff --git a/core/in_queue.js b/core/in_queue.js index 9a268f9..6454c60 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -13,15 +13,7 @@ const express = require("express"); //WEB-сервер Express const cors = require("cors"); //Управление заголовками безопасности для WEB-сервера Express const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const { ServerError } = require("./server_errors"); //Типовая ошибка -const { - makeErrorText, - validateObject, - buildURL, - getAppSrvFunction, - buildOptionsXML, - parseOptionsXML, - deepMerge -} = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject, buildURL, getAppSrvFunction, buildOptionsXML, parseOptionsXML, deepMerge } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса @@ -96,11 +88,7 @@ class InQueue extends EventEmitter { //Обработка сообщения async processMessage(prms) { //Проверяем структуру переданного объекта для обработки - let sCheckResult = validateObject( - prms, - prmsInQueueSchema.processMessage, - "Параметры функции обработки входящего сообщения" - ); + let sCheckResult = validateObject(prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения"); //Если структура объекта в норме if (!sCheckResult) { //Буфер для сообщения очереди @@ -135,9 +123,10 @@ class InQueue extends EventEmitter { }); //Скажем что пришло новое входящее сообщение await this.logger.info( - `Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${ - prms.function.sCode - } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, + `Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({ + sSrvRoot: prms.service.sSrvRoot, + sFnURL: prms.function.sFnURL + })})`, { nQueueId: q.nId } ); //Выполняем обработчик "До" (если он есть) @@ -200,13 +189,11 @@ class InQueue extends EventEmitter { let sOptionsResp = buildOptionsXML({ options: optionsResp }); q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); } - //Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем - if (!_.isUndefined(resBefore.bUnAuth)) - if (resBefore.bUnAuth === true) - throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); - //Если пришел флаг прекращения дальнейшей обработки сообщения - то дальше его обработку прекращаем - if (!_.isUndefined(resBefore.bStopPropagation)) - if (resBefore.bStopPropagation === true) bStopPropagation = true; + //Фиксируем результат исполнения "До" - флаг ошибочной аутентификации - если он поднят, то это ошибка, дальше ничего не делаем + if (!_.isUndefined(resBefore.bUnAuth) && resBefore.bUnAuth === true) + throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); + //Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем + if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -223,8 +210,7 @@ class InQueue extends EventEmitter { //Вызов обработчика БД let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); //Если результат - ошибка пробрасываем её - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) - throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); + if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); @@ -339,10 +325,7 @@ class InQueue extends EventEmitter { nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR }); //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса - await this.logger.error( - `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, - { nQueueId: q.nId } - ); + await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId }); //Добавим чуть больше информации в тему сообщения sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`; } else { @@ -371,11 +354,7 @@ class InQueue extends EventEmitter { //Запуск обработки очереди входящих сообщений startProcessing(prms) { //Проверяем структуру переданного объекта для старта - let sCheckResult = validateObject( - prms, - prmsInQueueSchema.startProcessing, - "Параметры функции запуска обработки очереди входящих сообщений" - ); + let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений"); //Если структура объекта в норме if (!sCheckResult) { //Выставляем флаг работы @@ -431,18 +410,15 @@ class InQueue extends EventEmitter { } ); //...и собственный обработчик ошибок - this.webApp.use( - buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), - async (err, req, res, next) => { - //Протоколируем в журнал работы сервера - await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { - nServiceId: srvs.nId, - nServiceFnId: fn.nId - }); - //Отправим ошибку клиенту - res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); - } - ); + this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => { + //Протоколируем в журнал работы сервера + await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { + nServiceId: srvs.nId, + nServiceFnId: fn.nId + }); + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); + }); } ); }); diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 416c483..b1fada3 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -12,15 +12,7 @@ const _ = require("lodash"); //Работа с массивами и объек const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами const lg = require("./logger"); //Протоколирование работы const db = require("./db_connector"); //Взаимодействие с БД -const { - makeErrorText, - validateObject, - getAppSrvFunction, - buildURL, - parseOptionsXML, - buildOptionsXML, - deepMerge -} = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject, getAppSrvFunction, buildURL, parseOptionsXML, buildOptionsXML, deepMerge } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля @@ -35,12 +27,7 @@ const { SERR_WEB_SERVER, SERR_UNAUTH } = require("./constants"); //Глобальные константы -const { - NINC_EXEC_CNT_YES, - NINC_EXEC_CNT_NO, - NIS_ORIGINAL_NO, - NIS_ORIGINAL_YES -} = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД //-------------------------- // Глобальные идентификаторы @@ -112,8 +99,7 @@ const appProcess = async prms => { //Проверяем аутентификацию if ( prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_NO || - (prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES && - isServiceAuth == objServiceSchema.NIS_AUTH_YES) + (prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES && isServiceAuth == objServiceSchema.NIS_AUTH_YES) ) { //Фиксируем начало исполнения сервером приложений - в статусе сообщения res = await dbConn.setQueueState({ @@ -122,9 +108,9 @@ const appProcess = async prms => { }); //Фиксируем начало исполнения сервером приложений - в протоколе работы сервиса await logger.info( - `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + `Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${ + prms.queue.sExecState + }, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); //Считаем тело сообщения @@ -141,6 +127,8 @@ const appProcess = async prms => { let options = { method: prms.function.sFnPrmsType, encoding: null }; //Инициализируем параметры ответа сервера let optionsResp = {}; + //Флаг прекращения обработки сообщения + let bStopPropagation = false; //Определимся с URL и телом сообщения в зависимости от способа передачи параметров if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL }); @@ -176,6 +164,7 @@ const appProcess = async prms => { try { let resBeforePrms = _.cloneDeep(prms); resBeforePrms.options = _.cloneDeep(options); + resBeforePrms.dbConn = dbConn; resBefore = await fnBefore(resBeforePrms); } catch (e) { throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); @@ -225,123 +214,123 @@ const appProcess = async prms => { }); bCtxIsSet = true; } + //Применим ответ "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем + if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; } else { //Или расскажем об ошибке в структуре ответа throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } } - //Фиксируем отправку сообщения в протоколе работы сервиса - await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, { - nQueueId: prms.queue.nId - }); - //Отправляем сообщение удалённому серверу - try { - //Сохраняем параметры с которыми уходило сообщение - try { - let tmpOptions = _.cloneDeep(options); - delete tmpOptions.body; - let sOptions = buildOptionsXML({ options: tmpOptions }); - await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions }); - } catch (e) { - await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, { - nQueueId: prms.queue.nId - }); - } - //Ждем ответ от удалённого сервера - options.resolveWithFullResponse = true; - let serverResp = await rqp(options); - //Сохраняем полученный ответ - prms.queue.blResp = Buffer.from(serverResp.body || ""); - await dbConn.setQueueResp({ - nQueueId: prms.queue.nId, - blResp: prms.queue.blResp, - nIsOriginal: NIS_ORIGINAL_YES + //Если флаг прекращения обработки сообщения не установлен + if (bStopPropagation === false) { + //Фиксируем отправку сообщения в протоколе работы сервиса + await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, { + nQueueId: prms.queue.nId }); - //Сохраняем заголовки ответа и HTTP-статус - optionsResp.headers = _.cloneDeep(serverResp.headers); - optionsResp.statusCode = serverResp.statusCode; + //Отправляем сообщение удалённому серверу try { - let sOptionsResp = buildOptionsXML({ options: optionsResp }); - await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp }); + //Сохраняем параметры с которыми уходило сообщение + try { + let tmpOptions = _.cloneDeep(options); + delete tmpOptions.body; + let sOptions = buildOptionsXML({ options: tmpOptions }); + await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions }); + } catch (e) { + await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); + } + //Ждем ответ от удалённого сервера + options.resolveWithFullResponse = true; + let serverResp = await rqp(options); + //Сохраняем полученный ответ + prms.queue.blResp = Buffer.from(serverResp.body || ""); + await dbConn.setQueueResp({ + nQueueId: prms.queue.nId, + blResp: prms.queue.blResp, + nIsOriginal: NIS_ORIGINAL_YES + }); + //Сохраняем заголовки ответа и HTTP-статус + optionsResp.headers = _.cloneDeep(serverResp.headers); + optionsResp.statusCode = serverResp.statusCode; + try { + let sOptionsResp = buildOptionsXML({ options: optionsResp }); + await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp }); + } catch (e) { + await logger.warn(`Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); + } } catch (e) { - await logger.warn( - `Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`, - { nQueueId: prms.queue.nId } - ); + //Прекращаем исполнение если были ошибки + let sError = "Неожиданная ошибка удалённого сервиса"; + if (e.error) { + let sSubError = e.error.code || e.error; + sError = `Ошибка передачи данных: ${sSubError}`; + } + if (e.response) sError = `${e.response.statusCode} - ${e.response.statusMessage}`; + throw new ServerError(SERR_WEB_SERVER, sError); } - } catch (e) { - //Прекращаем исполнение если были ошибки - let sError = "Неожиданная ошибка удалённого сервиса"; - if (e.error) { - let sSubError = e.error.code || e.error; - sError = `Ошибка передачи данных: ${sSubError}`; - } - if (e.response) sError = `${e.response.statusCode} - ${e.response.statusMessage}`; - throw new ServerError(SERR_WEB_SERVER, sError); - } - //Выполняем обработчик "После" (если он есть) - if (prms.function.sAppSrvAfter) { - const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); - let resAfter = null; - try { - let resAfterPrms = _.cloneDeep(prms); - resAfterPrms.options = _.cloneDeep(options); - resAfterPrms.optionsResp = _.cloneDeep(optionsResp); - resAfter = await fnAfter(resAfterPrms); - } catch (e) { - throw new ServerError(SERR_APP_SERVER_AFTER, e.message); - } - //Проверяем структуру ответа функции постобработки - if (resAfter) { - let sCheckResult = validateObject( - resAfter, - objOutQueueProcessorSchema.OutQueueProcessorFnAfter, - "Результат функции постобработки исходящего сообщения" - ); - //Если структура ответа в норме - if (!sCheckResult) { - //Применим ответ "После" - обработанный ответ удаленного сервиса - if (!_.isUndefined(resAfter.blResp)) { - prms.queue.blResp = resAfter.blResp; - await dbConn.setQueueResp({ - nQueueId: prms.queue.nId, - blResp: prms.queue.blResp, - nIsOriginal: NIS_ORIGINAL_NO - }); - } - //Применим ответ "После" - флаг утентификации сервиса - if (!_.isUndefined(resAfter.bUnAuth)) - if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); - //Применим ответ "После" - контекст работы сервиса - if (!_.isUndefined(resAfter.sCtx)) - if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) { - prms.service.sCtx = resAfter.sCtx; - prms.service.dCtxExp = resAfter.dCtxExp; - await dbConn.setServiceContext({ - nServiceId: prms.service.nId, - sCtx: prms.service.sCtx, - dCtxExp: prms.service.dCtxExp + //Выполняем обработчик "После" (если он есть) + if (prms.function.sAppSrvAfter) { + const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); + let resAfter = null; + try { + let resAfterPrms = _.cloneDeep(prms); + resAfterPrms.options = _.cloneDeep(options); + resAfterPrms.optionsResp = _.cloneDeep(optionsResp); + resAfter = await fnAfter(resAfterPrms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_AFTER, e.message); + } + //Проверяем структуру ответа функции постобработки + if (resAfter) { + let sCheckResult = validateObject( + resAfter, + objOutQueueProcessorSchema.OutQueueProcessorFnAfter, + "Результат функции постобработки исходящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Применим ответ "После" - обработанный ответ удаленного сервиса + if (!_.isUndefined(resAfter.blResp)) { + prms.queue.blResp = resAfter.blResp; + await dbConn.setQueueResp({ + nQueueId: prms.queue.nId, + blResp: prms.queue.blResp, + nIsOriginal: NIS_ORIGINAL_NO }); - bCtxIsSet = true; } - } else { - //Или расскажем об ошибке в структуре ответа - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + //Применим ответ "После" - флаг утентификации сервиса + if (!_.isUndefined(resAfter.bUnAuth)) + if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); + //Применим ответ "После" - контекст работы сервиса + if (!_.isUndefined(resAfter.sCtx)) + if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) { + prms.service.sCtx = resAfter.sCtx; + prms.service.dCtxExp = resAfter.dCtxExp; + await dbConn.setServiceContext({ + nServiceId: prms.service.nId, + sCtx: prms.service.sCtx, + dCtxExp: prms.service.dCtxExp + }); + bCtxIsSet = true; + } + } else { + //Или расскажем об ошибке в структуре ответа + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } } } - } - //Если это функция начала сеанса, и нет обработчика на стороне БД и контекст не был установлен до сих пор - то положим в него то, что нам ответил сервер - if ( - prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN && - !prms.function.sPrcResp && - !bCtxIsSet - ) { - await dbConn.setServiceContext({ nServiceId: prms.service.nId, sCtx: serverResp }); - } - //Если это функция окончания сеанса, и нет обработчика на стороне БД - то сбросим контекст здесь - if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGOUT && !prms.function.sPrcResp) { - await dbConn.clearServiceContext({ nServiceId: prms.service.nId }); + //Если это функция начала сеанса, и нет обработчика на стороне БД и контекст не был установлен до сих пор - то положим в него то, что нам ответил сервер + if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN && !prms.function.sPrcResp && !bCtxIsSet) { + await dbConn.setServiceContext({ nServiceId: prms.service.nId, sCtx: serverResp }); + } + //Если это функция окончания сеанса, и нет обработчика на стороне БД - то сбросим контекст здесь + if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGOUT && !prms.function.sPrcResp) { + await dbConn.clearServiceContext({ nServiceId: prms.service.nId }); + } } //Фиксируем успешное исполнение сервером приложений - в статусе сообщения res = await dbConn.setQueueState({ @@ -383,10 +372,9 @@ const appProcess = async prms => { }); } //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`, - { nQueueId: prms.queue.nId } - ); + await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); } } else { //Фатальная ошибка обработки - некорректный объект параметров @@ -401,11 +389,7 @@ const dbProcess = async prms => { //Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки) let res = null; //Проверяем структуру переданного объекта для старта - let sCheckResult = validateObject( - prms, - prmsOutQueueProcessorSchema.dbProcess, - "Параметры функции запуска обработки ообщения сервером БД" - ); + let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.dbProcess, "Параметры функции запуска обработки ообщения сервером БД"); //Если структура объекта в норме if (!sCheckResult) { //Обрабатываем @@ -417,9 +401,9 @@ const dbProcess = async prms => { }); //Фиксируем начало исполнения сервером БД - в протоколе работы сервиса await logger.info( - `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${ - prms.queue.sServiceFnCode - }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, + `Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${ + prms.queue.sExecState + }, попытка исполнения - ${prms.queue.nExecCnt + 1}`, { nQueueId: prms.queue.nId } ); //Если обработчик со стороны БД указан @@ -427,11 +411,9 @@ const dbProcess = async prms => { //Вызываем его let prcRes = await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId }); //Если результат - ошибка пробрасываем её - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) - throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); + if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) - throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); + if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); } //Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения res = await dbConn.setQueueState({ @@ -466,10 +448,9 @@ const dbProcess = async prms => { }); } //Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса - await logger.error( - `Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`, - { nQueueId: prms.queue.nId } - ); + await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`, { + nQueueId: prms.queue.nId + }); } } else { //Фатальная ошибка обработки - некорректный объект параметров @@ -482,11 +463,7 @@ const dbProcess = async prms => { //Обработка задачи const processTask = async prms => { //Проверяем параметры - let sCheckResult = validateObject( - prms, - prmsOutQueueProcessorSchema.processTask, - "Параметры функции обработки задачи" - ); + let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.processTask, "Параметры функции обработки задачи"); //Если параметры в норме if (!sCheckResult) { let q = null; @@ -629,11 +606,7 @@ process.on("uncaughtException", e => { //Приём сообщений process.on("message", task => { //Проверяем структуру переданного сообщения - let sCheckResult = validateObject( - task, - objOutQueueProcessorSchema.OutQueueProcessorTask, - "Задача обработчика очереди исходящих сообщений" - ); + let sCheckResult = validateObject(task, objOutQueueProcessorSchema.OutQueueProcessorTask, "Задача обработчика очереди исходящих сообщений"); //Если структура объекта в норме if (!sCheckResult) { //Запускаем обработку diff --git a/core/utils.js b/core/utils.js index 4ec5f9e..e8d5b9b 100644 --- a/core/utils.js +++ b/core/utils.js @@ -197,7 +197,9 @@ const sendMail = prms => { from: prms.mail.sFrom, to: prms.sTo, subject: prms.sSubject, - text: prms.sMessage + text: prms.sMessage, + html: prms.sHTML, + attachments: prms.attachments }; //Отправляем сообщение transporter.sendMail(mailOptions, (error, info) => { diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index cd36149..bcabc16 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -145,6 +145,16 @@ exports.OutQueueProcessorFnBefore = new Schema({ type: path => `Дата истечения контекста (${path}) имеет некорректный тип данных (ожидалось - Date)`, required: path => `Не указана дата истечения контекста (${path})` } + }, + //Флаг прекращения дальнейшей обработки сообщения + bStopPropagation: { + type: Boolean, + required: false, + message: { + type: path => + `Флаг прекращения дальнейшей обработки сообщения (${path}) имеет некорректный тип данных (ожидалось - Boolean)`, + required: path => `Не указан флаг прекращения дальнейшей обработки сообщения (${path})` + } } }); diff --git a/models/prms_utils.js b/models/prms_utils.js index fadf9ef..5b603aa 100644 --- a/models/prms_utils.js +++ b/models/prms_utils.js @@ -40,8 +40,7 @@ exports.sendMail = new Schema({ required: true, use: { validateTo }, message: { - type: path => - `Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`, + type: path => `Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан cписок адресов E-Mail для отправки уведомления (${path})`, validateTo: path => `Неверный формат списка адресов E-Mail для отправки уведомления (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)` @@ -50,7 +49,7 @@ exports.sendMail = new Schema({ //Заголовок сообщения sSubject: { type: String, - required: true, + required: false, message: { type: path => `Заголовок сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан заголовок сообщения (${path})` @@ -59,11 +58,29 @@ exports.sendMail = new Schema({ //Текст уведомления sMessage: { type: String, - required: true, + required: false, message: { type: path => `Текст уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указан текст уведомления (${path})` } + }, + //HTML текст сообщения + sHTML: { + type: String, + required: false, + message: { + type: path => `HTML текст сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан HTML текст сообщения (${path})` + } + }, + //Вложения сообщения + attachments: { + type: Array, + required: false, + message: { + type: path => `Список вложений сообщения (${path}) имеет некорректный тип данных (ожидалось - Array)`, + required: path => `Не указан список вложений сообщения (${path})` + } } }); @@ -141,8 +158,7 @@ exports.buildOptionsXML = new Schema({ type: Object, required: true, message: { - type: path => - `Объект параметров сообщения/ответа (${path}) имеет некорректный тип данных (ожидалось - Object)`, + type: path => `Объект параметров сообщения/ответа (${path}) имеет некорректный тип данных (ожидалось - Object)`, required: path => `Не указан объект параметров сообщения/ответа (${path})` } } diff --git a/modules/send_mail.js b/modules/send_mail.js new file mode 100644 index 0000000..047e76e --- /dev/null +++ b/modules/send_mail.js @@ -0,0 +1,175 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Дополнительный модуль: Рассылка E-Mail (MAIL) +*/ + +//------------------------------ +// Подключение внешних библиотек +//------------------------------ + +const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML +const cfg = require("./../config"); //Настройки сервера приложений +const { makeErrorText, sendMail } = require("./../core/utils"); //Вспомогательные функции +const oracledb = require("oracledb"); //Работа с СУБД Oracle + +//--------------------- +// Глобальные константы +//--------------------- + +//Статусы отправки +const NSTATUS_ERR = 2; +const NSTATUS_DONE = 3; + +//------------ +// Тело модуля +//------------ + +//Чтение данных из курсора +const readCursorData = cursor => { + return new Promise((resolve, reject) => { + let queryStream = cursor.toQueryStream(); + let rows = []; + queryStream.on("data", row => { + rows.push(row); + }); + queryStream.on("error", err => { + reject(new Error(err.message)); + }); + queryStream.on("close", () => { + resolve(rows); + }); + }); +}; + +//Установка статуса отправки +const setSendMsg = async prms => { + let pooledConnection; + try { + pooledConnection = await prms.connection.getConnection(); + await pooledConnection.execute( + "begin PKG_EXS_EXT_MAIL.EXSEXTMAIL_SET_STATUS(NRN => :NRN, SERR_TEXT => :SERR_TEXT, NSTATUS => :NSTATUS); end;", + { NRN: prms.nRn, SERR_TEXT: prms.sErrMsg, NSTATUS: prms.nStatus }, + { autoCommit: true } + ); + } catch (e) { + throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } + } +}; + +//Считывание записей прикладываемых документов +const getMailAttach = async prms => { + let pooledConnection; + try { + pooledConnection = await prms.connection.getConnection(); + let res = await pooledConnection.execute( + "begin PKG_EXS_EXT_MAIL.GET_ATTACH(NIDENT => :NIDENT, RCDOCUMENTS => :RCDOCUMENTS); end;", + { + NIDENT: prms.nIdent, + RCDOCUMENTS: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT } + }, + { outFormat: oracledb.OBJECT, autoCommit: true } + ); + let rows = await readCursorData(res.outBinds.RCDOCUMENTS); + let rowsRes = []; + //Если результат запроса не пустой + if (rows.length !== 0) { + //Переводим BLOB в BUFFER и формируем формат аттача + for (let i = 0; i < rows.length; i++) { + let rowContent = await rows[i].BDATA.getData(); + rowsRes.push({ + filename: rows[i].FILENAME, + content: rowContent + }); + } + } + return rowsRes; + } catch (e) { + throw new Error(e.message); + } finally { + if (pooledConnection) { + try { + await pooledConnection.close(); + } catch (e) { + throw new Error(e.message); + } + } + } +}; + +//Разбор XML +const parseXML = xmlDoc => { + return new Promise((resolve, reject) => { + xml2js.parseString(xmlDoc, { explicitArray: false, mergeAttrs: true }, (err, result) => { + if (err) reject(err); + else resolve(result); + }); + }); +}; + +//Обработчик "До" для исходящего сообщения +const before = async prms => { + //Инициализируем переменные + let res = "OK"; + let parseRes = null; + //Разбираем параметры отправки + try { + //Формируем объект на основании XML + parseRes = await parseXML(prms.queue.blMsg.toString()); + //Если есть присоединенные файлы - добавляем их + if (parseRes.mail.ident) { + parseRes.mail.attachments = await getMailAttach({ connection: prms.dbConn.connection, nIdent: parseRes.mail.ident }); + } + //Если указан текст в обычном формате + if (parseRes.mail.text) { + parseRes.mail.text = Buffer.from(parseRes.mail.text, "base64").toString("utf-8"); + } + //Если указан текст в формате HTML + if (parseRes.mail.html) { + parseRes.mail.html = Buffer.from(parseRes.mail.html, "base64").toString("utf-8"); + } + } catch (e) { + parseRes = prms.queue.blMsg.toString(); + res = `Ошибка разбора параметров отправки: ${makeErrorText(e)}`; + } + if (res === "OK") { + try { + await sendMail({ + mail: cfg.mail, + sTo: parseRes.mail.to, + sSubject: parseRes.mail.title, + sMessage: parseRes.mail.text, + sHTML: parseRes.mail.html, + attachments: parseRes.mail.attachments + }); + } catch (e) { + res = `Ошибка отправки E-Mail сообщения: ${makeErrorText(e)}`; + } + } + //Если имеется рег. номер записи очереди отправки E-mail - обновляем информацию о текущем сообщении + if (parseRes.mail.nExsextmailId) { + if (res === "OK") { + await setSendMsg({ connection: prms.dbConn.connection, nRn: parseRes.mail.nExsextmailId, sErrMsg: "", nStatus: NSTATUS_DONE }); + } else { + await setSendMsg({ connection: prms.dbConn.connection, nRn: parseRes.mail.nExsextmailId, sErrMsg: res, nStatus: NSTATUS_ERR }); + } + } + //Возвращаем результат и флаг того, что дальше отрабатывать это сообщение не надо + return { + blMsg: Buffer.from(JSON.stringify({ message: parseRes, state: res })), + bStopPropagation: true + }; +}; + +//----------------- +// Интерфейс модуля +//----------------- + +exports.before = before;