diff --git a/core/in_queue.js b/core/in_queue.js index 8acc774..d269abe 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -12,13 +12,19 @@ const EventEmitter = require("events"); //Обработчик пользова const express = require("express"); //WEB-сервер Express const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const { ServerError } = require("./server_errors"); //Типовая ошибка -const { SERR_OBJECT_BAD_INTERFACE, SERR_WEB_SERVER } = require("./constants"); //Общесистемные константы -const { makeErrorText, validateObject, buildURL } = require("./utils"); //Вспомогательные функции +const { makeErrorText, validateObject, buildURL, getAppSrvFunction } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса +const { + SERR_OBJECT_BAD_INTERFACE, + SERR_WEB_SERVER, + SERR_APP_SERVER_BEFORE, + SERR_APP_SERVER_AFTER +} = require("./constants"); //Общесистемные константы //-------------------------- // Глобальные идентификаторы @@ -80,34 +86,162 @@ class InQueue extends EventEmitter { ); //Если структура объекта в норме if (!sCheckResult) { - //Определимся с телом сообщения - let blMsg = null; - //Для POST сообщений - это тело запроса - if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { - blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; - } else { - //Для GET - параметры запроса - if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query)); - } - //Кладём сообщение в очередь - let q = await this.dbConn.putQueue({ - nServiceFnId: prms.function.nId, - blMsg - }); - //Скажем что пришло новое входящее сообщение - await this.logger.info( - `Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${ - prms.function.sCode - } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, - { nQueueId: q.nId } - ); - prms.res - .status(200) - .send( - `

Сервер приложений ПП Пурс 8

Функция сервиса: ${ - prms.service.sName - }/${prms.function.sCode}

` + //Буфер для сообщения очереди + let q = null; + try { + //Определимся с телом сообщения + let blMsg = null; + //Для POST сообщений - это тело запроса + if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { + blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; + } else { + //Для GET - параметры запроса + if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query)); + } + //Кладём сообщение в очередь + q = await this.dbConn.putQueue({ + nServiceFnId: prms.function.nId, + blMsg + }); + //Скажем что пришло новое входящее сообщение + await this.logger.info( + `Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${ + prms.function.sCode + } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, + { nQueueId: q.nId } ); + //Выполняем обработчик "До" (если он есть) + if (prms.function.sAppSrvBefore) { + //Выставим статус сообщению очереди - исполняется сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + //Выполняем + const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore); + let resBefore = null; + try { + prms.queue = q; + resBefore = await fnBefore(prms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_BEFORE, e.message); + } + //Проверяем структуру ответа функции предобработки + let sCheckResult = validateObject( + resBefore, + objInQueueSchema.InQueueProcessorFnBefore, + "Результат функции предобработки входящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Выставим статус сообщению очереди - исполнено сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успех исполнения + if (resBefore.blMsg) { + q = await this.dbConn.setQueueAppSrvResult({ + nQueueId: q.nId, + blMsg: resBefore.blMsg, + blResp: null + }); + } + } else { + //Или расскажем об ошибке + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Вызываем обработчик со стороны БД (если он есть) + if (prms.function.sPrcResp) { + //Фиксируем начало исполнения сервером БД - в статусе сообщения + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB + }); + //Вызов обработчика БД + q = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId }); + //Выставим статус сообщению очереди - исполнено обработчиком БД + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK + }); + } + //Выполняем обработчик "После" (если он есть) + if (prms.function.sAppSrvAfter) { + //Выставим статус сообщению очереди - исполняется сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP + }); + //Выполняем + const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter); + let resAfter = null; + try { + prms.queue = q; + resAfter = await fnAfter(prms); + } catch (e) { + throw new ServerError(SERR_APP_SERVER_AFTER, e.message); + } + //Проверяем структуру ответа функции предобработки + let sCheckResult = validateObject( + resAfter, + objInQueueSchema.InQueueProcessorFnAfter, + "Результат функции постобработки входящего сообщения" + ); + //Если структура ответа в норме + if (!sCheckResult) { + //Выставим статус сообщению очереди - исполнено сервером приложений + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK + }); + //Фиксируем успех исполнения + q = await this.dbConn.setQueueAppSrvResult({ + nQueueId: q.nId, + blMsg: q.blMsg, + blResp: resAfter.blResp + }); + } else { + //Или расскажем об ошибке + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } + //Всё успешно - отдаём результат клиенту + prms.res.status(200).send(q.blResp); + //Фиксируем успех обработки - в статусе сообщения + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK + }); + //Фиксируем успех обработки - в протоколе работы сервиса + await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); + } catch (e) { + //Если сообщение очереди успели создать + if (q) { + //Фиксируем ошибку обработки сервером приложений - в статусе сообщения + q = await this.dbConn.setQueueState({ + nQueueId: q.nId, + sExecMsg: makeErrorText(e), + nIncExecCnt: NINC_EXEC_CNT_YES, + nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR + }); + //Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса + await this.logger.error( + `Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`, + { nQueueId: q.nId } + ); + } else { + //Ограничимся общей ошибкой + await this.logger.error(makeErrorText(e), { + nServiceId: prms.service.nId, + nServiceFnId: prms.function.nId + }); + } + //Отправим ошибку клиенту + prms.res.status(500).send(makeErrorText(e)); + } } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } diff --git a/models/obj_in_queue.js b/models/obj_in_queue.js new file mode 100644 index 0000000..3f6e582 --- /dev/null +++ b/models/obj_in_queue.js @@ -0,0 +1,41 @@ +/* + Сервис интеграции ПП Парус 8 с WEB API + Модели данных: описатель сообщений обмена с обработчиком очереди выходящих сообщений +*/ + +//---------------------- +// Подключение библиотек +//---------------------- + +const Schema = require("validate"); //Схемы валидации + +//------------------ +// Интерфейс модуля +//------------------ + +//Схема валидации результата работы функции "предобработки" сообщения очереди сервером приложений +exports.InQueueProcessorFnBefore = new Schema({ + //Обработанный запрос внешней системы + blMsg: { + type: Buffer, + required: false, + message: { + type: path => + `Обработанный запрос внешней системы (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, + required: path => `Не указан Обработанный запрос внешней системы (${path})` + } + } +}); + +//Схема валидации результата работы функции "постобработки" сообщения очереди сервером приложений +exports.InQueueProcessorFnAfter = new Schema({ + //Обработанный ответ системы + blResp: { + type: Buffer, + required: true, + message: { + type: path => `Обработанный ответ системы (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, + required: path => `Не указан обработанный ответ системы (${path})` + } + } +});