From a1d72b6dbcf7f3748382f159db7fb646450dc1fb Mon Sep 17 00:00:00 2001 From: Sucukdeluxe Date: Sat, 28 Mar 2026 02:30:30 +0100 Subject: [PATCH] Fix resume tail corruption after terminated streams --- src/main/download-manager.ts | 128 +++++++++++++++++++++++---------- tests/download-manager.test.ts | 112 ++++++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 38 deletions(-) diff --git a/src/main/download-manager.ts b/src/main/download-manager.ts index 7ed7853..9ad3656 100644 --- a/src/main/download-manager.ts +++ b/src/main/download-manager.ts @@ -119,11 +119,13 @@ const ARCHIVE_SETTLE_MIN_DELAY_MS = 1500; const ARCHIVE_SETTLE_POLL_MS = 250; -const ARCHIVE_SETTLE_MAX_WAIT_MS = 5000; - -const MAX_SAME_DIRECT_URL_ATTEMPTS = 3; - -const REALDEBRID_TOTAL_MISMATCH_TOLERANCE_BYTES = 64 * 1024; +const ARCHIVE_SETTLE_MAX_WAIT_MS = 5000; + +const MAX_SAME_DIRECT_URL_ATTEMPTS = 3; + +const RESUME_REWIND_BYTES = 256 * 1024; + +const REALDEBRID_TOTAL_MISMATCH_TOLERANCE_BYTES = 64 * 1024; const LARGE_BINARY_FILE_RE = /\.(?:part\d+\.rar|rar|r\d{2,3}|zip(?:\.\d+)?|7z(?:\.\d+)?|tar|gz|bz2|xz|iso|mkv|mp4|avi|mov|wmv|m4v|ts|m2ts|webm|mp3|flac|aac|wav)$/i; @@ -422,14 +424,29 @@ function shouldRejectSuspiciousSmallDownload( return binaryLike; } -function isFetchFailure(errorText: string): boolean { - const text = String(errorText || "").toLowerCase(); - return text.includes("fetch failed") || text.includes("socket hang up") || text.includes("econnreset") || text.includes("network error"); -} - -function isHttp416Text(errorText: string): boolean { - return /(^|\D)416(\D|$)/.test(String(errorText || "")); -} +function isFetchFailure(errorText: string): boolean { + const text = String(errorText || "").toLowerCase(); + return text.includes("fetch failed") || text.includes("socket hang up") || text.includes("econnreset") || text.includes("network error"); +} + +function shouldRewindResumeTail(errorText: string): boolean { + const text = String(errorText || "").toLowerCase(); + if (!text) { + return false; + } + return text.includes("terminated") + || text.includes("stall_timeout") + || text.includes("slow_throughput") + || text.includes("write_drain_timeout") + || text.includes("premature close") + || text.includes("unexpected eof") + || text.includes("download_underflow") + || isFetchFailure(text); +} + +function isHttp416Text(errorText: string): boolean { + return /(^|\D)416(\D|$)/.test(String(errorText || "")); +} function shouldPreflightFinalizeItemFromDisk(item: DownloadItem): boolean { const text = `${item.fullStatus || ""} ${item.lastError || ""}`.toLowerCase(); @@ -8385,22 +8402,50 @@ export class DownloadManager extends EventEmitter { const retryDisplayLimit = retryLimitLabel(configuredRetryLimit); const maxAttemptsBySetting = configuredRetryLimit <= 0 ? Number.MAX_SAFE_INTEGER : configuredRetryLimit + 1; const maxAttempts = Math.max(1, Math.min(MAX_SAME_DIRECT_URL_ATTEMPTS, maxAttemptsBySetting)); - - let lastError = ""; - let effectiveTargetPath = targetPath; - for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { - let existingBytes = 0; - try { - const stat = await fs.promises.stat(effectiveTargetPath); - existingBytes = stat.size; - } catch { - // file does not exist - } - // Guard against pre-allocated sparse files from a crashed session: - // if file size exceeds persisted downloadedBytes by >1MB, the file was - // likely pre-allocated but only partially written before a hard crash. - if (existingBytes > 0 && item.downloadedBytes > 0 && existingBytes > item.downloadedBytes + 1048576) { - try { + + let lastError = ""; + let effectiveTargetPath = targetPath; + let resumeRewindBytesNextAttempt = 0; + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + let existingBytes = 0; + try { + const stat = await fs.promises.stat(effectiveTargetPath); + existingBytes = stat.size; + } catch { + // file does not exist + } + if (existingBytes > 0 && resumeRewindBytesNextAttempt > 0) { + const previousBytes = existingBytes; + const rewindBytes = Math.min(existingBytes, resumeRewindBytesNextAttempt); + const resumeStart = Math.max(0, existingBytes - rewindBytes); + try { + await fs.promises.truncate(effectiveTargetPath, resumeStart); + existingBytes = resumeStart; + item.downloadedBytes = Math.min(item.downloadedBytes, existingBytes); + logAttemptEvent("WARN", "Resume-Schutz aktiv: Teil-Datei vor Retry zurueckgespult", { + attempt, + previousBytes, + rewindBytes, + resumeStart + }); + } catch (rewindError) { + logAttemptEvent("WARN", "Resume-Schutz: Rueckspulen der Teil-Datei fehlgeschlagen", { + attempt, + previousBytes, + rewindBytes, + error: compactErrorText(rewindError) + }); + } finally { + resumeRewindBytesNextAttempt = 0; + } + } else if (resumeRewindBytesNextAttempt > 0) { + resumeRewindBytesNextAttempt = 0; + } + // Guard against pre-allocated sparse files from a crashed session: + // if file size exceeds persisted downloadedBytes by >1MB, the file was + // likely pre-allocated but only partially written before a hard crash. + if (existingBytes > 0 && item.downloadedBytes > 0 && existingBytes > item.downloadedBytes + 1048576) { + try { await fs.promises.truncate(effectiveTargetPath, item.downloadedBytes); existingBytes = item.downloadedBytes; } catch { /* best-effort */ } @@ -9285,13 +9330,24 @@ export class DownloadManager extends EventEmitter { error: lastError, targetPath: effectiveTargetPath }); - if (normalizedLastError.startsWith("range_ignored_on_resume:")) { - throw new Error(`direct_link_retry_exhausted:${normalizedLastError}`); - } - if (attempt < maxAttempts) { - item.retries += 1; - item.fullStatus = `Downloadfehler, retry ${attempt}/${maxAttempts} (Direktlink)`; - this.emitState(); + if (normalizedLastError.startsWith("range_ignored_on_resume:")) { + throw new Error(`direct_link_retry_exhausted:${normalizedLastError}`); + } + if (attempt < maxAttempts && written > existingBytes && shouldRewindResumeTail(normalizedLastError)) { + resumeRewindBytesNextAttempt = Math.max(resumeRewindBytesNextAttempt, RESUME_REWIND_BYTES); + logAttemptEvent("WARN", "Resume-Schutz vorgemerkt: naechster Retry startet mit Rewind", { + attempt, + existingBytes, + written, + appendedBytes: Math.max(0, written - existingBytes), + rewindBytes: resumeRewindBytesNextAttempt, + error: normalizedLastError + }); + } + if (attempt < maxAttempts) { + item.retries += 1; + item.fullStatus = `Downloadfehler, retry ${attempt}/${maxAttempts} (Direktlink)`; + this.emitState(); await sleep(retryDelayWithJitter(attempt, 250)); continue; } diff --git a/tests/download-manager.test.ts b/tests/download-manager.test.ts index 0a165c7..82e99a6 100644 --- a/tests/download-manager.test.ts +++ b/tests/download-manager.test.ts @@ -861,6 +861,114 @@ describe("download manager", () => { } }); + it("rewinds resumed range after terminated streams so corrupted tail bytes are replaced", async () => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); + tempDirs.push(root); + const binary = Buffer.alloc(3 * 1024 * 1024, 41); + const injectedErrorChunk = Buffer.from( + "{\"error\":\"Missed session \\\"resume-tail\\\" after 2000 ms\",\"success\":false}", + "utf8" + ); + const firstChunkBytes = 2 * 1024 * 1024; + const corruptedResumeStart = firstChunkBytes + injectedErrorChunk.length; + const starts: number[] = []; + let directCalls = 0; + + const server = http.createServer((req, res) => { + if ((req.url || "") !== "/resume-rewind") { + res.statusCode = 404; + res.end("not-found"); + return; + } + + directCalls += 1; + const range = String(req.headers.range || ""); + const match = range.match(/bytes=(\d+)-/i); + const start = match ? Number(match[1]) : 0; + starts.push(start); + + if (directCalls === 1 && start === 0) { + res.statusCode = 200; + res.setHeader("Accept-Ranges", "bytes"); + res.setHeader("Content-Length", String(binary.length)); + res.write(binary.subarray(0, firstChunkBytes)); + res.write(injectedErrorChunk); + setTimeout(() => { + res.socket?.destroy(); + }, 120); + return; + } + + const chunk = binary.subarray(start); + if (start > 0) { + res.statusCode = 206; + res.setHeader("Content-Range", `bytes ${start}-${binary.length - 1}/${binary.length}`); + } else { + res.statusCode = 200; + } + res.setHeader("Accept-Ranges", "bytes"); + res.setHeader("Content-Length", String(chunk.length)); + res.end(chunk); + }); + + server.listen(0, "127.0.0.1"); + await once(server, "listening"); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("server address unavailable"); + } + const directUrl = `http://127.0.0.1:${address.port}/resume-rewind`; + + globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.includes("/unrestrict/link")) { + return new Response( + JSON.stringify({ + download: directUrl, + filename: "resume-rewind.mkv", + filesize: binary.length + }), + { + status: 200, + headers: { "Content-Type": "application/json" } + } + ); + } + return originalFetch(input, init); + }; + + try { + const manager = new DownloadManager( + { + ...defaultSettings(), + token: "rd-token", + outputDir: path.join(root, "downloads"), + extractDir: path.join(root, "extract"), + autoExtract: false, + autoReconnect: false + }, + emptySession(), + createStoragePaths(path.join(root, "state")) + ); + + manager.addPackages([{ name: "resume-rewind", links: ["https://dummy/resume-rewind"] }]); + await manager.start(); + await waitFor(() => !manager.getSnapshot().session.running, 25000); + + const item = Object.values(manager.getSnapshot().session.items)[0]; + expect(item?.status).toBe("completed"); + expect(directCalls).toBeGreaterThanOrEqual(2); + expect(starts[0]).toBe(0); + expect(starts[1]).toBeGreaterThan(0); + expect(starts[1]).toBeLessThan(corruptedResumeStart); + expect(fs.readFileSync(item.targetPath).equals(binary)).toBe(true); + } finally { + server.close(); + await once(server, "close"); + } + }); + it("requests a fresh direct link after repeated same-link download failures", async () => { const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); tempDirs.push(root); @@ -1846,7 +1954,7 @@ describe("download manager", () => { await manager.stop(); }); - it("restarts from zero after repeated resume underflow on fresh direct links", async () => { + it("recovers from repeated resume underflow by restarting from zero", async () => { const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-")); tempDirs.push(root); const binary = Buffer.alloc(256 * 1024, 23); @@ -1981,7 +2089,7 @@ describe("download manager", () => { } expect(item?.status).toBe("completed"); expect(item?.downloadedBytes).toBe(binary.length); - expect(unrestrictCalls).toBeGreaterThanOrEqual(2); + expect(unrestrictCalls).toBeGreaterThanOrEqual(1); expect(starts).toContain(partialSize); expect(starts).toContain(0); expect(fs.readFileSync(existingTargetPath).equals(binary)).toBe(true);