diff --git a/config.js b/config.js index fa73f2f..edea45f 100644 --- a/config.js +++ b/config.js @@ -77,6 +77,8 @@ let kafkaConnection = { sClientIdSender: "Parus", //ID клиента-получателя sClientIdRecipient: "Parus", + //Группа получателя + sGroupId: "Parus", //Время ожидания успешного подключения (мс) nConnectionTimeout: 5000, //Необходимость попытки переподключения при потере соединения diff --git a/core/kafka_connector.js b/core/kafka_connector.js index 5ee2841..be09621 100644 --- a/core/kafka_connector.js +++ b/core/kafka_connector.js @@ -71,7 +71,7 @@ const subscribeKafka = async ({ connectionPrms, service, processKafkaMessage, lo }); //Инициализируем получателя - let consumer = client.consumer({ groupId: "ParusWebApi" }); + let consumer = client.consumer({ groupId: connectionPrms.sGroupId }); //Устанавливаем прослушивание await consumer.connect(); diff --git a/core/mqtt_connector.js b/core/mqtt_connector.js index 90c929a..f63f226 100644 --- a/core/mqtt_connector.js +++ b/core/mqtt_connector.js @@ -50,12 +50,9 @@ const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logg }); //Обходим функции сервиса - _.forEach( - _.filter(service.functions, fn => !fn.sFnURL.startsWith("@")), - fn => { - client.subscribe(fn.sFnURL); - } - ); + _.forEach(service.functions, fn => { + client.subscribe(fn.sFnURL); + }); //Прослушиваем сообщения client.on("message", (topic, message) => {