Fix stop→start race conditions causing potential hang
- Add scheduler generation counter to prevent stale scheduler from continuing after stop/start cycle - Guard processItem stop-abort handler: skip status overwrite when a new start() has already re-activated the session - Yield in start() after recoverRetryableItems to let pending abort handlers complete before evaluating item states - Add test: rapid stop → disable provider → start must resolve Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3032604496
commit
307dcf0815
@ -1060,6 +1060,7 @@ export class DownloadManager extends EventEmitter {
|
||||
private activeTasks = new Map<string, ActiveTask>();
|
||||
|
||||
private scheduleRunning = false;
|
||||
private schedulerGeneration = 0;
|
||||
|
||||
private persistTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
@ -3348,12 +3349,20 @@ export class DownloadManager extends EventEmitter {
|
||||
if (this.session.running) {
|
||||
return;
|
||||
}
|
||||
// Bump scheduler generation so any old scheduler from a previous run exits
|
||||
// instead of continuing with stale state.
|
||||
this.schedulerGeneration += 1;
|
||||
|
||||
// Set running early to prevent concurrent start() calls from passing the guard
|
||||
// while we await recoverRetryableItems below.
|
||||
this.session.running = true;
|
||||
|
||||
const recoveredItems = await this.recoverRetryableItems("start");
|
||||
|
||||
// Yield once more to let any pending abort handlers from the previous stop()
|
||||
// complete — they check this.session.running and skip status overwrite if true.
|
||||
await sleep(0);
|
||||
|
||||
let recoveredStoppedItems = 0;
|
||||
for (const item of Object.values(this.session.items)) {
|
||||
if (item.status !== "cancelled" || item.fullStatus !== "Gestoppt") {
|
||||
@ -3478,6 +3487,7 @@ export class DownloadManager extends EventEmitter {
|
||||
|
||||
public stop(): void {
|
||||
const keepExtraction = this.settings.autoExtractWhenStopped;
|
||||
this.schedulerGeneration += 1;
|
||||
this.session.running = false;
|
||||
this.session.paused = false;
|
||||
this.session.reconnectUntil = 0;
|
||||
@ -5210,9 +5220,10 @@ export class DownloadManager extends EventEmitter {
|
||||
return;
|
||||
}
|
||||
this.scheduleRunning = true;
|
||||
logger.info("Scheduler gestartet");
|
||||
const myGeneration = this.schedulerGeneration;
|
||||
logger.info(`Scheduler gestartet (gen=${myGeneration})`);
|
||||
try {
|
||||
while (this.session.running) {
|
||||
while (this.session.running && this.schedulerGeneration === myGeneration) {
|
||||
const now = nowMs();
|
||||
if (now - this.lastSchedulerHeartbeatAt >= 60000) {
|
||||
this.lastSchedulerHeartbeatAt = now;
|
||||
@ -5265,7 +5276,7 @@ export class DownloadManager extends EventEmitter {
|
||||
}
|
||||
} finally {
|
||||
this.scheduleRunning = false;
|
||||
logger.info("Scheduler beendet");
|
||||
logger.info(`Scheduler beendet (gen=${myGeneration})`);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5887,9 +5898,13 @@ export class DownloadManager extends EventEmitter {
|
||||
this.dropItemContribution(item.id);
|
||||
this.retryStateByItem.delete(item.id);
|
||||
} else if (reason === "stop") {
|
||||
// If a new start() has already re-queued this item, don't overwrite
|
||||
// its status with "cancelled"/"Gestoppt" — the new run owns it now.
|
||||
if (!this.session.running) {
|
||||
item.status = "cancelled";
|
||||
item.fullStatus = "Gestoppt";
|
||||
this.recordRunOutcome(item.id, "cancelled");
|
||||
}
|
||||
if (!active.resumable && claimedTargetPath && !fs.existsSync(claimedTargetPath)) {
|
||||
item.downloadedBytes = 0;
|
||||
item.progressPercent = 0;
|
||||
|
||||
@ -5582,4 +5582,103 @@ describe("download manager", () => {
|
||||
expect(internal.settings.debridLinkApiKeyDailyUsageBytes[firstKey.id]).toBe(1024);
|
||||
expect(internal.settings.debridLinkApiKeyDailyUsageBytes[secondKey.id]).toBe(512);
|
||||
});
|
||||
|
||||
it("does not hang when rapid stop, disable provider, start", async () => {
|
||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||
tempDirs.push(root);
|
||||
const binary = Buffer.alloc(256 * 1024, 7);
|
||||
|
||||
// Slow server: delivers data in chunks with delay
|
||||
const server = http.createServer((req, res) => {
|
||||
if ((req.url || "") !== "/slow-dl") {
|
||||
res.statusCode = 404;
|
||||
res.end("not-found");
|
||||
return;
|
||||
}
|
||||
res.statusCode = 200;
|
||||
res.setHeader("Accept-Ranges", "bytes");
|
||||
res.setHeader("Content-Length", String(binary.length));
|
||||
// Send first half, then delay
|
||||
res.write(binary.subarray(0, Math.floor(binary.length / 4)));
|
||||
const timer = setTimeout(() => {
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.end(binary.subarray(Math.floor(binary.length / 4)));
|
||||
}
|
||||
}, 5000);
|
||||
res.on("close", () => clearTimeout(timer));
|
||||
});
|
||||
|
||||
server.listen(0, "127.0.0.1");
|
||||
await once(server, "listening");
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("server address unavailable");
|
||||
}
|
||||
const directUrl = `http://127.0.0.1:${address.port}/slow-dl`;
|
||||
|
||||
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
|
||||
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
|
||||
if (url.includes("/unrestrict/link")) {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
download: directUrl,
|
||||
filename: "test-file.bin",
|
||||
filesize: binary.length
|
||||
}),
|
||||
{
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/json" }
|
||||
}
|
||||
);
|
||||
}
|
||||
return originalFetch(input, init);
|
||||
};
|
||||
|
||||
const settings = {
|
||||
...defaultSettings(),
|
||||
token: "rd-token",
|
||||
outputDir: path.join(root, "downloads"),
|
||||
extractDir: path.join(root, "extract"),
|
||||
autoExtract: false,
|
||||
maxParallel: 1,
|
||||
autoReconnect: false,
|
||||
retryLimit: 1
|
||||
};
|
||||
|
||||
try {
|
||||
const manager = new DownloadManager(
|
||||
settings,
|
||||
emptySession(),
|
||||
createStoragePaths(path.join(root, "state"))
|
||||
);
|
||||
|
||||
manager.addPackages([{ name: "hang-test", links: ["https://dummy/hang-test"] }]);
|
||||
|
||||
// Step 1: Start and wait for download to begin
|
||||
await manager.start();
|
||||
await waitFor(() => {
|
||||
const items = Object.values(manager.getSnapshot().session.items);
|
||||
return items.some((item) => item.status === "downloading");
|
||||
}, 12000);
|
||||
|
||||
// Step 2: Stop — do NOT wait for running=false
|
||||
manager.stop();
|
||||
|
||||
// Step 3: Immediately disable the active provider
|
||||
manager.setSettings({
|
||||
...settings,
|
||||
disabledProviders: ["realdebrid"]
|
||||
});
|
||||
|
||||
// Step 4: Start again immediately — must resolve (not hang)
|
||||
const startPromise = manager.start();
|
||||
const timeout = new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 8000));
|
||||
const result = await Promise.race([startPromise.then(() => "ok" as const), timeout]);
|
||||
expect(result).toBe("ok");
|
||||
} finally {
|
||||
server.close();
|
||||
await once(server, "close");
|
||||
}
|
||||
}, 30000);
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user