diff --git a/core/in_queue.js b/core/in_queue.js index a24bfff..8acc774 100644 --- a/core/in_queue.js +++ b/core/in_queue.js @@ -10,10 +10,13 @@ const _ = require("lodash"); //Работа с массивами и коллекциями const EventEmitter = require("events"); //Обработчик пользовательских событий const express = require("express"); //WEB-сервер Express +const bodyParser = require("body-parser"); //Модуль для Express (разбор тела входящего запроса) const { ServerError } = require("./server_errors"); //Типовая ошибка -const { SERR_OBJECT_BAD_INTERFACE } = require("./constants"); //Общесистемные константы -const { makeErrorText, validateObject } = require("./utils"); //Вспомогательные функции +const { SERR_OBJECT_BAD_INTERFACE, SERR_WEB_SERVER } = require("./constants"); //Общесистемные константы +const { makeErrorText, validateObject, buildURL } = require("./utils"); //Вспомогательные функции const { NINC_EXEC_CNT_YES } = require("../models/prms_db_connector"); //Схемы валидации параметров функций модуля взаимодействия с БД +const objServiceSchema = require("../models/obj_service"); //Схемы валидации сервиса +const objServiceFnSchema = require("../models/obj_service_function"); //Схемы валидации функции сервиса const objQueueSchema = require("../models/obj_queue"); //Схемы валидации сообщения очереди const prmsInQueueSchema = require("../models/prms_in_queue"); //Схемы валидации параметров функций класса @@ -60,13 +63,55 @@ class InQueue extends EventEmitter { //Уведомление о запуске обработчика очереди notifyStarted() { //Оповестим подписчиков о запуске - this.emit(SEVT_IN_QUEUE_STARTED); + this.emit(SEVT_IN_QUEUE_STARTED, this.inComing.nPort); } //Уведомление об остановке обработчика очереди notifyStopped() { //Оповестим подписчиков об останове this.emit(SEVT_IN_QUEUE_STOPPED); } + //Обработка сообщения + async processMessage(prms) { + //Проверяем структуру переданного объекта для старта + let sCheckResult = validateObject( + prms, + prmsInQueueSchema.processMessage, + "Параметры функции обработки входящего сообщения" + ); + //Если структура объекта в норме + if (!sCheckResult) { + //Определимся с телом сообщения + let blMsg = null; + //Для POST сообщений - это тело запроса + if (prms.function.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST) { + blMsg = prms.req.body && !_.isEmpty(prms.req.body) ? prms.req.body : null; + } else { + //Для GET - параметры запроса + if (!_.isEmpty(prms.req.query)) blMsg = new Buffer(JSON.stringify(prms.req.query)); + } + //Кладём сообщение в очередь + let q = await this.dbConn.putQueue({ + nServiceFnId: prms.function.nId, + blMsg + }); + //Скажем что пришло новое входящее сообщение + await this.logger.info( + `Новое входящее сообщение от ${prms.req.connection.address().address} для фукнции ${ + prms.function.sCode + } (${buildURL({ sSrvRoot: prms.service.sSrvRoot, sFnURL: prms.function.sFnURL })})`, + { nQueueId: q.nId } + ); + prms.res + .status(200) + .send( + `

Сервер приложений ПП Пурс 8

Функция сервиса: ${ + prms.service.sName + }/${prms.function.sCode}

` + ); + } else { + throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); + } + } //Запуск обработки очереди входящих сообщений startProcessing(prms) { //Проверяем структуру переданного объекта для старта @@ -81,15 +126,74 @@ class InQueue extends EventEmitter { this.bWorking = true; //запоминаем список обслуживаемых сервисов this.services = prms.services; - //Запускаем сервер - this.webApp.use("*", (req, res) => { - res.status(200).send("

Сервер приложений ПП Пурс 8

"); + //Конфигурируем сервер - обработка тела сообщения + this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); + //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений + _.forEach(_.filter(this.services, { nSrvType: objServiceSchema.NSRV_TYPE_RECIVE }), srvs => { + //Для любых запросов к корневому адресу сервиса - ответ о том, что это за сервис, и что он работает + this.webApp.all(srvs.sSrvRoot, (req, res) => { + res.status(200).send( + `

Сервер приложений ПП Пурс 8

Сервис: ${ + srvs.sName + }

