ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA) #2

Merged
Mim merged 2 commits from Dollerok/P8-ExchangeService:master into master 2024-09-27 11:42:05 +03:00
5 changed files with 99 additions and 254 deletions

View File

@ -21,6 +21,8 @@ const {
buildOptionsXML,
parseOptionsXML,
deepMerge,
deepCopyObject,
isUndefined,
getKafkaConnectionSettings,
getMQTTConnectionSettings,
getURLProtocol
@ -381,17 +383,17 @@ class InQueue extends EventEmitter {
}
}
//Обработка сообщения kafka
async processKafkaMessage({ message, service, fn }) {
//Обработка MQ сообщения
async processMQMessage({ message, service, fn, sProtocol }) {
//Буфер для сообщения очереди
let q = null;
try {
//Тело сообщения и ответ на него
//Префикс протокола
let sProtocolPrefix = sProtocol === objServiceSchema.SPROTOCOL_KAFKA ? "Kafka" : "MQTT";
//Тело сообщения
let blMsg = null;
let blResp = null;
//Параметры сообщения и ответа на него
//Параметры сообщения
let options = {};
let optionsResp = {};
//Флаг прекращения обработки сообщения
let bStopPropagation = false;
//Получим тело сообщения
@ -399,7 +401,7 @@ class InQueue extends EventEmitter {
//Определимся с параметрами сообщения полученными от внешней системы
options = {
method: fn.sFnPrmsType,
headers: _.cloneDeep(message.headers)
headers: message.headers
};
//Кладём сообщение в очередь
q = await this.dbConn.putQueue({
@ -409,63 +411,39 @@ class InQueue extends EventEmitter {
});
//Скажем что пришло новое входящее сообщение
await this.logger.info(
`Новое входящее Kafka-сообщение для функции ${fn.sCode} (${buildURL({
`Новое входящее ${sProtocolPrefix}-сообщение для функции ${fn.sCode} (${buildURL({
sSrvRoot: service.sSrvRoot,
sFnURL: fn.sFnURL
})})`,
{ nQueueId: q.nId }
);
//Вызываем обработчик со стороны БД (если он есть)
if (fn.sPrcResp) {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Вызов обработчика БД
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
//Выставим статус сообщению очереди - исполнено обработчиком БД
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
});
//Считаем ответ полученный от системы
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
blResp = qData.blResp;
}
//Выполняем обработчик "После" (если он есть)
if (bStopPropagation === false && fn.sAppSrvAfter) {
//Выполняем обработчик "До" (если он есть)
if (fn.sAppSrvBefore) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
let resAfter = null;
const fnBefore = getAppSrvFunction(fn.sAppSrvBefore);
let resBefore = null;
try {
let resAfterPrms = { res: { body: blMsg }, service: service, function: fn };
resAfterPrms.queue = _.cloneDeep(q);
resAfterPrms.queue.blMsg = blMsg;
resAfterPrms.queue.blResp = blResp;
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfterPrms.dbConn = this.dbConn;
resAfterPrms.notifier = this.notifier;
resAfter = await fnAfter(resAfterPrms);
let resBeforePrms = { service: service, function: fn };
resBeforePrms.queue = deepCopyObject(q);
resBeforePrms.queue.blMsg = blMsg;
resBeforePrms.options = deepCopyObject(options);
resBeforePrms.dbConn = this.dbConn;
resBeforePrms.notifier = this.notifier;
resBefore = await fnBefore(resBeforePrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
}
//Проверяем структуру ответа функции предобработки
if (resAfter) {
if (resBefore) {
let sCheckResult = validateObject(
resAfter,
objInQueueSchema.InQueueProcessorFnAfter,
"Результат функции постобработки входящего сообщения"
resBefore,
objInQueueSchema.InQueueProcessorFnBefore,
"Результат функции предобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
@ -474,106 +452,24 @@ class InQueue extends EventEmitter {
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем результат исполнения "После" - ответ системы
if (!_.isUndefined(resAfter.blResp)) {
blResp = resAfter.blResp;
q = await this.dbConn.setQueueResp({
//Фиксируем результат исполнения "До" - обработанный запрос внешней системы
if (!isUndefined(resBefore.blMsg)) {
blMsg = resBefore.blMsg;
q = await this.dbConn.setQueueMsg({
nQueueId: q.nId,
blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
}
//Фиксируем результат исполнения "После" - параметры ответа на запрос
if (!_.isUndefined(resAfter.optionsResp)) {
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
}
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} catch (e) {
//Тема и текст уведомления об ошибке
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
let sMessage = makeErrorText(e);
//Если сообщение очереди успели создать
if (q) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: sMessage,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId });
//Добавим чуть больше информации в тему сообщения
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
} else {
//Ограничимся общей ошибкой
await this.logger.error(sMessage, {
nServiceId: service.nId,
nServiceFnId: fn.nId
});
}
//Если для функции-обработчика указан признак необходимости оповещения об ошибках
if (fn.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) {
//Отправим уведомление об ошибке отработки в почту
await this.notifier.addMessage({
sTo: fn.sErrNtfMail,
sSubject,
sMessage
});
}
}
}
//Обработка сообщения
async processMQTTMessage({ message, service, fn }) {
//Буфер для сообщения очереди
let q = null;
try {
//Тело сообщения и ответ на него
let blMsg = null;
let blResp = null;
//Параметры сообщения и ответа на него
let options = {};
let optionsResp = {};
//Получим тело сообщения
blMsg = message ? message : null;
//Определимся с параметрами сообщения полученными от внешней системы
options = {
method: fn.sFnPrmsType
};
//Кладём сообщение в очередь
q = await this.dbConn.putQueue({
nServiceFnId: fn.nId,
sOptions: buildOptionsXML({ options }),
blMsg
});
//Скажем что пришло новое входящее сообщение
await this.logger.info(
`Новое входящее MQTT-сообщение для функции ${fn.sCode} (${buildURL({
sSrvRoot: service.sSrvRoot,
sFnURL: fn.sFnURL
})})`,
{ nQueueId: q.nId }
);
}
//Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
if (!isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Вызываем обработчик со стороны БД (если он есть)
if (fn.sPrcResp) {
if (bStopPropagation === false && fn.sPrcResp) {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
@ -583,76 +479,11 @@ class InQueue extends EventEmitter {
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Если результат - ошибка пробрасываем её
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
//Выставим статус сообщению очереди - исполнено обработчиком БД
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
});
//Считаем ответ полученный от системы
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
blResp = qData.blResp;
}
//Выполняем обработчик "После" (если он есть)
if (fn.sAppSrvAfter) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
let resAfter = null;
try {
let resAfterPrms = { res: { body: message }, service: service, function: fn };
resAfterPrms.queue = _.cloneDeep(q);
resAfterPrms.queue.blMsg = blMsg;
resAfterPrms.queue.blResp = blResp;
resAfterPrms.options = _.cloneDeep(options);
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
resAfterPrms.dbConn = this.dbConn;
resAfterPrms.notifier = this.notifier;
resAfter = await fnAfter(resAfterPrms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции предобработки
if (resAfter) {
let sCheckResult = validateObject(
resAfter,
objInQueueSchema.InQueueProcessorFnAfter,
"Результат функции постобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем результат исполнения "После" - ответ системы
if (!_.isUndefined(resAfter.blResp)) {
blResp = resAfter.blResp;
q = await this.dbConn.setQueueResp({
nQueueId: q.nId,
blResp,
nIsOriginal: NIS_ORIGINAL_NO
});
}
//Фиксируем результат исполнения "После" - параметры ответа на запрос
if (!_.isUndefined(resAfter.optionsResp)) {
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
let sOptionsResp = buildOptionsXML({ options: optionsResp });
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
}
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
}
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
@ -794,7 +625,7 @@ class InQueue extends EventEmitter {
let connectionKafka = await subscribeKafka({
settings: connectionSettings,
service: srv,
processKafkaMessage: prms => this.processKafkaMessage(prms),
processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_KAFKA }),
logger: this.logger
});
//Если подключение было создано
@ -831,7 +662,7 @@ class InQueue extends EventEmitter {
let connectionMQTT = await subscribeMQTT({
settings: connectionSettings,
service: srv,
processMQTTMessage: prms => this.processMQTTMessage(prms),
processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_MQTT }),
logger: this.logger
});
//Если подключение было создано
@ -873,29 +704,29 @@ class InQueue extends EventEmitter {
}
}
//Закрытие подключений, если они есть
//Закрытие подключений
stopConnections() {
//Если у нас есть соединения с MQTT
if (this.mqttConnections.length !== 0) {
//Закрываем их
_.forEach(this.mqttConnections, async connection => {
for (let connection of this.mqttConnections) {
try {
await connection.end();
connection.end();
} catch (e) {
await this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`);
this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`);
}
}
});
}
//Если у нас есть соединения с Kafka
if (this.kafkaConnections.length !== 0) {
//Закрываем их
_.forEach(this.kafkaConnections, async connection => {
for (let connection of this.kafkaConnections) {
try {
await connection.disconnect();
connection.disconnect();
} catch (e) {
await this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`);
this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`);
}
}
});
}
}

View File

@ -37,7 +37,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
};
//Получение MQTT сообщений
const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => {
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
try {
//Признак необходимости вывода сообщения о потере соединения
let bLogLostConnection = true;
@ -82,7 +82,7 @@ const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }
eachMessage: async ({ topic, message }) => {
try {
//Вызываем обработчик
processKafkaMessage({
processMessage({
message,
service,
fn: service.functions.find(fn => {

View File

@ -34,7 +34,7 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => {
};
//Получение MQTT сообщений
const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => {
const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
try {
//Инициализируем строку подключения
let sBroker = service.sSrvRoot;
@ -54,8 +54,8 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger })
//Прослушиваем сообщения
client.on("message", (topic, message) => {
//Обрабатываем сообщение
processMQTTMessage({
message,
processMessage({
message: { value: message, headers: {} },
service,
fn: service.functions.find(fn => {
return fn.sFnURL === topic;
@ -68,7 +68,7 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger })
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
});
//Прослушиваем восстановление соединения
client.on("reconnect", () => {
client.on("connect", () => {
//Сообщим о восстановлении соединения
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
});

View File

@ -343,7 +343,6 @@ const appProcess = async prms => {
topic: options.topic,
message: options.body
});
console.log(serverResp);
break;
//mqtt и mqtts
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
@ -622,6 +621,12 @@ const processTask = async prms => {
if (res instanceof ServerError) {
throw res;
} else {
//Если это не Kafka/MQTT сообщение - обрабатываем сервером БД
if (
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
getURLProtocol(prms.task.service.sSrvRoot)
)
) {
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
res = await dbProcess({ queue: res, function: prms.task.function });
@ -629,6 +634,7 @@ const processTask = async prms => {
if (res instanceof ServerError) throw res;
}
}
}
} else {
//Попыток нет - финализируем обработку
await dbConn.setQueueState({

View File

@ -327,6 +327,12 @@ const deepMerge = (...args) => {
return res;
};
//Глубокое копирование объекта
const deepCopyObject = obj => JSON.parse(JSON.stringify(obj));
//Проверка на undefined
const isUndefined = value => value === undefined;
//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
const getConnectionSettings = (service, settingsArray) => {
//Считываем параметры и возвращаем
@ -401,6 +407,8 @@ exports.parseOptionsXML = parseOptionsXML;
exports.buildOptionsXML = buildOptionsXML;
exports.getNowString = getNowString;
exports.deepMerge = deepMerge;
exports.deepCopyObject = deepCopyObject;
exports.isUndefined = isUndefined;
exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
exports.getKafkaBroker = getKafkaBroker;