ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA) #2
315
core/in_queue.js
315
core/in_queue.js
@ -21,6 +21,8 @@ const {
|
|||||||
buildOptionsXML,
|
buildOptionsXML,
|
||||||
parseOptionsXML,
|
parseOptionsXML,
|
||||||
deepMerge,
|
deepMerge,
|
||||||
|
deepCopyObject,
|
||||||
|
isUndefined,
|
||||||
getKafkaConnectionSettings,
|
getKafkaConnectionSettings,
|
||||||
getMQTTConnectionSettings,
|
getMQTTConnectionSettings,
|
||||||
getURLProtocol
|
getURLProtocol
|
||||||
@ -381,17 +383,17 @@ class InQueue extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Обработка сообщения kafka
|
//Обработка MQ сообщения
|
||||||
async processKafkaMessage({ message, service, fn }) {
|
async processMQMessage({ message, service, fn, sProtocol }) {
|
||||||
//Буфер для сообщения очереди
|
//Буфер для сообщения очереди
|
||||||
let q = null;
|
let q = null;
|
||||||
try {
|
try {
|
||||||
//Тело сообщения и ответ на него
|
//Префикс протокола
|
||||||
|
let sProtocolPrefix = sProtocol === objServiceSchema.SPROTOCOL_KAFKA ? "Kafka" : "MQTT";
|
||||||
|
//Тело сообщения
|
||||||
let blMsg = null;
|
let blMsg = null;
|
||||||
let blResp = null;
|
//Параметры сообщения
|
||||||
//Параметры сообщения и ответа на него
|
|
||||||
let options = {};
|
let options = {};
|
||||||
let optionsResp = {};
|
|
||||||
//Флаг прекращения обработки сообщения
|
//Флаг прекращения обработки сообщения
|
||||||
let bStopPropagation = false;
|
let bStopPropagation = false;
|
||||||
//Получим тело сообщения
|
//Получим тело сообщения
|
||||||
@ -399,7 +401,7 @@ class InQueue extends EventEmitter {
|
|||||||
//Определимся с параметрами сообщения полученными от внешней системы
|
//Определимся с параметрами сообщения полученными от внешней системы
|
||||||
options = {
|
options = {
|
||||||
method: fn.sFnPrmsType,
|
method: fn.sFnPrmsType,
|
||||||
headers: _.cloneDeep(message.headers)
|
headers: message.headers
|
||||||
};
|
};
|
||||||
//Кладём сообщение в очередь
|
//Кладём сообщение в очередь
|
||||||
q = await this.dbConn.putQueue({
|
q = await this.dbConn.putQueue({
|
||||||
@ -409,14 +411,65 @@ class InQueue extends EventEmitter {
|
|||||||
});
|
});
|
||||||
//Скажем что пришло новое входящее сообщение
|
//Скажем что пришло новое входящее сообщение
|
||||||
await this.logger.info(
|
await this.logger.info(
|
||||||
`Новое входящее Kafka-сообщение для функции ${fn.sCode} (${buildURL({
|
`Новое входящее ${sProtocolPrefix}-сообщение для функции ${fn.sCode} (${buildURL({
|
||||||
sSrvRoot: service.sSrvRoot,
|
sSrvRoot: service.sSrvRoot,
|
||||||
sFnURL: fn.sFnURL
|
sFnURL: fn.sFnURL
|
||||||
})})`,
|
})})`,
|
||||||
{ nQueueId: q.nId }
|
{ nQueueId: q.nId }
|
||||||
);
|
);
|
||||||
|
//Выполняем обработчик "До" (если он есть)
|
||||||
|
if (fn.sAppSrvBefore) {
|
||||||
|
//Выставим статус сообщению очереди - исполняется сервером приложений
|
||||||
|
q = await this.dbConn.setQueueState({
|
||||||
|
nQueueId: q.nId,
|
||||||
|
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
||||||
|
});
|
||||||
|
//Выполняем
|
||||||
|
const fnBefore = getAppSrvFunction(fn.sAppSrvBefore);
|
||||||
|
let resBefore = null;
|
||||||
|
try {
|
||||||
|
let resBeforePrms = { service: service, function: fn };
|
||||||
|
resBeforePrms.queue = deepCopyObject(q);
|
||||||
|
resBeforePrms.queue.blMsg = blMsg;
|
||||||
|
resBeforePrms.options = deepCopyObject(options);
|
||||||
|
resBeforePrms.dbConn = this.dbConn;
|
||||||
|
resBeforePrms.notifier = this.notifier;
|
||||||
|
resBefore = await fnBefore(resBeforePrms);
|
||||||
|
} catch (e) {
|
||||||
|
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
|
||||||
|
}
|
||||||
|
//Проверяем структуру ответа функции предобработки
|
||||||
|
if (resBefore) {
|
||||||
|
let sCheckResult = validateObject(
|
||||||
|
resBefore,
|
||||||
|
objInQueueSchema.InQueueProcessorFnBefore,
|
||||||
|
"Результат функции предобработки входящего сообщения"
|
||||||
|
);
|
||||||
|
//Если структура ответа в норме
|
||||||
|
if (!sCheckResult) {
|
||||||
|
//Выставим статус сообщению очереди - исполнено сервером приложений
|
||||||
|
q = await this.dbConn.setQueueState({
|
||||||
|
nQueueId: q.nId,
|
||||||
|
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
||||||
|
});
|
||||||
|
//Фиксируем результат исполнения "До" - обработанный запрос внешней системы
|
||||||
|
if (!isUndefined(resBefore.blMsg)) {
|
||||||
|
blMsg = resBefore.blMsg;
|
||||||
|
q = await this.dbConn.setQueueMsg({
|
||||||
|
nQueueId: q.nId,
|
||||||
|
blMsg
|
||||||
|
});
|
||||||
|
}
|
||||||
|
//Фиксируем результат исполнения "До" - флаг прекращения дальнейшей обработки сообщения - если он поднят, то дальше обработку сообщения прекращаем
|
||||||
|
if (!isUndefined(resBefore.bStopPropagation) && resBefore.bStopPropagation === true) bStopPropagation = true;
|
||||||
|
} else {
|
||||||
|
//Или расскажем об ошибке
|
||||||
|
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
//Вызываем обработчик со стороны БД (если он есть)
|
//Вызываем обработчик со стороны БД (если он есть)
|
||||||
if (fn.sPrcResp) {
|
if (bStopPropagation === false && fn.sPrcResp) {
|
||||||
//Фиксируем начало исполнения сервером БД - в статусе сообщения
|
//Фиксируем начало исполнения сервером БД - в статусе сообщения
|
||||||
q = await this.dbConn.setQueueState({
|
q = await this.dbConn.setQueueState({
|
||||||
nQueueId: q.nId,
|
nQueueId: q.nId,
|
||||||
@ -426,233 +479,11 @@ class InQueue extends EventEmitter {
|
|||||||
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
|
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
|
||||||
//Если результат - ошибка пробрасываем её
|
//Если результат - ошибка пробрасываем её
|
||||||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
|
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
|
||||||
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
|
|
||||||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
|
|
||||||
//Выставим статус сообщению очереди - исполнено обработчиком БД
|
//Выставим статус сообщению очереди - исполнено обработчиком БД
|
||||||
q = await this.dbConn.setQueueState({
|
q = await this.dbConn.setQueueState({
|
||||||
nQueueId: q.nId,
|
nQueueId: q.nId,
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
|
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
|
||||||
});
|
});
|
||||||
//Считаем ответ полученный от системы
|
|
||||||
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
|
|
||||||
blResp = qData.blResp;
|
|
||||||
}
|
|
||||||
//Выполняем обработчик "После" (если он есть)
|
|
||||||
if (bStopPropagation === false && fn.sAppSrvAfter) {
|
|
||||||
//Выставим статус сообщению очереди - исполняется сервером приложений
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
|
||||||
});
|
|
||||||
//Выполняем
|
|
||||||
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
|
|
||||||
let resAfter = null;
|
|
||||||
try {
|
|
||||||
let resAfterPrms = { res: { body: blMsg }, service: service, function: fn };
|
|
||||||
resAfterPrms.queue = _.cloneDeep(q);
|
|
||||||
resAfterPrms.queue.blMsg = blMsg;
|
|
||||||
resAfterPrms.queue.blResp = blResp;
|
|
||||||
resAfterPrms.options = _.cloneDeep(options);
|
|
||||||
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
|
|
||||||
resAfterPrms.dbConn = this.dbConn;
|
|
||||||
resAfterPrms.notifier = this.notifier;
|
|
||||||
resAfter = await fnAfter(resAfterPrms);
|
|
||||||
} catch (e) {
|
|
||||||
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
|
|
||||||
}
|
|
||||||
//Проверяем структуру ответа функции предобработки
|
|
||||||
if (resAfter) {
|
|
||||||
let sCheckResult = validateObject(
|
|
||||||
resAfter,
|
|
||||||
objInQueueSchema.InQueueProcessorFnAfter,
|
|
||||||
"Результат функции постобработки входящего сообщения"
|
|
||||||
);
|
|
||||||
//Если структура ответа в норме
|
|
||||||
if (!sCheckResult) {
|
|
||||||
//Выставим статус сообщению очереди - исполнено сервером приложений
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
|
||||||
});
|
|
||||||
//Фиксируем результат исполнения "После" - ответ системы
|
|
||||||
if (!_.isUndefined(resAfter.blResp)) {
|
|
||||||
blResp = resAfter.blResp;
|
|
||||||
q = await this.dbConn.setQueueResp({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
blResp,
|
|
||||||
nIsOriginal: NIS_ORIGINAL_NO
|
|
||||||
});
|
|
||||||
}
|
|
||||||
//Фиксируем результат исполнения "После" - параметры ответа на запрос
|
|
||||||
if (!_.isUndefined(resAfter.optionsResp)) {
|
|
||||||
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
|
|
||||||
let sOptionsResp = buildOptionsXML({ options: optionsResp });
|
|
||||||
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
|
|
||||||
}
|
|
||||||
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
|
|
||||||
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
|
||||||
} else {
|
|
||||||
//Или расскажем об ошибке
|
|
||||||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//Фиксируем успех обработки - в протоколе работы сервиса
|
|
||||||
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
|
|
||||||
//Фиксируем успех обработки - в статусе сообщения
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
|
||||||
});
|
|
||||||
} catch (e) {
|
|
||||||
//Тема и текст уведомления об ошибке
|
|
||||||
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
|
|
||||||
let sMessage = makeErrorText(e);
|
|
||||||
//Если сообщение очереди успели создать
|
|
||||||
if (q) {
|
|
||||||
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
sExecMsg: sMessage,
|
|
||||||
nIncExecCnt: NINC_EXEC_CNT_YES,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
|
|
||||||
});
|
|
||||||
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
|
|
||||||
await this.logger.error(`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${sMessage}`, { nQueueId: q.nId });
|
|
||||||
//Добавим чуть больше информации в тему сообщения
|
|
||||||
sSubject = `Ошибка обработки входящего сообщения ${q.nId} сервером приложений для функции "${fn.sCode}" сервиса "${service.sCode}"`;
|
|
||||||
} else {
|
|
||||||
//Ограничимся общей ошибкой
|
|
||||||
await this.logger.error(sMessage, {
|
|
||||||
nServiceId: service.nId,
|
|
||||||
nServiceFnId: fn.nId
|
|
||||||
});
|
|
||||||
}
|
|
||||||
//Если для функции-обработчика указан признак необходимости оповещения об ошибках
|
|
||||||
if (fn.nErrNtfSign == objServiceFnSchema.NERR_NTF_SIGN_YES) {
|
|
||||||
//Отправим уведомление об ошибке отработки в почту
|
|
||||||
await this.notifier.addMessage({
|
|
||||||
sTo: fn.sErrNtfMail,
|
|
||||||
sSubject,
|
|
||||||
sMessage
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//Обработка сообщения
|
|
||||||
async processMQTTMessage({ message, service, fn }) {
|
|
||||||
//Буфер для сообщения очереди
|
|
||||||
let q = null;
|
|
||||||
try {
|
|
||||||
//Тело сообщения и ответ на него
|
|
||||||
let blMsg = null;
|
|
||||||
let blResp = null;
|
|
||||||
//Параметры сообщения и ответа на него
|
|
||||||
let options = {};
|
|
||||||
let optionsResp = {};
|
|
||||||
//Получим тело сообщения
|
|
||||||
blMsg = message ? message : null;
|
|
||||||
//Определимся с параметрами сообщения полученными от внешней системы
|
|
||||||
options = {
|
|
||||||
method: fn.sFnPrmsType
|
|
||||||
};
|
|
||||||
//Кладём сообщение в очередь
|
|
||||||
q = await this.dbConn.putQueue({
|
|
||||||
nServiceFnId: fn.nId,
|
|
||||||
sOptions: buildOptionsXML({ options }),
|
|
||||||
blMsg
|
|
||||||
});
|
|
||||||
//Скажем что пришло новое входящее сообщение
|
|
||||||
await this.logger.info(
|
|
||||||
`Новое входящее MQTT-сообщение для функции ${fn.sCode} (${buildURL({
|
|
||||||
sSrvRoot: service.sSrvRoot,
|
|
||||||
sFnURL: fn.sFnURL
|
|
||||||
})})`,
|
|
||||||
{ nQueueId: q.nId }
|
|
||||||
);
|
|
||||||
//Вызываем обработчик со стороны БД (если он есть)
|
|
||||||
if (fn.sPrcResp) {
|
|
||||||
//Фиксируем начало исполнения сервером БД - в статусе сообщения
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
|
|
||||||
});
|
|
||||||
//Вызов обработчика БД
|
|
||||||
let prcRes = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
|
|
||||||
//Если результат - ошибка пробрасываем её
|
|
||||||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_ERR) throw new ServerError(SERR_DB_SERVER, prcRes.sMsg);
|
|
||||||
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
|
|
||||||
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
|
|
||||||
//Выставим статус сообщению очереди - исполнено обработчиком БД
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
|
|
||||||
});
|
|
||||||
//Считаем ответ полученный от системы
|
|
||||||
let qData = await this.dbConn.getQueueResp({ nQueueId: q.nId });
|
|
||||||
blResp = qData.blResp;
|
|
||||||
}
|
|
||||||
//Выполняем обработчик "После" (если он есть)
|
|
||||||
if (fn.sAppSrvAfter) {
|
|
||||||
//Выставим статус сообщению очереди - исполняется сервером приложений
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
|
|
||||||
});
|
|
||||||
//Выполняем
|
|
||||||
const fnAfter = getAppSrvFunction(fn.sAppSrvAfter);
|
|
||||||
let resAfter = null;
|
|
||||||
try {
|
|
||||||
let resAfterPrms = { res: { body: message }, service: service, function: fn };
|
|
||||||
resAfterPrms.queue = _.cloneDeep(q);
|
|
||||||
resAfterPrms.queue.blMsg = blMsg;
|
|
||||||
resAfterPrms.queue.blResp = blResp;
|
|
||||||
resAfterPrms.options = _.cloneDeep(options);
|
|
||||||
resAfterPrms.optionsResp = _.cloneDeep(optionsResp);
|
|
||||||
resAfterPrms.dbConn = this.dbConn;
|
|
||||||
resAfterPrms.notifier = this.notifier;
|
|
||||||
resAfter = await fnAfter(resAfterPrms);
|
|
||||||
} catch (e) {
|
|
||||||
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
|
|
||||||
}
|
|
||||||
//Проверяем структуру ответа функции предобработки
|
|
||||||
if (resAfter) {
|
|
||||||
let sCheckResult = validateObject(
|
|
||||||
resAfter,
|
|
||||||
objInQueueSchema.InQueueProcessorFnAfter,
|
|
||||||
"Результат функции постобработки входящего сообщения"
|
|
||||||
);
|
|
||||||
//Если структура ответа в норме
|
|
||||||
if (!sCheckResult) {
|
|
||||||
//Выставим статус сообщению очереди - исполнено сервером приложений
|
|
||||||
q = await this.dbConn.setQueueState({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
|
|
||||||
});
|
|
||||||
//Фиксируем результат исполнения "После" - ответ системы
|
|
||||||
if (!_.isUndefined(resAfter.blResp)) {
|
|
||||||
blResp = resAfter.blResp;
|
|
||||||
q = await this.dbConn.setQueueResp({
|
|
||||||
nQueueId: q.nId,
|
|
||||||
blResp,
|
|
||||||
nIsOriginal: NIS_ORIGINAL_NO
|
|
||||||
});
|
|
||||||
}
|
|
||||||
//Фиксируем результат исполнения "После" - параметры ответа на запрос
|
|
||||||
if (!_.isUndefined(resAfter.optionsResp)) {
|
|
||||||
optionsResp = deepMerge(optionsResp, resAfter.optionsResp);
|
|
||||||
let sOptionsResp = buildOptionsXML({ options: optionsResp });
|
|
||||||
q = await this.dbConn.setQueueOptionsResp({ nQueueId: q.nId, sOptionsResp });
|
|
||||||
}
|
|
||||||
//Если пришел флаг ошибочной аутентификации и он положительный - то это ошибка, дальше ничего не делаем
|
|
||||||
if (!_.isUndefined(resAfter.bUnAuth)) if (resAfter.bUnAuth === true) throw new ServerError(SERR_UNAUTH, "Нет аутентификации");
|
|
||||||
} else {
|
|
||||||
//Или расскажем об ошибке
|
|
||||||
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
//Фиксируем успех обработки - в протоколе работы сервиса
|
//Фиксируем успех обработки - в протоколе работы сервиса
|
||||||
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
|
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
|
||||||
@ -794,7 +625,7 @@ class InQueue extends EventEmitter {
|
|||||||
let connectionKafka = await subscribeKafka({
|
let connectionKafka = await subscribeKafka({
|
||||||
settings: connectionSettings,
|
settings: connectionSettings,
|
||||||
service: srv,
|
service: srv,
|
||||||
processKafkaMessage: prms => this.processKafkaMessage(prms),
|
processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_KAFKA }),
|
||||||
logger: this.logger
|
logger: this.logger
|
||||||
});
|
});
|
||||||
//Если подключение было создано
|
//Если подключение было создано
|
||||||
@ -831,7 +662,7 @@ class InQueue extends EventEmitter {
|
|||||||
let connectionMQTT = await subscribeMQTT({
|
let connectionMQTT = await subscribeMQTT({
|
||||||
settings: connectionSettings,
|
settings: connectionSettings,
|
||||||
service: srv,
|
service: srv,
|
||||||
processMQTTMessage: prms => this.processMQTTMessage(prms),
|
processMessage: prms => this.processMQMessage({ ...prms, sProtocol: objServiceSchema.SPROTOCOL_MQTT }),
|
||||||
logger: this.logger
|
logger: this.logger
|
||||||
});
|
});
|
||||||
//Если подключение было создано
|
//Если подключение было создано
|
||||||
@ -873,29 +704,29 @@ class InQueue extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Закрытие подключений, если они есть
|
//Закрытие подключений
|
||||||
stopConnections() {
|
stopConnections() {
|
||||||
//Если у нас есть соединения с MQTT
|
//Если у нас есть соединения с MQTT
|
||||||
if (this.mqttConnections.length !== 0) {
|
if (this.mqttConnections.length !== 0) {
|
||||||
//Закрываем их
|
//Закрываем их
|
||||||
_.forEach(this.mqttConnections, async connection => {
|
for (let connection of this.mqttConnections) {
|
||||||
try {
|
try {
|
||||||
await connection.end();
|
connection.end();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`);
|
this.logger.error(`Ошибка завершения MQTT подключения: ${makeErrorText(e)}`);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
//Если у нас есть соединения с Kafka
|
//Если у нас есть соединения с Kafka
|
||||||
if (this.kafkaConnections.length !== 0) {
|
if (this.kafkaConnections.length !== 0) {
|
||||||
//Закрываем их
|
//Закрываем их
|
||||||
_.forEach(this.kafkaConnections, async connection => {
|
for (let connection of this.kafkaConnections) {
|
||||||
try {
|
try {
|
||||||
await connection.disconnect();
|
connection.disconnect();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`);
|
this.logger.error(`Ошибка завершения Kafka подключения: ${makeErrorText(e)}`);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ const publishKafka = async ({ settings, url, auth, topic, message }) => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
//Получение MQTT сообщений
|
//Получение MQTT сообщений
|
||||||
const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }) => {
|
const subscribeKafka = async ({ settings, service, processMessage, logger }) => {
|
||||||
try {
|
try {
|
||||||
//Признак необходимости вывода сообщения о потере соединения
|
//Признак необходимости вывода сообщения о потере соединения
|
||||||
let bLogLostConnection = true;
|
let bLogLostConnection = true;
|
||||||
@ -82,7 +82,7 @@ const subscribeKafka = async ({ settings, service, processKafkaMessage, logger }
|
|||||||
eachMessage: async ({ topic, message }) => {
|
eachMessage: async ({ topic, message }) => {
|
||||||
try {
|
try {
|
||||||
//Вызываем обработчик
|
//Вызываем обработчик
|
||||||
processKafkaMessage({
|
processMessage({
|
||||||
message,
|
message,
|
||||||
service,
|
service,
|
||||||
fn: service.functions.find(fn => {
|
fn: service.functions.find(fn => {
|
||||||
|
@ -34,7 +34,7 @@ const publishMQTT = async ({ settings, url, auth, topic, message }) => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
//Получение MQTT сообщений
|
//Получение MQTT сообщений
|
||||||
const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger }) => {
|
const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
|
||||||
try {
|
try {
|
||||||
//Инициализируем строку подключения
|
//Инициализируем строку подключения
|
||||||
let sBroker = service.sSrvRoot;
|
let sBroker = service.sSrvRoot;
|
||||||
@ -54,8 +54,8 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger })
|
|||||||
//Прослушиваем сообщения
|
//Прослушиваем сообщения
|
||||||
client.on("message", (topic, message) => {
|
client.on("message", (topic, message) => {
|
||||||
//Обрабатываем сообщение
|
//Обрабатываем сообщение
|
||||||
processMQTTMessage({
|
processMessage({
|
||||||
message,
|
message: { value: message, headers: {} },
|
||||||
service,
|
service,
|
||||||
fn: service.functions.find(fn => {
|
fn: service.functions.find(fn => {
|
||||||
return fn.sFnURL === topic;
|
return fn.sFnURL === topic;
|
||||||
@ -68,7 +68,7 @@ const subscribeMQTT = async ({ settings, service, processMQTTMessage, logger })
|
|||||||
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
|
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
|
||||||
});
|
});
|
||||||
//Прослушиваем восстановление соединения
|
//Прослушиваем восстановление соединения
|
||||||
client.on("reconnect", () => {
|
client.on("connect", () => {
|
||||||
//Сообщим о восстановлении соединения
|
//Сообщим о восстановлении соединения
|
||||||
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
|
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
|
||||||
});
|
});
|
||||||
|
@ -343,7 +343,6 @@ const appProcess = async prms => {
|
|||||||
topic: options.topic,
|
topic: options.topic,
|
||||||
message: options.body
|
message: options.body
|
||||||
});
|
});
|
||||||
console.log(serverResp);
|
|
||||||
break;
|
break;
|
||||||
//mqtt и mqtts
|
//mqtt и mqtts
|
||||||
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
|
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
|
||||||
@ -622,11 +621,18 @@ const processTask = async prms => {
|
|||||||
if (res instanceof ServerError) {
|
if (res instanceof ServerError) {
|
||||||
throw res;
|
throw res;
|
||||||
} else {
|
} else {
|
||||||
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
|
//Если это не Kafka/MQTT сообщение - обрабатываем сервером БД
|
||||||
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
|
if (
|
||||||
res = await dbProcess({ queue: res, function: prms.task.function });
|
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
|
||||||
//Если результат обработки ошибка - пробрасываем её дальше
|
getURLProtocol(prms.task.service.sSrvRoot)
|
||||||
if (res instanceof ServerError) throw res;
|
)
|
||||||
|
) {
|
||||||
|
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
|
||||||
|
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
|
||||||
|
res = await dbProcess({ queue: res, function: prms.task.function });
|
||||||
|
//Если результат обработки ошибка - пробрасываем её дальше
|
||||||
|
if (res instanceof ServerError) throw res;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -327,6 +327,12 @@ const deepMerge = (...args) => {
|
|||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//Глубокое копирование объекта
|
||||||
|
const deepCopyObject = obj => JSON.parse(JSON.stringify(obj));
|
||||||
|
|
||||||
|
//Проверка на undefined
|
||||||
|
const isUndefined = value => value === undefined;
|
||||||
|
|
||||||
//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
|
//Считывание параметров подключения для сервиса обмена (при service === "" считывание подключения "По умолчанию", settingsArray - массив объектов [{sService: "", ...},...])
|
||||||
const getConnectionSettings = (service, settingsArray) => {
|
const getConnectionSettings = (service, settingsArray) => {
|
||||||
//Считываем параметры и возвращаем
|
//Считываем параметры и возвращаем
|
||||||
@ -401,6 +407,8 @@ exports.parseOptionsXML = parseOptionsXML;
|
|||||||
exports.buildOptionsXML = buildOptionsXML;
|
exports.buildOptionsXML = buildOptionsXML;
|
||||||
exports.getNowString = getNowString;
|
exports.getNowString = getNowString;
|
||||||
exports.deepMerge = deepMerge;
|
exports.deepMerge = deepMerge;
|
||||||
|
exports.deepCopyObject = deepCopyObject;
|
||||||
|
exports.isUndefined = isUndefined;
|
||||||
exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
|
exports.getKafkaConnectionSettings = getKafkaConnectionSettings;
|
||||||
exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
|
exports.getMQTTConnectionSettings = getMQTTConnectionSettings;
|
||||||
exports.getKafkaBroker = getKafkaBroker;
|
exports.getKafkaBroker = getKafkaBroker;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user