Compare commits

...

12 Commits

Author SHA1 Message Date
Mikhail Chechnev
62786dcc5a Добавлен параметр "promiseCancellable" для "wrapPromiseTimeout" - обеспечение корректной обработки промисов, не имеющих методов "isPending" и "cancel" 2025-12-01 19:23:46 +03:00
Mikhail Chechnev
9cd779dd64 ЦИТК-255 - исправлена ошибка обработки таймаута для сообщений, отработанных с флагом "bStopPropagation = true"
Сервер ошибочно считал такие сообщения "нарушением таймаута" и писал в журнал несуществующие ошибки, в то время как сообщение нормально отрабатывалось.
2025-12-01 19:22:00 +03:00
Mikhail Chechnev
80f50d43a3 ЦИТК-999 - ДИАДОК - исправлена проблема конвертации ИНН в число с последующим исчезновением лидирующего нуля 2025-11-26 12:50:32 +03:00
Mikhail Chechnev
54988da32c В "package.json" удалены ссылки на GitHub (заменены на git.citpb.ru) 2025-11-26 12:48:48 +03:00
Mikhail Chechnev
f2bb9e1dee Релиз 2025.11.06 2025-11-12 12:11:40 +03:00
Mikhail Chechnev
a4d26956ab П8-61674 - исправлена ошибка сохранения ID контрагента при роуминге 2025-11-10 16:17:58 +03:00
Mikhail Chechnev
3721c8b4be Коннектор "Kafka" - исправлены опечатки в консольной выдаче 2025-11-07 18:24:22 +03:00
Mim
e559b534bf ЦИТК-255 - Добавление параметра времени ожидания отработки входящего сообщения
Reviewed-on: CITKParus/P8-ExchangeService#11
2025-11-05 12:18:11 +03:00
Mikhail Chechnev
5e767ca3eb Релиз 2025.10.03 2025-10-06 18:39:09 +03:00
Mikhail Chechnev
b94afb06f9 Релиз 2025.08.28 2025-09-01 14:35:35 +03:00
Mikhail Chechnev
b1bc06edc9 Релиз 2025.07.29 2025-08-04 14:56:27 +03:00
Mikhail Chechnev
61dd9e8f1a ЦИТК-977 - поддержка обмена "Универсальными сообщениями" 2025-08-04 14:35:11 +03:00
6 changed files with 60 additions and 38 deletions

View File

