forked from CITKParus/P8-ExchangeService
88 lines
3.3 KiB
JavaScript
88 lines
3.3 KiB
JavaScript
/*
|
||
Сервис интеграции ПП Парус 8 с WEB API
|
||
Модуль ядра: обработчик mqtt сообщений
|
||
*/
|
||
|
||
//----------------------
|
||
// Подключение библиотек
|
||
//----------------------
|
||
|
||
const { makeErrorText } = require("./utils"); //Вспомогательные функции
|
||
const mqtt = require("mqtt"); //Работа с MQTT
|
||
|
||
//------------
|
||
// Тело модуля
|
||
//------------
|
||
|
||
//Отправка MQTT сообщения
|
||
const publishMQTT = async ({ settings, url, auth, topic, message }) => {
|
||
//Инициализируем подключение
|
||
const client = await mqtt.connectAsync(url, {
|
||
clientId: settings.sClientIdSender,
|
||
clean: true,
|
||
connectTimeout: settings.nConnectTimeout,
|
||
username: auth.user,
|
||
password: auth.pass,
|
||
reconnectPeriod: settings.nReconnectPeriod
|
||
});
|
||
//Отправляем сообщение
|
||
await client.publishAsync(topic, message);
|
||
//Закрываем подключение
|
||
await client.endAsync();
|
||
//Возвращаем статус успешной отправки
|
||
return { statusCode: 200 };
|
||
};
|
||
|
||
//Получение MQTT сообщений
|
||
const subscribeMQTT = async ({ settings, service, processMessage, logger }) => {
|
||
try {
|
||
//Инициализируем строку подключения
|
||
let sBroker = service.sSrvRoot;
|
||
//Инициализируем подключение
|
||
const client = await mqtt.connectAsync(sBroker, {
|
||
clientId: settings.sClientIdRecipient,
|
||
clean: true,
|
||
connectTimeout: settings.nConnectTimeout,
|
||
username: service.sSrvUser,
|
||
password: service.sSrvPass,
|
||
reconnectPeriod: settings.nReconnectPeriod
|
||
});
|
||
//Обходим функции сервиса
|
||
service.functions.forEach(fn => {
|
||
client.subscribe(fn.sFnURL);
|
||
});
|
||
//Прослушиваем сообщения
|
||
client.on("message", (topic, message) => {
|
||
//Обрабатываем сообщение
|
||
processMessage({
|
||
message: { value: message, headers: {} },
|
||
service,
|
||
fn: service.functions.find(fn => {
|
||
return fn.sFnURL === topic;
|
||
})
|
||
});
|
||
});
|
||
//Прослушиваем отключение от сервера
|
||
client.on("offline", () => {
|
||
//Выводим ошибку
|
||
logger.error(`Соединение с MQTT потеряно (${sBroker})`);
|
||
});
|
||
//Прослушиваем восстановление соединения
|
||
client.on("connect", () => {
|
||
//Сообщим о восстановлении соединения
|
||
logger.info(`Соединение с MQTT восстановлено (${sBroker})`);
|
||
});
|
||
//Возвращаем подключение
|
||
return client;
|
||
} catch (e) {
|
||
logger.error(`Ошибка запуска обработчика очереди исходящих сообщений MQTT: ${makeErrorText(e)}`);
|
||
}
|
||
};
|
||
|
||
//-----------------
|
||
// Интерфейс модуля
|
||
//-----------------
|
||
|
||
exports.publishMQTT = publishMQTT;
|
||
exports.subscribeMQTT = subscribeMQTT;
|