Передача описания сервиса и функции обработчику сообщения очереди

This commit is contained in:
Mikhail Chechnev 2018-12-07 18:24:05 +03:00
parent b09ca9e7d4
commit 8fe4f979f9
5 changed files with 75 additions and 34 deletions

View File

@ -143,11 +143,15 @@ class OutQueue extends EventEmitter {
//Если структура объекта в норме //Если структура объекта в норме
if (!sCheckResult) { if (!sCheckResult) {
//Добавляем идентификатор позиции очереди в список обрабатываемых //Добавляем идентификатор позиции очереди в список обрабатываемых
this.addInProgress({ nQueueId: prms.nQueueId }); this.addInProgress({ nQueueId: prms.queue.nId });
//Отдаём команду дочернему процессу обработчика на старт исполнения //Отдаём команду дочернему процессу обработчика на старт исполнения
prms.proc.send({ prms.proc.send({
nQueueId: prms.nQueueId, nQueueId: prms.queue.nId,
connectSettings: this.dbConn.connectSettings connectSettings: this.dbConn.connectSettings,
service: _.find(this.services, { nId: prms.queue.nServiceId }),
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
})
}); });
//Уменьшаем количество доступных обработчиков //Уменьшаем количество доступных обработчиков
this.nWorkersLeft--; this.nWorkersLeft--;
@ -277,7 +281,7 @@ class OutQueue extends EventEmitter {
//Перехват останова обработчика //Перехват останова обработчика
proc.on("exit", code => {}); proc.on("exit", code => {});
//Запускаем обработчик //Запускаем обработчик
this.startQueueProcessor({ nQueueId: prms.queue.nId, proc }); this.startQueueProcessor({ queue: prms.queue, proc });
} }
} else { } else {
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);

View File

@ -90,8 +90,8 @@ const appProcess = async prms => {
}, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`, }, ${prms.queue.sExecState}, попытка исполнения - ${prms.queue.nExecCnt + 1}`,
{ nQueueId: prms.queue.nId } { nQueueId: prms.queue.nId }
); );
if (prms.queue.blMsg) { let sMsg =
let sMsg = prms.queue.blMsg.toString() + " MODIFICATION FOR " + prms.queue.nId; (prms.queue.blMsg ? prms.queue.blMsg.toString() : "null") + " MODIFICATION FOR " + prms.queue.nId;
//Фиксируем успех исполнения //Фиксируем успех исполнения
newQueue = await dbConn.setQueueAppSrvResult({ newQueue = await dbConn.setQueueAppSrvResult({
nQueueId: prms.queue.nId, nQueueId: prms.queue.nId,
@ -107,12 +107,6 @@ const appProcess = async prms => {
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, { await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером приложений`, {
nQueueId: prms.queue.nId nQueueId: prms.queue.nId
}); });
} else {
throw new ServerError(
SERR_UNEXPECTED,
`Ошибка отработки сообщения ${prms.queue.nId}: нет данных для обработки`
);
}
} catch (e) { } catch (e) {
//Фиксируем ошибку обработки сервером приложений - в статусе сообщения //Фиксируем ошибку обработки сервером приложений - в статусе сообщения
newQueue = await dbConn.setQueueState({ newQueue = await dbConn.setQueueState({
@ -228,7 +222,11 @@ const processTask = async prms => {
case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: { case objQueueSchema.NQUEUE_EXEC_STATE_INQUEUE: {
//Запускаем обработку сервером приложений //Запускаем обработку сервером приложений
try { try {
let res = await appProcess({ queue: q }); let res = await appProcess({
queue: q,
service: prms.task.service,
function: prms.task.function
});
//И если она успешно завершилась - обработку сервером БД //И если она успешно завершилась - обработку сервером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
try { try {
@ -284,7 +282,11 @@ const processTask = async prms => {
if (q.nExecCnt < q.nRetryAttempts) { if (q.nExecCnt < q.nRetryAttempts) {
//Снова запускаем обработку сервером приложений //Снова запускаем обработку сервером приложений
try { try {
let res = await appProcess({ queue: q }); let res = await appProcess({
queue: q,
service: prms.task.service,
function: prms.task.function
});
//И если она успешно завершилась - обработку сервоером БД //И если она успешно завершилась - обработку сервоером БД
if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) { if (res.nExecState == objQueueSchema.NQUEUE_EXEC_STATE_APP_OK) {
try { try {

View File

@ -9,6 +9,8 @@
const Schema = require("validate"); //Схемы валидации const Schema = require("validate"); //Схемы валидации
const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений const { dbConnect } = require("./obj_config"); //Схемы валидации конфигурации сервера приложений
const { Service } = require("./obj_service"); //Схема валидации сервиса
const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса
//---------- //----------
// Константы // Константы
@ -45,6 +47,22 @@ exports.OutQueueProcessorTask = new Schema({
message: { message: {
required: path => `Не указаны параметры подключения к БД (${path})` required: path => `Не указаны параметры подключения к БД (${path})`
} }
},
//Cервис-обработчик
service: {
schema: Service,
required: true,
message: {
required: path => `Не указан сервис для обработки сообщения очереди (${path})`
}
},
//Функция сервиса-обработчика
function: {
schema: ServiceFunction,
required: true,
message: {
required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})`
}
} }
}); });

View File

@ -99,13 +99,12 @@ exports.isInProgress = new Schema({
//Схема валидации параметров функции запуска обработчика сообщения очереди //Схема валидации параметров функции запуска обработчика сообщения очереди
exports.startQueueProcessor = new Schema({ exports.startQueueProcessor = new Schema({
//Идентификатор сообщения //Обрабатываемое сообщение очереди
nQueueId: { queue: {
type: Number, schema: Queue,
required: true, required: true,
message: { message: {
type: path => `Идентификатор сообщения (${path}) имеет некорректный тип данных (ожидалось - Number)`, required: path => `Не указано обрабатываемое сообщение очреди (${path})`
required: path => `Не указан идентификатор сообщения (${path})`
} }
}, },
//Процесс обработчика //Процесс обработчика

View File

@ -10,6 +10,8 @@
const Schema = require("validate"); //Схемы валидации const Schema = require("validate"); //Схемы валидации
const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди const { Queue } = require("./obj_queue"); //Схема валидации позиции очереди
const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений const { OutQueueProcessorTask } = require("./obj_out_queue_processor"); //Схемы валидации объектов обработчика исходящих сообщений
const { Service } = require("./obj_service"); //Схема валидации сервиса
const { ServiceFunction } = require("./obj_service_function"); //Схема валидации функции сервиса
//------------------ //------------------
// Интерфейс модуля // Интерфейс модуля
@ -37,6 +39,22 @@ exports.appProcess = new Schema({
message: { message: {
required: path => `Не указано обрабатываемое сообщение очреди (${path})` required: path => `Не указано обрабатываемое сообщение очреди (${path})`
} }
},
//Cервис-обработчик
service: {
schema: Service,
required: true,
message: {
required: path => `Не указан сервис для обработки сообщения очереди (${path})`
}
},
//Функция сервиса-обработчика
function: {
schema: ServiceFunction,
required: true,
message: {
required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})`
}
} }
}); });