@ -14,7 +14,7 @@ let common = {
//Версия сервера приложений //Версия сервера приложений
sVersion: "8.5.6.1", sVersion: "8.5.6.1",
//Релиз сервера приложений //Релиз сервера приложений
sRelease: "2025.07.01", sRelease: "2025.11.06",
//Таймаут останова сервера (мс) //Таймаут останова сервера (мс)
nTerminateTimeout: 60000, nTerminateTimeout: 60000,
//Контролировать версию Системы //Контролировать версию Системы

View File

@ -152,6 +152,8 @@ class InQueue extends EventEmitter {
sOptions: buildOptionsXML({ options }), sOptions: buildOptionsXML({ options }),
blMsg blMsg
}); });
//Запомним идентификатор записи очереди в запросе
prms.req.nQId = q.nId;
//Скажем что пришло новое входящее сообщение //Скажем что пришло новое входящее сообщение
await this.logger.info( await this.logger.info(
`Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({ `Новое входящее сообщение от ${prms.req.connection.address().address} для функции ${prms.function.sCode} (${buildURL({
@ -329,13 +331,19 @@ class InQueue extends EventEmitter {
} }
} }
} }
//Если мы еще не отдали ответ от сервера //Всё успешно - отдаём результат клиенту, если ещё не отдали
if (!prms.res.writableFinished) { if (bStopPropagation === false && !prms.res.writableFinished) {
//Всё успешно - отдаём результат клиенту if (optionsResp.headers) prms.res.set(optionsResp.headers);
if (bStopPropagation === false) { prms.res.status(optionsResp.statusCode || 200).send(blResp);
if (optionsResp.headers) prms.res.set(optionsResp.headers); }
prms.res.status(optionsResp.statusCode || 200).send(blResp); //Если отправка ответа была прервана по таймауту
} if (prms.req.bIsTimedOut === true) {
//Вернем ошибку обработчика с информацией об этом
throw new ServerError(
SERR_WEB_SERVER,
"Истекло время ожидания обработки входящего запроса. Канал закрыт. Клиенту был отправлен ответ с ошибкой истечения таймаута (504)."
);
} else {
//Фиксируем успех обработки - в протоколе работы сервиса //Фиксируем успех обработки - в протоколе работы сервиса
await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId }); await this.logger.info(`Входящее сообщение ${q.nId} успешно отработано`, { nQueueId: q.nId });
//Фиксируем успех обработки - в статусе сообщения //Фиксируем успех обработки - в статусе сообщения
@ -344,12 +352,6 @@ class InQueue extends EventEmitter {
nIncExecCnt: NINC_EXEC_CNT_YES, nIncExecCnt: NINC_EXEC_CNT_YES,
nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK nExecState: objQueueSchema.NQUEUE_EXEC_STATE_OK
}); });
} else {
//Или расскажем об ошибке
throw new ServerError(
SERR_WEB_SERVER,
"Истекло время ожидания обработки входящего запроса. Канал закрыт. Клиенту был отправлен ответ."
);
} }
} catch (e) { } catch (e) {
//Тема и текст уведомления об ошибке //Тема и текст уведомления об ошибке
@ -555,21 +557,24 @@ class InQueue extends EventEmitter {
if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream"; if (req.headers["content-type"] === "false") req.headers["content-type"] = "application/octet-stream";
next(); next();
}); });
//Если требуется установить таймаут на обработку сообщений //Конфигурируем сервер - устанавливаем таймаут обработки сообщений
if (this.inComing.nTimeout !== 0) { this.webApp.use((req, res, next) => {
//Конфигурируем сервер - устанавливаем таймаут обработки сообщений //Поднимем флаг истечения таймаута обработки
this.webApp.use((req, res, next) => { req.bIsTimedOut = false;
//Если требуется установить таймаут на обработку сообщений
if (this.inComing.nTimeout !== 0)
//Устанавливаем таймаут на ответ от сервера //Устанавливаем таймаут на ответ от сервера
res.setTimeout(this.inComing.nTimeout, () => { res.setTimeout(this.inComing.nTimeout, () => {
//Поднимем флаг исчетечение таймаута обработки
req.bIsTimedOut = true;
//Формируем ошибку //Формируем ошибку
let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса."); let err = new Error("Истекло время ожидания формирования ответа для завершения текущего запроса.");
err.status = 504; err.status = 504;
//Отправляем ошибку //Отправляем ошибку
next(err); next(err);
}); });
next(); next();
}); });
}
//Конфигурируем сервер - обработка тела сообщения //Конфигурируем сервер - обработка тела сообщения
this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" })); this.webApp.use(bodyParser.raw({ limit: `${this.inComing.nMsgMaxSize}mb`, type: "*/*" }));
//Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений //Конфигурируем сервер - обходим все сервисы, работающие на приём сообщений
@ -624,7 +629,8 @@ class InQueue extends EventEmitter {
//Протоколируем в журнал работы сервера //Протоколируем в журнал работы сервера
await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), { await this.logger.error(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)), {
nServiceId: srvs.nId, nServiceId: srvs.nId,
nServiceFnId: fn.nId nServiceFnId: fn.nId,
nQueueId: req.nQId || null
}); });
//Отправим ошибку клиенту //Отправим ошибку клиенту
res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message))); res.status(err.status || 500).send(makeErrorText(new ServerError(SERR_WEB_SERVER, err.message)));

View File

@ -90,7 +90,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
}) })
}); });
} catch (e) { } catch (e) {
await logger.error(`Ошибка обработки исходящего сообщения Kafka: ${makeErrorText(e)}`); await logger.error(`Ошибка обработки входящего сообщения Kafka: ${makeErrorText(e)}`);
} }
} }
}); });
@ -107,7 +107,7 @@ const subscribeKafka = async ({ settings, service, processMessage, logger }) =>
//Возвращаем соединение //Возвращаем соединение
return consumer; return consumer;
} catch (e) { } catch (e) {
await logger.error(`Ошибка запуска обработчика очереди исходящих сообщений Kafka: ${makeErrorText(e)}`); await logger.error(`Ошибка запуска обработчика очереди входящих сообщений Kafka: ${makeErrorText(e)}`);
} }
}; };

View File

@ -404,7 +404,7 @@ const getURLProtocol = sURL => {
}; };
//Обёртывание промиса в таймаут исполнения //Обёртывание промиса в таймаут исполнения
const wrapPromiseTimeout = (timeout, promise) => { const wrapPromiseTimeout = (timeout, promise, promiseCancellable = true) => {
if (!timeout) return promise; if (!timeout) return promise;
let timeoutPid; let timeoutPid;
const timeoutPromise = new Promise((resolve, reject) => { const timeoutPromise = new Promise((resolve, reject) => {
@ -414,7 +414,7 @@ const wrapPromiseTimeout = (timeout, promise) => {
timeoutPid = setTimeout(() => reject(e), timeout); timeoutPid = setTimeout(() => reject(e), timeout);
}); });
return Promise.race([promise, timeoutPromise]).finally(() => { return Promise.race([promise, timeoutPromise]).finally(() => {
if (promise.promise().isPending()) promise.cancel(); if (promiseCancellable && promise.promise().isPending()) promise.cancel();
if (timeoutPid) clearTimeout(timeoutPid); if (timeoutPid) clearTimeout(timeoutPid);
}); });
}; };

View File

@ -27,7 +27,8 @@ const tag = [
"Resolutions", "Resolutions",
"XmlSignatureRejections", "XmlSignatureRejections",
"RecipientTitles", "RecipientTitles",
"Requests" "Requests",
"UniversalMessages"
]; ];
//------------ //------------
@ -144,6 +145,11 @@ const getOrganizations = organizations => {
//Найдем активную организацию не в роуминге //Найдем активную организацию не в роуминге
organization.Organizations[0] = organizations.Organizations.find(org => (org.IsRoaming === isRoaming) && (org.IsActive === isActive)); organization.Organizations[0] = organizations.Organizations.find(org => (org.IsRoaming === isRoaming) && (org.IsActive === isActive));
//Если не удалось получить организацию не в роуминге //Если не удалось получить организацию не в роуминге
if (!organization.Organizations[0]) {
//Найдем активную организацию
organization.Organizations[0] = organizations.Organizations.find(org => (org.IsActive === isActive));
};
//Если не удалось получить активную организацию
if (!organization.Organizations[0]) { if (!organization.Organizations[0]) {
//Если нет организации не в роуминге и найдено более одной организации //Если нет организации не в роуминге и найдено более одной организации
if (organizations.Organizations.length > 1) { if (organizations.Organizations.length > 1) {
@ -290,6 +296,8 @@ const beforeMessagePost = async prms => {
} }
//Если не достали из контекста токен доступа - значит нет аутентификации на сервере //Если не достали из контекста токен доступа - значит нет аутентификации на сервере
if (!sToken) return { bUnAuth: true }; if (!sToken) return { bUnAuth: true };
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Конвертируем XML из "Парус 8" в JSON //Конвертируем XML из "Парус 8" в JSON
let obj = await toJSON(prms.queue.blMsg.toString()); let obj = await toJSON(prms.queue.blMsg.toString());
//Формируем запрос для получения FromBoxId //Формируем запрос для получения FromBoxId
@ -307,7 +315,7 @@ const beforeMessagePost = async prms => {
//Получим идентификатор организации по ИНН/КПП поставщика документа //Получим идентификатор организации по ИНН/КПП поставщика документа
for (let i in serverResp.Organizations) { for (let i in serverResp.Organizations) {
//Если найдена подходящая организация - запомним идентификатор и выходим из цикла //Если найдена подходящая организация - запомним идентификатор и выходим из цикла
if (serverResp.Organizations[i].Inn == prms.options.inn_pr && serverResp.Organizations[i].Kpp == prms.options.kpp_pr) { if (serverResp.Organizations[i].Inn == optionsData.inn_pr && serverResp.Organizations[i].Kpp == optionsData.kpp_pr) {
//Сохраняем полученный ответ //Сохраняем полученный ответ
obj.FromBoxId = serverResp.Organizations[i].Boxes[0].BoxId; obj.FromBoxId = serverResp.Organizations[i].Boxes[0].BoxId;
break; break;
@ -315,13 +323,13 @@ const beforeMessagePost = async prms => {
} }
//Не удалось получить ящик отправителя //Не удалось получить ящик отправителя
if (!obj.FromBoxId) { if (!obj.FromBoxId) {
throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${prms.options.inn_pr} и КПП: ${prms.options.kpp_pr}`); throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${optionsData.inn_pr} и КПП: ${optionsData.kpp_pr}`);
} }
} catch (e) { } catch (e) {
throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`); throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`);
} }
//Получим ящик получателя //Получим ящик получателя
organization = await getOrganizationBoxId(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), prms.options.inn_cs, prms.options.kpp_cs); organization = await getOrganizationBoxId(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), optionsData.inn_cs, optionsData.kpp_cs);
obj.ToBoxId = organization.BoxId; obj.ToBoxId = organization.BoxId;
//Если не заполнен идентификатор подразделения и при получении ящика удалось его подобрать //Если не заполнен идентификатор подразделения и при получении ящика удалось его подобрать
if ((!obj.ToDepartmentId) && (organization.DepartmentId)) { if ((!obj.ToDepartmentId) && (organization.DepartmentId)) {
@ -448,6 +456,8 @@ const beforeEvent = async prms => {
} }
//Если не достали из контекста токен доступа - значит нет аутентификации на сервере //Если не достали из контекста токен доступа - значит нет аутентификации на сервере
if (!sToken) return { bUnAuth: true }; if (!sToken) return { bUnAuth: true };
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Формируем запрос для получения BoxId //Формируем запрос для получения BoxId
let rqpOptions = { let rqpOptions = {
uri: buildMyOrganizationURL(prms.service.sSrvRoot), uri: buildMyOrganizationURL(prms.service.sSrvRoot),
@ -460,7 +470,7 @@ const beforeEvent = async prms => {
//Получим идентификатор организации по ИНН/КПП контрагента организации //Получим идентификатор организации по ИНН/КПП контрагента организации
for (let i in serverResp.Organizations) { for (let i in serverResp.Organizations) {
//Если найдена подходящая организация - запомним идентификатор и выходим из цикла //Если найдена подходящая организация - запомним идентификатор и выходим из цикла
if (serverResp.Organizations[i].Inn == prms.options.inn && serverResp.Organizations[i].Kpp == prms.options.kpp) { if (serverResp.Organizations[i].Inn == optionsData.inn && serverResp.Organizations[i].Kpp == optionsData.kpp) {
//Сохраняем полученный ответ //Сохраняем полученный ответ
sBoxId = serverResp.Organizations[i].Boxes[0].BoxId; sBoxId = serverResp.Organizations[i].Boxes[0].BoxId;
//Если задано подразделение //Если задано подразделение
@ -487,7 +497,7 @@ const beforeEvent = async prms => {
} }
//Не удалось получить ящик текущей организации //Не удалось получить ящик текущей организации
if (!sBoxId) { if (!sBoxId) {
throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${prms.options.inn} и КПП: ${prms.options.kpp}`); throw new Error(`Не удалось получить ящик текущей организации с ИНН: ${optionsData.inn} и КПП: ${optionsData.kpp}`);
} }
} catch (e) { } catch (e) {
throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`); throw Error(`Ошибка при получении ящика текущей организации: ${e.message}`);
@ -683,12 +693,16 @@ const beforeDocLoad = async prms => {
entId = "documentId="; entId = "documentId=";
msgId = "letterId="; msgId = "letterId=";
break; break;
//Загрузка Универсального сообщения
case 6:
entId = "attachmentId=";
break;
default: default:
} }
surl = `${surl}?${msgId}${prms.options.smsgid}&${entId}${prms.options.sentid}`; surl = `${surl}?${msgId}${prms.options.smsgid}&${entId}${prms.options.sentid}`;
let obj; let obj;
let rblMsg; let rblMsg;
if (prms.queue.blMsg && prms.options.type != 5) { if (prms.queue.blMsg && (prms.options.type != 5) && (prms.options.type != 6)) {
//Конвертируем XML из "Парус 8" в понятный "ДИАДОК" JSON //Конвертируем XML из "Парус 8" в понятный "ДИАДОК" JSON
obj = await toJSON(prms.queue.blMsg.toString()); obj = await toJSON(prms.queue.blMsg.toString());
rblMsg = Buffer.from(JSON.stringify(obj)); rblMsg = Buffer.from(JSON.stringify(obj));
@ -823,6 +837,8 @@ const beforeDepartmentIdGet = async prms => {
const afterDepartmentIdGet = async prms => { const afterDepartmentIdGet = async prms => {
let resu = null; let resu = null;
let organization = {}; let organization = {};
//Получим параметры запроса
const optionsData = await toJSON(prms.queue.sOptions);
//Действие выполнено успешно //Действие выполнено успешно
if (prms.optionsResp.statusCode == 200) { if (prms.optionsResp.statusCode == 200) {
try { try {
@ -830,7 +846,7 @@ const afterDepartmentIdGet = async prms => {
//Получим организацию не в роуминге (или единственную организацию в роуминге) //Получим организацию не в роуминге (или единственную организацию в роуминге)
organization = getOrganizations(JSON.parse(prms.queue.blResp.toString())); organization = getOrganizations(JSON.parse(prms.queue.blResp.toString()));
if (!organization) { if (!organization) {
throw Error(`Не удалось получить ящик для контрагента с ИНН: ${prms.options.nINN} и КПП: ${prms.options.nKPP}`); throw Error(`Не удалось получить ящик для контрагента с ИНН: ${optionsData.nINN} и КПП: ${optionsData.nKPP}`);
} }
} catch (e) { } catch (e) {
//Получим ключ разработчика //Получим ключ разработчика
@ -838,7 +854,7 @@ const afterDepartmentIdGet = async prms => {
//Считаем токен доступа из контекста сервиса //Считаем токен доступа из контекста сервиса
let sToken = prms.service.sCtx; let sToken = prms.service.sCtx;
//Получим головную организацию по ИНН/КПП //Получим головную организацию по ИНН/КПП
organization = await getOrganization(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), prms.options.nINN, prms.options.nKPP); organization = await getOrganization(prms.service.sSrvRoot, buildHeaders(sAPIClientId, sToken), optionsData.nINN, optionsData.nKPP);
}; };
//Преобразуем JSON ответ сервиса "ДИАДОК" в XML, понятный "Парус 8" //Преобразуем JSON ответ сервиса "ДИАДОК" в XML, понятный "Парус 8"
resu = toXML({ root: organization }); resu = toXML({ root: organization });

View File

@ -9,14 +9,14 @@
}, },
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git+https://github.com/CITKParus/ExchangeService.git" "url": "git+https://git.citpb.ru/CITKParus/P8-ExchangeService.git"
}, },
"author": "CITK Parus", "author": "CITK Parus",
"license": "ISC", "license": "ISC",
"bugs": { "bugs": {
"url": "https://github.com/CITKParus/ExchangeService/issues" "url": "https://git.citpb.ru/CITKParus/P8-ExchangeService/issues"
}, },
"homepage": "https://github.com/CITKParus/ExchangeService#readme", "homepage": "https://git.citpb.ru/CITKParus/P8-ExchangeService/",
"dependencies": { "dependencies": {
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"cors": "^2.8.5", "cors": "^2.8.5",