/* Сервис интеграции ПП Парус 8 с WEB API Дополнительный модуль: Интеграция с ГАР (GAR) */ //------------------------------ // Подключаем внешние библиотеки //------------------------------ const fs = require("fs"); //Работа с файлами const { pipeline } = require("stream"); //Работа с потоками const { promisify } = require("util"); //Вспомогательные инструменты const xml2js = require("xml2js"); //Конвертация XML в JSON и JSON в XML const confServ = require("../config"); //Настройки сервера приложений const conf = require("./gar_config"); //Параметры расширения "Интеграция с ГАР" const StreamZip = require("./gar_utils/node_modules/node-stream-zip"); //Работа с ZIP-архивами const fetch = require("./gar_utils/node_modules/node-fetch"); //Работа с запросами const { WorkersPool } = require("./gar_utils/workers_pool"); //Пул обработчиков const { logInf, makeTaskMessage, logWrn, logErr, stringToDate, dateToISOString } = require("./gar_utils/utils"); //Вспомогательные функции //-------------------------- // Глобальные идентификаторы //-------------------------- //Название модудля для протокола const MODULE = `GAR`; //Параметры пула обработчиков const workersPoolOptions = { workerPath: "./modules/gar_utils/import.js", limit: conf.common.nThreads, timeout: 0, drainTimeout: 60000 }; //Пул обработчиков let WP = null; //Очередь на парсинг let PARSE_QUEUE = []; //Обрабатываемые элементы архива let ENTRIES = []; //Всего элементов в архиве let ENTRIES_COUNT = 0; //Всего файлов в архиве let FILES_COUNT = 0; //Общий объем файлов в архиве let TOTAL_SIZE = 0; //Объем успешно обработанных файлов let PROCESSED_SIZE = 0; //Количество успешно обработанных файлов let PROCESSED_COUNT = 0; //Объем файлов обработанных с ошибками let ERROR_SIZE = 0; //Количество файлов обработанных с ошибками let ERROR_COUNT = 0; //Начало let START_TIME = null; //Окончание let END_TIME = null; //Протокол выполнения let LOAD_LOG = null; //Флаг распакованности архива let ZIP_UNPACKED = false; //------------ // Тело модуля //------------ //Выдача общей статистики const printCommonStats = () => { logWrn(`Всего элементов: ${ENTRIES_COUNT}, файлов для обработки: ${FILES_COUNT}`, MODULE, LOAD_LOG); logWrn(`Объем файлов для обработки: ${TOTAL_SIZE} байт`, MODULE, LOAD_LOG); }; //Выдача статистики импорта const printImportStats = () => { logWrn(`Количество необработанных файлов: ${FILES_COUNT - ERROR_COUNT - PROCESSED_COUNT}`, MODULE, LOAD_LOG); logWrn(`Объем необработанных файлов: ${TOTAL_SIZE - ERROR_SIZE - PROCESSED_SIZE} байт`, MODULE, LOAD_LOG); logWrn(`Количество файлов обработанных с ошибками: ${ERROR_COUNT}`, MODULE, LOAD_LOG); logWrn(`Объем файлов обработанных с ошибками: ${ERROR_SIZE} байт`, MODULE, LOAD_LOG); logWrn(`Количество файлов успешно обработанных: ${PROCESSED_COUNT}`, MODULE, LOAD_LOG); logWrn(`Объем файлов успешно обработанных: ${PROCESSED_SIZE} байт`, MODULE, LOAD_LOG); logWrn(`Начало: ${START_TIME}`, MODULE, LOAD_LOG); logWrn(`Окончание: ${END_TIME}`, MODULE, LOAD_LOG); logWrn(`Длительность: ${(END_TIME.getTime() - START_TIME.getTime()) / 1000} секунд`, MODULE, LOAD_LOG); }; //Подчистка временного файла const removeTempFile = fileFullName => { logInf(`Удаляю временный "${fileFullName}"...`, MODULE, LOAD_LOG); fs.rm(fileFullName, { maxRetries: 5, retryDelay: 1000 }, err => { if (err) logErr(`Ошибка удаления временного файла "${fileFullName}": ${err.message}`, MODULE, LOAD_LOG); else logInf(`Удалено "${fileFullName}".`, MODULE, LOAD_LOG); }); }; //Проверка необходимости загрузки элемента const needLoad = ({ processedCount, entry, processLimit, processFilter }) => (processLimit === 0 || processedCount <= processLimit) && !entry.isDirectory && entry.name.toLowerCase().endsWith("xml") && (processFilter === null || (processFilter != null && entry.name.match(processFilter))); //Обработка очереди на распаковку const processParseQueue = async () => { //Если в очереди еще есть необработанные элементы if (PARSE_QUEUE.length > 0) { //Получим данные элемента очереди const { entry, fileFullName, fileName, garVersionInfo } = PARSE_QUEUE.shift(); //Если обработчик запущен if (WP.started) { //Отправим задачу на выполнение try { await WP.sendTask(makeTaskMessage({ payload: { garVersionInfo, fileFullName, fileName } }), (e, p) => { //Удалим временный файл removeTempFile(fileFullName); //Если ошибка if (e) { //Размер файлов, обработанных с ошибками ERROR_SIZE += entry.size; //Количество ошибок ERROR_COUNT++; //Сообщение об ошибке let msg = `При обработке "${entry.name}": ${e.message}`; logErr(msg, MODULE, LOAD_LOG); } else { //Размер успешно обработанных файлов PROCESSED_SIZE += entry.size; //Количество успешно обработанных файлов PROCESSED_COUNT++; logWrn(`Обработано успешно "${entry.name}".`, MODULE, LOAD_LOG); } logWrn( `Всего обработано: ${PROCESSED_SIZE + ERROR_SIZE} байт, ${Math.round(((PROCESSED_SIZE + ERROR_SIZE) / TOTAL_SIZE) * 100)}%`, MODULE, LOAD_LOG ); }); } catch (e) { //Удалим временный файл logErr(`При размещении задачи для "${entry.name}": ${e.message}`, MODULE, LOAD_LOG); removeTempFile(fileFullName); } } else { //Пул фоновых обработчиков остановлен (могла прийти команда принудительного выключения) logErr(`При размещении задачи для "${entry.name}": пул уже остановлен. Прекращаю работу.`, MODULE, LOAD_LOG); removeTempFile(fileFullName); } } if (PARSE_QUEUE.length > 0 || !ZIP_UNPACKED) setTimeout(processParseQueue, 0); }; //Конвертация в XML const toXML = obj => { const builder = new xml2js.Builder(); return builder.buildObject(obj); }; //Обработчик после получения обновлений ГАР const afterLoad = async prms => { if (!conf.common.sDownloadsDir) throw new Error(`Не указан путь для размещения загруженных файлов.`); if (!conf.common.sTmpDir) throw new Error(`Не указан путь для размещения временных файлов.`); if (!conf.common.sLogDir) throw new Error(`Не указан путь для размещения файлов протоколирования.`); //Информация о загружаемых данных const LOAD_INFO = { REGIONS: prms.options.sRegions, GARDATELAST: stringToDate(prms.options.dGarDateLast), HOUSESLOADED: Number(prms.options.nHousesLoaded), STEADSLOADED: Number(prms.options.nSteadsLoaded) }; //Если указаны загружаемые регионы и дата последней загруженной версии ГАР if (LOAD_INFO.REGIONS && LOAD_INFO.GARDATELAST) { //Идентификаторы загружаемых процессов let loadIdents = []; //Идентификатор протоколирования const logIdent = Date.now(); //Открываю лог выполнения LOAD_LOG = fs.createWriteStream(`${conf.common.sLogDir}/gar_load_${logIdent}.log`); LOAD_LOG.on("error", e => {}); LOAD_LOG.on("close", () => {}); logInf("Протокол выполнения загрузки ГАР открыт.", MODULE, LOAD_LOG); //Информация о версиях ГАР const requestRespJson = JSON.parse(prms.queue.blResp.toString()); //Обработаем полученную информацию о версиях ГАР logInf(`Обрабатываю полученую информацию о версиях ГАР...`, MODULE, LOAD_LOG); //Версии ГАР для загрузки let garVersions = []; //Регионы const regions = LOAD_INFO.REGIONS.split(";"); //Последняя загруженная версия ГАР const garDateLast = LOAD_INFO.GARDATELAST; //Признак загрузки домов const housesLoaded = LOAD_INFO.HOUSESLOADED ? LOAD_INFO.HOUSESLOADED : 0; //Признак загрузки участков const steadsLoaded = LOAD_INFO.STEADSLOADED ? LOAD_INFO.STEADSLOADED : 0; //Если не указана последняя загруженная версия ГАР if (!garDateLast) throw new Error(`Не указана последняя загруженная версия ГАР, обновление недоступно.`); //Обойдем элементы ответа for (let respElement of requestRespJson) { //Дата версии ГАР const garVersionDate = stringToDate(respElement.Date); //Ссылка на данные обновления const garXmlDeltaUrl = respElement.GarXMLDeltaURL; //Если указана дата и ссылка на обновление if (garVersionDate && garXmlDeltaUrl) { //Если версия вышла позже последней загруженной if (garDateLast < garVersionDate) { //Сохраним версию ГАР garVersions.push({ versionDate: dateToISOString(garVersionDate), xmlDeltaUrl: garXmlDeltaUrl }); } } else { throw new Error(`Не удалось корректно определить информацию о версиях ГАР.`); } } logInf(`Полученая информация о версиях ГАР обработана.`, MODULE, LOAD_LOG); //Если не указаны необходимые для загрузки версии ГАР if (!garVersions || garVersions.length == 0) throw new Error( `Не удалось определить необходимые для загрузки версии ГАР, вышедшие после ${garDateLast.toISOString().substring(0, 10)}.` ); //Обработаем версии ГАР logInf(`Обрабатываю версии ГАР...`, MODULE, LOAD_LOG); //Отсортируем версии ГАР по возрастанию garVersions.sort((a, b) => { if (a.versionDate > b.versionDate) return 1; if (a.versionDate === b.versionDate) return 0; if (a.versionDate < b.versionDate) return -1; }); //Пул обработчиков WP = new WorkersPool(workersPoolOptions); //Запуск фоновых процессов logInf(`Стартую обработчики...`, MODULE, LOAD_LOG); await WP.start({ dbBuferSize: conf.dbConnect.nBufferSize, fileChunkSize: conf.common.nFileChunkSize, loadLog: LOAD_LOG, dbConn: { sUser: confServ.dbConnect.sUser, sPassword: confServ.dbConnect.sPassword, sConnectString: confServ.dbConnect.sConnectString, sSchema: confServ.dbConnect.sSchema } }); logInf(`Обработчики запущены.`, MODULE, LOAD_LOG); //Обрабатываемая версия ГАР let garVersion = garVersions[0]; // Обработаем версию ГАР logInf(`Обрабатываю версию ГАР "${garVersion.versionDate}"...`, MODULE, LOAD_LOG); //Флаг необходимости загрузки файла let downloadFlag = true; //Полный путь к загрузке (временная переменная) let fileFullNameTmp = `${conf.common.sDownloadsDir}/${garVersion.versionDate}.zip`; //Если файл был загружен ранее if (fs.existsSync(fileFullNameTmp)) { logInf(`Файл "${fileFullNameTmp}" уже существует.`, MODULE, LOAD_LOG); //Если разрешено использование существующего файла if (conf.common.bDownloadsUseExists) downloadFlag = false; else fileFullNameTmp = `${conf.common.sDownloadsDir}/${garVersion.versionDate}_${logIdent}.zip`; } //Полный путь к загрузке const fileFullName = fileFullNameTmp; //Если необходимо загрузить файл if (downloadFlag) { //Загружаем файл try { logInf(`Загружаю файл по ссылке "${garVersion.xmlDeltaUrl}" в каталог "${conf.common.sDownloadsDir}"...`, MODULE, LOAD_LOG); const streamPipeline = promisify(pipeline); const fileData = await fetch(garVersion.xmlDeltaUrl, { redirect: "follow", follow: 20 }); if (!fileData.ok) throw new Error(`Не удалось загрузить файл по ссылке "${garVersion.xmlDeltaUrl}": ${fileData.statusText}.`); await streamPipeline(fileData.body, fs.createWriteStream(fileFullName)); logInf(`Файл "${fileFullName}" загружен.`, MODULE, LOAD_LOG); } catch (e) { const errorMessage = `Ошибка загрузки файла по ссылке "${garVersion.xmlDeltaUrl}": ${e.message}.`; logErr(errorMessage, MODULE, LOAD_LOG); throw new Error(errorMessage); } } //Обнулим переменные ENTRIES = []; TOTAL_SIZE = 0; FILES_COUNT = 0; PARSE_QUEUE = []; ENTRIES_COUNT = 0; PROCESSED_SIZE = 0; PROCESSED_COUNT = 0; ERROR_SIZE = 0; ERROR_COUNT = 0; START_TIME = null; END_TIME = null; ZIP_UNPACKED = false; //Анализ архива logInf(`Читаю архив...`, MODULE, LOAD_LOG); const zip = new StreamZip.async({ file: fileFullName }); const entries = await zip.entries(); //Обойдем файлы архива for (const entry of Object.values(entries)) { //Количество файлов архива ENTRIES_COUNT++; //Путь к фалу в архиве const path = entry.name.split("/"); //Если подходящий путь к файлу if ([1, 2].includes(path.length)) { //Регион const region = path.length == 2 ? path[0] : ""; //Если указан регион и он входит в состав регионов, которые необходимо загрузить и файл попадает под условия загрузки if ( (!region || !regions || (region && regions && regions.includes(region))) && needLoad({ processedCount: FILES_COUNT, entry, processLimit: conf.common.nLoadFilesLimit, processFilter: conf.common.sLoadFilesMask }) && (housesLoaded == 1 || ((!housesLoaded || housesLoaded != 1) && !path[path.length - 1].startsWith(`AS_HOUSES`))) && (steadsLoaded == 1 || ((!steadsLoaded || steadsLoaded != 1) && !path[path.length - 1].startsWith(`AS_STEADS`))) ) { //Количество, подошедших под условия загрузки, файлов FILES_COUNT++; //Общий размер файлов, подошедших под условия загрузки TOTAL_SIZE += entry.size; //Запомним файл ENTRIES.push(entry); } } } //Отсортируем файлы в порядке возрастания по размеру файла ENTRIES.sort((a, b) => (a.size > b.size ? 1 : a.size < b.size ? -1 : 0)); printCommonStats(); logInf(`Архив прочитан.`, MODULE, LOAD_LOG); //Обработка очереди на парсинг setTimeout(processParseQueue, 0); //Время начала обработки архива START_TIME = new Date(); //Идентификатор процесса const ident = Date.now(); //Директория для размещения временных файлов архива const garVersionDir = `${conf.common.sTmpDir}/${garVersion.versionDate}`; //Если не существует директории для размещения временных файлов if (!fs.existsSync(garVersionDir)) { //Создадим директорию try { fs.mkdirSync(garVersionDir); } catch (e) { throw new Error(`Не удалось создать директорию "${garVersionDir}": ${e.message}`); } } //Обойдем файлы архива for (const entry of ENTRIES) { //Путь к файлу архива const path = entry.name.split("/"); //Имя файла const unzipFileName = path[path.length - 1]; //Регион const region = path.length == 2 ? path[0] : ""; //Полный путь к файлу const unzipFileFullName = `${garVersionDir}/${region ? `${region}/` : ""}${unzipFileName}`; //Если указан регион if (region) if (!fs.existsSync(`${garVersionDir}/${region}`)) //Если еще не существует диретории для региона //Создадим директорию для региона try { fs.mkdirSync(`${garVersionDir}/${region}`); } catch (e) { throw new Error(`Не удалось создать директорию "${garVersionDir}/${region}": ${e.message}`); } //Если файл еще не существует if (!fs.existsSync(unzipFileFullName)) { //Распакуем файл logInf(`Распаковываю "${entry.name}" (${entry.size} байт) в "${unzipFileFullName}"...`, MODULE, LOAD_LOG); await zip.extract(entry.name, unzipFileFullName); logInf(`Распаковано "${entry.name}" в "${unzipFileFullName}".`, MODULE, LOAD_LOG); } else { logInf(`Файл "${entry.name}" уже распакован в директорию "${garVersionDir}".`, MODULE, LOAD_LOG); } //Отдаём его в обработку фоновому процессу PARSE_QUEUE.push({ entry, fileName: unzipFileName, fileFullName: unzipFileFullName, garVersionInfo: { ident, region, versionDate: garVersion.versionDate } }); } //Закрываем архив logInf("Закрываю архив...", MODULE, LOAD_LOG); await zip.close(); logInf("Архив закрыт.", MODULE, LOAD_LOG); //Флаг закрытия архива ZIP_UNPACKED = true; //Ожидаем, пока всё отработает logInf("Жду завершения фоновой обработки...", MODULE, LOAD_LOG); while (PARSE_QUEUE.length > 0 || WP.available != conf.common.nThreads) await new Promise(resolve => setTimeout(resolve, 1000)); logInf("Фоновая обработка завершена.", MODULE, LOAD_LOG); //Очистка директорий для размещения временных файлов logInf(`Очищаю директорию "${garVersionDir}" для размещения временных файлов...`, MODULE, LOAD_LOG); fs.rmSync(garVersionDir, { recursive: true }); logInf(`Каталог "${garVersionDir}" для размещения временных файлов очищена.`, MODULE, LOAD_LOG); //Если необходимо удалить загруженные файлы if (conf.common.bDownloadsDelete) { logInf(`Удаляю загруженный файл "${fileFullName}"...`, MODULE, LOAD_LOG); fs.unlinkSync(fileFullName); logInf(`Загруженный файл "${fileFullName}" удален.`, MODULE, LOAD_LOG); } //Время завершения выполнения загрузки END_TIME = new Date(); printCommonStats(); printImportStats(); //Если обработка прошла успешно if (ERROR_COUNT == 0) { //Запомним обработанную версию loadIdents.push({ GAR_VERSION: { IDENT: ident, VERSION_DATE: garVersion.versionDate, REGIONS: LOAD_INFO.REGIONS, HOUSES_LOADED: housesLoaded, STEADS_LOADED: steadsLoaded } }); logInf(`Версия ГАР "${garVersion.versionDate}" обработана.`, MODULE, LOAD_LOG); } else { loadIdents = null; logErr(`Версия ГАР "${garVersion.versionDate}" обработана с ошибками.`, MODULE, LOAD_LOG); } //Выключаю пул обработчиков logInf("Останавливаю обработчики...", MODULE, LOAD_LOG); await WP.stop(LOAD_LOG); WP = null; logInf("Обработчики остановлены.", MODULE, LOAD_LOG); logInf(`Версии ГАР обработаны.`, MODULE, LOAD_LOG); //Закрываю протокол выполнения logInf("Закрываю протокол выполнения загрузки ГАР...", MODULE, LOAD_LOG); if (LOAD_LOG) LOAD_LOG.destroy(); if (!loadIdents) throw new Error(`Не удалось загрузить данные обновления ГАР.`); //Вернем результат return { blResp: Buffer.from(toXML(loadIdents[0])) }; } else { throw new Error(`Не указан регион и/или дата для загрузки обновлений ГАР.`); } }; //----------------- // Интерфейс модуля //----------------- exports.afterLoad = afterLoad;