diff --git a/src/main/download-manager.ts b/src/main/download-manager.ts index bd52e5a..040c051 100644 --- a/src/main/download-manager.ts +++ b/src/main/download-manager.ts @@ -1685,17 +1685,18 @@ export class DownloadManager extends EventEmitter { private itemContributedBytes = new Map(); - /** Per-package serialization for autoRenameExtractedVideoFiles. The hybrid- - * extract path fires a fire-and-forget rename after every successful - * archive iteration, and the deferred post-process path runs another - * rename when post-processing finishes. For a multi-archive package - * (e.g. 25 episodes) these can overlap, with two concurrent scans seeing - * the same files, racing to rename, and producing "Ziel existiert" / - * ENOENT noise plus occasionally missed renames. We chain subsequent - * invocations onto the running promise so the second scan re-scans - * AFTER the first finishes — picking up any newly-arrived files while - * guaranteeing no two scans operate on the same fileset simultaneously. */ - private autoRenameInFlight = new Map>(); + /** Per-package serialization for ALL post-process file operations that + * touch files inside the package's extractDir — currently autoRename and + * collectMkvFilesToLibrary. Previously only autoRename was serialized + * (autoRenameInFlight), but the hybrid-extract pipe (fire-and-forget) and + * the deferred-post-process pipe (top-level awaited) could interleave so + * that pipe A's mkvMove ran while pipe B's rename was still scanning the + * same dir → ENOENT and files moved to the library under their old + * obfuscated names. By chaining both rename AND mkvMove onto the same + * per-package promise we guarantee that at most one file-mutating + * operation runs per package at any time, regardless of which pipe + * triggered it. */ + private packageFileOpChain = new Map>(); private runItemIds = new Set(); @@ -3669,34 +3670,35 @@ export class DownloadManager extends EventEmitter { return next; } + /** Serialize a file-mutating post-process operation per package. Both + * autoRename and collectMkvFilesToLibrary touch files in extractDir; + * running them concurrently (across the hybrid-extract and deferred- + * post-process pipes) corrupts state. This helper chains every call + * onto the previous one so at most one such op runs per package. */ + private chainPackageFileOp(pkgId: string, fn: () => Promise): Promise { + const previous = this.packageFileOpChain.get(pkgId); + const result = (previous ?? Promise.resolve()).catch(() => undefined).then(fn); + this.packageFileOpChain.set(pkgId, result); + return result.finally(() => { + // Only clear the slot if no newer chained call replaced us — keeps + // the chain intact when several callers queue up at once. + if (this.packageFileOpChain.get(pkgId) === result) { + this.packageFileOpChain.delete(pkgId); + } + }); + } + private async autoRenameExtractedVideoFiles( extractDir: string, pkg?: PackageEntry, shouldAbort?: () => boolean ): Promise { - // Serialize per-package: chain onto any in-flight scan for the same - // package so two scans never read the same fileset in parallel. Without - // this, hybrid-extract's per-archive trigger + the deferred post-process - // trigger frequently overlap and cause "Ziel existiert" / ENOENT - // log noise (and occasionally a missed rename when the second scan's - // chosen target file disappears between scan and rename). if (!pkg) { return this.autoRenameExtractedVideoFilesImpl(extractDir, undefined, shouldAbort); } - const previous = this.autoRenameInFlight.get(pkg.id); - const next = (previous ?? Promise.resolve(0)).catch(() => 0).then(() => + return this.chainPackageFileOp(pkg.id, () => this.autoRenameExtractedVideoFilesImpl(extractDir, pkg, shouldAbort) ); - this.autoRenameInFlight.set(pkg.id, next); - try { - return await next; - } finally { - // Only clear the slot if no newer chained call took our place. This - // keeps the chain intact when several callers queue up at once. - if (this.autoRenameInFlight.get(pkg.id) === next) { - this.autoRenameInFlight.delete(pkg.id); - } - } } private async autoRenameExtractedVideoFilesImpl( @@ -10952,6 +10954,10 @@ export class DownloadManager extends EventEmitter { if (result.extracted > 0) { // Fire-and-forget: rename then collect MKVs in background so the // slot is not blocked and the next archive set can start immediately. + // Both operations route through chainPackageFileOp so they cannot + // race with the deferred-post-process pipe's rename / mkvMove for + // the same package — without that, hybrid mkvMove could move a + // file while deferred rename was still scanning it (ENOENT). void (async () => { try { await this.autoRenameExtractedVideoFiles(pkg.extractDir, pkg); @@ -10959,7 +10965,7 @@ export class DownloadManager extends EventEmitter { logger.warn(`Hybrid Auto-Rename Fehler: pkg=${pkg.name}, reason=${compactErrorText(err)}`); } try { - await this.collectMkvFilesToLibrary(packageId, pkg); + await this.chainPackageFileOp(pkg.id, () => this.collectMkvFilesToLibrary(packageId, pkg)); } catch (err) { logger.warn(`Hybrid MKV-Collection Fehler: pkg=${pkg.name}, reason=${compactErrorText(err)}`); } @@ -11745,7 +11751,9 @@ export class DownloadManager extends EventEmitter { throwIfAborted(); pkg.postProcessLabel = "Verschiebe Videos..."; this.emitState(); - await this.collectMkvFilesToLibrary(packageId, pkg, shouldAbort); + // Route through chainPackageFileOp so this serializes against any + // hybrid-pipe rename or mkvMove still pending for the same package. + await this.chainPackageFileOp(pkg.id, () => this.collectMkvFilesToLibrary(packageId, pkg, shouldAbort)); } throwIfAborted(); diff --git a/tests/download-manager.test.ts b/tests/download-manager.test.ts index 6d10110..9fd771d 100644 --- a/tests/download-manager.test.ts +++ b/tests/download-manager.test.ts @@ -10455,4 +10455,67 @@ describe("download manager", () => { expect(files).not.toContain(ep.file); } }); + + it("serializes rename and mkvMove across hybrid + deferred pipes (no ENOENT from cross-pipe race)", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-crosspipe-")); + tempDirs.push(root); + const stateDir = path.join(root, "state"); + fs.mkdirSync(stateDir, { recursive: true }); + + const manager = new DownloadManager( + { ...defaultSettings(), token: "rd-token", outputDir: path.join(root, "out"), extractDir: path.join(root, "extract") }, + emptySession(), + createStoragePaths(stateDir) + ); + + // Both rename and mkvMove route through the SAME chain, so any pair of + // invocations for the same package must run strictly sequentially — + // even when they come from different call sites (hybrid + deferred). + const pkgId = "crosspipe-pkg-1"; + let concurrent = 0; + let maxConcurrent = 0; + const op = async (ms: number): Promise => { + concurrent += 1; + maxConcurrent = Math.max(maxConcurrent, concurrent); + await new Promise((r) => setTimeout(r, ms)); + concurrent -= 1; + return `done-${ms}`; + }; + + const [r1, r2, r3, r4] = await Promise.all([ + (manager as any).chainPackageFileOp(pkgId, () => op(40)), + (manager as any).chainPackageFileOp(pkgId, () => op(20)), + (manager as any).chainPackageFileOp(pkgId, () => op(30)), + (manager as any).chainPackageFileOp(pkgId, () => op(10)) + ]); + + expect(r1).toBe("done-40"); + expect(r2).toBe("done-20"); + expect(r3).toBe("done-30"); + expect(r4).toBe("done-10"); + // Crucial: never more than 1 operation in flight at a time. + expect(maxConcurrent).toBe(1); + // Chain slot cleared after the last op completed. + expect((manager as any).packageFileOpChain.has(pkgId)).toBe(false); + }); + + it("chainPackageFileOp recovers from a failed op so subsequent ops still run", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-chain-recover-")); + tempDirs.push(root); + const manager = new DownloadManager( + { ...defaultSettings(), token: "rd-token", outputDir: path.join(root, "out"), extractDir: path.join(root, "extract") }, + emptySession(), + createStoragePaths(path.join(root, "state")) + ); + const pkgId = "recover-pkg"; + const [failed, nextResult] = await Promise.allSettled([ + (manager as any).chainPackageFileOp(pkgId, async () => { throw new Error("boom"); }), + (manager as any).chainPackageFileOp(pkgId, async () => "ok") + ]); + expect(failed.status).toBe("rejected"); + expect(nextResult.status).toBe("fulfilled"); + if (nextResult.status === "fulfilled") { + expect(nextResult.value).toBe("ok"); + } + }); });