` + ); + }); + //Для всех функций сервиса... + _.forEach(srvs.functions, fn => { + //...собственный обработчик, в зависимости от указанного способа передачи параметров + this.webApp[fn.nFnPrmsType == objServiceFnSchema.NFN_PRMS_TYPE_POST ? "post" : "get"]( + buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), + async (req, res) => { + try { + //Вызываем обработчик + await this.processMessage({ req, res, service: srvs, function: fn }); + } catch (e) { + //Протоколируем в журнал работы сервера + await this.logger.error(makeErrorText(e), { + nServiceId: srvs.nId, + nServiceFnId: fn.nId + }); + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(e)); + } + } + ); + //...и собственный обработчик ошибок + this.webApp.use( + buildURL({ sSrvRoot: srvs.sSrvRoot, sFnURL: fn.sFnURL }), + async (err, req, res, next) => { + //Протоколируем в журнал работы сервера + await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { + nServiceId: srvs.nId, + nServiceFnId: fn.nId + }); + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); + } + ); + }); }); - //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! КОНТРОЛЬ ЗАПУСКА!!!!!!!!!!!!!! + //Запросы на адреса, не входящие в состав объявленных сервисов - 404 NOT FOUND + this.webApp.use("*", (req, res) => { + res.status(404).send( + "

Сервер приложений ПП Пурс 8

Запрошенный адрес не найден

" + ); + }); + //Ошибки, не отработанные индивидуальными обработчиками - 500 SERVER ERROR + this.webApp.use(async (err, req, res, next) => { + //Протоколируем в журнал работы сервера + await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); + //Отправим ошибку клиенту + res.status(500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); + }); + //Запускаем сервер this.srv = this.webApp.listen(this.inComing.nPort, () => { //И оповещаем всех что запустились this.notifyStarted(); }); + this.srv.on("error", e => { + throw new ServerError(e.code, `Фатальная ошибка обработчика очереди входящих сообщений: ${e.message}`); + }); } else { throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); } @@ -99,7 +203,6 @@ class InQueue extends EventEmitter { //Выставляем флаг неработы this.bWorking = false; //Останавливаем WEB-сервер (если создавался) - //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! КОНТРОЛЬ ОСТАНОВА!!!!!!!!!!!!!! if (this.srv) { this.srv.close(() => { //Оповещаем всхес, что остановились diff --git a/models/prms_in_queue.js b/models/prms_in_queue.js index 1f4c820..365f000 100644 --- a/models/prms_in_queue.js +++ b/models/prms_in_queue.js @@ -8,10 +8,13 @@ //---------------------- const Schema = require("validate"); //Схемы валидации +const { IncomingMessage, ServerResponse } = require("http"); //Работа с HTTP протоколом const { inComing } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений const { defServices } = require("./obj_services"); //Схема валидации списка сервисов const { DBConnector } = require("../core/db_connector"); //Класс взаимодействия в БД const { Logger } = require("../core/logger"); //Класс для протоколирования работы +const { Service } = require("./obj_service"); //Схема валидации сервиса +const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса //------------------ // Интерфейс модуля @@ -49,6 +52,46 @@ exports.InQueue = new Schema({ } }); +//Схема валидации параметров функции обработки входящего сообщения +exports.processMessage = new Schema({ + //Входящее сообщение + req: { + type: IncomingMessage, + required: true, + message: { + type: path => + `Входящее сообщение (${path}) имеет некорректный тип данных (ожидалось - IncomingMessage, см. документацию к Node.JS HTTP - https://nodejs.org/dist/latest-v10.x/docs/api/http.html#http_class_http_incomingmessage)`, + required: path => `Не указано входящее сообщение (${path})` + } + }, + //Ответ на входящее сообщение + res: { + type: ServerResponse, + required: true, + message: { + type: path => + `Ответ на входящие сообщение (${path}) имеет некорректный тип данных (ожидалось - ServerResponse, см. документацию к Node.JS HTTP - https://nodejs.org/dist/latest-v10.x/docs/api/http.html#http_class_http_serverresponse)`, + required: path => `Не указан ответ на входящее сообщение (${path})` + } + }, + //Cервис-обработчик + service: { + schema: Service, + required: true, + message: { + required: path => `Не указан сервис для обработки входящего сообщения (${path})` + } + }, + //Функция сервиса-обработчика + function: { + schema: ServiceFunction, + required: true, + message: { + required: path => `Не указана функция сервиса для обработки входящего сообщения (${path})` + } + } +}); + //Схема валидации параметров функции запуска обслуживания очереди exports.startProcessing = new Schema({ //Список обслуживаемых сервисов