Add parallel extraction (2 concurrent) and better status labels

- Replace serial packagePostProcessQueue with semaphore (max 2 concurrent)
- Hybrid-extract: items waiting for parts show "Entpacken - Warten auf Parts"
- Failed hybrid extraction shows "Entpacken - Error" instead of "Fertig"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sucukdeluxe 2026-03-03 18:15:57 +01:00
parent 8cc1f788ad
commit 9b460758f9

View File

@ -773,6 +773,10 @@ export class DownloadManager extends EventEmitter {
private packagePostProcessQueue: Promise<void> = Promise.resolve(); private packagePostProcessQueue: Promise<void> = Promise.resolve();
private packagePostProcessActive = 0;
private packagePostProcessWaiters: Array<() => void> = [];
private packagePostProcessTasks = new Map<string, Promise<void>>(); private packagePostProcessTasks = new Map<string, Promise<void>>();
private packagePostProcessAbortControllers = new Map<string, AbortController>(); private packagePostProcessAbortControllers = new Map<string, AbortController>();
@ -1170,6 +1174,9 @@ export class DownloadManager extends EventEmitter {
this.packagePostProcessAbortControllers.clear(); this.packagePostProcessAbortControllers.clear();
this.hybridExtractRequeue.clear(); this.hybridExtractRequeue.clear();
this.packagePostProcessQueue = Promise.resolve(); this.packagePostProcessQueue = Promise.resolve();
this.packagePostProcessActive = 0;
for (const waiter of this.packagePostProcessWaiters) { waiter(); }
this.packagePostProcessWaiters = [];
this.summary = null; this.summary = null;
this.nonResumableActive = 0; this.nonResumableActive = 0;
this.retryAfterByItem.clear(); this.retryAfterByItem.clear();
@ -2989,6 +2996,26 @@ export class DownloadManager extends EventEmitter {
} }
} }
private async acquirePostProcessSlot(): Promise<void> {
const maxConcurrent = 2;
if (this.packagePostProcessActive < maxConcurrent) {
this.packagePostProcessActive += 1;
return;
}
await new Promise<void>((resolve) => {
this.packagePostProcessWaiters.push(resolve);
});
this.packagePostProcessActive += 1;
}
private releasePostProcessSlot(): void {
this.packagePostProcessActive -= 1;
const next = this.packagePostProcessWaiters.shift();
if (next) {
next();
}
}
private runPackagePostProcessing(packageId: string): Promise<void> { private runPackagePostProcessing(packageId: string): Promise<void> {
const existing = this.packagePostProcessTasks.get(packageId); const existing = this.packagePostProcessTasks.get(packageId);
if (existing) { if (existing) {
@ -2999,15 +3026,14 @@ export class DownloadManager extends EventEmitter {
const abortController = new AbortController(); const abortController = new AbortController();
this.packagePostProcessAbortControllers.set(packageId, abortController); this.packagePostProcessAbortControllers.set(packageId, abortController);
const task = this.packagePostProcessQueue const task = (async () => {
.catch(() => undefined) await this.acquirePostProcessSlot();
.then(async () => { try {
await this.handlePackagePostProcessing(packageId, abortController.signal); await this.handlePackagePostProcessing(packageId, abortController.signal);
}) } catch (error) {
.catch((error) => {
logger.warn(`Post-Processing für Paket fehlgeschlagen: ${compactErrorText(error)}`); logger.warn(`Post-Processing für Paket fehlgeschlagen: ${compactErrorText(error)}`);
}) } finally {
.finally(() => { this.releasePostProcessSlot();
this.packagePostProcessTasks.delete(packageId); this.packagePostProcessTasks.delete(packageId);
this.packagePostProcessAbortControllers.delete(packageId); this.packagePostProcessAbortControllers.delete(packageId);
this.persistSoon(); this.persistSoon();
@ -3017,10 +3043,10 @@ export class DownloadManager extends EventEmitter {
logger.warn(`runPackagePostProcessing Fehler (hybridRequeue): ${compactErrorText(err)}`) logger.warn(`runPackagePostProcessing Fehler (hybridRequeue): ${compactErrorText(err)}`)
); );
} }
}); }
})();
this.packagePostProcessTasks.set(packageId, task); this.packagePostProcessTasks.set(packageId, task);
this.packagePostProcessQueue = task;
return task; return task;
} }
@ -5229,12 +5255,18 @@ export class DownloadManager extends EventEmitter {
this.emitState(); this.emitState();
}; };
// Mark items not yet being extracted as pending // Mark hybrid items as pending, others as waiting for parts
for (const entry of hybridItems) { const hybridItemIds = new Set(hybridItems.map((item) => item.id));
if (!isExtractedLabel(entry.fullStatus)) { for (const entry of completedItems) {
entry.fullStatus = "Entpacken - Ausstehend"; if (isExtractedLabel(entry.fullStatus)) {
entry.updatedAt = nowMs(); continue;
} }
if (hybridItemIds.has(entry.id)) {
entry.fullStatus = "Entpacken - Ausstehend";
} else {
entry.fullStatus = "Entpacken - Warten auf Parts";
}
entry.updatedAt = nowMs();
} }
this.emitState(); this.emitState();
@ -5302,7 +5334,7 @@ export class DownloadManager extends EventEmitter {
if (result.extracted > 0 && result.failed === 0) { if (result.extracted > 0 && result.failed === 0) {
entry.fullStatus = "Entpackt - Done"; entry.fullStatus = "Entpackt - Done";
} else { } else {
entry.fullStatus = `Fertig (${humanSize(entry.downloadedBytes)})`; entry.fullStatus = "Entpacken - Error";
} }
entry.updatedAt = updatedAt; entry.updatedAt = updatedAt;
} }