From 67aafc25b7013bca48d29a434c479bbb61e5fabf Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Fri, 27 Sep 2024 13:21:14 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A6=D0=98=D0=A2=D0=9A-901=20-=20=D0=9F=D0=BE?= =?UTF-8?q?=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D0=B0=20MQTT/Kafka=20-=20?= =?UTF-8?q?=D0=BA=D0=BE=D1=81=D0=BC=D0=B5=D1=82=D0=B8=D1=87=D0=B5=D1=81?= =?UTF-8?q?=D0=BA=D0=B8=D0=B5=20=D0=B8=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/in_queue.js | 6 +----- core/out_queue_processor.js | 10 +++++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/core/in_queue.js b/core/in_queue.js index eb63e79..a6ab9c9 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -107,7 +107,7 @@ class InQueue extends EventEmitter { //Оповестим подписчиков об останове this.emit(SEVT_IN_QUEUE_STOPPED); } - //Обработка сообщения + //Обработка сообщения HTTP/HTTPS async processMessage(prms) { //Проверяем структуру переданного объекта для обработки let sCheckResult = validateObject(prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения"); @@ -382,7 +382,6 @@ class InQueue extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - //Обработка MQ сообщения async processMQMessage({ message, service, fn, sProtocol }) { //Буфер для сообщения очереди @@ -528,7 +527,6 @@ class InQueue extends EventEmitter { } } } - //Запуск обработки очереди входящих сообщений async startProcessing(prms) { //Проверяем структуру переданного объекта для старта @@ -703,7 +701,6 @@ class InQueue extends EventEmitter { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } } - //Закрытие подключений stopConnections() { //Если у нас есть соединения с MQTT @@ -729,7 +726,6 @@ class InQueue extends EventEmitter { } } } - //Остановка обработки очереди исходящих сообщений stopProcessing() { //Выставляем флаг неработы diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index 666f67e..3f7b0a6 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -171,7 +171,7 @@ const appProcess = async prms => { //Указываем, что выполнение обработчика "После" невозможно bExecuteAfter = false; break; - //mqtt и mqtts + //MQTT/MQTTS case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): options.url = prms.service.sSrvRoot; options.body = prms.queue.blMsg; @@ -189,7 +189,7 @@ const appProcess = async prms => { //Указываем, что выполнение обработчика "После" невозможно bExecuteAfter = false; break; - //Другие + //HTTP/HTTPS default: //Определимся с URL и телом сообщения в зависимости от способа передачи параметров (для POST, PATCH и PUT - данные в теле, для остальных - в URI) if ( @@ -344,7 +344,7 @@ const appProcess = async prms => { message: options.body }); break; - //mqtt и mqtts + //MQTT/MQTTS case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol): serverResp = await publishMQTT({ settings: options.settings, @@ -354,9 +354,9 @@ const appProcess = async prms => { message: options.body }); break; - //Другие + //HTTP/HTTPS default: - //Ждем ответ от удалённого сервера + //Установим флаг возврата полного ответа (и тела и заголовков) options.resolveWithFullResponse = true; //Отправляем запрос serverResp = await rqp(options);