Compare commits

..

No commits in common. "86a19155c6b490789f10386389f68d53ec30b0e5" and "ffb12c85503b041855a044c07ba867975de7cf29" have entirely different histories.

2 changed files with 10 additions and 6 deletions

View File

@ -107,7 +107,7 @@ class InQueue extends EventEmitter {
//Оповестим подписчиков об останове
this.emit(SEVT_IN_QUEUE_STOPPED);
}
//Обработка сообщения HTTP/HTTPS
//Обработка сообщения
async processMessage(prms) {
//Проверяем структуру переданного объекта для обработки
let sCheckResult = validateObject(prms, prmsInQueueSchema.processMessage, "Параметры функции обработки входящего сообщения");
@ -382,6 +382,7 @@ class InQueue extends EventEmitter {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Обработка MQ сообщения
async processMQMessage({ message, service, fn, sProtocol }) {
//Буфер для сообщения очереди
@ -527,6 +528,7 @@ class InQueue extends EventEmitter {
}
}
}
//Запуск обработки очереди входящих сообщений
async startProcessing(prms) {
//Проверяем структуру переданного объекта для старта
@ -701,6 +703,7 @@ class InQueue extends EventEmitter {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Закрытие подключений
stopConnections() {
//Если у нас есть соединения с MQTT
@ -726,6 +729,7 @@ class InQueue extends EventEmitter {
}
}
}
//Остановка обработки очереди исходящих сообщений
stopProcessing() {
//Выставляем флаг неработы

View File

@ -171,7 +171,7 @@ const appProcess = async prms => {
//Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false;
break;
//MQTT/MQTTS
//mqtt и mqtts
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
options.url = prms.service.sSrvRoot;
options.body = prms.queue.blMsg;
@ -189,7 +189,7 @@ const appProcess = async prms => {
//Указываем, что выполнение обработчика "После" невозможно
bExecuteAfter = false;
break;
//HTTP/HTTPS
//Другие
default:
//Определимся с URL и телом сообщения в зависимости от способа передачи параметров (для POST, PATCH и PUT - данные в теле, для остальных - в URI)
if (
@ -344,7 +344,7 @@ const appProcess = async prms => {
message: options.body
});
break;
//MQTT/MQTTS
//mqtt и mqtts
case [objServiceSchema.SPROTOCOL_MQTT, objServiceSchema.SPROTOCOL_MQTTS].includes(sProtocol):
serverResp = await publishMQTT({
settings: options.settings,
@ -354,9 +354,9 @@ const appProcess = async prms => {
message: options.body
});
break;
//HTTP/HTTPS
//Другие
default:
//Установим флаг возврата полного ответа (и тела и заголовков)
//Ждем ответ от удалённого сервера
options.resolveWithFullResponse = true;
//Отправляем запрос
serverResp = await rqp(options);