ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA)

This commit is contained in:
Dollerino 2024-09-26 14:25:51 +03:00
parent 6ad825c2b6
commit 418c52172f
5 changed files with 95 additions and 250 deletions

View File

@ -21,6 +21,8 @@ const {
buildOptionsXML, buildOptionsXML,
parseOptionsXML, parseOptionsXML,
deepMerge, deepMerge,
deepCopyObject,
isUndefined,
getKafkaConnectionSettings, getKafkaConnectionSettings,
getMQTTConnectionSettings, getMQTTConnectionSettings,
getURLProtocol getURLProtocol
@ -381,17 +383,17 @@ class InQueue extends EventEmitter {
} }
} }
//Обработка сообщения kafka //Обработка MQ сообщения
async processKafkaMessage({ message, service, fn }) { async processMQMessage({ message, service, fn, sProtocol }) {
//Буфер для сообщения очереди //Буфер для сообщения очереди
let q = null; let q = null;
try { try {
//Тело сообщения и ответ на него //Префикс протокола
let sProtocolPrefix = sProtocol === objServiceSchema.SPROTOCOL_KAFKA ? "Kafka" : "MQTT";
//Тело сообщения
let blMsg = null; let blMsg = null;
let blResp = null; //Параметры сообщения
//Параметры сообщения и ответа на него
let options = {}; let options = {};
let optionsResp = {};
//Флаг прекращения обработки сообщения //Флаг прекращения обработки сообщения
let bStopPropagation = false; let bStopPropagation = false;
//Получим тело сообщения //Получим тело сообщения
@ -399,7 +401,7 @@ class InQueue extends EventEmitter {
//Определимся с параметрами сообщения полученными от внешней системы //Определимся с параметрами сообщения полученными от внешней системы
options = { options = {
method: fn.sFnPrmsType, method: fn.sFnPrmsType,
headers: _.cloneDeep(message.headers) headers: message.headers
}; };
//Кладём сообщение в очередь //Кладём сообщение в очередь
q = await this.dbConn.putQueue({ q = await this.dbConn.putQueue({
@ -409,14 +411,65 @@ class InQueue extends EventEmitter {
}); });
//Скажем что пришло новое входящее сообщение //Скажем что пришло новое входящее сообщение
await this.logger.info( await this.logger.info(
`Новое входящее Kafka-сообщение для функции ${fn.sCode} (${buildURL({ `Новое входящее ${sProtocolPrefix}-сообщение для функции ${fn.sCode} (${buildURL({
sSrvRoot: service.sSrvRoot, sSrvRoot: service.sSrvRoot,
sFnURL: fn.sFnURL sFnURL: fn.sFnURL
})})`, })})`,
{ nQueueId: q.nId } { nQueueId: q.nId }
); );
//Выполняем обработчик "До" (если он есть)
if (fn.sAppSrvBefore) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnBefore = getAppSrvFunction(fn.sAppSrvBefore);
let resBefore = null;
try {
let resBeforePrms = { service: service, function: fn };
resBeforePrms.queue = deepCopyObject(q);
resBeforePrms.queue.blMsg = blMsg;
resBeforePrms.options = deepCopyObject(options);
resBeforePrms.dbConn = this.dbConn;
resBeforePrms.notifier = this.notifier;
resBefore = await fnBefore(resBeforePrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
}
//Проверяем структуру ответа функции предобработки
if (resBefore) {
let sCheckResult = validateObject(
resBefore,
objInQueueSchema.InQueueProcessorFnBefore,
"Результат функции предобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем результат исполнения "До" - обработанный запрос внешней системы
if (!isUndefined(resBefore.blMsg)) {
blMsg = resBefore.blMsg;
q = await this.dbConn.setQueueMsg({
nQueueId: q.nId,
blMsg
});
}
//Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Вызываем обработчик со стороны БД (если он есть) //Вызываем обработчик со стороны БД (если он есть)
if (fn.sPrcResp) { if (bStopPropagation === false && fn.sPrcResp) {
//Фиксируем начало исполнения сервером БД - в статусе сообщения //Фиксируем начало исполнения сервером БД - в статусе сообщения
q = await this.dbConn.setQueueState({ q = await this.dbConn.setQueueState({
nQueueId: q.nId, nQueueId: q.nId,
@ -426,233 +479,11 @@ class InQueue extends EventEmitter {
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Если результат - ошибка пробрасываем её //Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
//Выставим статус сообщению очереди - исполнено обработчиком БД //Выставим статус сообщению очереди - исполнено обработчиком БД
q = await this.dbConn.setQueueState({ q = await this.dbConn.setQueueState({
nQueueId: q.nId, nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
}); });
//Считаем ответ полученный от системы
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
blResp = qData.blResp;
}
//Выполняем обработчик "После" (если он есть)
if (bStopPropagation === false && fn.sAppSrvAfter) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
let resAfter = null;
try {
let resAfterPrms = { res: { body: blMsg }, service: service, function: fn };
resAfterPrms.queue = _.cloneDeep(q);
resAfterPrms.queue.blMsg = blMsg;
resAfterPrms.queue.blResp = blResp;
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfterPrms.dbConn = this.dbConn;
resAfterPrms.notifier = this.notifier;
resAfter = await fnAfter(resAfterPrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции предобработки
if (resAfter) {
let sCheckResult = validateObject(
resAfter,
objInQueueSchema.InQueueProcessorFnAfter,
"Результат функции постобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем результат исполнения "После" - ответ системы
if (!_.isUndefined(resAfter.blResp)) {
blResp = resAfter.blResp;
q = await this.dbConn.setQueueResp({
nQueueId: q.nId,
blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
}
//Фиксируем результат исполнения "После" - параметры ответа на запрос
if (!_.isUndefined(resAfter.optionsResp)) {
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
}
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} catch (e) {
//Тема и текст уведомления об ошибке
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
let sMessage = makeErrorText(e);
//Если сообщение очереди успели создать
if (q) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: sMessage,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId });
//Добавим чуть больше информации в тему сообщения
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
} else {
//Ограничимся общей ошибкой
await this.logger.error(sMessage, {
nServiceId: service.nId,
nServiceFnId: fn.nId
});
}
//Если для функции-обработчика указан признак необходимости оповещения об ошибках
if (fn.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) {
//Отправим уведомление об ошибке отработки в почту
await this.notifier.addMessage({
sTo: fn.sErrNtfMail,
sSubject,
sMessage
});
}
}
}
//Обработка сообщения
async processMQTTMessage({ message, service, fn }) {
//Буфер для сообщения очереди
let q = null;
try {
//Тело сообщения и ответ на него
let blMsg = null;
let blResp = null;
//Параметры сообщения и ответа на него
let options = {};
let optionsResp = {};
//Получим тело сообщения
blMsg = message ? message : null;
//Определимся с параметрами сообщения полученными от внешней системы
options = {
method: fn.sFnPrmsType
};
//Кладём сообщение в очередь
q = await this.dbConn.putQueue({
nServiceFnId: fn.nId,
sOptions: buildOptionsXML({ options }),
blMsg
});
//Скажем что пришло новое входящее сообщение
await this.logger.info(
`Новое входящее MQTT-сообщение для функции ${fn.sCode} (${buildURL({
sSrvRoot: service.sSrvRoot,
sFnURL: fn.sFnURL
})})`,
{ nQueueId: q.nId }
);
//Вызываем обработчик со стороны БД (если он есть)
if (fn.sPrcResp) {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Вызов обработчика БД
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
//Выставим статус сообщению очереди - исполнено обработчиком БД
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
});
//Считаем ответ полученный от системы
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
blResp = qData.blResp;
}
//Выполняем обработчик "После" (если он есть)
if (fn.sAppSrvAfter) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
let resAfter = null;
try {
let resAfterPrms = { res: { body: message }, service: service, function: fn };
resAfterPrms.queue = _.cloneDeep(q);
resAfterPrms.queue.blMsg = blMsg;
resAfterPrms.queue.blResp = blResp;
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfterPrms.dbConn = this.dbConn;
resAfterPrms.notifier = this.notifier;
resAfter = await fnAfter(resAfterPrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции предобработки
if (resAfter) {
let sCheckResult = validateObject(
resAfter,
objInQueueSchema.InQueueProcessorFnAfter,
"Результат функции постобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем результат исполнения "После" - ответ системы
if (!_.isUndefined(resAfter.blResp)) {
blResp = resAfter.blResp;
q = await this.dbConn.setQueueResp({
nQueueId: q.nId,
blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
}
//Фиксируем результат исполнения "После" - параметры ответа на запрос
if (!_.isUndefined(resAfter.optionsResp)) {
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
}
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
} }
//Фиксируем успех обработки - в протоколе работы сервиса //Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
@ -794,7 +625,7 @@ class InQueue extends EventEmitter {
let connectionKafka = await subscribeKafka({ let connectionKafka = await subscribeKafka({
settings: connectionSettings, settings: connectionSettings,
service: srv, service: srv,
processKafkaMessage: prms => this.processKafkaMessage(prms), processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_KAFKA }),
logger: this.logger logger: this.logger
}); });
//Если подключение было создано //Если подключение было создано
@ -831,7 +662,7 @@ class InQueue extends EventEmitter {
let connectionMQTT = await subscribeMQTT({ let connectionMQTT = await subscribeMQTT({
settings: connectionSettings, settings: connectionSettings,
service: srv, service: srv,
processMQTTMessage: prms => this.processMQTTMessage(prms), processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_MQTT }),
logger: this.logger logger: this.logger
}); });
//Если подключение было создано //Если подключение было создано
@ -873,29 +704,29 @@ class InQueue extends EventEmitter {
} }
} }
//Закрытие подключений, если они есть //Закрытие подключений
stopConnections() { async stopConnections() {
//Если у нас есть соединения с MQTT //Если у нас есть соединения с MQTT
if (this.mqttConnections.length !== 0) { if (this.mqttConnections.length !== 0) {
//Закрываем их //Закрываем их
_.forEach(this.mqttConnections, async connection => { for (let connection of this.mqttConnections) {
try { try {
await connection.end(); await connection.end();
} catch (e) { } catch (e) {
await this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`); await this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`);
} }
}); }
} }
//Если у нас есть соединения с Kafka //Если у нас есть соединения с Kafka
if (this.kafkaConnections.length !== 0) { if (this.kafkaConnections.length !== 0) {
//Закрываем их //Закрываем их
_.forEach(this.kafkaConnections, async connection => { for (let connection of this.kafkaConnections) {
try { try {
await connection.disconnect(); await connection.disconnect();
} catch (e) { } catch (e) {
await this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`); await this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`);
} }
}); }
} }
} }

View File

@ -37,7 +37,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
}; };
//Получение MQTT сообщений //Получение MQTT сообщений
const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => { const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
try { try {
//Признак необходимости вывода сообщения о потере соединения //Признак необходимости вывода сообщения о потере соединения
let bLogLostConnection = true; let bLogLostConnection = true;
@ -82,7 +82,7 @@ const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }
eachMessage: async ({ topic, message }) => { eachMessage: async ({ topic, message }) => {
try { try {
//Вызываем обработчик //Вызываем обработчик
processKafkaMessage({ processMessage({
message, message,
service, service,
fn: service.functions.find(fn => { fn: service.functions.find(fn => {

View File

@ -34,7 +34,7 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => {
}; };
//Получение MQTT сообщений //Получение MQTT сообщений
const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => { const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
try { try {
//Инициализируем строку подключения //Инициализируем строку подключения
let sBroker = service.sSrvRoot; let sBroker = service.sSrvRoot;
@ -54,8 +54,8 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger })
//Прослушиваем сообщения //Прослушиваем сообщения
client.on("message", (topic, message) => { client.on("message", (topic, message) => {
//Обрабатываем сообщение //Обрабатываем сообщение
processMQTTMessage({ processMessage({
message, message: { value: message, headers: {} },
service, service,
fn: service.functions.find(fn => { fn: service.functions.find(fn => {
return fn.sFnURL === topic; return fn.sFnURL === topic;

View File

@ -343,7 +343,6 @@ const appProcess = async prms => {
topic: options.topic, topic: options.topic,
message: options.body message: options.body
}); });
console.log(serverResp);
break; break;
//mqtt и mqtts //mqtt и mqtts
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
@ -622,11 +621,18 @@ const processTask = async prms => {
if (res instanceof ServerError) { if (res instanceof ServerError) {
throw res; throw res;
} else { } else {
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { if (
res = await dbProcess({ queue: res, function: prms.task.function }); ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
//Если результат обработки ошибка - пробрасываем её дальше getURLProtocol(prms.task.service.sSrvRoot)
if (res instanceof ServerError) throw res; )
) {
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
res = await dbProcess({ queue: res, function: prms.task.function });
//Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) throw res;
}
} }
} }
} else { } else {

View File

@ -327,6 +327,12 @@ const deepMerge = (...args) => {
return res; return res;
}; };
//Глубокое копирование объекта
const deepCopyObject = obj => JSON.parse(JSON.stringify(obj));
//Проверка на undefined
const isUndefined = value => value === undefined;
//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...]) //Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
const getConnectionSettings = (service, settingsArray) => { const getConnectionSettings = (service, settingsArray) => {
//Считываем параметры и возвращаем //Считываем параметры и возвращаем
@ -401,6 +407,8 @@ exports.parseOptionsXML = parseOptionsXML;
exports.buildOptionsXML = buildOptionsXML; exports.buildOptionsXML = buildOptionsXML;
exports.getNowString = getNowString; exports.getNowString = getNowString;
exports.deepMerge = deepMerge; exports.deepMerge = deepMerge;
exports.deepCopyObject = deepCopyObject;
exports.isUndefined = isUndefined;
exports.getKafkaConnectionSettings = getKafkaConnectionSettings; exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
exports.getMQTTConnectionSettings = getMQTTConnectionSettings; exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
exports.getKafkaBroker = getKafkaBroker; exports.getKafkaBroker = getKafkaBroker;