ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA)

This commit is contained in:
Dollerino 2024-09-27 15:46:33 +03:00
parent 0db9729f61
commit 54138ac4ec

View File

@ -621,17 +621,25 @@ const processTask = async prms => {
if (res instanceof ServerError) {
throw res;
} else {
//Если это не Kafka/MQTT сообщение - обрабатываем сервером БД
if (
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
getURLProtocol(prms.task.service.sSrvRoot)
)
) {
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
//Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения
if (
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
getURLProtocol(prms.task.service.sSrvRoot)
)
) {
res = await dbProcess({ queue: res, function: prms.task.function });
//Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) throw res;
} else {
//Финализируем обработку
await dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: null,
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
}
}
}