ЦИТК-685, ЦИТК-37, ЦИТК-759

ЦИТК-685 - Контроль соответствия версий сервера приложений и системы
ЦИТК-37 - Добавлена возможность использования прокси при отправке
ЦИТК-759 - Исправление ошибки считывания очереди обмена при сброшенной галке "Сохранять успешное выполнение в очереди"
This commit is contained in:
Mikhail Chechnev 2023-11-03 18:57:04 +03:00 committed by GitHub
commit 66cc13d4fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 148 additions and 18 deletions

View File

@ -14,7 +14,9 @@ let common = {
//Релиз сервера приложений
sRelease: "2023.10.10",
//Таймаут останова сервера (мс)
nTerminateTimeout: 60000
nTerminateTimeout: 60000,
//Контролировать версию Системы
bControlSystemVersion: false
};
//Параметры подключения к БД
@ -46,7 +48,9 @@ let outGoing = {
//Максимальный размер пула подключений к БД для обработчика исходящих сообщений
nPoolMax: 4,
//Шаг инкремента подключений к БД в пуле обработчика исходящих сообщений
nPoolIncrement: 0
nPoolIncrement: 0,
//Глобальный адрес прокси-сервера
sProxy: null
};
//Параметры обработки очереди входящих сообщений

View File

@ -14,7 +14,9 @@ let common = {
//Релиз сервера приложений
sRelease: "2023.10.10",
//Таймаут останова сервера (мс)
nTerminateTimeout: 60000
nTerminateTimeout: 60000,
//Контролировать версию Системы
bControlSystemVersion: false
};
//Параметры подключения к БД

View File

