ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA) #3

Merged
Mim merged 4 commits from Dollerok/P8-ExchangeService:master into master 2024-09-27 17:15:50 +03:00

View File

@ -621,17 +621,25 @@ const processTask = async prms => {
if (res instanceof ServerError) { if (res instanceof ServerError) {
throw res; throw res;
} else { } else {
//Если это не Kafka/MQTT сообщение - обрабатываем сервером БД //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД
if ( if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения
getURLProtocol(prms.task.service.sSrvRoot) if (
) ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
) { getURLProtocol(prms.task.service.sSrvRoot)
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД )
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { ) {
res = await dbProcess({ queue: res, function: prms.task.function }); res = await dbProcess({ queue: res, function: prms.task.function });
//Если результат обработки ошибка - пробрасываем её дальше //Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) throw res; if (res instanceof ServerError) throw res;
} else {
//Финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: null,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
} }
} }
} }