760 lines
47 KiB
JavaScript
760 lines
47 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: обработчик исходящего сообщения
|
||
*/
|
||
|
||
//----------------------
|
||
// Подключение библиотек
|
||
//----------------------
|
||
|
||
require("module-alias/register"); //Поддержка псевонимов при подключении модулей
|
||
const _ = require("lodash"); //Работа с массивами и объектами
|
||
const rqp = require("request-promise"); //Работа с HTTP/HTTPS запросами
|
||
const lg = require("./logger"); //Протоколирование работы
|
||
const db = require("./db_connector"); //Взаимодействие с БД
|
||
const {
|
||
makeErrorText,
|
||
validateObject,
|
||
getAppSrvFunction,
|
||
buildURL,
|
||
parseOptionsXML,
|
||
buildOptionsXML,
|
||
deepMerge,
|
||
getKafkaConnectionSettings,
|
||
getMQTTConnectionSettings,
|
||
getKafkaBroker,
|
||
getKafkaAuth,
|
||
getURLProtocol,
|
||
wrapPromiseTimeout
|
||
} = require("./utils"); //Вспомогательные функции
|
||
const { ServerError } = require("./server_errors"); //Типовая ошибка
|
||
const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений
|
||
const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля
|
||
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
|
||
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
|
||
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
|
||
const {
|
||
SERR_UNEXPECTED,
|
||
SERR_OBJECT_BAD_INTERFACE,
|
||
SERR_APP_SERVER_BEFORE,
|
||
SERR_APP_SERVER_AFTER,
|
||
SERR_DB_SERVER,
|
||
SERR_WEB_SERVER,
|
||
SERR_UNAUTH
|
||
} = require("./constants"); //Глобальные константы
|
||
const { NINC_EXEC_CNT_YES, NINC_EXEC_CNT_NO, NIS_ORIGINAL_NO, NIS_ORIGINAL_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
|
||
const { publishMQTT } = require("./mqtt_connector"); //Работа с MQTT/MQTTS запросами
|
||
const { publishKafka } = require("./kafka_connector"); //Работа с Kafka запросами
|
||
|
||
//--------------------------
|
||
// Глобальные идентификаторы
|
||
//--------------------------
|
||
|
||
let dbConn = null; //Подключение к БД
|
||
let logger = null; //Протоколирование работы
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Отправка родительскому процессу ошибки обработки сообщения сервером приложений
|
||
const sendErrorResult = prms => {
|
||
//Проверяем структуру переданного сообщения
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueProcessorSchema.sendErrorResult,
|
||
"Параметры функции отправки родительскому процессу ошибки обработки сообщения"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
process.send({
|
||
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
|
||
sMsg: prms.sMessage
|
||
});
|
||
} else {
|
||
process.send({
|
||
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
|
||
sMsg: sCheckResult
|
||
});
|
||
}
|
||
};
|
||
|
||
//Отправка родительскому процессу успеха обработки сообщения сервером приложений
|
||
const sendOKResult = () => {
|
||
process.send({
|
||
sResult: objOutQueueProcessorSchema.STASK_RESULT_OK,
|
||
sMsg: null
|
||
});
|
||
};
|
||
|
||
//Отправка родительскому процессу успеха обработки сообщения сервером приложений
|
||
const sendUnAuthResult = () => {
|
||
process.send({
|
||
sResult: objOutQueueProcessorSchema.STASK_RESULT_UNAUTH,
|
||
sMsg: null
|
||
});
|
||
};
|
||
|
||
//Запуск обработки сообщения сервером приложений
|
||
const appProcess = async prms => {
|
||
//Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки)
|
||
let res = null;
|
||
//Проверяем структуру переданного объекта для старта
|
||
let sCheckResult = validateObject(
|
||
prms,
|
||
prmsOutQueueProcessorSchema.appProcess,
|
||
"Параметры функции запуска обработки ообщения сервером приложений"
|
||
);
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Запоминаем текущий статус сообщения
|
||
let nOldExecState = prms.queue.nExecState;
|
||
//Обрабатываем
|
||
try {
|
||
//Считываем статус аутентификации сервиса
|
||
let isServiceAuth = await dbConn.isServiceAuth({ nServiceId: prms.service.nId });
|
||
//Проверяем аутентификацию
|
||
if (
|
||
prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_NO ||
|
||
(prms.function.nAuthOnly == objServiceFnSchema.NAUTH_ONLY_YES && isServiceAuth == objServiceSchema.NIS_AUTH_YES)
|
||
) {
|
||
//Фиксируем начало исполнения сервером приложений - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
||
});
|
||
//Фиксируем начало исполнения сервером приложений - в протоколе работы сервиса
|
||
await logger.info(
|
||
`Обрабатываю исходящее сообщение сервером приложений: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${
|
||
prms.queue.sExecState
|
||
}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
|
||
{ nQueueId: prms.queue.nId }
|
||
);
|
||
//Считаем тело сообщения
|
||
let qData = await dbConn.getQueueMsg({ nQueueId: prms.queue.nId });
|
||
//Считаем контекст сервиса
|
||
let serviceCtx = await dbConn.getServiceContext({ nServiceId: prms.service.nId });
|
||
//Флаг установленности контекста для функции начала сеанса
|
||
let bCtxIsSet = false;
|
||
//Кладём данные тела в объект сообщения и инициализируем поле для ответа
|
||
_.extend(prms.queue, { blMsg: qData.blMsg, blResp: null });
|
||
//Кладём данные контекста в сервис
|
||
_.extend(prms.service, serviceCtx);
|
||
//Собираем параметры для передачи серверу
|
||
let options = { method: prms.function.sFnPrmsType, encoding: null };
|
||
//Инициализируем параметры ответа сервера
|
||
let optionsResp = {};
|
||
//Флаг прекращения обработки сообщения
|
||
let bStopPropagation = false;
|
||
//Флаг выполнения обработчика "До"
|
||
let bExecuteBefore = true;
|
||
//Флаг выполнения обработчика "После"
|
||
let bExecuteAfter = true;
|
||
//Считываем протокол работы
|
||
let sProtocol = getURLProtocol(prms.service.sSrvRoot);
|
||
//Исходя из протокола собираем параметры
|
||
switch (true) {
|
||
//Kafka
|
||
case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
|
||
options.url = getKafkaBroker(prms.service.sSrvRoot);
|
||
options.body = prms.queue.blMsg;
|
||
options.topic = prms.function.sFnURL;
|
||
options.settings = getKafkaConnectionSettings(prms.service.sCode, prms.kafka);
|
||
options.auth = getKafkaAuth(prms.service.sSrvUser, prms.service.sSrvPass, options.settings);
|
||
//Если параметры подключения не считаны
|
||
if (!options.settings) {
|
||
//Расскажем об ошибке считывания
|
||
throw new ServerError(
|
||
SERR_UNEXPECTED,
|
||
`Ошибка получения настроек подключения к Kafka для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("kafka") файла конфигурации сервиса приложений ("config.js").`
|
||
);
|
||
}
|
||
//Указываем, что выполнение обработчика "После" невозможно
|
||
bExecuteAfter = false;
|
||
break;
|
||
//MQTT/MQTTS
|
||
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
|
||
options.url = prms.service.sSrvRoot;
|
||
options.body = prms.queue.blMsg;
|
||
options.topic = prms.function.sFnURL;
|
||
options.auth = { user: prms.service.sSrvUser, pass: prms.service.sSrvPass };
|
||
options.settings = getMQTTConnectionSettings(prms.service.sCode, prms.mqtt);
|
||
//Если параметры подключения не считаны
|
||
if (!options.settings) {
|
||
//Расскажем об ошибке считывания
|
||
throw new ServerError(
|
||
SERR_UNEXPECTED,
|
||
`Ошибка получения настроек подключения к MQTT для сервиса "${prms.service.sCode}". Необходимо проверить соответствующий параметр ("mqtt") файла конфигурации сервиса приложений ("config.js").`
|
||
);
|
||
}
|
||
//Указываем, что выполнение обработчика "После" невозможно
|
||
bExecuteAfter = false;
|
||
break;
|
||
//HTTP/HTTPS
|
||
default:
|
||
//Определимся с URL и телом сообщения в зависимости от способа передачи параметров (для POST, PATCH и PUT - данные в теле, для остальных - в URI)
|
||
if (
|
||
[
|
||
objServiceFnSchema.NFN_PRMS_TYPE_POST,
|
||
objServiceFnSchema.NFN_PRMS_TYPE_PATCH,
|
||
objServiceFnSchema.NFN_PRMS_TYPE_PUT
|
||
].includes(prms.function.nFnPrmsType)
|
||
) {
|
||
options.url = buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL });
|
||
options.body = prms.queue.blMsg;
|
||
options.headers = { "content-type": "application/octet-stream" };
|
||
} else {
|
||
options.url = buildURL({
|
||
sSrvRoot: prms.service.sSrvRoot,
|
||
sFnURL: prms.function.sFnURL,
|
||
sQuery: prms.queue.blMsg === null ? "" : prms.queue.blMsg.toString()
|
||
});
|
||
}
|
||
break;
|
||
}
|
||
// Если у сервиса указан прокси, либо у приложения установлен глобальный прокси
|
||
if (prms.service.sProxyURL || prms.sProxy) {
|
||
// Добавляем прокси с приоритетом сервиса
|
||
options.proxy = prms.service.sProxyURL ?? prms.sProxy;
|
||
}
|
||
//Дополним получившиеся параметры переданными в сообщении
|
||
if (prms.queue.sOptions) {
|
||
try {
|
||
let optionsTmp = await parseOptionsXML({ sOptions: prms.queue.sOptions });
|
||
options = deepMerge(options, optionsTmp);
|
||
//При конвертации XML -> JSON пустые тэги приходят как "", а в encoding нужен или null, или правильная кодировка
|
||
if (options.encoding === "") options.encoding = null;
|
||
} catch (e) {
|
||
await logger.warn(
|
||
`Указанные для сообщения параметры имеют некорректный формат - использую параметры по умолчанию. Ошибка парсера: ${makeErrorText(
|
||
e
|
||
)}`,
|
||
{ nQueueId: prms.queue.nId }
|
||
);
|
||
}
|
||
}
|
||
//Выполняем обработчик "До" (если он есть)
|
||
if (prms.function.sAppSrvBefore && bExecuteBefore) {
|
||
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
|
||
let resBefore = null;
|
||
try {
|
||
let resBeforePrms = _.cloneDeep(prms);
|
||
resBeforePrms.options = _.cloneDeep(options);
|
||
resBeforePrms.dbConn = dbConn;
|
||
resBefore = await fnBefore(resBeforePrms);
|
||
} catch (e) {
|
||
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
|
||
}
|
||
//Проверяем структуру ответа функции предобработки
|
||
if (resBefore) {
|
||
let sCheckResult = validateObject(
|
||
resBefore,
|
||
objOutQueueProcessorSchema.OutQueueProcessorFnBefore,
|
||
"Результат функции предобработки исходящего сообщения"
|
||
);
|
||
//Если структура ответа в норме
|
||
if (!sCheckResult) {
|
||
//Применим ответ "До" - обработанное сообщение очереди
|
||
if (!_.isUndefined(resBefore.blMsg)) {
|
||
prms.queue.blMsg = resBefore.blMsg;
|
||
await dbConn.setQueueMsg({
|
||
nQueueId: prms.queue.nId,
|
||
blMsg: prms.queue.blMsg
|
||
});
|
||
if (
|
||
[
|
||
objServiceFnSchema.NFN_PRMS_TYPE_POST,
|
||
objServiceFnSchema.NFN_PRMS_TYPE_PATCH,
|
||
objServiceFnSchema.NFN_PRMS_TYPE_PUT
|
||
].includes(prms.function.nFnPrmsType) ||
|
||
[objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
|
||
sProtocol
|
||
)
|
||
) {
|
||
options.body = prms.queue.blMsg;
|
||
} else {
|
||
options.url = buildURL({
|
||
sSrvRoot: prms.service.sSrvRoot,
|
||
sFnURL: prms.function.sFnURL,
|
||
sQuery: prms.queue.blMsg === null ? "" : prms.queue.blMsg.toString()
|
||
});
|
||
}
|
||
}
|
||
//Применим ответ "До" - параметры отправки сообщения удаленному серверу
|
||
if (!_.isUndefined(resBefore.options)) options = deepMerge(options, resBefore.options);
|
||
//Применим ответ "До" - флаг отсуствия аутентификации
|
||
if (!_.isUndefined(resBefore.bUnAuth))
|
||
if (resBefore.bUnAuth === true) {
|
||
throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
||
}
|
||
//Применим ответ "До" - контекст работы сервиса
|
||
if (!_.isUndefined(resBefore.sCtx))
|
||
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
|
||
prms.service.sCtx = resBefore.sCtx;
|
||
prms.service.dCtxExp = resBefore.dCtxExp;
|
||
await dbConn.setServiceContext({
|
||
nServiceId: prms.service.nId,
|
||
sCtx: prms.service.sCtx,
|
||
dCtxExp: prms.service.dCtxExp
|
||
});
|
||
bCtxIsSet = true;
|
||
}
|
||
//Применим ответ "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
|
||
if (!_.isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
|
||
} else {
|
||
//Или расскажем об ошибке в структуре ответа
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
}
|
||
//Если флаг прекращения обработки сообщения не установлен
|
||
if (bStopPropagation === false) {
|
||
//Фиксируем отправку сообщения в протоколе работы сервиса
|
||
await logger.info(`Отправляю исходящее сообщение ${prms.queue.nId} на URL: ${options.url}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
//Отправляем сообщение удалённому серверу
|
||
try {
|
||
//Сохраняем параметры с которыми уходило сообщение
|
||
try {
|
||
let tmpOptions = _.cloneDeep(options);
|
||
//Исключим из параметров заведомо бинарные поля (их сохранение не предусмотрено)
|
||
delete tmpOptions.body;
|
||
delete tmpOptions.cert;
|
||
delete tmpOptions.key;
|
||
delete tmpOptions.auth;
|
||
//Конвертируем в XML
|
||
let sOptions = buildOptionsXML({ options: tmpOptions });
|
||
//Сохраняемв БД
|
||
await dbConn.setQueueOptions({ nQueueId: prms.queue.nId, sOptions });
|
||
} catch (e) {
|
||
await logger.warn(`Не удалось сохранить параметры отправки сообщения: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
//Инициализируем ответ от сервера
|
||
let serverResp = null;
|
||
//Выполняем отправку исходя из протокола
|
||
switch (true) {
|
||
//Kafka
|
||
case sProtocol === objServiceSchema.SPROTOCOL_KAFKA:
|
||
serverResp = await publishKafka({
|
||
settings: options.settings,
|
||
url: options.url,
|
||
auth: options.auth,
|
||
topic: options.topic,
|
||
message: options.body
|
||
});
|
||
break;
|
||
//MQTT/MQTTS
|
||
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
|
||
serverResp = await publishMQTT({
|
||
settings: options.settings,
|
||
url: options.url,
|
||
auth: options.auth,
|
||
topic: options.topic,
|
||
message: options.body
|
||
});
|
||
break;
|
||
//HTTP/HTTPS
|
||
default:
|
||
//Установим флаг возврата полного ответа (и тела и заголовков)
|
||
options.resolveWithFullResponse = true;
|
||
//Установим таймаут подключения (если задан в настройках функции и ещё не задан в параметрах сообщения)
|
||
if (prms.function.nTimeoutConn && !options.timeout) options.timeout = prms.function.nTimeoutConn;
|
||
//Отправляем запрос
|
||
serverResp = prms.function.nTimeoutAsynch
|
||
? await wrapPromiseTimeout(prms.function.nTimeoutAsynch, rqp(options))
|
||
: await rqp(options);
|
||
break;
|
||
}
|
||
//Сохраняем полученный ответ
|
||
prms.queue.blResp = Buffer.from(serverResp.body || "");
|
||
await dbConn.setQueueResp({
|
||
nQueueId: prms.queue.nId,
|
||
blResp: prms.queue.blResp,
|
||
nIsOriginal: NIS_ORIGINAL_YES
|
||
});
|
||
//Сохраняем заголовки ответа и HTTP-статус
|
||
optionsResp.headers = _.cloneDeep(serverResp.headers);
|
||
optionsResp.statusCode = serverResp.statusCode;
|
||
try {
|
||
let sOptionsResp = buildOptionsXML({ options: optionsResp });
|
||
await dbConn.setQueueOptionsResp({ nQueueId: prms.queue.nId, sOptionsResp });
|
||
} catch (e) {
|
||
await logger.warn(`Не удалось сохранить заголовок ответа удалённого сервера: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
} catch (e) {
|
||
//Прекращаем исполнение если были ошибки
|
||
let sError = "Неожиданная ошибка удалённого сервиса";
|
||
if (e.error) {
|
||
let sSubError = e.error.code || e.error;
|
||
sError = `Ошибка передачи данных: ${sSubError}`;
|
||
}
|
||
if (e.response) sError = `${e.response.statusCode} - ${e.response.statusMessage}`;
|
||
throw new ServerError(SERR_WEB_SERVER, sError);
|
||
}
|
||
//Выполняем обработчик "После" (если он есть)
|
||
if (prms.function.sAppSrvAfter && bExecuteAfter) {
|
||
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
|
||
let resAfter = null;
|
||
try {
|
||
let resAfterPrms = _.cloneDeep(prms);
|
||
resAfterPrms.options = _.cloneDeep(options);
|
||
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
|
||
resAfter = await fnAfter(resAfterPrms);
|
||
} catch (e) {
|
||
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
|
||
}
|
||
//Проверяем структуру ответа функции постобработки
|
||
if (resAfter) {
|
||
let sCheckResult = validateObject(
|
||
resAfter,
|
||
objOutQueueProcessorSchema.OutQueueProcessorFnAfter,
|
||
"Результат функции постобработки исходящего сообщения"
|
||
);
|
||
//Если структура ответа в норме
|
||
if (!sCheckResult) {
|
||
//Применим ответ "После" - обработанный ответ удаленного сервиса
|
||
if (!_.isUndefined(resAfter.blResp)) {
|
||
prms.queue.blResp = resAfter.blResp;
|
||
await dbConn.setQueueResp({
|
||
nQueueId: prms.queue.nId,
|
||
blResp: prms.queue.blResp,
|
||
nIsOriginal: NIS_ORIGINAL_NO
|
||
});
|
||
}
|
||
//Применим ответ "После" - флаг утентификации сервиса
|
||
if (!_.isUndefined(resAfter.bUnAuth))
|
||
if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
||
//Применим ответ "После" - контекст работы сервиса
|
||
if (!_.isUndefined(resAfter.sCtx))
|
||
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN) {
|
||
prms.service.sCtx = resAfter.sCtx;
|
||
prms.service.dCtxExp = resAfter.dCtxExp;
|
||
await dbConn.setServiceContext({
|
||
nServiceId: prms.service.nId,
|
||
sCtx: prms.service.sCtx,
|
||
dCtxExp: prms.service.dCtxExp
|
||
});
|
||
bCtxIsSet = true;
|
||
}
|
||
} else {
|
||
//Или расскажем об ошибке в структуре ответа
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
}
|
||
}
|
||
//Если это функция начала сеанса, и нет обработчика на стороне БД и контекст не был установлен до сих пор - то положим в него то, что нам ответил сервер
|
||
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGIN && !prms.function.sPrcResp && !bCtxIsSet) {
|
||
await dbConn.setServiceContext({ nServiceId: prms.service.nId, sCtx: serverResp });
|
||
}
|
||
//Если это функция окончания сеанса, и нет обработчика на стороне БД - то сбросим контекст здесь
|
||
if (prms.function.nFnType == objServiceFnSchema.NFN_TYPE_LOGOUT && !prms.function.sPrcResp) {
|
||
await dbConn.clearServiceContext({ nServiceId: prms.service.nId });
|
||
}
|
||
}
|
||
//Фиксируем успешное исполнение сервером приложений - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
||
});
|
||
//Фиксируем успешное исполнение сервером приложений - в протоколе работы сервиса
|
||
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
} else {
|
||
//Нет атуентификации
|
||
throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
||
}
|
||
} catch (e) {
|
||
//Если была ошибка аутентификации - возвращаем старый статус не меняя количества попыток
|
||
if (e instanceof ServerError && e.sCode == SERR_UNAUTH) {
|
||
await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nExecState: nOldExecState,
|
||
nResetData: objQueueSchema.NQUEUE_RESET_DATA_YES
|
||
});
|
||
res = e;
|
||
} else {
|
||
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nResetData:
|
||
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
|
||
? objQueueSchema.NQUEUE_RESET_DATA_YES
|
||
: objQueueSchema.NQUEUE_RESET_DATA_NO,
|
||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
||
nExecState:
|
||
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
|
||
? objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR
|
||
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
}
|
||
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
|
||
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером приложений: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
} else {
|
||
//Фатальная ошибка обработки - некорректный объект параметров
|
||
res = new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
//Возвращаем результат
|
||
return res;
|
||
};
|
||
|
||
//Запуск обработки сообщения сервером БД
|
||
const dbProcess = async prms => {
|
||
//Результат обработки - объект Queue (обработанное сообщение) или ServerError (ошибка обработки)
|
||
let res = null;
|
||
//Проверяем структуру переданного объекта для старта
|
||
let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.dbProcess, "Параметры функции запуска обработки ообщения сервером БД");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Обрабатываем
|
||
try {
|
||
//Фиксируем начало исполнения сервером БД - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
|
||
});
|
||
//Фиксируем начало исполнения сервером БД - в протоколе работы сервиса
|
||
await logger.info(
|
||
`Обрабатываю исходящее сообщение сервером БД: ${prms.queue.nId}, ${prms.queue.sInDate}, ${prms.queue.sServiceFnCode}, ${
|
||
prms.queue.sExecState
|
||
}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
|
||
{ nQueueId: prms.queue.nId }
|
||
);
|
||
//Если обработчик со стороны БД указан
|
||
if (prms.function.sPrcResp) {
|
||
//Вызываем его
|
||
let prcRes = await dbConn.execQueueDBPrc({ nQueueId: prms.queue.nId });
|
||
//Если результат - ошибка пробрасываем её
|
||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
|
||
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
|
||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
|
||
}
|
||
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
|
||
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
//Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
||
});
|
||
} catch (e) {
|
||
//Если была ошибка аутентификации - возвращаем на повторную обработку сервером приложений
|
||
if (e instanceof ServerError && e.sCode == SERR_UNAUTH) {
|
||
await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE,
|
||
nResetData: objQueueSchema.NQUEUE_RESET_DATA_YES
|
||
});
|
||
res = e;
|
||
} else {
|
||
//Фиксируем ошибку обработки сервером БД - в статусе сообщения
|
||
res = await dbConn.setQueueState({
|
||
nQueueId: prms.queue.nId,
|
||
sExecMsg: makeErrorText(e),
|
||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
||
nExecState:
|
||
prms.queue.nExecCnt + 1 < prms.queue.nRetryAttempts
|
||
? objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR
|
||
: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
}
|
||
//Фиксируем ошибку обработки сервером БД - в протоколе работы сервиса
|
||
await logger.error(`Ошибка обработки исходящего сообщения ${prms.queue.nId} сервером БД: ${makeErrorText(e)}`, {
|
||
nQueueId: prms.queue.nId
|
||
});
|
||
}
|
||
} else {
|
||
//Фатальная ошибка обработки - некорректный объект параметров
|
||
res = new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
//Возвращаем результат
|
||
return res;
|
||
};
|
||
|
||
//Обработка задачи
|
||
const processTask = async prms => {
|
||
//Проверяем параметры
|
||
let sCheckResult = validateObject(prms, prmsOutQueueProcessorSchema.processTask, "Параметры функции обработки задачи");
|
||
//Если параметры в норме
|
||
if (!sCheckResult) {
|
||
let q = null;
|
||
try {
|
||
//Создаём подключение к БД
|
||
dbConn = new db.DBConnector({ connectSettings: prms.task.connectSettings });
|
||
//Создаём логгер для протоколирования работы
|
||
logger = new lg.Logger();
|
||
//Подключим логгер к БД (и отключим когда надо)
|
||
dbConn.on(db.SEVT_DB_CONNECTOR_CONNECTED, connection => {
|
||
logger.setDBConnector(dbConn, true);
|
||
});
|
||
dbConn.on(db.SEVT_DB_CONNECTOR_DISCONNECTED, () => {
|
||
logger.removeDBConnector();
|
||
});
|
||
//Подключаемся к БД
|
||
await dbConn.connect({ bServerStart: false });
|
||
//Считываем запись очереди
|
||
q = await dbConn.getQueue({ nQueueId: prms.task.nQueueId });
|
||
//Далее работаем от статуса считанной записи
|
||
switch (q.nExecState) {
|
||
//Статусы "Поставлено в очередь" или "Ошибка обработки сервером приложений"
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE:
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR: {
|
||
//Если ещё не обрабатывали или есть ещё попытки отработки
|
||
if (q.nExecCnt == 0 || q.nExecCnt < q.nRetryAttempts) {
|
||
//Запускаем обработку сервером приложений
|
||
let res = await appProcess({
|
||
queue: q,
|
||
service: prms.task.service,
|
||
function: prms.task.function,
|
||
sProxy: prms.task.sProxy,
|
||
kafka: prms.task.kafka,
|
||
mqtt: prms.task.mqtt
|
||
});
|
||
//Если результат обработки ошибка - пробрасываем её дальше
|
||
if (res instanceof ServerError) {
|
||
throw res;
|
||
} else {
|
||
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД
|
||
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
|
||
//Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения
|
||
if (
|
||
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
|
||
getURLProtocol(prms.task.service.sSrvRoot)
|
||
)
|
||
) {
|
||
res = await dbProcess({ queue: res, function: prms.task.function });
|
||
//Если результат обработки ошибка - пробрасываем её дальше
|
||
if (res instanceof ServerError) throw res;
|
||
} else {
|
||
//Финализируем обработку
|
||
await dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
sExecMsg: null,
|
||
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
||
});
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
//Попыток нет - финализируем обработку
|
||
await dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
sExecMsg: q.sExecMsg,
|
||
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
}
|
||
break;
|
||
}
|
||
//Статусы "Успешно обработано сервером приложений" и "Ошибка обработки в БД"
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_APP_OK:
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_DB_ERR: {
|
||
//Если ещё есть попытки отработки
|
||
if (q.nExecCnt == 0 || q.nExecCnt < q.nRetryAttempts) {
|
||
//Снова запускаем обработку сервером БД
|
||
let res = await dbProcess({ queue: q, function: prms.task.function });
|
||
//Если результат обработки ошибка - пробрасываем её дальше
|
||
if (res instanceof ServerError) throw res;
|
||
} else {
|
||
//Попыток нет - финализируем обработку
|
||
await dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
sExecMsg: q.sExecMsg,
|
||
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
||
});
|
||
}
|
||
break;
|
||
}
|
||
//Успешно обработано в БД
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_DB_OK: {
|
||
//Финализируем
|
||
await dbConn.setQueueState({
|
||
nQueueId: q.nId,
|
||
sExecMsg: null,
|
||
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
||
});
|
||
break;
|
||
}
|
||
//Статусы "Обрабатывается сервером приложений", "Обрабатывается в БД", "Обработано с ошибками", "Обработано успешно"
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_APP:
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_DB:
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_ERR:
|
||
case objQueueSchema.NQUEUE_EXEC_STATE_OK: {
|
||
//Предупредим о неверном статусе сообщения (такие сюда попадать не должны)
|
||
await logger.warn(`Cообщение ${q.nId} в статусе ${q.sExecState} попало в очередь обработчика`, {
|
||
nQueueId: q.nId
|
||
});
|
||
break;
|
||
}
|
||
//Неипонятный статус
|
||
default: {
|
||
//Ничего не делаем
|
||
break;
|
||
}
|
||
}
|
||
//Отключаемся от БД
|
||
if (dbConn) await dbConn.disconnect({ bServerClose: false });
|
||
//Отправляем успех
|
||
sendOKResult();
|
||
} catch (e) {
|
||
//Отключаемся от БД
|
||
if (dbConn) await dbConn.disconnect({ bServerClose: false });
|
||
//Отправляем ошибку
|
||
if (e instanceof ServerError && e.sCode == SERR_UNAUTH) sendUnAuthResult();
|
||
else sendErrorResult({ sMessage: makeErrorText(e) });
|
||
}
|
||
} else {
|
||
sendErrorResult({ sMessage: sCheckResult });
|
||
}
|
||
};
|
||
|
||
//---------------------------------
|
||
// Управление процессом обработчика
|
||
//---------------------------------
|
||
|
||
//Перехват CTRL + C (останов процесса)
|
||
process.on("SIGINT", () => {});
|
||
|
||
//Перехват CTRL + \ (останов процесса)
|
||
process.on("SIGQUIT", () => {});
|
||
|
||
//Перехват мягкого останова процесса
|
||
process.on("SIGTERM", () => {
|
||
process.exit(0);
|
||
});
|
||
|
||
//Перехват ошибок
|
||
process.on("uncaughtException", e => {
|
||
//Отправляем ошибку родительскому процессу
|
||
sendErrorResult({ sMessage: makeErrorText(e) });
|
||
});
|
||
|
||
//Приём сообщений
|
||
process.on("message", task => {
|
||
//Проверяем структуру переданного сообщения
|
||
let sCheckResult = validateObject(task, objOutQueueProcessorSchema.OutQueueProcessorTask, "Задача обработчика очереди исходящих сообщений");
|
||
//Если структура объекта в норме
|
||
if (!sCheckResult) {
|
||
//Запускаем обработку
|
||
processTask({ task });
|
||
} else {
|
||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||
}
|
||
});
|