beta-real-debrid-downloader/src/main/download/post-processor.ts
Sucukdeluxe d0885ba552 Fix 16 bugs found by code review across all download modules
Critical fixes:
- Post-processor: remove double attempts increment (onProgress + onArchiveFailure both counted)
- Post-processor: fix slot leak when signal aborted after acquireSlot
- Scheduler: reset global watchdog high-water mark after stall event (prevents permanent misfires)
- Pipeline/DM: fix isPathInsideDir path traversal (add trailing separator check)
- Retry-manager: check per-kind exhaustion before shelve threshold (prevents bypass)
- Retry-manager: add MAX_SHELVE_COUNT=5 cap to prevent infinite shelve cycling

Important fixes:
- Scheduler: clear retryDelays and providerCooldowns on start()
- Scheduler: skip already-aborting slots in stall detection
- Download-manager: fix cleanupAfterExtraction using extractDir instead of outputDir for link removal
- Download-manager: add "extracting" to package normalizeSessionStatuses
- Download-manager: clear activeTasks map on stop()
- Download-manager: remove useless cachedDirectUrls re-insertion after success
- Stream-writer: remove duplicate truncation code in error path
- Stream-writer: skip alignedFlush in finally when bodyError already set (avoids 5min drain wait)
- Stream-writer: re-read elapsed after speed limiter sleep for accurate window reset
- Error-classifier: add HTTP 401 (Forbidden) and 410 (NotFound) classification

Tests updated to match new shelve/kind-exhaustion priority and 401 classification.
All 216 tests pass, build verified.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 18:33:06 +01:00

411 lines
13 KiB
TypeScript

