From 5dabee332e0f0776093021df5084549605c9a017 Mon Sep 17 00:00:00 2001 From: Sucukdeluxe Date: Tue, 3 Mar 2026 21:43:34 +0100 Subject: [PATCH] Parallel archive extraction within packages maxParallelExtract now controls how many archives extract simultaneously within a single package (e.g. 4 episodes at once). Packages still extract sequentially (one package at a time) to focus I/O. Progress handler updated to track multiple active archives independently. Co-Authored-By: Claude Opus 4.6 --- package.json | 2 +- src/main/download-manager.ts | 89 +++++++++++++++++++++--------------- src/main/extractor.ts | 66 +++++++++++++++++++++----- 3 files changed, 107 insertions(+), 50 deletions(-) diff --git a/package.json b/package.json index 9221c21..d59fe7e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "real-debrid-downloader", - "version": "1.5.70", + "version": "1.5.71", "description": "Real-Debrid Downloader Desktop (Electron + React + TypeScript)", "main": "build/main/main/main.js", "author": "Sucukdeluxe", diff --git a/src/main/download-manager.ts b/src/main/download-manager.ts index d5d4b48..26e25ff 100644 --- a/src/main/download-manager.ts +++ b/src/main/download-manager.ts @@ -5516,32 +5516,8 @@ export class DownloadManager extends EventEmitter { const resolveArchiveItems = (archiveName: string): DownloadItem[] => resolveArchiveItemsFromList(archiveName, completedItems); - // Only update items of the currently extracting archive, not all items - let currentArchiveItems: DownloadItem[] = []; - const updateExtractingStatus = (text: string): void => { - const normalized = String(text || ""); - if (lastExtractStatusText === normalized) { - return; - } - lastExtractStatusText = normalized; - const updatedAt = nowMs(); - for (const entry of currentArchiveItems) { - if (isExtractedLabel(entry.fullStatus)) { - continue; - } - if (entry.fullStatus === normalized) { - continue; - } - entry.fullStatus = normalized; - entry.updatedAt = updatedAt; - } - }; - - let lastExtractStatusText = ""; let lastExtractEmitAt = 0; - let lastExtractArchiveName = ""; - const emitExtractStatus = (text: string, force = false): void => { - updateExtractingStatus(text); + const emitExtractStatus = (_text: string, force = false): void => { const now = nowMs(); if (!force && now - lastExtractEmitAt < EXTRACT_PROGRESS_EMIT_INTERVAL_MS) { return; @@ -5586,6 +5562,9 @@ export class DownloadManager extends EventEmitter { } }, extractTimeoutMs); try { + // Track multiple active archives for parallel extraction + const activeArchiveItemsMap = new Map(); + const result = await extractPackageArchives({ packageDir: pkg.outputDir, targetDir: pkg.extractDir, @@ -5596,33 +5575,69 @@ export class DownloadManager extends EventEmitter { passwordList: this.settings.archivePasswordList, signal: extractAbortController.signal, packageId, + maxParallel: this.settings.maxParallelExtract || 2, onProgress: (progress) => { - // When a new archive starts, mark the previous archive's items as done - if (progress.archiveName && progress.archiveName !== lastExtractArchiveName) { - if (lastExtractArchiveName && currentArchiveItems.length > 0) { + if (progress.phase === "done") { + // Mark all remaining active archives as done + for (const [, items] of activeArchiveItemsMap) { const doneAt = nowMs(); - for (const entry of currentArchiveItems) { + for (const entry of items) { if (!isExtractedLabel(entry.fullStatus)) { entry.fullStatus = "Entpackt - Done"; entry.updatedAt = doneAt; } } } - lastExtractArchiveName = progress.archiveName; - currentArchiveItems = resolveArchiveItems(progress.archiveName); + activeArchiveItemsMap.clear(); + emitExtractStatus("Entpacken 100%", true); + return; } - const label = progress.phase === "done" - ? "Entpacken 100%" - : (() => { + + if (progress.archiveName) { + // Resolve items for this archive if not yet tracked + if (!activeArchiveItemsMap.has(progress.archiveName)) { + activeArchiveItemsMap.set(progress.archiveName, resolveArchiveItems(progress.archiveName)); + } + const archiveItems = activeArchiveItemsMap.get(progress.archiveName)!; + + // If archive is at 100%, mark its items as done and remove from active + if (Number(progress.archivePercent ?? 0) >= 100) { + const doneAt = nowMs(); + for (const entry of archiveItems) { + if (!isExtractedLabel(entry.fullStatus)) { + entry.fullStatus = "Entpackt - Done"; + entry.updatedAt = doneAt; + } + } + activeArchiveItemsMap.delete(progress.archiveName); + } else { + // Update this archive's items with current progress const archive = progress.archiveName ? ` · ${progress.archiveName}` : ""; const elapsed = progress.elapsedMs && progress.elapsedMs >= 1000 ? ` · ${Math.floor(progress.elapsedMs / 1000)}s` : ""; const activeArchive = Number(progress.archivePercent ?? 0) > 0 ? 1 : 0; const currentDisplay = Math.max(0, Math.min(progress.total, progress.current + activeArchive)); - return `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`; - })(); - emitExtractStatus(label); + const label = `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`; + const updatedAt = nowMs(); + for (const entry of archiveItems) { + if (!isExtractedLabel(entry.fullStatus) && entry.fullStatus !== label) { + entry.fullStatus = label; + entry.updatedAt = updatedAt; + } + } + } + } + + // Emit overall status (throttled) + const archive = progress.archiveName ? ` · ${progress.archiveName}` : ""; + const elapsed = progress.elapsedMs && progress.elapsedMs >= 1000 + ? ` · ${Math.floor(progress.elapsedMs / 1000)}s` + : ""; + const activeArchive = Number(progress.archivePercent ?? 0) > 0 ? 1 : 0; + const currentDisplay = Math.max(0, Math.min(progress.total, progress.current + activeArchive)); + const overallLabel = `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`; + emitExtractStatus(overallLabel); } }); logger.info(`Post-Processing Entpacken Ende: pkg=${pkg.name}, extracted=${result.extracted}, failed=${result.failed}, lastError=${result.lastError || ""}`); diff --git a/src/main/extractor.ts b/src/main/extractor.ts index bece033..c2501f9 100644 --- a/src/main/extractor.ts +++ b/src/main/extractor.ts @@ -87,6 +87,7 @@ export interface ExtractOptions { skipPostCleanup?: boolean; packageId?: string; hybridMode?: boolean; + maxParallel?: number; } export interface ExtractProgressUpdate { @@ -1905,12 +1906,15 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{ emitProgress(extracted, "", "extracting"); - for (const archivePath of pendingCandidates) { - if (options.signal?.aborted) { + const maxParallel = Math.max(1, options.maxParallel || 1); + let noExtractorEncountered = false; + + const extractSingleArchive = async (archivePath: string): Promise => { + if (options.signal?.aborted || noExtractorEncountered) { throw new Error("aborted:extract"); } - const archiveName = path.basename(archivePath); - const archiveResumeKey = archiveNameKey(archiveName); + const archiveName = path.basename(archivePath); + const archiveResumeKey = archiveNameKey(archiveName); const archiveStartedAt = Date.now(); let archivePercent = 0; emitProgress(extracted + failed, archiveName, "extracting", archivePercent, 0); @@ -1930,7 +1934,8 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{ const sig = await detectArchiveSignature(archivePath); if (!sig) { logger.info(`Generische Split-Datei übersprungen (keine Archiv-Signatur): ${archiveName}`); - continue; + clearInterval(pulseTimer); + return; } logger.info(`Generische Split-Datei verifiziert (Signatur: ${sig}): ${archiveName}`); } @@ -1994,23 +1999,60 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{ failed += 1; const errorText = String(error); if (isExtractAbortError(errorText)) { - throw new Error("aborted:extract"); + throw error; } lastError = errorText; const errorCategory = classifyExtractionError(errorText); logger.error(`Entpack-Fehler ${path.basename(archivePath)} [${errorCategory}]: ${errorText}`); emitProgress(extracted + failed, archiveName, "extracting", archivePercent, Date.now() - archiveStartedAt); if (isNoExtractorError(errorText)) { - const remaining = candidates.length - (extracted + failed); - if (remaining > 0) { - failed += remaining; - emitProgress(candidates.length, archiveName, "extracting", 0, Date.now() - archiveStartedAt); - } - break; + noExtractorEncountered = true; } } finally { clearInterval(pulseTimer); } + }; + + if (maxParallel <= 1) { + for (const archivePath of pendingCandidates) { + if (options.signal?.aborted || noExtractorEncountered) break; + await extractSingleArchive(archivePath); + } + } else { + // Parallel extraction pool: N workers pull from a shared queue + const queue = [...pendingCandidates]; + let nextIdx = 0; + let abortError: Error | null = null; + + const worker = async (): Promise => { + while (nextIdx < queue.length && !abortError && !noExtractorEncountered) { + if (options.signal?.aborted) break; + const idx = nextIdx; + nextIdx += 1; + try { + await extractSingleArchive(queue[idx]); + } catch (error) { + if (isExtractAbortError(String(error))) { + abortError = error instanceof Error ? error : new Error(String(error)); + break; + } + // Non-abort errors are already handled inside extractSingleArchive + } + } + }; + + const workerCount = Math.min(maxParallel, pendingCandidates.length); + logger.info(`Parallele Extraktion: ${workerCount} gleichzeitige Worker für ${pendingCandidates.length} Archive`); + await Promise.all(Array.from({ length: workerCount }, () => worker())); + + if (abortError) throw new Error("aborted:extract"); + if (noExtractorEncountered) { + const remaining = candidates.length - (extracted + failed); + if (remaining > 0) { + failed += remaining; + emitProgress(candidates.length, "", "extracting", 0, 0); + } + } } // ── Nested extraction: extract archives found inside the output (1 level) ──