From 54138ac4ec351622c93076f0189e235ce21c92ae Mon Sep 17 00:00:00 2001 From: Dollerino Date: Fri, 27 Sep 2024 15:46:33 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-901=20(=D0=94=D0=BE?= =?UTF-8?q?=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BF=D0=BE?= =?UTF-8?q?=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B8=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=BE=D0=B2=20MQTT=20=D0=B8=20KA?= =?UTF-8?q?FKA)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue_processor.js | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3f7b0a6..958ed26 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,17 +621,25 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; + } else { + //Финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); } } }