ЦИТК-441 - новое расширение "Рассылка E-Mail"

This commit is contained in:
Mikhail Chechnev 2022-07-05 14:59:01 +03:00
parent 0146702e1c
commit 56e3cff024
6 changed files with 359 additions and 207 deletions

View File

@ -13,15 +13,7 @@ const express = require("express"); //WEB-сервер Express
const cors = require("cors"); //Управление заголовками безопасности для WEB-сервера Express
const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса)
const { ServerError } = require("./server_errors"); //Типовая ошибка
const {
makeErrorText,
validateObject,
buildURL,
getAppSrvFunction,
buildOptionsXML,
parseOptionsXML,
deepMerge
} = require("./utils"); //Вспомогательные функции
const { makeErrorText, validateObject, buildURL, getAppSrvFunction, buildOptionsXML, parseOptionsXML, deepMerge } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
@ -96,11 +88,7 @@ class InQueue extends EventEmitter {
//Обработка сообщения
async processMessage(prms) {
//Проверяем структуру переданного объекта для обработки
let sCheckResult = validateObject(
prms,
prmsInQueueSchema.processMessage,
"Параметры функции обработки входящего сообщения"
);
let sCheckResult = validateObject(prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения");
//Если структура объекта в норме
if (!sCheckResult) {
//Буфер для сообщения очереди
@ -135,9 +123,10 @@ class InQueue extends EventEmitter {
});
//Скажем что пришло новое входящее сообщение
await this.logger.info(
`Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${
prms.function.sCode
} (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`,
`Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({
sSrvRoot: prms.service.sSrvRoot,
sFnURL: prms.function.sFnURL
})})`,
{ nQueueId: q.nId }
);
//Выполняем обработчик "До" (если он есть)
@ -200,13 +189,11 @@ class InQueue extends EventEmitter {
let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
}
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resBefore.bUnAuth))
if (resBefore.bUnAuth === true)
throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Если пришел флаг прекращения дальнейшей обработки сообщения - то дальше его обработку прекращаем
if (!_.isUndefined(resBefore.bStopPropagation))
if (resBefore.bStopPropagation === true) bStopPropagation = true;
//Фиксируем результат исполнения "До" - флаг ошибочной аутентификации - если он поднят, то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resBefore.bUnAuth) && resBefore.bUnAuth === true)
throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -223,8 +210,7 @@ class InQueue extends EventEmitter {
//Вызов обработчика БД
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR)
throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH)
throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
@ -339,10 +325,7 @@ class InQueue extends EventEmitter {
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await this.logger.error(
`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`,
{ nQueueId: q.nId }
);
await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId });
//Добавим чуть больше информации в тему сообщения
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`;
} else {
@ -371,11 +354,7 @@ class InQueue extends EventEmitter {
//Запуск обработки очереди входящих сообщений
startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsInQueueSchema.startProcessing,
"Параметры функции запуска обработки очереди входящих сообщений"
);
let sCheckResult = validateObject(prms, prmsInQueueSchema.startProcessing, "Параметры функции запуска обработки очереди входящих сообщений");
//Если структура объекта в норме
if (!sCheckResult) {
//Выставляем флаг работы
@ -431,18 +410,15 @@ class InQueue extends EventEmitter {
}
);
//...и собственный обработчик ошибок
this.webApp.use(
buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }),
async (err, req, res, next) => {
//Протоколируем в журнал работы сервера
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
nServiceId: srvs.nId,
nServiceFnId: fn.nId
});
//Отправим ошибку клиенту
res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
}
);
this.webApp.use(buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), async (err, req, res, next) => {
//Протоколируем в журнал работы сервера
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
nServiceId: srvs.nId,
nServiceFnId: fn.nId
});
//Отправим ошибку клиенту
res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));
});
}
);
});

View File

@ -12,15 +12,7 @@ const _ = require("lodash"); //Работа с массивами и объек
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
const lg = require("./logger"); //Протоколирование работы
const db = require("./db_connector"); //Взаимодействие с БД
const {
makeErrorText,
validateObject,
getAppSrvFunction,
buildURL,
parseOptionsXML,
buildOptionsXML,
deepMerge
} = require("./utils"); //Вспомогательные функции
const { makeErrorText, validateObject, getAppSrvFunction, buildURL, parseOptionsXML, buildOptionsXML, deepMerge } = require("./utils"); //Вспомогательные функции
const { ServerError } = require("./server_errors"); //Типовая ошибка
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
@ -35,12 +27,7 @@ const {
SERR_WEB_SERVER,
SERR_UNAUTH
} = require("./constants"); //Глобальные константы
const {
NINC_EXEC_CNT_YES,
NINC_EXEC_CNT_NO,
NIS_ORIGINAL_NO,
NIS_ORIGINAL_YES
} = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
//--------------------------
// Глобальные идентификаторы
@ -112,8 +99,7 @@ const appProcess = async prms => {
//Проверяем аутентификацию
if (
prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_NO ||
(prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES &&
isServiceAuth == objServiceSchema.NIS_AUTH_YES)
(prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES && isServiceAuth == objServiceSchema.NIS_AUTH_YES)
) {
//Фиксируем начало исполнения сервером приложений - в статусе сообщения
res = await dbConn.setQueueState({
@ -122,9 +108,9 @@ const appProcess = async prms => {
});
//Фиксируем начало исполнения сервером приложений - в протоколе работы сервиса
await logger.info(
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${
prms.queue.sExecState
}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Считаем тело сообщения
@ -141,6 +127,8 @@ const appProcess = async prms => {
let options = { method: prms.function.sFnPrmsType, encoding: null };
//Инициализируем параметры ответа сервера
let optionsResp = {};
//Флаг прекращения обработки сообщения
let bStopPropagation = false;
//Определимся с URL и телом сообщения в зависимости от способа передачи параметров
if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) {
options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL });
@ -176,6 +164,7 @@ const appProcess = async prms => {
try {
let resBeforePrms = _.cloneDeep(prms);
resBeforePrms.options = _.cloneDeep(options);
resBeforePrms.dbConn = dbConn;
resBefore = await fnBefore(resBeforePrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
@ -225,123 +214,123 @@ const appProcess = async prms => {
});
bCtxIsSet = true;
}
//Применим ответ "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
} else {
//Или расскажем об ошибке в структуре ответа
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Фиксируем отправку сообщения в протоколе работы сервиса
await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, {
nQueueId: prms.queue.nId
});
//Отправляем сообщение удалённому серверу
try {
//Сохраняем параметры с которыми уходило сообщение
try {
let tmpOptions = _.cloneDeep(options);
delete tmpOptions.body;
let sOptions = buildOptionsXML({ options: tmpOptions });
await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions });
} catch (e) {
await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
//Ждем ответ от удалённого сервера
options.resolveWithFullResponse = true;
let serverResp = await rqp(options);
//Сохраняем полученный ответ
prms.queue.blResp = Buffer.from(serverResp.body || "");
await dbConn.setQueueResp({
nQueueId: prms.queue.nId,
blResp: prms.queue.blResp,
nIsOriginal: NIS_ORIGINAL_YES
//Если флаг прекращения обработки сообщения не установлен
if (bStopPropagation === false) {
//Фиксируем отправку сообщения в протоколе работы сервиса
await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, {
nQueueId: prms.queue.nId
});
//Сохраняем заголовки ответа и HTTP-статус
optionsResp.headers = _.cloneDeep(serverResp.headers);
optionsResp.statusCode = serverResp.statusCode;
//Отправляем сообщение удалённому серверу
try {
let sOptionsResp = buildOptionsXML({ options: optionsResp });
await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp });
//Сохраняем параметры с которыми уходило сообщение
try {
let tmpOptions = _.cloneDeep(options);
delete tmpOptions.body;
let sOptions = buildOptionsXML({ options: tmpOptions });
await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions });
} catch (e) {
await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
//Ждем ответ от удалённого сервера
options.resolveWithFullResponse = true;
let serverResp = await rqp(options);
//Сохраняем полученный ответ
prms.queue.blResp = Buffer.from(serverResp.body || "");
await dbConn.setQueueResp({
nQueueId: prms.queue.nId,
blResp: prms.queue.blResp,
nIsOriginal: NIS_ORIGINAL_YES
});
//Сохраняем заголовки ответа и HTTP-статус
optionsResp.headers = _.cloneDeep(serverResp.headers);
optionsResp.statusCode = serverResp.statusCode;
try {
let sOptionsResp = buildOptionsXML({ options: optionsResp });
await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp });
} catch (e) {
await logger.warn(`Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
} catch (e) {
await logger.warn(
`Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
//Прекращаем исполнение если были ошибки
let sError = "Неожиданная ошибка удалённого сервиса";
if (e.error) {
let sSubError = e.error.code || e.error;
sError = `Ошибка передачи данных: ${sSubError}`;
}
if (e.response) sError = `${e.response.statusCode} - ${e.response.statusMessage}`;
throw new ServerError(SERR_WEB_SERVER, sError);
}
} catch (e) {
//Прекращаем исполнение если были ошибки
let sError = "Неожиданная ошибка удалённого сервиса";
if (e.error) {
let sSubError = e.error.code || e.error;
sError = `Ошибка передачи данных: ${sSubError}`;
}
if (e.response) sError = `${e.response.statusCode} - ${e.response.statusMessage}`;
throw new ServerError(SERR_WEB_SERVER, sError);
}
//Выполняем обработчик "После" (если он есть)
if (prms.function.sAppSrvAfter) {
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
let resAfter = null;
try {
let resAfterPrms = _.cloneDeep(prms);
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfter = await fnAfter(resAfterPrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции постобработки
if (resAfter) {
let sCheckResult = validateObject(
resAfter,
objOutQueueProcessorSchema.OutQueueProcessorFnAfter,
"Результат функции постобработки исходящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Применим ответ "После" - обработанный ответ удаленного сервиса
if (!_.isUndefined(resAfter.blResp)) {
prms.queue.blResp = resAfter.blResp;
await dbConn.setQueueResp({
nQueueId: prms.queue.nId,
blResp: prms.queue.blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
}
//Применим ответ "После" - флаг утентификации сервиса
if (!_.isUndefined(resAfter.bUnAuth))
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Применим ответ "После" - контекст работы сервиса
if (!_.isUndefined(resAfter.sCtx))
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
prms.service.sCtx = resAfter.sCtx;
prms.service.dCtxExp = resAfter.dCtxExp;
await dbConn.setServiceContext({
nServiceId: prms.service.nId,
sCtx: prms.service.sCtx,
dCtxExp: prms.service.dCtxExp
//Выполняем обработчик "После" (если он есть)
if (prms.function.sAppSrvAfter) {
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
let resAfter = null;
try {
let resAfterPrms = _.cloneDeep(prms);
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfter = await fnAfter(resAfterPrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции постобработки
if (resAfter) {
let sCheckResult = validateObject(
resAfter,
objOutQueueProcessorSchema.OutQueueProcessorFnAfter,
"Результат функции постобработки исходящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Применим ответ "После" - обработанный ответ удаленного сервиса
if (!_.isUndefined(resAfter.blResp)) {
prms.queue.blResp = resAfter.blResp;
await dbConn.setQueueResp({
nQueueId: prms.queue.nId,
blResp: prms.queue.blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
bCtxIsSet = true;
}
} else {
//Или расскажем об ошибке в структуре ответа
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
//Применим ответ "После" - флаг утентификации сервиса
if (!_.isUndefined(resAfter.bUnAuth))
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
//Применим ответ "После" - контекст работы сервиса
if (!_.isUndefined(resAfter.sCtx))
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
prms.service.sCtx = resAfter.sCtx;
prms.service.dCtxExp = resAfter.dCtxExp;
await dbConn.setServiceContext({
nServiceId: prms.service.nId,
sCtx: prms.service.sCtx,
dCtxExp: prms.service.dCtxExp
});
bCtxIsSet = true;
}
} else {
//Или расскажем об ошибке в структуре ответа
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
}
//Если это функция начала сеанса, и нет обработчика на стороне БД и контекст не был установлен до сих пор - то положим в него то, что нам ответил сервер
if (
prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN &&
!prms.function.sPrcResp &&
!bCtxIsSet
) {
await dbConn.setServiceContext({ nServiceId: prms.service.nId, sCtx: serverResp });
}
//Если это функция окончания сеанса, и нет обработчика на стороне БД - то сбросим контекст здесь
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGOUT && !prms.function.sPrcResp) {
await dbConn.clearServiceContext({ nServiceId: prms.service.nId });
//Если это функция начала сеанса, и нет обработчика на стороне БД и контекст не был установлен до сих пор - то положим в него то, что нам ответил сервер
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN && !prms.function.sPrcResp && !bCtxIsSet) {
await dbConn.setServiceContext({ nServiceId: prms.service.nId, sCtx: serverResp });
}
//Если это функция окончания сеанса, и нет обработчика на стороне БД - то сбросим контекст здесь
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGOUT && !prms.function.sPrcResp) {
await dbConn.clearServiceContext({ nServiceId: prms.service.nId });
}
}
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения
res = await dbConn.setQueueState({
@ -383,10 +372,9 @@ const appProcess = async prms => {
});
}
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
} else {
//Фатальная ошибка обработки - некорректный объект параметров
@ -401,11 +389,7 @@ const dbProcess = async prms => {
//Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки)
let res = null;
//Проверяем структуру переданного объекта для старта
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.dbProcess,
"Параметры функции запуска обработки ообщения сервером БД"
);
let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.dbProcess, "Параметры функции запуска обработки ообщения сервером БД");
//Если структура объекта в норме
if (!sCheckResult) {
//Обрабатываем
@ -417,9 +401,9 @@ const dbProcess = async prms => {
});
//Фиксируем начало исполнения сервером БД - в протоколе работы сервиса
await logger.info(
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${
prms.queue.sServiceFnCode
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${
prms.queue.sExecState
}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId }
);
//Если обработчик со стороны БД указан
@ -427,11 +411,9 @@ const dbProcess = async prms => {
//Вызываем его
let prcRes = await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
//Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR)
throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH)
throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
}
//Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения
res = await dbConn.setQueueState({
@ -466,10 +448,9 @@ const dbProcess = async prms => {
});
}
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
await logger.error(
`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`,
{ nQueueId: prms.queue.nId }
);
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`, {
nQueueId: prms.queue.nId
});
}
} else {
//Фатальная ошибка обработки - некорректный объект параметров
@ -482,11 +463,7 @@ const dbProcess = async prms => {
//Обработка задачи
const processTask = async prms => {
//Проверяем параметры
let sCheckResult = validateObject(
prms,
prmsOutQueueProcessorSchema.processTask,
"Параметры функции обработки задачи"
);
let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.processTask, "Параметры функции обработки задачи");
//Если параметры в норме
if (!sCheckResult) {
let q = null;
@ -629,11 +606,7 @@ process.on("uncaughtException", e => {
//Приём сообщений
process.on("message", task => {
//Проверяем структуру переданного сообщения
let sCheckResult = validateObject(
task,
objOutQueueProcessorSchema.OutQueueProcessorTask,
"Задача обработчика очереди исходящих сообщений"
);
let sCheckResult = validateObject(task, objOutQueueProcessorSchema.OutQueueProcessorTask, "Задача обработчика очереди исходящих сообщений");
//Если структура объекта в норме
if (!sCheckResult) {
//Запускаем обработку

View File

@ -197,7 +197,9 @@ const sendMail = prms => {
from: prms.mail.sFrom,
to: prms.sTo,
subject: prms.sSubject,
text: prms.sMessage
text: prms.sMessage,
html: prms.sHTML,
attachments: prms.attachments
};
//Отправляем сообщение
transporter.sendMail(mailOptions, (error, info) => {

View File

@ -145,6 +145,16 @@ exports.OutQueueProcessorFnBefore = new Schema({
type: path => `Дата истечения контекста (${path}) имеет некорректный тип данных (ожидалось - Date)`,
required: path => `Не указана дата истечения контекста (${path})`
}
},
//Флаг прекращения дальнейшей обработки сообщения
bStopPropagation: {
type: Boolean,
required: false,
message: {
type: path =>
`Флаг прекращения дальнейшей обработки сообщения (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан флаг прекращения дальнейшей обработки сообщения (${path})`
}
}
});

View File

@ -40,8 +40,7 @@ exports.sendMail = new Schema({
required: true,
use: { validateTo },
message: {
type: path =>
`Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`,
type: path => `Список адресов E-Mail для отправки уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан cписок адресов E-Mail для отправки уведомления (${path})`,
validateTo: path =>
`Неверный формат списка адресов E-Mail для отправки уведомления (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)`
@ -50,7 +49,7 @@ exports.sendMail = new Schema({
//Заголовок сообщения
sSubject: {
type: String,
required: true,
required: false,
message: {
type: path => `Заголовок сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан заголовок сообщения (${path})`
@ -59,11 +58,29 @@ exports.sendMail = new Schema({
//Текст уведомления
sMessage: {
type: String,
required: true,
required: false,
message: {
type: path => `Текст уведомления (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан текст уведомления (${path})`
}
},
//HTML текст сообщения
sHTML: {
type: String,
required: false,
message: {
type: path => `HTML текст сообщения (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан HTML текст сообщения (${path})`
}
},
//Вложения сообщения
attachments: {
type: Array,
required: false,
message: {
type: path => `Список вложений сообщения (${path}) имеет некорректный тип данных (ожидалось - Array)`,
required: path => `Не указан список вложений сообщения (${path})`
}
}
});
@ -141,8 +158,7 @@ exports.buildOptionsXML = new Schema({
type: Object,
required: true,
message: {
type: path =>
`Объект параметров сообщения/ответа (${path}) имеет некорректный тип данных (ожидалось - Object)`,
type: path => `Объект параметров сообщения/ответа (${path}) имеет некорректный тип данных (ожидалось - Object)`,
required: path => `Не указан объект параметров сообщения/ответа (${path})`
}
}

175
modules/send_mail.js Normal file
View File

@ -0,0 +1,175 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Дополнительный модуль: Рассылка E-Mail (MAIL)
*/
//------------------------------
// Подключение внешних библиотек
//------------------------------
const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML
const cfg = require("./../config"); //Настройки сервера приложений
const { makeErrorText, sendMail } = require("./../core/utils"); //Вспомогательные функции
const oracledb = require("oracledb"); //Работа с СУБД Oracle
//---------------------
// Глобальные константы
//---------------------
//Статусы отправки
const NSTATUS_ERR = 2;
const NSTATUS_DONE = 3;
//------------
// Тело модуля
//------------
//Чтение данных из курсора
const readCursorData = cursor => {
return new Promise((resolve, reject) => {
let queryStream = cursor.toQueryStream();
let rows = [];
queryStream.on("data", row => {
rows.push(row);
});
queryStream.on("error", err => {
reject(new Error(err.message));
});
queryStream.on("close", () => {
resolve(rows);
});
});
};
//Установка статуса отправки
const setSendMsg = async prms => {
let pooledConnection;
try {
pooledConnection = await prms.connection.getConnection();
await pooledConnection.execute(
"begin PKG_EXS_EXT_MAIL.EXSEXTMAIL_SET_STATUS(NRN => :NRN, SERR_TEXT => :SERR_TEXT, NSTATUS => :NSTATUS); end;",
{ NRN: prms.nRn, SERR_TEXT: prms.sErrMsg, NSTATUS: prms.nStatus },
{ autoCommit: true }
);
} catch (e) {
throw new Error(e.message);
} finally {
if (pooledConnection) {
try {
await pooledConnection.close();
} catch (e) {
throw new Error(e.message);
}
}
}
};
//Считывание записей прикладываемых документов
const getMailAttach = async prms => {
let pooledConnection;
try {
pooledConnection = await prms.connection.getConnection();
let res = await pooledConnection.execute(
"begin PKG_EXS_EXT_MAIL.GET_ATTACH(NIDENT => :NIDENT, RCDOCUMENTS => :RCDOCUMENTS); end;",
{
NIDENT: prms.nIdent,
RCDOCUMENTS: { type: oracledb.CURSOR, dir: oracledb.BIND_OUT }
},
{ outFormat: oracledb.OBJECT, autoCommit: true }
);
let rows = await readCursorData(res.outBinds.RCDOCUMENTS);
let rowsRes = [];
//Если результат запроса не пустой
if (rows.length !== 0) {
//Переводим BLOB в BUFFER и формируем формат аттача
for (let i = 0; i < rows.length; i++) {
let rowContent = await rows[i].BDATA.getData();
rowsRes.push({
filename: rows[i].FILENAME,
content: rowContent
});
}
}
return rowsRes;
} catch (e) {
throw new Error(e.message);
} finally {
if (pooledConnection) {
try {
await pooledConnection.close();
} catch (e) {
throw new Error(e.message);
}
}
}
};
//Разбор XML
const parseXML = xmlDoc => {
return new Promise((resolve, reject) => {
xml2js.parseString(xmlDoc, { explicitArray: false, mergeAttrs: true }, (err, result) => {
if (err) reject(err);
else resolve(result);
});
});
};
//Обработчик "До" для исходящего сообщения
const before = async prms => {
//Инициализируем переменные
let res = "OK";
let parseRes = null;
//Разбираем параметры отправки
try {
//Формируем объект на основании XML
parseRes = await parseXML(prms.queue.blMsg.toString());
//Если есть присоединенные файлы - добавляем их
if (parseRes.mail.ident) {
parseRes.mail.attachments = await getMailAttach({ connection: prms.dbConn.connection, nIdent: parseRes.mail.ident });
}
//Если указан текст в обычном формате
if (parseRes.mail.text) {
parseRes.mail.text = Buffer.from(parseRes.mail.text, "base64").toString("utf-8");
}
//Если указан текст в формате HTML
if (parseRes.mail.html) {
parseRes.mail.html = Buffer.from(parseRes.mail.html, "base64").toString("utf-8");
}
} catch (e) {
parseRes = prms.queue.blMsg.toString();
res = `Ошибка разбора параметров отправки: ${makeErrorText(e)}`;
}
if (res === "OK") {
try {
await sendMail({
mail: cfg.mail,
sTo: parseRes.mail.to,
sSubject: parseRes.mail.title,
sMessage: parseRes.mail.text,
sHTML: parseRes.mail.html,
attachments: parseRes.mail.attachments
});
} catch (e) {
res = `Ошибка отправки E-Mail сообщения: ${makeErrorText(e)}`;
}
}
//Если имеется рег. номер записи очереди отправки E-mail - обновляем информацию о текущем сообщении
if (parseRes.mail.nExsextmailId) {
if (res === "OK") {
await setSendMsg({ connection: prms.dbConn.connection, nRn: parseRes.mail.nExsextmailId, sErrMsg: "", nStatus: NSTATUS_DONE });
} else {
await setSendMsg({ connection: prms.dbConn.connection, nRn: parseRes.mail.nExsextmailId, sErrMsg: res, nStatus: NSTATUS_ERR });
}
}
//Возвращаем результат и флаг того, что дальше отрабатывать это сообщение не надо
return {
blMsg: Buffer.from(JSON.stringify({ message: parseRes, state: res })),
bStopPropagation: true
};
};
//-----------------
// Интерфейс модуля
//-----------------
exports.before = before;