From 04f218bc84955c9edbb289013027657a20e4148e Mon Sep 17 00:00:00 2001 From: Mikhail Chechnev Date: Wed, 5 Dec 2018 22:26:14 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D1=87=D0=B8=D0=BA=20=D0=B8=D1=81=D1=85=D0=BE=D0=B4=D1=8F=D1=89?= =?UTF-8?q?=D0=B5=D0=B3=D0=BE=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20(=D0=BD=D0=B0=D1=87=D0=B0=D0=BB=D0=BE)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/out_queue_processor.js | 168 +++++++++++++++++++++++------ models/prms_out_queue_processor.js | 91 ++++++++++++++++ 2 files changed, 224 insertions(+), 35 deletions(-) create mode 100644 models/prms_out_queue_processor.js diff --git a/core/out_queue_processor.js b/core/out_queue_processor.js index fb3b261..91396e2 100644 --- a/core/out_queue_processor.js +++ b/core/out_queue_processor.js @@ -10,51 +10,151 @@ const _ = require("lodash"); //Работа с массивами и коллекциями const { makeModuleFullPath, validateObject } = require("./utils"); //Вспомогательные функции const { ServerError } = require("./server_errors"); //Типовая ошибка -const { NQUEUE_EXEC_STATE_APP_OK, NQUEUE_EXEC_STATE_APP_ERR } = require("../models/obj_queue"); //Схема валидации сообщения очереди обмена const objOutQueueProcessorSchema = require("../models/obj_out_queue_processor"); //Схема валидации сообщений обмена с бработчиком очереди исходящих сообщений +const prmsOutQueueProcessorSchema = require("../models/prms_out_queue_processor"); //Схема валидации параметров функций модуля +const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const { + SERR_UNEXPECTED, SERR_MODULES_BAD_INTERFACE, SERR_OBJECT_BAD_INTERFACE, SERR_MODULES_NO_MODULE_SPECIFIED } = require("./constants"); //Глобальные константы - -//-------------------------- -// Глобальные идентификаторы -//-------------------------- - -//Сообщени для родительского процесса -let result = { - nExecState: null, - sExecMsg: null, - blResp: null -}; +//!!!!!!!!!!!!!!!!!!!!!!! УБРАТЬ!!!!!!!!!!!!!!!!!!!!!! +const fs = require("fs"); //------------ // Тело модуля //------------ -//Установка состояния ошибки в ответном сообщении -const setErrorResult = e => { - //Выставим код состояния - ошибка обработки сервером приложений - result.nExecState = NQUEUE_EXEC_STATE_APP_ERR; - //Выставим сообщение об ошибке - result.sExecMsg = e.message; +//Отправка родительскому процессу ошибки обработки сообщения сервером приложений +const sendErrorResult = prms => { + //Проверяем параметры + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.sendErrorResult, + "Параметры функции отправки ошибки обработки" + ); + //Если параметры в норме + if (!sCheckResult) { + //Отправляем родительскому процессу ошибку + process.send({ + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, + sExecMsg: prms.sMessage, + blMsg: null, + blResp: null + }); + } else { + //Отправляем родительскому процессу сведения об ошибочных параметрах + process.send({ + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, + sExecMsg: sCheckResult, + blMsg: null, + blResp: null + }); + } }; -//Установка состояния успеха в ответном сообщении -const setOKResult = () => { - //Выставим код состояния - ошибка обработки сервером приложений - result.nExecState = NQUEUE_EXEC_STATE_APP_OK; - //Выставим сообщение об ошибке - result.sExecMsg = null; +//Отправка родительскому процессу успеха обработки сообщения сервером приложений +const sendOKResult = prms => { + //Проверяем параметры + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.sendOKResult, + "Параметры функции отправки ответа об успехе обработки" + ); + //Если параметры в норме + if (!sCheckResult) { + //Отправляем родительскому процессу успех + process.send({ + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK, + sExecMsg: null, + blMsg: prms.blMsg, + blResp: prms.blResp + }); + } else { + //Отправляем родительскому процессу сведения об ошибочных параметрах + process.send({ + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, + sExecMsg: sCheckResult, + blMsg: null, + blResp: null + }); + } +}; + +//Отправка родительскому процессу сообщения без обработки +const sendUnChange = prms => { + //Проверяем параметры + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.sendUnChange, + "Параметры функции отправки сообщения без обработки" + ); + //Если параметры в норме + if (!sCheckResult) { + process.send({ + nExecState: prms.task.nExecState, + sExecMsg: null, + blMsg: prms.task.blMsg ? new Buffer(prms.task.blMsg) : null, + blResp: prms.task.blResp ? new Buffer(prms.task.blResp) : null + }); + } else { + //Отправляем родительскому процессу сведения об ошибочных параметрах + process.send({ + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR, + sExecMsg: sCheckResult, + blMsg: null, + blResp: null + }); + } }; //Обработка задачи -const processTask = async task => { - setTimeout(() => { - setOKResult(); - process.send(result); - }, 3000); +const processTask = async prms => { + //Проверяем параметры + let sCheckResult = validateObject( + prms, + prmsOutQueueProcessorSchema.processTask, + "Параметры функции обработки задачи" + ); + //Если параметры в норме + if (!sCheckResult) { + //Обработке подлежат только необработанные сервером приложений сообщения + if ( + prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE || + prms.task.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_ERR + ) { + setTimeout(() => { + //if (prms.task.nQueueId == 2) { + // sendErrorResult({ sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId }); + //} else { + if (prms.task.blMsg) { + let b = new Buffer(prms.task.blMsg); + fs.writeFile("c:/repos/temp/" + prms.task.nQueueId, b, err => { + if (err) { + sendErrorResult({ sMessage: err.message }); + } else { + let sMsg = b.toString() + " MODIFICATION FOR " + prms.task.nQueueId; + sendOKResult({ + blMsg: new Buffer(sMsg), + blResp: new Buffer("REPLAY ON " + prms.task.nQueueId) + }); + } + }); + } else { + sendErrorResult({ + sMessage: "Ошибка отработки сообщения " + prms.task.nQueueId + ": нет данных для обработки" + }); + } + //} + }, 3000); + } else { + //Остальные возвращаем без изменения и отработки, с сохранением статусов и сообщений + sendUnChange(prms); + } + } else { + sendErrorResult({ sMessage: sCheckResult }); + } }; //--------------------------------- @@ -62,7 +162,7 @@ const processTask = async task => { //--------------------------------- //Перехват CTRL + C (останов процесса) -process.on("SIGINT", async () => {}); +process.on("SIGINT", () => {}); //Перехват CTRL + \ (останов процесса) process.on("SIGQUIT", () => {}); @@ -72,10 +172,8 @@ process.on("SIGTERM", () => {}); //Перехват ошибок process.on("uncaughtException", e => { - //Выставляем ошибку в сообщении - setErrorResult(e); - //Отправляем ответ родительскому процессу - process.send(result); + //Отправляем ошибку родительскому процессу + sendErrorResult({ sMessage: e.message }); }); //Приём сообщений @@ -89,7 +187,7 @@ process.on("message", task => { //Если структура объекта в норме if (!sCheckResult) { //Запускаем обработку - processTask(task); + processTask({ task }); } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } diff --git a/models/prms_out_queue_processor.js b/models/prms_out_queue_processor.js new file mode 100644 index 0000000..34fbb96 --- /dev/null +++ b/models/prms_out_queue_processor.js @@ -0,0 +1,91 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модели данных: описатели параметров функций модуля обработки исходящих сообщений +*/ + +//---------------------- +// Подключение библиотек +//---------------------- + +const Schema = require("validate"); //Схемы валидации +const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений + +//------------ +// Тело модуля +//------------ + +//Валидация данных сообщения очереди +const validateBuffer = val => { + //Либо null + if (val === null) { + return true; + } else { + //Либо Buffer + return val instanceof Buffer; + } +}; + +//------------------ +// Интерфейс модуля +//------------------ + +//Схема валидации параметров функции отправки ошибки обработки +exports.sendErrorResult = new Schema({ + //Сообщение об ошибке + sMessage: { + type: String, + required: true, + message: { + type: path => `Идентификатор сервиса (${path}) имеет некорректный тип данных (ожидалось - String)`, + required: path => `Не указан идентификатор сервиса (${path})` + } + } +}); + +//Схема валидации параметров функции отправки успеха обработки +exports.sendOKResult = new Schema({ + //Данные сообщения очереди обмена + blMsg: { + use: { validateBuffer }, + required: true, + message: { + validateBuffer: path => + `Данные сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, + required: path => `Не указаны данные сообщения очереди обмена (${path})` + } + }, + //Данные ответа сообщения очереди обмена + blResp: { + use: { validateBuffer }, + required: true, + message: { + validateBuffer: path => + `Данные ответа сообщения очереди обмена (${path}) имеют некорректный тип данных (ожидалось - null или Buffer)`, + required: path => `Не указаны данные ответа сообщения очереди обмена (${path})` + } + } +}).validator({ required: val => typeof val != "undefined" }); + +//Параметры функции отправки сообщения родителю без обработки +exports.sendUnChange = new Schema({ + //Задача обработки + task: { + schema: OutQueueProcessorTask, + required: true, + message: { + required: path => `Не указана задача для обработки (${path})` + } + } +}); + +//Параметры функции обработки сообщения +exports.processTask = new Schema({ + //Задача обработки + task: { + schema: OutQueueProcessorTask, + required: true, + message: { + required: path => `Не указана задача для обработки (${path})` + } + } +});