/**
* post-processor.ts — Extraction state machine with bounded retries.
*
* Each archive has a clear state (pending → extracting → done/failed).
* No infinite loops: hard cap on retry count per archive.
* Redownload requests are emitted as events, not handled internally.
*/
import { EventEmitter } from "node:events";
import { DownloadError, DownloadErrorKind, classifyExtractionError } from "./error-classifier";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface ArchiveExtractionState {
archiveName: string;
status: "pending" | "extracting" | "done" | "failed";
attempts: number;
maxAttempts: number;
redownloaded: boolean;
lastError?: string;
lastErrorKind?: DownloadErrorKind;
}
export interface PackagePostProcessState {
packageId: string;
status: "idle" | "waiting" | "extracting" | "done" | "failed" | "aborted";
archives: Map<string, ArchiveExtractionState>;
startedAt: number;
completedAt?: number;
label?: string;
}
export interface PostProcessOptions {
packageDir: string;
extractDir: string;
cleanupMode: "none" | "trash" | "delete";
conflictMode: "overwrite" | "skip" | "rename" | "ask";
removeLinks: boolean;
removeSamples: boolean;
passwordList: string;
hybridMode: boolean;
maxParallelExtract: number;
extractCpuPriority: string;
signal: AbortSignal;
}
export interface ExtractProgressUpdate {
current: number;
total: number;
percent: number;
archiveName: string;
archivePercent?: number;
phase: "extracting" | "done" | "preparing";
archiveDone?: boolean;
archiveSuccess?: boolean;
}
export interface ExtractArchiveFailure {
archiveName: string;
errorText: string;
category: string;
suggestRedownload: boolean;
}
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const DEFAULT_MAX_EXTRACT_ATTEMPTS = 3;
const SLOT_POLL_INTERVAL_MS = 500;
// ---------------------------------------------------------------------------
// PostProcessor
// ---------------------------------------------------------------------------
export interface PostProcessorEvents {
progress: [{ packageId: string; update: ExtractProgressUpdate }];
"package-done": [{ packageId: string; success: boolean; errors: string[] }];
"archive-redownload": [{ packageId: string; archiveName: string; error: string }];
status: [{ packageId: string; label: string }];
}
export class PostProcessor extends EventEmitter {
private states = new Map<string, PackagePostProcessState>();
private abortControllers = new Map<string, AbortController>();
private activeTasks = new Map<string, Promise<void>>();
private activeSlots = 0;
private maxSlots: number;
private slotWaiters: Array<() => void> = [];
/** Extraction function — injected to avoid circular dependency. */
private extractFn: ((opts: any) => Promise<any>) | null = null;
/** Archive candidate finder. */
private findArchivesFn: ((dir: string) => string[] | Promise<string[]>) | null = null;
constructor(maxParallel: number = 2) {
super();
this.maxSlots = maxParallel;
}
/** Inject the extraction function (from extractor.ts). */
setExtractor(
extractFn: (opts: any) => Promise<any>,
findArchivesFn: (dir: string) => string[] | Promise<string[]>,
): void {
this.extractFn = extractFn;
this.findArchivesFn = findArchivesFn;
}
setMaxParallel(n: number): void {
this.maxSlots = Math.max(1, n);
}
/**
* Queue a package for post-processing.
* If already processing, mark for re-run (hybrid requeue).
*/
queuePackage(packageId: string, options: PostProcessOptions): void {
const existing = this.activeTasks.get(packageId);
if (existing) {
// Mark for requeue — current run will check after finishing
const state = this.states.get(packageId);
if (state) state.status = "waiting";
return;
}
const ac = new AbortController();
this.abortControllers.set(packageId, ac);
const combinedSignal = AbortSignal.any([options.signal, ac.signal]);
const task = this.runPostProcessing(packageId, { ...options, signal: combinedSignal });
this.activeTasks.set(packageId, task);
task.finally(() => {
this.activeTasks.delete(packageId);
this.abortControllers.delete(packageId);
});
}
/**
* Abort processing for a specific package.
*/
abortPackage(packageId: string): void {
const ac = this.abortControllers.get(packageId);
if (ac) ac.abort();
const state = this.states.get(packageId);
if (state) state.status = "aborted";
}
/**
* Abort all active post-processing.
*/
abortAll(): void {
for (const [id, ac] of this.abortControllers) {
ac.abort();
const state = this.states.get(id);
if (state) state.status = "aborted";
}
}
/**
* Retry extraction for a package (user-initiated).
*/
retryPackage(packageId: string, options: PostProcessOptions): void {
// Reset archive states
const state = this.states.get(packageId);
if (state) {
for (const archive of state.archives.values()) {
if (archive.status === "failed") {
archive.status = "pending";
archive.attempts = 0;
}
}
state.status = "idle";
}
this.queuePackage(packageId, options);
}
/**
* Get state for a package.
*/
getState(packageId: string): PackagePostProcessState | undefined {
return this.states.get(packageId);
}
/**
* Check if any processing is active.
*/
isActive(): boolean {
return this.activeTasks.size > 0;
}
/**
* Wait for all active tasks to complete.
*/
async waitAll(): Promise<void> {
await Promise.allSettled([...this.activeTasks.values()]);
}
// -----------------------------------------------------------------------
// Private
// -----------------------------------------------------------------------
private async runPostProcessing(packageId: string, options: PostProcessOptions): Promise<void> {
// Acquire slot
await this.acquireSlot(options.signal);
if (options.signal.aborted) {
this.releaseSlot();
return;
}
const state: PackagePostProcessState = this.states.get(packageId) || {
packageId,
status: "extracting",
archives: new Map(),
startedAt: Date.now(),
};
state.status = "extracting";
state.startedAt = Date.now();
this.states.set(packageId, state);
let round = 0;
const MAX_ROUNDS = 5; // Hard cap on requeue rounds
try {
do {
round++;
if (round > MAX_ROUNDS) {
state.label = `Max. Runden erreicht (${MAX_ROUNDS})`;
break;
}
this.emit("status", { packageId, label: `Entpacken Runde ${round}...` });
try {
await this.runExtractionRound(packageId, options, state);
} catch (error) {
if (options.signal.aborted) break;
const msg = error instanceof Error ? error.message : String(error);
state.label = `Fehler: ${msg}`;
this.emit("status", { packageId, label: state.label });
}
// Check if there are pending archives for another round
const hasPending = [...state.archives.values()].some(a => a.status === "pending");
if (!hasPending) break;
} while (!options.signal.aborted);
// Determine final status
const archives = [...state.archives.values()];
const allDone = archives.every(a => a.status === "done");
const anyFailed = archives.some(a => a.status === "failed");
const errors = archives
.filter(a => a.status === "failed")
.map(a => `${a.archiveName}: ${a.lastError || "Unbekannt"}`);
if (options.signal.aborted) {
state.status = "aborted";
} else if (allDone || archives.length === 0) {
state.status = "done";
} else {
state.status = "failed";
}
state.completedAt = Date.now();
this.emit("package-done", {
packageId,
success: state.status === "done",
errors,
});
} finally {
this.releaseSlot();
}
}
private async runExtractionRound(
packageId: string,
options: PostProcessOptions,
state: PackagePostProcessState,
): Promise<void> {
if (!this.extractFn || !this.findArchivesFn) {
throw new Error("Extractor not configured — call setExtractor()");
}
// Find archives
const archivePaths = await this.findArchivesFn(options.packageDir);
if (archivePaths.length === 0) {
state.label = "Keine Archive gefunden";
return;
}
// Initialize archive states for new archives
for (const archivePath of archivePaths) {
const name = archivePath;
if (!state.archives.has(name)) {
state.archives.set(name, {
archiveName: name,
status: "pending",
attempts: 0,
maxAttempts: DEFAULT_MAX_EXTRACT_ATTEMPTS,
redownloaded: false,
});
}
}
// Only extract pending archives
const pendingArchives = [...state.archives.values()]
.filter(a => a.status === "pending")
.map(a => a.archiveName);
if (pendingArchives.length === 0) return;
// Run extraction
const failures: ExtractArchiveFailure[] = [];
await this.extractFn({
packageDir: options.packageDir,
targetDir: options.extractDir,
cleanupMode: options.cleanupMode,
conflictMode: options.conflictMode,
removeLinks: options.removeLinks,
removeSamples: options.removeSamples,
passwordList: options.passwordList,
signal: options.signal,
hybridMode: options.hybridMode,
maxParallel: options.maxParallelExtract,
extractCpuPriority: options.extractCpuPriority,
packageId,
onlyArchives: new Set(pendingArchives),
onProgress: (update: ExtractProgressUpdate) => {
this.emit("progress", { packageId, update });
// Track individual archive completion
if (update.archiveDone) {
const archiveState = state.archives.get(update.archiveName);
if (archiveState && update.archiveSuccess) {
archiveState.attempts++;
archiveState.status = "done";
}
// If not success, onArchiveFailure will handle it (and increment attempts)
}
},
onArchiveFailure: (failure: ExtractArchiveFailure) => {
failures.push(failure);
const archiveState = state.archives.get(failure.archiveName);
if (!archiveState) return;
const error = classifyExtractionError(failure.errorText, failure.category);
archiveState.lastError = failure.errorText;
archiveState.lastErrorKind = error.kind;
archiveState.attempts++;
// Decide: retry, redownload, or fail permanently
if (archiveState.attempts >= archiveState.maxAttempts) {
// Max attempts reached
if (error.kind === DownloadErrorKind.ArchiveCorrupt && !archiveState.redownloaded && failure.suggestRedownload) {
// Request redownload (max once per archive)
archiveState.redownloaded = true;
archiveState.attempts = 0; // Reset for redownloaded archive
archiveState.status = "pending";
this.emit("archive-redownload", {
packageId,
archiveName: failure.archiveName,
error: failure.errorText,
});
} else {
archiveState.status = "failed";
}
} else {
// Still have attempts left — mark as pending for next round
archiveState.status = "pending";
}
},
});
}
// -----------------------------------------------------------------------
// Slot management
// -----------------------------------------------------------------------
private async acquireSlot(signal: AbortSignal): Promise<void> {
while (this.activeSlots >= this.maxSlots) {
if (signal.aborted) return;
await new Promise<void>(resolve => {
this.slotWaiters.push(resolve);
// Also poll in case signal gets aborted
const timer = setTimeout(() => {
const idx = this.slotWaiters.indexOf(resolve);
if (idx >= 0) this.slotWaiters.splice(idx, 1);
resolve();
}, SLOT_POLL_INTERVAL_MS);
// Clean up timer if resolved normally
const originalResolve = resolve;
// Just let the poll handle it
});
}
this.activeSlots++;
}
private releaseSlot(): void {
this.activeSlots = Math.max(0, this.activeSlots - 1);
const waiter = this.slotWaiters.shift();
if (waiter) waiter();
}
}