/* Сервис интеграции ПП Парус 8 с WEB API Модуль ядра: обработчик mqtt сообщений */ //---------------------- // Подключение библиотек //---------------------- const { makeErrorText } = require("./utils"); //Вспомогательные функции const mqtt = require("mqtt"); //Работа с MQTT //------------ // Тело модуля //------------ //Отправка MQTT сообщения const publishMQTT = async ({ settings, url, auth, topic, message }) => { //Инициализируем подключение const client = await mqtt.connectAsync(url, { clientId: settings.sClientIdSender, clean: true, connectTimeout: settings.nConnectTimeout, username: auth.user, password: auth.pass, reconnectPeriod: settings.nReconnectPeriod }); //Отправляем сообщение await client.publishAsync(topic, message); //Закрываем подключение await client.endAsync(); //Возвращаем статус успешной отправки return { statusCode: 200 }; }; //Получение MQTT сообщений const subscribeMQTT = async ({ settings, service, processMessage, logger }) => { try { //Инициализируем строку подключения let sBroker = service.sSrvRoot; //Инициализируем подключение const client = await mqtt.connectAsync(sBroker, { clientId: settings.sClientIdRecipient, clean: true, connectTimeout: settings.nConnectTimeout, username: service.sSrvUser, password: service.sSrvPass, reconnectPeriod: settings.nReconnectPeriod }); //Обходим функции сервиса service.functions.forEach(fn => { client.subscribe(fn.sFnURL); }); //Прослушиваем сообщения client.on("message", (topic, message) => { //Обрабатываем сообщение processMessage({ message: { value: message, headers: {} }, service, fn: service.functions.find(fn => { return fn.sFnURL === topic; }) }); }); //Прослушиваем отключение от сервера client.on("offline", () => { //Выводим ошибку logger.error(`Соединение с MQTT потеряно (${sBroker})`); }); //Прослушиваем восстановление соединения client.on("connect", () => { //Сообщим о восстановлении соединения logger.info(`Соединение с MQTT восстановлено (${sBroker})`); }); //Возвращаем подключение return client; } catch (e) { logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`); } }; //----------------- // Интерфейс модуля //----------------- exports.publishMQTT = publishMQTT; exports.subscribeMQTT = subscribeMQTT;