Обработчик входящих сообщений (вызов пред и пост обработки, обработчика со стороны БД)

This commit is contained in:
Mikhail Chechnev 2018-12-14 21:54:06 +03:00
parent 165987ff94
commit 6d4d833be6
2 changed files with 204 additions and 29 deletions

View File

@ -12,13 +12,19 @@ const EventEmitter = require("events"); //Обработчик пользова
const express = require("express"); //WEB-сервер Express const express = require("express"); //WEB-сервер Express
const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса)
const { ServerError } = require("./server_errors"); //Типовая ошибка const { ServerError } = require("./server_errors"); //Типовая ошибка
const { SERR_OBJECT_BAD_INTERFACE, SERR_WEB_SERVER } = require("./constants"); //Общесистемные константы const { makeErrorText, validateObject, buildURL, getAppSrvFunction } = require("./utils"); //Вспомогательные функции
const { makeErrorText, validateObject, buildURL } = require("./utils"); //Вспомогательные функции
const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД
const objInQueueSchema = require("../models/obj_in_queue"); //Схема валидации сообщений обмена с бработчиком очереди входящих сообщений
const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса
const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса
const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди
const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса
const {
SERR_OBJECT_BAD_INTERFACE,
SERR_WEB_SERVER,
SERR_APP_SERVER_BEFORE,
SERR_APP_SERVER_AFTER
} = require("./constants"); //Общесистемные константы
//-------------------------- //--------------------------
// Глобальные идентификаторы // Глобальные идентификаторы
@ -80,34 +86,162 @@ class InQueue extends EventEmitter {
); );
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Определимся с телом сообщения //Буфер для сообщения очереди
let blMsg = null; let q = null;
//Для POST сообщений - это тело запроса try {
if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { //Определимся с телом сообщения
blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; let blMsg = null;
} else { //Для POST сообщений - это тело запроса
//Для GET - параметры запроса if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) {
if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query)); blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null;
} } else {
//Кладём сообщение в очередь //Для GET - параметры запроса
let q = await this.dbConn.putQueue({ if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query));
nServiceFnId: prms.function.nId, }
blMsg //Кладём сообщение в очередь
}); q = await this.dbConn.putQueue({
//Скажем что пришло новое входящее сообщение nServiceFnId: prms.function.nId,
await this.logger.info( blMsg
`Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${ });
prms.function.sCode //Скажем что пришло новое входящее сообщение
} (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, await this.logger.info(
{ nQueueId: q.nId } `Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${
); prms.function.sCode
prms.res } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`,
.status(200) { nQueueId: q.nId }
.send(
`<html><body><center><br><h1>Сервер приложений ПП Пурс 8</h1><h3>Функция сервиса: ${
prms.service.sName
}/${prms.function.sCode}</h3></center></body></html>`
); );
//Выполняем обработчик "До" (если он есть)
if (prms.function.sAppSrvBefore) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnBefore = getAppSrvFunction(prms.function.sAppSrvBefore);
let resBefore = null;
try {
prms.queue = q;
resBefore = await fnBefore(prms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_BEFORE, e.message);
}
//Проверяем структуру ответа функции предобработки
let sCheckResult = validateObject(
resBefore,
objInQueueSchema.InQueueProcessorFnBefore,
"Результат функции предобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем успех исполнения
if (resBefore.blMsg) {
q = await this.dbConn.setQueueAppSrvResult({
nQueueId: q.nId,
blMsg: resBefore.blMsg,
blResp: null
});
}
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Вызываем обработчик со стороны БД (если он есть)
if (prms.function.sPrcResp) {
//Фиксируем начало исполнения сервером БД - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB
});
//Вызов обработчика БД
q = await this.dbConn.execQueueDBPrc({ nQueueId: q.nId });
//Выставим статус сообщению очереди - исполнено обработчиком БД
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_DB_OK
});
}
//Выполняем обработчик "После" (если он есть)
if (prms.function.sAppSrvAfter) {
//Выставим статус сообщению очереди - исполняется сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP
});
//Выполняем
const fnAfter = getAppSrvFunction(prms.function.sAppSrvAfter);
let resAfter = null;
try {
prms.queue = q;
resAfter = await fnAfter(prms);
} catch (e) {
throw new ServerError(SERR_APP_SERVER_AFTER, e.message);
}
//Проверяем структуру ответа функции предобработки
let sCheckResult = validateObject(
resAfter,
objInQueueSchema.InQueueProcessorFnAfter,
"Результат функции постобработки входящего сообщения"
);
//Если структура ответа в норме
if (!sCheckResult) {
//Выставим статус сообщению очереди - исполнено сервером приложений
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_APP_OK
});
//Фиксируем успех исполнения
q = await this.dbConn.setQueueAppSrvResult({
nQueueId: q.nId,
blMsg: q.blMsg,
blResp: resAfter.blResp
});
} else {
//Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
}
}
//Всё успешно - отдаём результат клиенту
prms.res.status(200).send(q.blResp);
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
} catch (e) {
//Если сообщение очереди успели создать
if (q) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
sExecMsg: makeErrorText(e),
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_ERR
});
//Фиксируем ошибку обработки сервером приложений - в протоколе работы сервиса
await this.logger.error(
`Ошибка обработки входящего сообщения ${q.nId} сервером приложений: ${makeErrorText(e)}`,
{ nQueueId: q.nId }
);
} else {
//Ограничимся общей ошибкой
await this.logger.error(makeErrorText(e), {
nServiceId: prms.service.nId,
nServiceFnId: prms.function.nId
});
}
//Отправим ошибку клиенту
prms.res.status(500).send(makeErrorText(e));
}
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
} }

41
models/obj_in_queue.js Normal file
View File

@ -0,0 +1,41 @@
/*
Сервис интеграции ПП Парус 8 с WEB API
Модели данных: описатель сообщений обмена с обработчиком очереди выходящих сообщений
*/
//----------------------
// Подключение библиотек
//----------------------
const Schema = require("validate"); //Схемы валидации
//------------------
// Интерфейс модуля
//------------------
//Схема валидации результата работы функции "предобработки" сообщения очереди сервером приложений
exports.InQueueProcessorFnBefore = new Schema({
//Обработанный запрос внешней системы
blMsg: {
type: Buffer,
required: false,
message: {
type: path =>
`Обработанный запрос внешней системы (${path}) имеет некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указан Обработанный запрос внешней системы (${path})`
}
}
});
//Схема валидации результата работы функции "постобработки" сообщения очереди сервером приложений
exports.InQueueProcessorFnAfter = new Schema({
//Обработанный ответ системы
blResp: {
type: Buffer,
required: true,
message: {
type: path => `Обработанный ответ системы (${path}) имеет некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указан обработанный ответ системы (${path})`
}
}
});