ЦИТК-901 (Добавление поддержки протоколов MQTT и KAFKA) #3
@ -621,17 +621,25 @@ const processTask = async prms => {
|
|||||||
if (res instanceof ServerError) {
|
if (res instanceof ServerError) {
|
||||||
throw res;
|
throw res;
|
||||||
} else {
|
} else {
|
||||||
//Если это не Kafka/MQTT сообщение - обрабатываем сервером БД
|
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД
|
||||||
if (
|
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
|
||||||
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
|
//Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения
|
||||||
getURLProtocol(prms.task.service.sSrvRoot)
|
if (
|
||||||
)
|
![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(
|
||||||
) {
|
getURLProtocol(prms.task.service.sSrvRoot)
|
||||||
//Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД
|
)
|
||||||
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
|
) {
|
||||||
res = await dbProcess({ queue: res, function: prms.task.function });
|
res = await dbProcess({ queue: res, function: prms.task.function });
|
||||||
//Если результат обработки ошибка - пробрасываем её дальше
|
//Если результат обработки ошибка - пробрасываем её дальше
|
||||||
if (res instanceof ServerError) throw res;
|
if (res instanceof ServerError) throw res;
|
||||||
|
} else {
|
||||||
|
//Финализируем обработку
|
||||||
|
await dbConn.setQueueState({
|
||||||
|
nQueueId: q.nId,
|
||||||
|
sExecMsg: null,
|
||||||
|
nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
|
||||||
|
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user