Parallel archive extraction within packages
maxParallelExtract now controls how many archives extract simultaneously within a single package (e.g. 4 episodes at once). Packages still extract sequentially (one package at a time) to focus I/O. Progress handler updated to track multiple active archives independently. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d9fe98231f
commit
5dabee332e
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "real-debrid-downloader",
|
||||
"version": "1.5.70",
|
||||
"version": "1.5.71",
|
||||
"description": "Real-Debrid Downloader Desktop (Electron + React + TypeScript)",
|
||||
"main": "build/main/main/main.js",
|
||||
"author": "Sucukdeluxe",
|
||||
|
||||
@ -5516,32 +5516,8 @@ export class DownloadManager extends EventEmitter {
|
||||
const resolveArchiveItems = (archiveName: string): DownloadItem[] =>
|
||||
resolveArchiveItemsFromList(archiveName, completedItems);
|
||||
|
||||
// Only update items of the currently extracting archive, not all items
|
||||
let currentArchiveItems: DownloadItem[] = [];
|
||||
const updateExtractingStatus = (text: string): void => {
|
||||
const normalized = String(text || "");
|
||||
if (lastExtractStatusText === normalized) {
|
||||
return;
|
||||
}
|
||||
lastExtractStatusText = normalized;
|
||||
const updatedAt = nowMs();
|
||||
for (const entry of currentArchiveItems) {
|
||||
if (isExtractedLabel(entry.fullStatus)) {
|
||||
continue;
|
||||
}
|
||||
if (entry.fullStatus === normalized) {
|
||||
continue;
|
||||
}
|
||||
entry.fullStatus = normalized;
|
||||
entry.updatedAt = updatedAt;
|
||||
}
|
||||
};
|
||||
|
||||
let lastExtractStatusText = "";
|
||||
let lastExtractEmitAt = 0;
|
||||
let lastExtractArchiveName = "";
|
||||
const emitExtractStatus = (text: string, force = false): void => {
|
||||
updateExtractingStatus(text);
|
||||
const emitExtractStatus = (_text: string, force = false): void => {
|
||||
const now = nowMs();
|
||||
if (!force && now - lastExtractEmitAt < EXTRACT_PROGRESS_EMIT_INTERVAL_MS) {
|
||||
return;
|
||||
@ -5586,6 +5562,9 @@ export class DownloadManager extends EventEmitter {
|
||||
}
|
||||
}, extractTimeoutMs);
|
||||
try {
|
||||
// Track multiple active archives for parallel extraction
|
||||
const activeArchiveItemsMap = new Map<string, DownloadItem[]>();
|
||||
|
||||
const result = await extractPackageArchives({
|
||||
packageDir: pkg.outputDir,
|
||||
targetDir: pkg.extractDir,
|
||||
@ -5596,33 +5575,69 @@ export class DownloadManager extends EventEmitter {
|
||||
passwordList: this.settings.archivePasswordList,
|
||||
signal: extractAbortController.signal,
|
||||
packageId,
|
||||
maxParallel: this.settings.maxParallelExtract || 2,
|
||||
onProgress: (progress) => {
|
||||
// When a new archive starts, mark the previous archive's items as done
|
||||
if (progress.archiveName && progress.archiveName !== lastExtractArchiveName) {
|
||||
if (lastExtractArchiveName && currentArchiveItems.length > 0) {
|
||||
if (progress.phase === "done") {
|
||||
// Mark all remaining active archives as done
|
||||
for (const [, items] of activeArchiveItemsMap) {
|
||||
const doneAt = nowMs();
|
||||
for (const entry of currentArchiveItems) {
|
||||
for (const entry of items) {
|
||||
if (!isExtractedLabel(entry.fullStatus)) {
|
||||
entry.fullStatus = "Entpackt - Done";
|
||||
entry.updatedAt = doneAt;
|
||||
}
|
||||
}
|
||||
}
|
||||
lastExtractArchiveName = progress.archiveName;
|
||||
currentArchiveItems = resolveArchiveItems(progress.archiveName);
|
||||
activeArchiveItemsMap.clear();
|
||||
emitExtractStatus("Entpacken 100%", true);
|
||||
return;
|
||||
}
|
||||
const label = progress.phase === "done"
|
||||
? "Entpacken 100%"
|
||||
: (() => {
|
||||
|
||||
if (progress.archiveName) {
|
||||
// Resolve items for this archive if not yet tracked
|
||||
if (!activeArchiveItemsMap.has(progress.archiveName)) {
|
||||
activeArchiveItemsMap.set(progress.archiveName, resolveArchiveItems(progress.archiveName));
|
||||
}
|
||||
const archiveItems = activeArchiveItemsMap.get(progress.archiveName)!;
|
||||
|
||||
// If archive is at 100%, mark its items as done and remove from active
|
||||
if (Number(progress.archivePercent ?? 0) >= 100) {
|
||||
const doneAt = nowMs();
|
||||
for (const entry of archiveItems) {
|
||||
if (!isExtractedLabel(entry.fullStatus)) {
|
||||
entry.fullStatus = "Entpackt - Done";
|
||||
entry.updatedAt = doneAt;
|
||||
}
|
||||
}
|
||||
activeArchiveItemsMap.delete(progress.archiveName);
|
||||
} else {
|
||||
// Update this archive's items with current progress
|
||||
const archive = progress.archiveName ? ` · ${progress.archiveName}` : "";
|
||||
const elapsed = progress.elapsedMs && progress.elapsedMs >= 1000
|
||||
? ` · ${Math.floor(progress.elapsedMs / 1000)}s`
|
||||
: "";
|
||||
const activeArchive = Number(progress.archivePercent ?? 0) > 0 ? 1 : 0;
|
||||
const currentDisplay = Math.max(0, Math.min(progress.total, progress.current + activeArchive));
|
||||
return `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`;
|
||||
})();
|
||||
emitExtractStatus(label);
|
||||
const label = `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`;
|
||||
const updatedAt = nowMs();
|
||||
for (const entry of archiveItems) {
|
||||
if (!isExtractedLabel(entry.fullStatus) && entry.fullStatus !== label) {
|
||||
entry.fullStatus = label;
|
||||
entry.updatedAt = updatedAt;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Emit overall status (throttled)
|
||||
const archive = progress.archiveName ? ` · ${progress.archiveName}` : "";
|
||||
const elapsed = progress.elapsedMs && progress.elapsedMs >= 1000
|
||||
? ` · ${Math.floor(progress.elapsedMs / 1000)}s`
|
||||
: "";
|
||||
const activeArchive = Number(progress.archivePercent ?? 0) > 0 ? 1 : 0;
|
||||
const currentDisplay = Math.max(0, Math.min(progress.total, progress.current + activeArchive));
|
||||
const overallLabel = `Entpacken ${progress.percent}% (${currentDisplay}/${progress.total})${archive}${elapsed}`;
|
||||
emitExtractStatus(overallLabel);
|
||||
}
|
||||
});
|
||||
logger.info(`Post-Processing Entpacken Ende: pkg=${pkg.name}, extracted=${result.extracted}, failed=${result.failed}, lastError=${result.lastError || ""}`);
|
||||
|
||||
@ -87,6 +87,7 @@ export interface ExtractOptions {
|
||||
skipPostCleanup?: boolean;
|
||||
packageId?: string;
|
||||
hybridMode?: boolean;
|
||||
maxParallel?: number;
|
||||
}
|
||||
|
||||
export interface ExtractProgressUpdate {
|
||||
@ -1905,8 +1906,11 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{
|
||||
|
||||
emitProgress(extracted, "", "extracting");
|
||||
|
||||
for (const archivePath of pendingCandidates) {
|
||||
if (options.signal?.aborted) {
|
||||
const maxParallel = Math.max(1, options.maxParallel || 1);
|
||||
let noExtractorEncountered = false;
|
||||
|
||||
const extractSingleArchive = async (archivePath: string): Promise<void> => {
|
||||
if (options.signal?.aborted || noExtractorEncountered) {
|
||||
throw new Error("aborted:extract");
|
||||
}
|
||||
const archiveName = path.basename(archivePath);
|
||||
@ -1930,7 +1934,8 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{
|
||||
const sig = await detectArchiveSignature(archivePath);
|
||||
if (!sig) {
|
||||
logger.info(`Generische Split-Datei übersprungen (keine Archiv-Signatur): ${archiveName}`);
|
||||
continue;
|
||||
clearInterval(pulseTimer);
|
||||
return;
|
||||
}
|
||||
logger.info(`Generische Split-Datei verifiziert (Signatur: ${sig}): ${archiveName}`);
|
||||
}
|
||||
@ -1994,23 +1999,60 @@ export async function extractPackageArchives(options: ExtractOptions): Promise<{
|
||||
failed += 1;
|
||||
const errorText = String(error);
|
||||
if (isExtractAbortError(errorText)) {
|
||||
throw new Error("aborted:extract");
|
||||
throw error;
|
||||
}
|
||||
lastError = errorText;
|
||||
const errorCategory = classifyExtractionError(errorText);
|
||||
logger.error(`Entpack-Fehler ${path.basename(archivePath)} [${errorCategory}]: ${errorText}`);
|
||||
emitProgress(extracted + failed, archiveName, "extracting", archivePercent, Date.now() - archiveStartedAt);
|
||||
if (isNoExtractorError(errorText)) {
|
||||
const remaining = candidates.length - (extracted + failed);
|
||||
if (remaining > 0) {
|
||||
failed += remaining;
|
||||
emitProgress(candidates.length, archiveName, "extracting", 0, Date.now() - archiveStartedAt);
|
||||
}
|
||||
break;
|
||||
noExtractorEncountered = true;
|
||||
}
|
||||
} finally {
|
||||
clearInterval(pulseTimer);
|
||||
}
|
||||
};
|
||||
|
||||
if (maxParallel <= 1) {
|
||||
for (const archivePath of pendingCandidates) {
|
||||
if (options.signal?.aborted || noExtractorEncountered) break;
|
||||
await extractSingleArchive(archivePath);
|
||||
}
|
||||
} else {
|
||||
// Parallel extraction pool: N workers pull from a shared queue
|
||||
const queue = [...pendingCandidates];
|
||||
let nextIdx = 0;
|
||||
let abortError: Error | null = null;
|
||||
|
||||
const worker = async (): Promise<void> => {
|
||||
while (nextIdx < queue.length && !abortError && !noExtractorEncountered) {
|
||||
if (options.signal?.aborted) break;
|
||||
const idx = nextIdx;
|
||||
nextIdx += 1;
|
||||
try {
|
||||
await extractSingleArchive(queue[idx]);
|
||||
} catch (error) {
|
||||
if (isExtractAbortError(String(error))) {
|
||||
abortError = error instanceof Error ? error : new Error(String(error));
|
||||
break;
|
||||
}
|
||||
// Non-abort errors are already handled inside extractSingleArchive
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const workerCount = Math.min(maxParallel, pendingCandidates.length);
|
||||
logger.info(`Parallele Extraktion: ${workerCount} gleichzeitige Worker für ${pendingCandidates.length} Archive`);
|
||||
await Promise.all(Array.from({ length: workerCount }, () => worker()));
|
||||
|
||||
if (abortError) throw new Error("aborted:extract");
|
||||
if (noExtractorEncountered) {
|
||||
const remaining = candidates.length - (extracted + failed);
|
||||
if (remaining > 0) {
|
||||
failed += remaining;
|
||||
emitProgress(candidates.length, "", "extracting", 0, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Nested extraction: extract archives found inside the output (1 level) ──
|
||||
|
||||
Loading…
Reference in New Issue
Block a user