From 8f0e9c9ea2920d3b3cd57e54081256c2417553d0 Mon Sep 17 00:00:00 2001 From: Dollerino Date: Fri, 27 Sep 2024 15:43:11 +0300 Subject: [PATCH 1/3] Update out_queue_processor.js --- core/out_queue_processor.js | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3f7b0a6..958ed26 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,17 +621,25 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; + } else { + //Финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); } } } -- 2.34.1 From 0db9729f6139f423e550c2463102c2f185b0c3fa Mon Sep 17 00:00:00 2001 From: Dollerino Date: Fri, 27 Sep 2024 15:46:06 +0300 Subject: [PATCH 2/3] Update out_queue_processor.js --- core/out_queue_processor.js | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 958ed26..3f7b0a6 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,25 +621,17 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { - //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { + //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; - } else { - //Финализируем обработку - await dbConn.setQueueState({ - nQueueId: q.nId, - sExecMsg: null, - nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, - nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK - }); } } } -- 2.34.1 From 54138ac4ec351622c93076f0189e235ce21c92ae Mon Sep 17 00:00:00 2001 From: Dollerino Date: Fri, 27 Sep 2024 15:46:33 +0300 Subject: [PATCH 3/3] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-901=20(=D0=94?= =?UTF-8?q?=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=BF?= =?UTF-8?q?=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B8=20=D0=BF=D1=80?= =?UTF-8?q?=D0=BE=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=BE=D0=B2=20MQTT=20?= =?UTF-8?q?=D0=B8=20KAFKA)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue_processor.js | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 3f7b0a6..958ed26 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -621,17 +621,25 @@ const processTask = async prms => { if (res instanceof ServerError) { throw res; } else { - //Если это не Kafka/MQTT сообщение - обрабатываем сервером БД - if ( - ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( - getURLProtocol(prms.task.service.sSrvRoot) - ) - ) { - //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда запустим обработку сервером БД - if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Нет ошибки, посмотрим что прилетело сообщение в успешном статусе и тогда по необходимости запустим обработку сервером БД + if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { + //Если это не MQTT/MQTTS/Kafka - запустим обработку сервером БД, иначе установим статус успешного выполнения + if ( + ![objServiceSchema.SPROTOCOL_KAFKA, objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes( + getURLProtocol(prms.task.service.sSrvRoot) + ) + ) { res = await dbProcess({ queue: res, function: prms.task.function }); //Если результат обработки ошибка - пробрасываем её дальше if (res instanceof ServerError) throw res; + } else { + //Финализируем обработку + await dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: null, + nIncExecCnt: q.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); } } } -- 2.34.1