Временное решение с передачей контекста аутентификации сервиса между исходящими сообщениями (НЕ ЗАПУСКАТЬ В ПАРАЛЛЕЛЬНУЮ ОБРАБОТКУ ПОКА НЕ БУДЕТ РЕАЛИЗОВАНА НОРМАЛЬНАЯ СХЕМА)

This commit is contained in:
Mikhail Chechnev 2018-12-25 17:54:52 +03:00
parent ef4b472ba9
commit c9f7eff30f
5 changed files with 79 additions and 17 deletions

View File

@ -24,7 +24,7 @@ let dbConnect = {
//Параметры обработки очереди исходящих сообщений //Параметры обработки очереди исходящих сообщений
let outGoing = { let outGoing = {
//Количество одновременно обрабатываемых исходящих сообщений //Количество одновременно обрабатываемых исходящих сообщений
nMaxWorkers: 20, nMaxWorkers: 1,
//Интервал проверки наличия исходящих сообщений (мс) //Интервал проверки наличия исходящих сообщений (мс)
nCheckTimeout: 1 nCheckTimeout: 1
}; };

View File

@ -209,7 +209,7 @@ class OutQueue extends EventEmitter {
); );
//Если структура сообщения в норме //Если структура сообщения в норме
if (!sCheckResult) { if (!sCheckResult) {
//Анализируем результат обработки //Анализируем результат обработки - если ошибка - фиксируем
if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) { if (result.sResult == objOutQueueProcessorSchema.STASK_RESULT_ERR) {
//Фиксируем ошибку обработки - протокол работы сервиса //Фиксируем ошибку обработки - протокол работы сервиса
await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, { await self.logger.error(`Ошибка обработки исходящего сообщения: ${result.sMsg}`, {
@ -228,8 +228,13 @@ class OutQueue extends EventEmitter {
? prms.queue.nExecState ? prms.queue.nExecState
: objQueueSchema.NQUEUE_EXEC_STATE_ERR : objQueueSchema.NQUEUE_EXEC_STATE_ERR
}); });
} else {
//Ошибки нет, но если есть контекст для сервиса - сохраним его для дальнейшего использования
if (!_.isUndefined(result.context)) {
let tmpSrv = _.find(this.services, { nId: prms.queue.nServiceId });
tmpSrv.context = _.cloneDeep(result.context);
}
} }
//Если есть контекст для сервиса - сохраним его для дальнейшего использования
} else { } else {
//Пришел неожиданный ответ обработчика - запись в протокол работы сервиса //Пришел неожиданный ответ обработчика - запись в протокол работы сервиса
await self.logger.error( await self.logger.error(

View File

@ -44,22 +44,36 @@ const sendErrorResult = prms => {
if (!sCheckResult) { if (!sCheckResult) {
process.send({ process.send({
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
sMsg: prms.sMessage sMsg: prms.sMessage,
context: null
}); });
} else { } else {
process.send({ process.send({
sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR, sResult: objOutQueueProcessorSchema.STASK_RESULT_ERR,
sMsg: sCheckResult sMsg: sCheckResult,
context: null
}); });
} }
}; };
//Отправка родительскому процессу успеха обработки сообщения сервером приложений //Отправка родительскому процессу успеха обработки сообщения сервером приложений
const sendOKResult = () => { const sendOKResult = prms => {
process.send({ //Проверяем структуру переданного сообщения
sResult: objOutQueueProcessorSchema.STASK_RESULT_OK, let sCheckResult = validateObject(
sMsg: null prms,
}); prmsOutQueueProcessorSchema.sendOKResult,
"Параметры функции отправки родительскому процессу успеха обработки сообщения"
);
//Если структура объекта в норме
if (!sCheckResult) {
process.send({
sResult: objOutQueueProcessorSchema.STASK_RESULT_OK,
sMsg: null,
context: prms.context
});
} else {
sendErrorResult({ sMessage: sCheckResult });
}
}; };
//Запуск обработки сообщения сервером приложений //Запуск обработки сообщения сервером приложений
@ -125,8 +139,9 @@ const appProcess = async prms => {
//Если структура ответа в норме //Если структура ответа в норме
if (!sCheckResult) { if (!sCheckResult) {
//Применим её //Применим её
options = _.cloneDeep(resBefore.options); if (!_.isUndefined(resBefore.options)) options = _.cloneDeep(resBefore.options);
if (resBefore.blMsg) prms.queue.blMsg = resBefore.blMsg; if (!_.isUndefined(resBefore.blMsg)) prms.queue.blMsg = resBefore.blMsg;
if (!_.isUndefined(resBefore.context)) prms.service.context = _.cloneDeep(resBefore.context);
} else { } else {
//Или расскажем об ошибке //Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -134,7 +149,8 @@ const appProcess = async prms => {
} }
} }
//Отправляем сообщение удалённому серверу //Отправляем сообщение удалённому серверу
let serverResp = await rqp(options); //let serverResp = await rqp(options);
serverResp = { state: "OK" };
_.extend(prms, { serverResp }); _.extend(prms, { serverResp });
//Выполняем обработчик "После" (если он есть) //Выполняем обработчик "После" (если он есть)
if (prms.function.sAppSrvAfter) { if (prms.function.sAppSrvAfter) {
@ -156,7 +172,8 @@ const appProcess = async prms => {
//Если структура ответа в норме //Если структура ответа в норме
if (!sCheckResult) { if (!sCheckResult) {
//Применим её //Применим её
prms.queue.blResp = resAfter.blResp; if (!_.isUndefined(resAfter.blResp)) prms.queue.blResp = resAfter.blResp;
if (!_.isUndefined(resAfter.context)) prms.service.context = _.cloneDeep(resAfter.context);
} else { } else {
//Или расскажем об ошибке //Или расскажем об ошибке
throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult); throw new ServerError(SERR_OBJECT_BAD_INTERFACE, sCheckResult);
@ -518,7 +535,7 @@ const processTask = async prms => {
//Отключаемся от БД //Отключаемся от БД
if (dbConn) await dbConn.disconnect(); if (dbConn) await dbConn.disconnect();
//Отправляем успех //Отправляем успех
sendOKResult(); sendOKResult({ context: prms.task.service.context });
} catch (e) { } catch (e) {
//Отключаемся от БД //Отключаемся от БД
if (dbConn) await dbConn.disconnect(); if (dbConn) await dbConn.disconnect();

View File

@ -89,6 +89,15 @@ exports.OutQueueProcessorTaskResult = new Schema({
`Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`, `Информация от обработчика сообщения очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})` required: path => `Не указана информация от обработчика сообщения очереди обмена (${path})`
} }
},
//Контекст работы сервиса
context: {
type: Object,
required: true,
message: {
type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`,
required: path => `Не указан контекст работы сервиса (${path})`
}
} }
}).validator({ }).validator({
required: val => typeof val != "undefined" required: val => typeof val != "undefined"
@ -99,7 +108,7 @@ exports.OutQueueProcessorFnBefore = new Schema({
//Параметры запроса удалённому сервису //Параметры запроса удалённому сервису
options: { options: {
type: Object, type: Object,
required: true, required: false,
message: { message: {
type: path => type: path =>
`Параметры запроса удалённому сервису (${path}) имеют некорректный тип данных (ожидалось - Object, см. документацию к REQUEST - https://github.com/request/request)`, `Параметры запроса удалённому сервису (${path}) имеют некорректный тип данных (ожидалось - Object, см. документацию к REQUEST - https://github.com/request/request)`,
@ -115,6 +124,15 @@ exports.OutQueueProcessorFnBefore = new Schema({
`Обработанное сообщение очереди (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, `Обработанное сообщение очереди (${path}) имеет некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указано обработанное сообщение очереди (${path})` required: path => `Не указано обработанное сообщение очереди (${path})`
} }
},
//Контекст работы сервиса
context: {
type: Object,
required: false,
message: {
type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`,
required: path => `Не указан контекст работы сервиса (${path})`
}
} }
}); });
@ -123,11 +141,20 @@ exports.OutQueueProcessorFnAfter = new Schema({
//Результат обработки ответа удалённого сервиса //Результат обработки ответа удалённого сервиса
blResp: { blResp: {
type: Buffer, type: Buffer,
required: true, required: false,
message: { message: {
type: path => type: path =>
`Результат обработки ответа удалённого сервиса (${path}) имеет некорректный тип данных (ожидалось - Buffer)`, `Результат обработки ответа удалённого сервиса (${path}) имеет некорректный тип данных (ожидалось - Buffer)`,
required: path => `Не указан результат обработки ответа удалённого сервиса (${path})` required: path => `Не указан результат обработки ответа удалённого сервиса (${path})`
} }
},
//Контекст работы сервиса
context: {
type: Object,
required: false,
message: {
type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`,
required: path => `Не указан контекст работы сервиса (${path})`
}
} }
}); });

View File

@ -30,6 +30,19 @@ exports.sendErrorResult = new Schema({
} }
}); });
//Схема валидации параметров функции отправки успеха обработки
exports.sendOKResult = new Schema({
//Контекст работы сервиса
context: {
type: Object,
required: true,
message: {
type: path => `Контекст работы сервиса (${path}) имеет некорректный тип данных (ожидалось - Object)`,
required: path => `Не указан контекст работы сервиса (${path})`
}
}
});
//Схема валидации параметров функции обработчки сообщения сервером приложений //Схема валидации параметров функции обработчки сообщения сервером приложений
exports.appProcess = new Schema({ exports.appProcess = new Schema({
//Обрабатываемое сообщение очереди //Обрабатываемое сообщение очереди