diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3f7b0a6..958ed26 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,17 +621,25 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; + } else { + //Финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); } } }