diff --git a/config.js b/config.js index 1ad4d8a..4f12cc8 100644 --- a/config.js +++ b/config.js @@ -24,7 +24,7 @@ let dbConnect = { //Параметры обработки очереди исходящих сообщений let outGoing = { //Количество одновременно обрабатываемых исходящих сообщений - nMaxWorkers: 20, + nMaxWorkers: 1, //Интервал проверки наличия исходящих сообщений (мс) nCheckTimeout: 1 }; diff --git a/core/out_queue.js b/core/out_queue.js index 415bbd5..6c73b99 100644 --- a/core/out_queue.js +++ b/core/out_queue.js @@ -209,7 +209,7 @@ class OutQueue extends EventEmitter { ); //Если структура сообщения в норме if (!sCheckResult) { - //Анализируем результат обработки + //Анализируем результат обработки - если ошибка - фиксируем if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) { //Фиксируем ошибку обработки - протокол работы сервиса await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, { @@ -228,8 +228,13 @@ class OutQueue extends EventEmitter { ? prms.queue.nExecState : objQueueSchema.NQUEUE_EXEC_STATE_ERR }); + } else { + //Ошибки нет, но если есть контекст для сервиса - сохраним его для дальнейшего использования + if (!_.isUndefined(result.context)) { + let tmpSrv = _.find(this.services, { nId: prms.queue.nServiceId }); + tmpSrv.context = _.cloneDeep(result.context); + } } - //Если есть контекст для сервиса - сохраним его для дальнейшего использования } else { //Пришел неожиданный ответ обработчика - запись в протокол работы сервиса await self.logger.error( diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index a96450c..9f9e377 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -44,22 +44,36 @@ const sendErrorResult = prms => { if (!sCheckResult) { process.send({ sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, - sMsg: prms.sMessage + sMsg: prms.sMessage, + context: null }); } else { process.send({ sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, - sMsg: sCheckResult + sMsg: sCheckResult, + context: null }); } }; //Отправка родительскому процессу успеха обработки сообщения сервером приложений -const sendOKResult = () => { - process.send({ - sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, - sMsg: null - }); +const sendOKResult = prms => { + //Проверяем структуру переданного сообщения + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.sendOKResult, + "Параметры функции отправки родительскому процессу успеха обработки сообщения" + ); + //Если структура объекта в норме + if (!sCheckResult) { + process.send({ + sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, + sMsg: null, + context: prms.context + }); + } else { + sendErrorResult({ sMessage: sCheckResult }); + } }; //Запуск обработки сообщения сервером приложений @@ -125,8 +139,9 @@ const appProcess = async prms => { //Если структура ответа в норме if (!sCheckResult) { //Применим её - options = _.cloneDeep(resBefore.options); - if (resBefore.blMsg) prms.queue.blMsg = resBefore.blMsg; + if (!_.isUndefined(resBefore.options)) options = _.cloneDeep(resBefore.options); + if (!_.isUndefined(resBefore.blMsg)) prms.queue.blMsg = resBefore.blMsg; + if (!_.isUndefined(resBefore.context)) prms.service.context = _.cloneDeep(resBefore.context); } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -134,7 +149,8 @@ const appProcess = async prms => { } } //Отправляем сообщение удалённому серверу - let serverResp = await rqp(options); + //let serverResp = await rqp(options); + serverResp = { state: "OK" }; _.extend(prms, { serverResp }); //Выполняем обработчик "После" (если он есть) if (prms.function.sAppSrvAfter) { @@ -156,7 +172,8 @@ const appProcess = async prms => { //Если структура ответа в норме if (!sCheckResult) { //Применим её - prms.queue.blResp = resAfter.blResp; + if (!_.isUndefined(resAfter.blResp)) prms.queue.blResp = resAfter.blResp; + if (!_.isUndefined(resAfter.context)) prms.service.context = _.cloneDeep(resAfter.context); } else { //Или расскажем об ошибке throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); @@ -518,7 +535,7 @@ const processTask = async prms => { //Отключаемся от БД if (dbConn) await dbConn.disconnect(); //Отправляем успех - sendOKResult(); + sendOKResult({ context: prms.task.service.context }); } catch (e) { //Отключаемся от БД if (dbConn) await dbConn.disconnect(); diff --git a/models/obj_out_queue_processor.js b/models/obj_out_queue_processor.js index eda8f03..fba1c8e 100644 --- a/models/obj_out_queue_processor.js +++ b/models/obj_out_queue_processor.js @@ -89,6 +89,15 @@ exports.OutQueueProcessorTaskResult = new Schema({ `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` } + }, + //Контекст работы сервиса + context: { + type: Object, + required: true, + message: { + type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, + required: path => `Не указан контекст работы сервиса (${path})` + } } }).validator({ required: val => typeof val != "undefined" @@ -99,7 +108,7 @@ exports.OutQueueProcessorFnBefore = new Schema({ //Параметры запроса удалённому сервису options: { type: Object, - required: true, + required: false, message: { type: path => `Параметры запроса удалённому сервису (${path}) имеют некорректный тип данных (ожидалось - Object, см. документацию к REQUEST - https://github.com/request/request)`, @@ -115,6 +124,15 @@ exports.OutQueueProcessorFnBefore = new Schema({ `Обработанное сообщение очереди (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, required: path => `Не указано обработанное сообщение очереди (${path})` } + }, + //Контекст работы сервиса + context: { + type: Object, + required: false, + message: { + type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, + required: path => `Не указан контекст работы сервиса (${path})` + } } }); @@ -123,11 +141,20 @@ exports.OutQueueProcessorFnAfter = new Schema({ //Результат обработки ответа удалённого сервиса blResp: { type: Buffer, - required: true, + required: false, message: { type: path => `Результат обработки ответа удалённого сервиса (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, required: path => `Не указан результат обработки ответа удалённого сервиса (${path})` } + }, + //Контекст работы сервиса + context: { + type: Object, + required: false, + message: { + type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, + required: path => `Не указан контекст работы сервиса (${path})` + } } }); diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js index 11788be..e8aef4f 100644 --- a/models/prms_out_queue_processor.js +++ b/models/prms_out_queue_processor.js @@ -30,6 +30,19 @@ exports.sendErrorResult = new Schema({ } }); +//Схема валидации параметров функции отправки успеха обработки +exports.sendOKResult = new Schema({ + //Контекст работы сервиса + context: { + type: Object, + required: true, + message: { + type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`, + required: path => `Не указан контекст работы сервиса (${path})` + } + } +}); + //Схема валидации параметров функции обработчки сообщения сервером приложений exports.appProcess = new Schema({ //Обрабатываемое сообщение очереди