diff --git a/core/in_queue.js b/core/in_queue.js index ce20127..eb63e79 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -21,6 +21,8 @@ const { buildOptionsXML, parseOptionsXML, deepMerge, + deepCopyObject, + isUndefined, getKafkaConnectionSettings, getMQTTConnectionSettings, getURLProtocol @@ -381,17 +383,17 @@ class InQueue extends EventEmitter { } } - //Обработка сообщения kafka - async processKafkaMessage({ message, service, fn }) { + //Обработка MQ сообщения + async processMQMessage({ message, service, fn, sProtocol }) { //Буфер для сообщения очереди let q = null; try { - //Тело сообщения и ответ на него + //Префикс протокола + let sProtocolPrefix = sProtocol === objServiceSchema.SPROTOCOL_KAFKA ? "Kafka" : "MQTT"; + //Тело сообщения let blMsg = null; - let blResp = null; - //Параметры сообщения и ответа на него + //Параметры сообщения let options = {}; - let optionsResp = {}; //Флаг прекращения обработки сообщения let bStopPropagation = false; //Получим тело сообщения @@ -399,7 +401,7 @@ class InQueue extends EventEmitter { //Определимся с параметрами сообщения полученными от внешней системы options = { method: fn.sFnPrmsType, - headers: _.cloneDeep(message.headers) + headers: message.headers }; //Кладём сообщение в очередь q = await this.dbConn.putQueue({ @@ -409,14 +411,65 @@ class InQueue extends EventEmitter { }); //Скажем что пришло новое входящее сообщение await this.logger.info( - `Новое входящее Kafka-сообщение для функции ${fn.sCode} (${buildURL({ + `Новое входящее ${sProtocolPrefix}-сообщение для функции ${fn.sCode} (${buildURL({ sSrvRoot: service.sSrvRoot, sFnURL: fn.sFnURL })})`, { nQueueId: q.nId } ); + //Выполняем обработчик "До" (если он есть) + if (fn.sAppSrvBefore) { + //Выставим статус сообщению очереди - исполняется сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + //Выполняем + const fnBefore = getAppSrvFunction(fn.sAppSrvBefore); + let resBefore = null; + try { + let resBeforePrms = { service: service, function: fn }; + resBeforePrms.queue = deepCopyObject(q); + resBeforePrms.queue.blMsg = blMsg; + resBeforePrms.options = deepCopyObject(options); + resBeforePrms.dbConn = this.dbConn; + resBeforePrms.notifier = this.notifier; + resBefore = await fnBefore(resBeforePrms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); + } + //Проверяем структуру ответа функции предобработки + if (resBefore) { + let sCheckResult = validateObject( + resBefore, + objInQueueSchema.InQueueProcessorFnBefore, + "Результат функции предобработки входящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Выставим статус сообщению очереди - исполнено сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем результат исполнения "До" - обработанный запрос внешней системы + if (!isUndefined(resBefore.blMsg)) { + blMsg = resBefore.blMsg; + q = await this.dbConn.setQueueMsg({ + nQueueId: q.nId, + blMsg + }); + } + //Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем + if (!isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true; + } else { + //Или расскажем об ошибке + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + } //Вызываем обработчик со стороны БД (если он есть) - if (fn.sPrcResp) { + if (bStopPropagation === false && fn.sPrcResp) { //Фиксируем начало исполнения сервером БД - в статусе сообщения q = await this.dbConn.setQueueState({ nQueueId: q.nId, @@ -426,233 +479,11 @@ class InQueue extends EventEmitter { let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); //Если результат - ошибка пробрасываем её if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); - //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); //Выставим статус сообщению очереди - исполнено обработчиком БД q = await this.dbConn.setQueueState({ nQueueId: q.nId, nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK }); - //Считаем ответ полученный от системы - let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId }); - blResp = qData.blResp; - } - //Выполняем обработчик "После" (если он есть) - if (bStopPropagation === false && fn.sAppSrvAfter) { - //Выставим статус сообщению очереди - исполняется сервером приложений - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP - }); - //Выполняем - const fnAfter = getAppSrvFunction(fn.sAppSrvAfter); - let resAfter = null; - try { - let resAfterPrms = { res: { body: blMsg }, service: service, function: fn }; - resAfterPrms.queue = _.cloneDeep(q); - resAfterPrms.queue.blMsg = blMsg; - resAfterPrms.queue.blResp = blResp; - resAfterPrms.options = _.cloneDeep(options); - resAfterPrms.optionsResp = _.cloneDeep(optionsResp); - resAfterPrms.dbConn = this.dbConn; - resAfterPrms.notifier = this.notifier; - resAfter = await fnAfter(resAfterPrms); - } catch (e) { - throw new ServerError(SERR_APP_SERVER_AFTER, e.message); - } - //Проверяем структуру ответа функции предобработки - if (resAfter) { - let sCheckResult = validateObject( - resAfter, - objInQueueSchema.InQueueProcessorFnAfter, - "Результат функции постобработки входящего сообщения" - ); - //Если структура ответа в норме - if (!sCheckResult) { - //Выставим статус сообщению очереди - исполнено сервером приложений - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем результат исполнения "После" - ответ системы - if (!_.isUndefined(resAfter.blResp)) { - blResp = resAfter.blResp; - q = await this.dbConn.setQueueResp({ - nQueueId: q.nId, - blResp, - nIsOriginal: NIS_ORIGINAL_NO - }); - } - //Фиксируем результат исполнения "После" - параметры ответа на запрос - if (!_.isUndefined(resAfter.optionsResp)) { - optionsResp = deepMerge(optionsResp, resAfter.optionsResp); - let sOptionsResp = buildOptionsXML({ options: optionsResp }); - q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); - } - //Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем - if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); - } else { - //Или расскажем об ошибке - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } - } - //Фиксируем успех обработки - в протоколе работы сервиса - await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); - //Фиксируем успех обработки - в статусе сообщения - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); - } catch (e) { - //Тема и текст уведомления об ошибке - let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`; - let sMessage = makeErrorText(e); - //Если сообщение очереди успели создать - if (q) { - //Фиксируем ошибку обработки сервером приложений - в статусе сообщения - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: sMessage, - nIncExecCnt: NINC_EXEC_CNT_YES, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR - }); - //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса - await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId }); - //Добавим чуть больше информации в тему сообщения - sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`; - } else { - //Ограничимся общей ошибкой - await this.logger.error(sMessage, { - nServiceId: service.nId, - nServiceFnId: fn.nId - }); - } - //Если для функции-обработчика указан признак необходимости оповещения об ошибках - if (fn.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) { - //Отправим уведомление об ошибке отработки в почту - await this.notifier.addMessage({ - sTo: fn.sErrNtfMail, - sSubject, - sMessage - }); - } - } - } - - //Обработка сообщения - async processMQTTMessage({ message, service, fn }) { - //Буфер для сообщения очереди - let q = null; - try { - //Тело сообщения и ответ на него - let blMsg = null; - let blResp = null; - //Параметры сообщения и ответа на него - let options = {}; - let optionsResp = {}; - //Получим тело сообщения - blMsg = message ? message : null; - //Определимся с параметрами сообщения полученными от внешней системы - options = { - method: fn.sFnPrmsType - }; - //Кладём сообщение в очередь - q = await this.dbConn.putQueue({ - nServiceFnId: fn.nId, - sOptions: buildOptionsXML({ options }), - blMsg - }); - //Скажем что пришло новое входящее сообщение - await this.logger.info( - `Новое входящее MQTT-сообщение для функции ${fn.sCode} (${buildURL({ - sSrvRoot: service.sSrvRoot, - sFnURL: fn.sFnURL - })})`, - { nQueueId: q.nId } - ); - //Вызываем обработчик со стороны БД (если он есть) - if (fn.sPrcResp) { - //Фиксируем начало исполнения сервером БД - в статусе сообщения - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB - }); - //Вызов обработчика БД - let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); - //Если результат - ошибка пробрасываем её - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg); - //Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом - if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации"); - //Выставим статус сообщению очереди - исполнено обработчиком БД - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK - }); - //Считаем ответ полученный от системы - let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId }); - blResp = qData.blResp; - } - //Выполняем обработчик "После" (если он есть) - if (fn.sAppSrvAfter) { - //Выставим статус сообщению очереди - исполняется сервером приложений - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP - }); - //Выполняем - const fnAfter = getAppSrvFunction(fn.sAppSrvAfter); - let resAfter = null; - try { - let resAfterPrms = { res: { body: message }, service: service, function: fn }; - resAfterPrms.queue = _.cloneDeep(q); - resAfterPrms.queue.blMsg = blMsg; - resAfterPrms.queue.blResp = blResp; - resAfterPrms.options = _.cloneDeep(options); - resAfterPrms.optionsResp = _.cloneDeep(optionsResp); - resAfterPrms.dbConn = this.dbConn; - resAfterPrms.notifier = this.notifier; - resAfter = await fnAfter(resAfterPrms); - } catch (e) { - throw new ServerError(SERR_APP_SERVER_AFTER, e.message); - } - //Проверяем структуру ответа функции предобработки - if (resAfter) { - let sCheckResult = validateObject( - resAfter, - objInQueueSchema.InQueueProcessorFnAfter, - "Результат функции постобработки входящего сообщения" - ); - //Если структура ответа в норме - if (!sCheckResult) { - //Выставим статус сообщению очереди - исполнено сервером приложений - q = await this.dbConn.setQueueState({ - nQueueId: q.nId, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK - }); - //Фиксируем результат исполнения "После" - ответ системы - if (!_.isUndefined(resAfter.blResp)) { - blResp = resAfter.blResp; - q = await this.dbConn.setQueueResp({ - nQueueId: q.nId, - blResp, - nIsOriginal: NIS_ORIGINAL_NO - }); - } - //Фиксируем результат исполнения "После" - параметры ответа на запрос - if (!_.isUndefined(resAfter.optionsResp)) { - optionsResp = deepMerge(optionsResp, resAfter.optionsResp); - let sOptionsResp = buildOptionsXML({ options: optionsResp }); - q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp }); - } - //Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем - if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации"); - } else { - //Или расскажем об ошибке - throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); - } - } } //Фиксируем успех обработки - в протоколе работы сервиса await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); @@ -794,7 +625,7 @@ class InQueue extends EventEmitter { let connectionKafka = await subscribeKafka({ settings: connectionSettings, service: srv, - processKafkaMessage: prms => this.processKafkaMessage(prms), + processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_KAFKA }), logger: this.logger }); //Если подключение было создано @@ -831,7 +662,7 @@ class InQueue extends EventEmitter { let connectionMQTT = await subscribeMQTT({ settings: connectionSettings, service: srv, - processMQTTMessage: prms => this.processMQTTMessage(prms), + processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_MQTT }), logger: this.logger }); //Если подключение было создано @@ -873,29 +704,29 @@ class InQueue extends EventEmitter { } } - //Закрытие подключений, если они есть + //Закрытие подключений stopConnections() { //Если у нас есть соединения с MQTT if (this.mqttConnections.length !== 0) { //Закрываем их - _.forEach(this.mqttConnections, async connection => { + for (let connection of this.mqttConnections) { try { - await connection.end(); + connection.end(); } catch (e) { - await this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`); + this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`); } - }); + } } //Если у нас есть соединения с Kafka if (this.kafkaConnections.length !== 0) { //Закрываем их - _.forEach(this.kafkaConnections, async connection => { + for (let connection of this.kafkaConnections) { try { - await connection.disconnect(); + connection.disconnect(); } catch (e) { - await this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`); + this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`); } - }); + } } } diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 5504fa6..90dcba6 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -37,7 +37,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => { }; //Получение MQTT сообщений -const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => { +const subscribeKafka = async ({ settings, service, processMessage, logger }) => { try { //Признак необходимости вывода сообщения о потере соединения let bLogLostConnection = true; @@ -82,7 +82,7 @@ const subscribeKafka = async ({ settings, service, processKafkaMessage, logger } eachMessage: async ({ topic, message }) => { try { //Вызываем обработчик - processKafkaMessage({ + processMessage({ message, service, fn: service.functions.find(fn => { diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index 17915f5..e0af9e1 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -34,7 +34,7 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => { }; //Получение MQTT сообщений -const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => { +const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { try { //Инициализируем строку подключения let sBroker = service.sSrvRoot; @@ -54,8 +54,8 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) //Прослушиваем сообщения client.on("message", (topic, message) => { //Обрабатываем сообщение - processMQTTMessage({ - message, + processMessage({ + message: { value: message, headers: {} }, service, fn: service.functions.find(fn => { return fn.sFnURL === topic; @@ -68,7 +68,7 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) logger.error(`Соединение с MQTT потеряно (${sBroker})`); }); //Прослушиваем восстановление соединения - client.on("reconnect", () => { + client.on("connect", () => { //Сообщим о восстановлении соединения logger.info(`Соединение с MQTT восстановлено (${sBroker})`); }); diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 1383fb1..666f67e 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -343,7 +343,6 @@ const appProcess = async prms => { topic: options.topic, message: options.body }); - console.log(serverResp); break; //mqtt и mqtts case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): @@ -622,11 +621,18 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { - res = await dbProcess({ queue: res, function: prms.task.function }); - //Если результат обработки ошибка - пробрасываем её дальше - if (res instanceof ServerError) throw res; + //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + res = await dbProcess({ queue: res, function: prms.task.function }); + //Если результат обработки ошибка - пробрасываем её дальше + if (res instanceof ServerError) throw res; + } } } } else { diff --git a/core/utils.js b/core/utils.js index e4973d4..f2d39df 100644 --- a/core/utils.js +++ b/core/utils.js @@ -327,6 +327,12 @@ const deepMerge = (...args) => { return res; }; +//Глубокое копирование объекта +const deepCopyObject = obj => JSON.parse(JSON.stringify(obj)); + +//Проверка на undefined +const isUndefined = value => value === undefined; + //Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...]) const getConnectionSettings = (service, settingsArray) => { //Считываем параметры и возвращаем @@ -401,6 +407,8 @@ exports.parseOptionsXML = parseOptionsXML; exports.buildOptionsXML = buildOptionsXML; exports.getNowString = getNowString; exports.deepMerge = deepMerge; +exports.deepCopyObject = deepCopyObject; +exports.isUndefined = isUndefined; exports.getKafkaConnectionSettings = getKafkaConnectionSettings; exports.getMQTTConnectionSettings = getMQTTConnectionSettings; exports.getKafkaBroker = getKafkaBroker;