Cross-Pipe Race-Fix: Rename + MKV-Move teilen jetzt einen per-Package Lock
v1.7.149 hat den Race ZWISCHEN parallelen Auto-Rename-Scans gefixt (autoRenameInFlight). Aber es gab noch einen anderen Race: - Hybrid-Pfad (Z.10952-66) feuert "fire-and-forget" rename->mkvMove - Deferred-Post-Process-Pfad (Z.11672/11748) feuert "awaited" rename + mkvMove Beide Pipes koennen GLEICHZEITIG fuer dasselbe Package laufen. Innerhalb einer Pipe ist rename->mkvMove sequentiell, aber Pipe A's mkvMove kann WAEHREND Pipe B's rename starten (nachdem die Rename-Serialisierung von v1.7.149 Pipe B entsperrt hat). Resultat: Pipe A bewegt File X aus extractDir, Pipe B's rename versucht File X umzubenennen → ENOENT, oder File landet mit altem Hoster-Namen in der Library. Fix: autoRenameInFlight wird zu packageFileOpChain generalisiert. Helper chainPackageFileOp(pkgId, fn) chained beliebige file-mutierende Ops auf das vorherige Promise. autoRenameExtractedVideoFiles benutzt es intern, und beide collectMkvFilesToLibrary-Aufrufstellen werden jetzt explizit durch denselben Chain geroutet. Effekt: pro Package laeuft maximal eine post-process Operation (rename ODER mkvMove) zu jeder Zeit, egal welche Pipe sie triggert. Tests: - "serializes rename and mkvMove across hybrid + deferred pipes": 4 chainPackageFileOp-Calls fuer dasselbe Package, max-concurrent == 1, Reihenfolge erhalten, Slot nach letztem Op geleert. - "chainPackageFileOp recovers from a failed op": Fehler im ersten Op bricht die Chain nicht — nachfolgende Ops laufen normal weiter. 584/584 Tests gruen.
This commit is contained in:
parent
84d02c5f98
commit
36ff1c5a86
@ -1685,17 +1685,18 @@ export class DownloadManager extends EventEmitter {
|
||||
|
||||
private itemContributedBytes = new Map<string, number>();
|
||||
|
||||
/** Per-package serialization for autoRenameExtractedVideoFiles. The hybrid-
|
||||
* extract path fires a fire-and-forget rename after every successful
|
||||
* archive iteration, and the deferred post-process path runs another
|
||||
* rename when post-processing finishes. For a multi-archive package
|
||||
* (e.g. 25 episodes) these can overlap, with two concurrent scans seeing
|
||||
* the same files, racing to rename, and producing "Ziel existiert" /
|
||||
* ENOENT noise plus occasionally missed renames. We chain subsequent
|
||||
* invocations onto the running promise so the second scan re-scans
|
||||
* AFTER the first finishes — picking up any newly-arrived files while
|
||||
* guaranteeing no two scans operate on the same fileset simultaneously. */
|
||||
private autoRenameInFlight = new Map<string, Promise<number>>();
|
||||
/** Per-package serialization for ALL post-process file operations that
|
||||
* touch files inside the package's extractDir — currently autoRename and
|
||||
* collectMkvFilesToLibrary. Previously only autoRename was serialized
|
||||
* (autoRenameInFlight), but the hybrid-extract pipe (fire-and-forget) and
|
||||
* the deferred-post-process pipe (top-level awaited) could interleave so
|
||||
* that pipe A's mkvMove ran while pipe B's rename was still scanning the
|
||||
* same dir → ENOENT and files moved to the library under their old
|
||||
* obfuscated names. By chaining both rename AND mkvMove onto the same
|
||||
* per-package promise we guarantee that at most one file-mutating
|
||||
* operation runs per package at any time, regardless of which pipe
|
||||
* triggered it. */
|
||||
private packageFileOpChain = new Map<string, Promise<unknown>>();
|
||||
|
||||
private runItemIds = new Set<string>();
|
||||
|
||||
@ -3669,34 +3670,35 @@ export class DownloadManager extends EventEmitter {
|
||||
return next;
|
||||
}
|
||||
|
||||
/** Serialize a file-mutating post-process operation per package. Both
|
||||
* autoRename and collectMkvFilesToLibrary touch files in extractDir;
|
||||
* running them concurrently (across the hybrid-extract and deferred-
|
||||
* post-process pipes) corrupts state. This helper chains every call
|
||||
* onto the previous one so at most one such op runs per package. */
|
||||
private chainPackageFileOp<T>(pkgId: string, fn: () => Promise<T>): Promise<T> {
|
||||
const previous = this.packageFileOpChain.get(pkgId);
|
||||
const result = (previous ?? Promise.resolve()).catch(() => undefined).then(fn);
|
||||
this.packageFileOpChain.set(pkgId, result);
|
||||
return result.finally(() => {
|
||||
// Only clear the slot if no newer chained call replaced us — keeps
|
||||
// the chain intact when several callers queue up at once.
|
||||
if (this.packageFileOpChain.get(pkgId) === result) {
|
||||
this.packageFileOpChain.delete(pkgId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async autoRenameExtractedVideoFiles(
|
||||
extractDir: string,
|
||||
pkg?: PackageEntry,
|
||||
shouldAbort?: () => boolean
|
||||
): Promise<number> {
|
||||
// Serialize per-package: chain onto any in-flight scan for the same
|
||||
// package so two scans never read the same fileset in parallel. Without
|
||||
// this, hybrid-extract's per-archive trigger + the deferred post-process
|
||||
// trigger frequently overlap and cause "Ziel existiert" / ENOENT
|
||||
// log noise (and occasionally a missed rename when the second scan's
|
||||
// chosen target file disappears between scan and rename).
|
||||
if (!pkg) {
|
||||
return this.autoRenameExtractedVideoFilesImpl(extractDir, undefined, shouldAbort);
|
||||
}
|
||||
const previous = this.autoRenameInFlight.get(pkg.id);
|
||||
const next = (previous ?? Promise.resolve(0)).catch(() => 0).then(() =>
|
||||
return this.chainPackageFileOp(pkg.id, () =>
|
||||
this.autoRenameExtractedVideoFilesImpl(extractDir, pkg, shouldAbort)
|
||||
);
|
||||
this.autoRenameInFlight.set(pkg.id, next);
|
||||
try {
|
||||
return await next;
|
||||
} finally {
|
||||
// Only clear the slot if no newer chained call took our place. This
|
||||
// keeps the chain intact when several callers queue up at once.
|
||||
if (this.autoRenameInFlight.get(pkg.id) === next) {
|
||||
this.autoRenameInFlight.delete(pkg.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async autoRenameExtractedVideoFilesImpl(
|
||||
@ -10952,6 +10954,10 @@ export class DownloadManager extends EventEmitter {
|
||||
if (result.extracted > 0) {
|
||||
// Fire-and-forget: rename then collect MKVs in background so the
|
||||
// slot is not blocked and the next archive set can start immediately.
|
||||
// Both operations route through chainPackageFileOp so they cannot
|
||||
// race with the deferred-post-process pipe's rename / mkvMove for
|
||||
// the same package — without that, hybrid mkvMove could move a
|
||||
// file while deferred rename was still scanning it (ENOENT).
|
||||
void (async () => {
|
||||
try {
|
||||
await this.autoRenameExtractedVideoFiles(pkg.extractDir, pkg);
|
||||
@ -10959,7 +10965,7 @@ export class DownloadManager extends EventEmitter {
|
||||
logger.warn(`Hybrid Auto-Rename Fehler: pkg=${pkg.name}, reason=${compactErrorText(err)}`);
|
||||
}
|
||||
try {
|
||||
await this.collectMkvFilesToLibrary(packageId, pkg);
|
||||
await this.chainPackageFileOp(pkg.id, () => this.collectMkvFilesToLibrary(packageId, pkg));
|
||||
} catch (err) {
|
||||
logger.warn(`Hybrid MKV-Collection Fehler: pkg=${pkg.name}, reason=${compactErrorText(err)}`);
|
||||
}
|
||||
@ -11745,7 +11751,9 @@ export class DownloadManager extends EventEmitter {
|
||||
throwIfAborted();
|
||||
pkg.postProcessLabel = "Verschiebe Videos...";
|
||||
this.emitState();
|
||||
await this.collectMkvFilesToLibrary(packageId, pkg, shouldAbort);
|
||||
// Route through chainPackageFileOp so this serializes against any
|
||||
// hybrid-pipe rename or mkvMove still pending for the same package.
|
||||
await this.chainPackageFileOp(pkg.id, () => this.collectMkvFilesToLibrary(packageId, pkg, shouldAbort));
|
||||
}
|
||||
|
||||
throwIfAborted();
|
||||
|
||||
@ -10455,4 +10455,67 @@ describe("download manager", () => {
|
||||
expect(files).not.toContain(ep.file);
|
||||
}
|
||||
});
|
||||
|
||||
it("serializes rename and mkvMove across hybrid + deferred pipes (no ENOENT from cross-pipe race)", async () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-crosspipe-"));
|
||||
tempDirs.push(root);
|
||||
const stateDir = path.join(root, "state");
|
||||
fs.mkdirSync(stateDir, { recursive: true });
|
||||
|
||||
const manager = new DownloadManager(
|
||||
{ ...defaultSettings(), token: "rd-token", outputDir: path.join(root, "out"), extractDir: path.join(root, "extract") },
|
||||
emptySession(),
|
||||
createStoragePaths(stateDir)
|
||||
);
|
||||
|
||||
// Both rename and mkvMove route through the SAME chain, so any pair of
|
||||
// invocations for the same package must run strictly sequentially —
|
||||
// even when they come from different call sites (hybrid + deferred).
|
||||
const pkgId = "crosspipe-pkg-1";
|
||||
let concurrent = 0;
|
||||
let maxConcurrent = 0;
|
||||
const op = async (ms: number): Promise<string> => {
|
||||
concurrent += 1;
|
||||
maxConcurrent = Math.max(maxConcurrent, concurrent);
|
||||
await new Promise((r) => setTimeout(r, ms));
|
||||
concurrent -= 1;
|
||||
return `done-${ms}`;
|
||||
};
|
||||
|
||||
const [r1, r2, r3, r4] = await Promise.all([
|
||||
(manager as any).chainPackageFileOp(pkgId, () => op(40)),
|
||||
(manager as any).chainPackageFileOp(pkgId, () => op(20)),
|
||||
(manager as any).chainPackageFileOp(pkgId, () => op(30)),
|
||||
(manager as any).chainPackageFileOp(pkgId, () => op(10))
|
||||
]);
|
||||
|
||||
expect(r1).toBe("done-40");
|
||||
expect(r2).toBe("done-20");
|
||||
expect(r3).toBe("done-30");
|
||||
expect(r4).toBe("done-10");
|
||||
// Crucial: never more than 1 operation in flight at a time.
|
||||
expect(maxConcurrent).toBe(1);
|
||||
// Chain slot cleared after the last op completed.
|
||||
expect((manager as any).packageFileOpChain.has(pkgId)).toBe(false);
|
||||
});
|
||||
|
||||
it("chainPackageFileOp recovers from a failed op so subsequent ops still run", async () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-chain-recover-"));
|
||||
tempDirs.push(root);
|
||||
const manager = new DownloadManager(
|
||||
{ ...defaultSettings(), token: "rd-token", outputDir: path.join(root, "out"), extractDir: path.join(root, "extract") },
|
||||
emptySession(),
|
||||
createStoragePaths(path.join(root, "state"))
|
||||
);
|
||||
const pkgId = "recover-pkg";
|
||||
const [failed, nextResult] = await Promise.allSettled([
|
||||
(manager as any).chainPackageFileOp(pkgId, async () => { throw new Error("boom"); }),
|
||||
(manager as any).chainPackageFileOp(pkgId, async () => "ok")
|
||||
]);
|
||||
expect(failed.status).toBe("rejected");
|
||||
expect(nextResult.status).toBe("fulfilled");
|
||||
if (nextResult.status === "fulfilled") {
|
||||
expect(nextResult.value).toBe("ok");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user