@ -93,7 +93,7 @@ class ParusAppServer {
async onOutQStarted() {
//Сообщим, что запустили обработчик
await this.logger.info("Обработчик очереди исходящих сообщений запущен");
//Запускаем бслуживание очереди входящих
//Запускаем обслуживание очереди входящих
await this.logger.info("Запуск обработчика очереди входящих сообщений...");
try {
this.inQ.startProcessing({ services: this.services });
@ -202,6 +202,8 @@ class ParusAppServer {
this.dbConn = new db.DBConnector({
connectSettings: {
...prms.config.dbConnect,
sRelease: prms.config.common.sRelease,
bControlSystemVersion: prms.config.common.bControlSystemVersion,
nPoolMin: prms.config.inComing.nPoolMin,
nPoolMax: prms.config.inComing.nPoolMax,
nPoolIncrement: prms.config.inComing.nPoolIncrement,
@ -215,7 +217,8 @@ class ParusAppServer {
outGoing: prms.config.outGoing,
dbConn: this.dbConn,
logger: this.logger,
notifier: this.notifier
notifier: this.notifier,
sProxy: prms.config.outGoing.sProxy
});
//Создаём обработчик очереди входящих
this.inQ = new iq.InQueue({
@ -229,7 +232,8 @@ class ParusAppServer {
this.srvAvlCtrl = new sac.ServiceAvailableController({
logger: this.logger,
notifier: this.notifier,
dbConn: this.dbConn
dbConn: this.dbConn,
sProxy: prms.config.outGoing.sProxy
});
//Скажем что инициализировали
await this.logger.info("Сервер приложений инициализирован");

View File

@ -312,14 +312,14 @@ class InQueue extends EventEmitter {
if (optionsResp.headers) prms.res.set(optionsResp.headers);
prms.res.status(optionsResp.statusCode || 200).send(blResp);
}
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения
q = await this.dbConn.setQueueState({
nQueueId: q.nId,
nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
} catch (e) {
//Тема и текст уведомления об ошибке
let sSubject = `Ошибка обработки входящего сообщения сервером приложений для функции "${prms.function.sCode}" сервиса "${prms.service.sCode}"`;

View File

@ -64,6 +64,8 @@ class OutQueue extends EventEmitter {
this.logger = prms.logger;
//Запомним уведомитель
this.notifier = prms.notifier;
//Запомним глобальный адрес прокси-сервера
this.sProxy = prms.sProxy;
//Список обрабатываемых в текущий момент сообщений очереди
this.inProgress = [];
//Привяжем методы к указателю на себя для использования в обработчиках событий
@ -160,7 +162,8 @@ class OutQueue extends EventEmitter {
service: _.find(this.services, { nId: prms.queue.nServiceId }),
function: _.find(_.find(this.services, { nId: prms.queue.nServiceId }).functions, {
nId: prms.queue.nServiceFnId
})
}),
sProxy: this.sProxy
});
//Уменьшаем количество доступных обработчиков
this.nWorkersLeft--;
@ -244,8 +247,11 @@ class OutQueue extends EventEmitter {
const proc = ChildProcess.fork("core/out_queue_processor", { silent: false });
//Перехват сообщений обработчика
proc.on("message", async result => {
//Считываем сообщение изменённое обработчиком
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
//Перечитывание не требуется, если выполнено успешно
if (result.sResult !== objOutQueueProcessorSchema.STASK_RESULT_OK) {
//Перечитываем запись очереди с учетом изменения статуса
prms.queue = await self.dbConn.getQueue({ nQueueId: prms.queue.nId });
}
//Проверяем структуру полученного сообщения
let sCheckResult = validateObject(
result,

View File

@ -145,6 +145,11 @@ const appProcess = async prms => {
sQuery: prms.queue.blMsg === null ? "" : prms.queue.blMsg.toString()
});
}
// Если у сервиса указан прокси, либо у приложения установлен глобальный прокси
if (prms.service.sProxyURL || prms.sProxy) {
// Добавляем прокси с приоритетом сервиса
options.proxy = prms.service.sProxyURL ?? prms.sProxy;
}
//Дополним получившиеся параметры переданными в сообщении
if (prms.queue.sOptions) {
try {
@ -430,16 +435,16 @@ const dbProcess = async prms => {
//Если результат - ошибка аутентификации, то и её пробрасываем, но с правильным кодом
if (prcRes.sResult == objQueueSchema.SPRC_RESP_RESULT_UNAUTH) throw new ServerError(SERR_UNAUTH, prcRes.sMsg || "Нет аутентификации");
}
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
//Фиксируем успешное исполнение (полное - дальше обработки нет) - в статусе сообщения
res = await dbConn.setQueueState({
nQueueId: prms.queue.nId,
nIncExecCnt: prms.queue.nExecCnt == 0 ? NINC_EXEC_CNT_YES : NINC_EXEC_CNT_NO,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
});
//Фиксируем успешное исполнение сервером БД - в протоколе работы сервиса
await logger.info(`Исходящее сообщение ${prms.queue.nId} успешно отработано сервером БД`, {
nQueueId: prms.queue.nId
});
} catch (e) {
//Если была ошибка аутентификации - возвращаем на повторную обработку сервером приложений
if (e instanceof ServerError && e.sCode == SERR_UNAUTH) {
@ -509,7 +514,8 @@ const processTask = async prms => {
let res = await appProcess({
queue: q,
service: prms.task.service,
function: prms.task.function
function: prms.task.function,
sProxy: prms.task.sProxy
});
//Если результат обработки ошибка - пробрасываем её дальше
if (res instanceof ServerError) {

View File

@ -67,6 +67,8 @@ class ServiceAvailableController extends EventEmitter {
this.logger = prms.logger;
//Запомним подключение к БД
this.dbConn = prms.dbConn;
//Запомним глобальный адрес прокси-сервера
this.sProxy = prms.sProxy;
//Привяжем методы к указателю на себя для использования в обработчиках событий
this.serviceDetectingLoop = this.serviceDetectingLoop.bind(this);
} else {
@ -110,8 +112,18 @@ class ServiceAvailableController extends EventEmitter {
service.nSrvType == objServiceSchema.NSRV_TYPE_SEND
) {
try {
// Инициализируем параметры запроса
let options = {};
// Устанавливаем параметры запроса
options.url = service.sSrvRoot;
options.timeout = NNETWORK_CHECK_TIMEOUT;
// Если у сервиса указан прокси, либо у приложения установлен глобальный прокси
if (service.sProxyURL || this.sProxy) {
// Добавляем прокси с приоритетом сервиса
options.proxy = service.sProxyURL ?? this.sProxy;
}
//Отправляем проверочный запрос
await rqp({ url: service.sSrvRoot, timeout: NNETWORK_CHECK_TIMEOUT });
await rqp(options);
//Запрос прошел - фиксируем дату доступности и сбрасываем дату недоступности
service.dAvailable = new Date();
service.dUnAvailable = null;

View File

@ -77,6 +77,16 @@ const common = new Schema({
validateTerminateTimeout: path =>
`Таймаут останова сервера (${path}) должен быть целым числом в диапазоне от 1000 до 120000`
}
},
//Контроль версии Системы
bControlSystemVersion: {
type: Boolean,
required: true,
message: {
type: path =>
`Признак контроля версии Системы (${path}) имеет некорректный тип данных (ожидалось - Boolean)`,
required: path => `Не указан признак контроля версии Системы (${path})`
}
}
});
@ -220,6 +230,17 @@ const outGoing = new Schema({
validatePoolIncrementOutGoing: path =>
`Значение шага инкремента подключений к БД в пуле обработчика исходящих сообщений (${path}) должно быть целым числом в диапазоне от 0 до 10`
}
},
//Глобальный адрес прокси-сервера
sProxy: {
type: String,
required: false,
message: {
type: path =>
`Адрес прокси-сервера приложения (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path =>
`Не указан глобальный адрес прокси-сервера (${path})`
}
}
});

View File

@ -65,6 +65,15 @@ exports.OutQueueProcessorTask = new Schema({
message: {
required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})`
}
},
//Глобальный адрес прокси-сервера
sProxy: {
type: String,
required: false,
message: {
type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан глобальный адрес прокси-сервера (${path})`
}
}
});

View File

@ -189,6 +189,16 @@ exports.Service = new Schema({
`Неверный формат списка адресов E-Mail для оповещения о простое внешнего сервиса (${path}), для указания нескольких адресов следует использовать запятую в качестве разделителя (без пробелов)`
}
},
//Адрес прокси-сервера
sProxyURL: {
type: String,
required: false,
message: {
type: path =>
`Адрес прокси-сервера в очереди обмена (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан адрес прокси-сервера в очереди обмена (${path})`
}
},
//Список функций сервиса
functions: defServiceFunctions(true, "functions")
});

View File

@ -66,6 +66,15 @@ exports.OutQueue = new Schema({
`Объект для рассылки уведомлений (${path}) имеет некорректный тип данных (ожидалось - Notifier)`,
required: path => `Не указан объект для рассылки уведомлений (${path})`
}
},
//Глобальный адрес прокси-сервера
sProxy: {
type: String,
required: false,
message: {
type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан глобальный адрес прокси-сервера (${path})`
}
}
});

View File

@ -37,7 +37,7 @@ exports.appProcess = new Schema({
schema: Queue,
required: true,
message: {
required: path => `Не указано обрабатываемое сообщение очреди (${path})`
required: path => `Не указано обрабатываемое сообщение очереди (${path})`
}
},
//Cервис-обработчик
@ -55,6 +55,15 @@ exports.appProcess = new Schema({
message: {
required: path => `Не указана функция сервиса для обработки сообщения очереди (${path})`
}
},
//Глобальный адрес прокси-сервера
sProxy: {
type: String,
required: false,
message: {
type: path => `Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан глобальный адрес прокси-сервера (${path})`
}
}
});

View File

@ -48,6 +48,16 @@ exports.ServiceAvailableController = new Schema({
`Объект для взаимодействия с БД (${path}) имеет некорректный тип данных (ожидалось - DBConnector)`,
required: path => `Не указан объект для взаимодействия с БД (${path})`
}
},
//Глобальный адрес прокси-сервера
sProxy: {
type: String,
required: false,
message: {
type: path =>
`Глобальный адрес прокси-сервера (${path}) имеет некорректный тип данных (ожидалось - String)`,
required: path => `Не указан глобальный адрес прокси-сервера (${path})`
}
}
});

View File

@ -70,6 +70,31 @@ const checkWorkers = async prms => {
}
};
//Проверка соответствия релизов сервера приложений и системы
const checkRelease = async prms => {
let pooledConnection;
try {
pooledConnection = await prms.connection.getConnection();
let res = await pooledConnection.execute("BEGIN :DB_RELEASE := PKG_EXS.UTL_PRODUCT_RELEASE_GET(); END;", {
DB_RELEASE: { dir: oracledb.BIND_OUT, type: oracledb.DB_TYPE_VARCHAR }
});
let sDB_RELEASE = res.outBinds.DB_RELEASE;
if (sDB_RELEASE !== prms.sRelease) {
throw new Error(`Версия сервера приложений (${prms.sRelease}) не соответствует версии системы (${sDB_RELEASE}).`);
}
} catch (e) {
throw new Error(e.message);
} finally {
if (pooledConnection) {
try {
await pooledConnection.close();
} catch (e) {
throw new Error(e.message);
}
}
}
}
//Подключение к БД
const connect = async prms => {
try {
@ -93,6 +118,9 @@ const connect = async prms => {
);
}
});
if (prms.bControlSystemVersion) {
await checkRelease({ sRelease: prms.sRelease, connection: pool });
}
await checkWorkers({ nMaxWorkers: prms.nMaxWorkers, connection: pool });
return pool;
} catch (e) {