From 0a99d3c584c486d002b288f7dacb6b5e6c5b551f Mon Sep 17 00:00:00 2001 From: Sucukdeluxe Date: Fri, 27 Feb 2026 14:55:31 +0100 Subject: [PATCH] Fix stuck queue scheduling and auto-recover stalled streams v1.3.6 --- package.json | 2 +- src/main/download-manager.ts | 79 +++++++++++- tests/download-manager.test.ts | 214 +++++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index a901917..97b6b02 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "real-debrid-downloader", - "version": "1.3.5", + "version": "1.3.6", "description": "Real-Debrid Downloader Desktop (Electron + React + TypeScript)", "main": "build/main/main/main.js", "author": "Sucukdeluxe", diff --git a/src/main/download-manager.ts b/src/main/download-manager.ts index f579036..63d3ba1 100644 --- a/src/main/download-manager.ts +++ b/src/main/download-manager.ts @@ -17,12 +17,22 @@ type ActiveTask = { itemId: string; packageId: string; abortController: AbortController; - abortReason: "stop" | "cancel" | "reconnect" | "package_toggle" | "none"; + abortReason: "stop" | "cancel" | "reconnect" | "package_toggle" | "stall" | "none"; resumable: boolean; speedEvents: Array<{ at: number; bytes: number }>; nonResumableCounted: boolean; }; +const DEFAULT_DOWNLOAD_STALL_TIMEOUT_MS = 120000; + +function getDownloadStallTimeoutMs(): number { + const fromEnv = Number(process.env.RD_STALL_TIMEOUT_MS ?? NaN); + if (Number.isFinite(fromEnv) && fromEnv >= 2000 && fromEnv <= 600000) { + return Math.floor(fromEnv); + } + return DEFAULT_DOWNLOAD_STALL_TIMEOUT_MS; +} + type DownloadManagerOptions = { megaWebUnrestrict?: MegaWebUnrestrictor; }; @@ -953,7 +963,7 @@ export class DownloadManager extends EventEmitter { this.startItem(next.packageId, next.itemId); } - if (this.activeTasks.size === 0 && !this.hasQueuedItems()) { + if (this.activeTasks.size === 0 && !this.hasQueuedItems() && this.packagePostProcessTasks.size === 0) { this.finishRun(); break; } @@ -1086,6 +1096,7 @@ export class DownloadManager extends EventEmitter { } let freshRetryUsed = false; + let stallRetries = 0; while (true) { try { const unrestricted = await this.debridService.unrestrictLink(item.url); @@ -1160,8 +1171,11 @@ export class DownloadManager extends EventEmitter { pkg.updatedAt = nowMs(); this.recordRunOutcome(item.id, "completed"); - await this.runPackagePostProcessing(pkg.id); - this.applyCompletedCleanupPolicy(pkg.id, item.id); + void this.runPackagePostProcessing(pkg.id).finally(() => { + this.applyCompletedCleanupPolicy(pkg.id, item.id); + this.persistSoon(); + this.emitState(); + }); this.persistSoon(); this.emitState(); return; @@ -1192,6 +1206,26 @@ export class DownloadManager extends EventEmitter { item.status = "queued"; item.speedBps = 0; item.fullStatus = "Paket gestoppt"; + } else if (reason === "stall") { + stallRetries += 1; + if (stallRetries <= 2) { + item.status = "queued"; + item.speedBps = 0; + item.fullStatus = `Keine Daten empfangen, Retry ${stallRetries}/2`; + item.lastError = ""; + item.attempts = 0; + item.updatedAt = nowMs(); + active.abortController = new AbortController(); + active.abortReason = "none"; + this.persistSoon(); + this.emitState(); + await sleep(350 * stallRetries); + continue; + } + item.status = "failed"; + item.lastError = "Download hing wiederholt"; + item.fullStatus = `Fehler: ${item.lastError}`; + this.recordRunOutcome(item.id, "failed"); } else { const errorText = compactErrorText(error); const shouldFreshRetry = !freshRetryUsed && isFetchFailure(errorText); @@ -1366,8 +1400,43 @@ export class DownloadManager extends EventEmitter { throw new Error("Leerer Response-Body"); } const reader = body.getReader(); + const stallTimeoutMs = getDownloadStallTimeoutMs(); + const readWithTimeout = async (): Promise> => { + if (stallTimeoutMs <= 0) { + return reader.read(); + } + return new Promise>((resolve, reject) => { + let settled = false; + const timer = setTimeout(() => { + if (settled) { + return; + } + settled = true; + active.abortReason = "stall"; + active.abortController.abort("stall"); + reject(new Error("stall_timeout")); + }, stallTimeoutMs); + + reader.read().then((result) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve(result); + }).catch((error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject(error); + }); + }); + }; + while (true) { - const { done, value } = await reader.read(); + const { done, value } = await readWithTimeout(); if (done) { break; } diff --git a/tests/download-manager.test.ts b/tests/download-manager.test.ts index e1df7c7..e4f9088 100644 --- a/tests/download-manager.test.ts +++ b/tests/download-manager.test.ts @@ -204,6 +204,220 @@ describe("download manager", () => { } }); + it("continues downloading while package post-processing is pending", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); + tempDirs.push(root); + const binary = Buffer.alloc(80 * 1024, 7); + + const server = http.createServer((req, res) => { + const route = req.url || ""; + if (route !== "/first" && route !== "/second") { + res.statusCode = 404; + res.end("not-found"); + return; + } + res.statusCode = 200; + res.setHeader("Accept-Ranges", "bytes"); + res.setHeader("Content-Length", String(binary.length)); + res.end(binary); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("server address unavailable"); + } + const firstUrl = `http://127.0.0.1:${address.port}/first`; + const secondUrl = `http://127.0.0.1:${address.port}/second`; + + globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.includes("/unrestrict/link")) { + const body = init?.body; + const bodyText = body instanceof URLSearchParams ? body.toString() : String(body || ""); + const originalLink = new URLSearchParams(bodyText).get("link") || ""; + const directUrl = originalLink.includes("second") ? secondUrl : firstUrl; + const filename = originalLink.includes("second") ? "second.bin" : "first.bin"; + return new Response( + JSON.stringify({ + download: directUrl, + filename, + filesize: binary.length + }), + { + status: 200, + headers: { "Content-Type": "application/json" } + } + ); + } + return originalFetch(input, init); + }; + + let releaseBlockedPostProcess: ((value?: void | PromiseLike) => void) | undefined; + try { + const manager = new DownloadManager( + { + ...defaultSettings(), + token: "rd-token", + outputDir: path.join(root, "downloads"), + extractDir: path.join(root, "extract"), + autoExtract: false, + maxParallel: 1 + }, + emptySession(), + createStoragePaths(path.join(root, "state")) + ); + + const blocker = new Promise((resolve) => { + releaseBlockedPostProcess = resolve; + }); + (manager as unknown as { packagePostProcessQueue: Promise }).packagePostProcessQueue = blocker; + + manager.addPackages([ + { name: "first", links: ["https://dummy/first"] }, + { name: "second", links: ["https://dummy/second"] } + ]); + + const initial = manager.getSnapshot(); + const firstPackage = initial.session.packageOrder[0]; + const secondPackage = initial.session.packageOrder[1]; + const firstItem = initial.session.packages[firstPackage]?.itemIds[0] || ""; + const secondItem = initial.session.packages[secondPackage]?.itemIds[0] || ""; + + manager.start(); + + await waitFor(() => manager.getSnapshot().session.items[firstItem]?.status === "completed", 12000); + await waitFor(() => { + const state = manager.getSnapshot().session.items[secondItem]?.status; + return state === "validating" || state === "downloading" || state === "integrity_check" || state === "completed"; + }, 6000); + + if (releaseBlockedPostProcess) { + releaseBlockedPostProcess(); + } + await waitFor(() => !manager.getSnapshot().session.running, 25000); + + const done = manager.getSnapshot(); + expect(done.session.items[firstItem]?.status).toBe("completed"); + expect(done.session.items[secondItem]?.status).toBe("completed"); + } finally { + if (releaseBlockedPostProcess) { + releaseBlockedPostProcess(); + } + server.close(); + await once(server, "close"); + } + }); + + it("recovers from stalled download streams without manual pause/resume", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); + tempDirs.push(root); + const binary = Buffer.alloc(300 * 1024, 17); + const previousStallTimeout = process.env.RD_STALL_TIMEOUT_MS; + process.env.RD_STALL_TIMEOUT_MS = "2500"; + let directCalls = 0; + + const server = http.createServer((req, res) => { + if ((req.url || "") !== "/stall") { + res.statusCode = 404; + res.end("not-found"); + return; + } + + directCalls += 1; + const range = String(req.headers.range || ""); + const match = range.match(/bytes=(\d+)-/i); + const start = match ? Number(match[1]) : 0; + + if (directCalls === 1 && start === 0) { + const firstChunk = Math.floor(binary.length / 3); + res.statusCode = 200; + res.setHeader("Accept-Ranges", "bytes"); + res.setHeader("Content-Length", String(binary.length)); + res.write(binary.subarray(0, firstChunk)); + setTimeout(() => { + if (!res.writableEnded && !res.destroyed) { + res.end(binary.subarray(firstChunk)); + } + }, 5000); + return; + } + + const chunk = binary.subarray(start); + if (start > 0) { + res.statusCode = 206; + res.setHeader("Content-Range", `bytes ${start}-${binary.length - 1}/${binary.length}`); + } else { + res.statusCode = 200; + } + res.setHeader("Accept-Ranges", "bytes"); + res.setHeader("Content-Length", String(chunk.length)); + res.end(chunk); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("server address unavailable"); + } + const directUrl = `http://127.0.0.1:${address.port}/stall`; + + globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.includes("/unrestrict/link")) { + return new Response( + JSON.stringify({ + download: directUrl, + filename: "stall.bin", + filesize: binary.length + }), + { + status: 200, + headers: { "Content-Type": "application/json" } + } + ); + } + return originalFetch(input, init); + }; + + try { + const manager = new DownloadManager( + { + ...defaultSettings(), + token: "rd-token", + outputDir: path.join(root, "downloads"), + extractDir: path.join(root, "extract"), + autoExtract: false, + autoReconnect: false + }, + emptySession(), + createStoragePaths(path.join(root, "state")) + ); + + manager.addPackages([{ name: "stall", links: ["https://dummy/stall"] }]); + manager.start(); + await waitFor(() => !manager.getSnapshot().session.running, 25000); + + const item = Object.values(manager.getSnapshot().session.items)[0]; + expect(item?.status).toBe("completed"); + expect(directCalls).toBeGreaterThan(1); + expect(fs.existsSync(item.targetPath)).toBe(true); + expect(fs.statSync(item.targetPath).size).toBe(binary.length); + } finally { + if (previousStallTimeout === undefined) { + delete process.env.RD_STALL_TIMEOUT_MS; + } else { + process.env.RD_STALL_TIMEOUT_MS = previousStallTimeout; + } + server.close(); + await once(server, "close"); + } + }, 35000); + it("uses content-disposition filename when provider filename is opaque", async () => { const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); tempDirs.push(root);