diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 958ed26..3f7b0a6 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,25 +621,17 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { - //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { + //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; - } else { - //Финализируем обработку - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: null, - nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); } } }