86 lines
3.3 KiB
JavaScript
86 lines
3.3 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: обработчик mqtt сообщений
|
||
*/
|
||
|
||
//----------------------
|
||
// Подключение библиотек
|
||
//----------------------
|
||
|
||
const _ = require("lodash"); //Работа с массивами и коллекциями
|
||
const { makeErrorText } = require("./utils"); //Вспомогательные функции
|
||
const mqtt = require("mqtt"); //Работа с MQTT
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Отправка MQTT сообщения
|
||
const publishMQTT = async ({ connectionPrms, url, auth, topic, message }) => {
|
||
//Инициализируем подключение
|
||
const client = await mqtt.connectAsync(url, {
|
||
clientId: connectionPrms.sClientIdSender,
|
||
clean: true,
|
||
connectTimeout: connectionPrms.nConnectTimeout,
|
||
username: auth.user,
|
||
password: auth.pass,
|
||
reconnectPeriod: connectionPrms.nReconnectPeriod
|
||
});
|
||
//Отправляем сообщение
|
||
await client.publishAsync(topic, message);
|
||
//Закрываем подключение
|
||
await client.endAsync();
|
||
//Возвращаем сообщение, которое было отправлено
|
||
return { statusCode: 200 };
|
||
};
|
||
|
||
//Получение MQTT сообщений
|
||
const subscribeMQTT = async ({ connectionPrms, service, processMQTTMessage, logger }) => {
|
||
try {
|
||
//Инициализируем строку подключения
|
||
let sBroker = service.sSrvRoot;
|
||
//Инициализируем подключение
|
||
const client = await mqtt.connectAsync(sBroker, {
|
||
clientId: connectionPrms.sClientIdRecipient,
|
||
clean: true,
|
||
connectTimeout: connectionPrms.nConnectTimeout,
|
||
username: service.sSrvUser,
|
||
password: service.sSrvPass,
|
||
reconnectPeriod: connectionPrms.nReconnectPeriod
|
||
});
|
||
|
||
//Обходим функции сервиса
|
||
_.forEach(service.functions, fn => {
|
||
client.subscribe(fn.sFnURL);
|
||
});
|
||
|
||
//Прослушиваем сообщения
|
||
client.on("message", (topic, message) => {
|
||
//Обрабатываем сообщение
|
||
processMQTTMessage({ message, service, fn: _.find(service.functions, { sFnURL: topic }) });
|
||
});
|
||
//Прослушиваем отключение от сервера
|
||
client.on("offline", () => {
|
||
//Выводим ошибку
|
||
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
|
||
});
|
||
//Прослушиваем восстановление соединения
|
||
client.on("reconnect", () => {
|
||
//Сообщим о восстановлении соединения
|
||
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
|
||
});
|
||
|
||
//Возвращаем подключение
|
||
return client;
|
||
} catch (e) {
|
||
logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`);
|
||
}
|
||
};
|
||
|
||
//-----------------
|
||
// Интерфейс модуля
|
||
//-----------------
|
||
|
||
exports.publishMQTT = publishMQTT;
|
||
exports.subscribeMQTT = subscribeMQTT;
|