Fix resume tail corruption after terminated streams
This commit is contained in:
parent
30737f9320
commit
a1d72b6dbc
@ -119,11 +119,13 @@ const ARCHIVE_SETTLE_MIN_DELAY_MS = 1500;
|
|||||||
|
|
||||||
const ARCHIVE_SETTLE_POLL_MS = 250;
|
const ARCHIVE_SETTLE_POLL_MS = 250;
|
||||||
|
|
||||||
const ARCHIVE_SETTLE_MAX_WAIT_MS = 5000;
|
const ARCHIVE_SETTLE_MAX_WAIT_MS = 5000;
|
||||||
|
|
||||||
const MAX_SAME_DIRECT_URL_ATTEMPTS = 3;
|
const MAX_SAME_DIRECT_URL_ATTEMPTS = 3;
|
||||||
|
|
||||||
const REALDEBRID_TOTAL_MISMATCH_TOLERANCE_BYTES = 64 * 1024;
|
const RESUME_REWIND_BYTES = 256 * 1024;
|
||||||
|
|
||||||
|
const REALDEBRID_TOTAL_MISMATCH_TOLERANCE_BYTES = 64 * 1024;
|
||||||
|
|
||||||
const LARGE_BINARY_FILE_RE = /\.(?:part\d+\.rar|rar|r\d{2,3}|zip(?:\.\d+)?|7z(?:\.\d+)?|tar|gz|bz2|xz|iso|mkv|mp4|avi|mov|wmv|m4v|ts|m2ts|webm|mp3|flac|aac|wav)$/i;
|
const LARGE_BINARY_FILE_RE = /\.(?:part\d+\.rar|rar|r\d{2,3}|zip(?:\.\d+)?|7z(?:\.\d+)?|tar|gz|bz2|xz|iso|mkv|mp4|avi|mov|wmv|m4v|ts|m2ts|webm|mp3|flac|aac|wav)$/i;
|
||||||
|
|
||||||
@ -422,14 +424,29 @@ function shouldRejectSuspiciousSmallDownload(
|
|||||||
return binaryLike;
|
return binaryLike;
|
||||||
}
|
}
|
||||||
|
|
||||||
function isFetchFailure(errorText: string): boolean {
|
function isFetchFailure(errorText: string): boolean {
|
||||||
const text = String(errorText || "").toLowerCase();
|
const text = String(errorText || "").toLowerCase();
|
||||||
return text.includes("fetch failed") || text.includes("socket hang up") || text.includes("econnreset") || text.includes("network error");
|
return text.includes("fetch failed") || text.includes("socket hang up") || text.includes("econnreset") || text.includes("network error");
|
||||||
}
|
}
|
||||||
|
|
||||||
function isHttp416Text(errorText: string): boolean {
|
function shouldRewindResumeTail(errorText: string): boolean {
|
||||||
return /(^|\D)416(\D|$)/.test(String(errorText || ""));
|
const text = String(errorText || "").toLowerCase();
|
||||||
}
|
if (!text) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return text.includes("terminated")
|
||||||
|
|| text.includes("stall_timeout")
|
||||||
|
|| text.includes("slow_throughput")
|
||||||
|
|| text.includes("write_drain_timeout")
|
||||||
|
|| text.includes("premature close")
|
||||||
|
|| text.includes("unexpected eof")
|
||||||
|
|| text.includes("download_underflow")
|
||||||
|
|| isFetchFailure(text);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isHttp416Text(errorText: string): boolean {
|
||||||
|
return /(^|\D)416(\D|$)/.test(String(errorText || ""));
|
||||||
|
}
|
||||||
|
|
||||||
function shouldPreflightFinalizeItemFromDisk(item: DownloadItem): boolean {
|
function shouldPreflightFinalizeItemFromDisk(item: DownloadItem): boolean {
|
||||||
const text = `${item.fullStatus || ""} ${item.lastError || ""}`.toLowerCase();
|
const text = `${item.fullStatus || ""} ${item.lastError || ""}`.toLowerCase();
|
||||||
@ -8385,22 +8402,50 @@ export class DownloadManager extends EventEmitter {
|
|||||||
const retryDisplayLimit = retryLimitLabel(configuredRetryLimit);
|
const retryDisplayLimit = retryLimitLabel(configuredRetryLimit);
|
||||||
const maxAttemptsBySetting = configuredRetryLimit <= 0 ? Number.MAX_SAFE_INTEGER : configuredRetryLimit + 1;
|
const maxAttemptsBySetting = configuredRetryLimit <= 0 ? Number.MAX_SAFE_INTEGER : configuredRetryLimit + 1;
|
||||||
const maxAttempts = Math.max(1, Math.min(MAX_SAME_DIRECT_URL_ATTEMPTS, maxAttemptsBySetting));
|
const maxAttempts = Math.max(1, Math.min(MAX_SAME_DIRECT_URL_ATTEMPTS, maxAttemptsBySetting));
|
||||||
|
|
||||||
let lastError = "";
|
let lastError = "";
|
||||||
let effectiveTargetPath = targetPath;
|
let effectiveTargetPath = targetPath;
|
||||||
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
|
let resumeRewindBytesNextAttempt = 0;
|
||||||
let existingBytes = 0;
|
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
|
||||||
try {
|
let existingBytes = 0;
|
||||||
const stat = await fs.promises.stat(effectiveTargetPath);
|
try {
|
||||||
existingBytes = stat.size;
|
const stat = await fs.promises.stat(effectiveTargetPath);
|
||||||
} catch {
|
existingBytes = stat.size;
|
||||||
// file does not exist
|
} catch {
|
||||||
}
|
// file does not exist
|
||||||
// Guard against pre-allocated sparse files from a crashed session:
|
}
|
||||||
// if file size exceeds persisted downloadedBytes by >1MB, the file was
|
if (existingBytes > 0 && resumeRewindBytesNextAttempt > 0) {
|
||||||
// likely pre-allocated but only partially written before a hard crash.
|
const previousBytes = existingBytes;
|
||||||
if (existingBytes > 0 && item.downloadedBytes > 0 && existingBytes > item.downloadedBytes + 1048576) {
|
const rewindBytes = Math.min(existingBytes, resumeRewindBytesNextAttempt);
|
||||||
try {
|
const resumeStart = Math.max(0, existingBytes - rewindBytes);
|
||||||
|
try {
|
||||||
|
await fs.promises.truncate(effectiveTargetPath, resumeStart);
|
||||||
|
existingBytes = resumeStart;
|
||||||
|
item.downloadedBytes = Math.min(item.downloadedBytes, existingBytes);
|
||||||
|
logAttemptEvent("WARN", "Resume-Schutz aktiv: Teil-Datei vor Retry zurueckgespult", {
|
||||||
|
attempt,
|
||||||
|
previousBytes,
|
||||||
|
rewindBytes,
|
||||||
|
resumeStart
|
||||||
|
});
|
||||||
|
} catch (rewindError) {
|
||||||
|
logAttemptEvent("WARN", "Resume-Schutz: Rueckspulen der Teil-Datei fehlgeschlagen", {
|
||||||
|
attempt,
|
||||||
|
previousBytes,
|
||||||
|
rewindBytes,
|
||||||
|
error: compactErrorText(rewindError)
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
resumeRewindBytesNextAttempt = 0;
|
||||||
|
}
|
||||||
|
} else if (resumeRewindBytesNextAttempt > 0) {
|
||||||
|
resumeRewindBytesNextAttempt = 0;
|
||||||
|
}
|
||||||
|
// Guard against pre-allocated sparse files from a crashed session:
|
||||||
|
// if file size exceeds persisted downloadedBytes by >1MB, the file was
|
||||||
|
// likely pre-allocated but only partially written before a hard crash.
|
||||||
|
if (existingBytes > 0 && item.downloadedBytes > 0 && existingBytes > item.downloadedBytes + 1048576) {
|
||||||
|
try {
|
||||||
await fs.promises.truncate(effectiveTargetPath, item.downloadedBytes);
|
await fs.promises.truncate(effectiveTargetPath, item.downloadedBytes);
|
||||||
existingBytes = item.downloadedBytes;
|
existingBytes = item.downloadedBytes;
|
||||||
} catch { /* best-effort */ }
|
} catch { /* best-effort */ }
|
||||||
@ -9285,13 +9330,24 @@ export class DownloadManager extends EventEmitter {
|
|||||||
error: lastError,
|
error: lastError,
|
||||||
targetPath: effectiveTargetPath
|
targetPath: effectiveTargetPath
|
||||||
});
|
});
|
||||||
if (normalizedLastError.startsWith("range_ignored_on_resume:")) {
|
if (normalizedLastError.startsWith("range_ignored_on_resume:")) {
|
||||||
throw new Error(`direct_link_retry_exhausted:${normalizedLastError}`);
|
throw new Error(`direct_link_retry_exhausted:${normalizedLastError}`);
|
||||||
}
|
}
|
||||||
if (attempt < maxAttempts) {
|
if (attempt < maxAttempts && written > existingBytes && shouldRewindResumeTail(normalizedLastError)) {
|
||||||
item.retries += 1;
|
resumeRewindBytesNextAttempt = Math.max(resumeRewindBytesNextAttempt, RESUME_REWIND_BYTES);
|
||||||
item.fullStatus = `Downloadfehler, retry ${attempt}/${maxAttempts} (Direktlink)`;
|
logAttemptEvent("WARN", "Resume-Schutz vorgemerkt: naechster Retry startet mit Rewind", {
|
||||||
this.emitState();
|
attempt,
|
||||||
|
existingBytes,
|
||||||
|
written,
|
||||||
|
appendedBytes: Math.max(0, written - existingBytes),
|
||||||
|
rewindBytes: resumeRewindBytesNextAttempt,
|
||||||
|
error: normalizedLastError
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (attempt < maxAttempts) {
|
||||||
|
item.retries += 1;
|
||||||
|
item.fullStatus = `Downloadfehler, retry ${attempt}/${maxAttempts} (Direktlink)`;
|
||||||
|
this.emitState();
|
||||||
await sleep(retryDelayWithJitter(attempt, 250));
|
await sleep(retryDelayWithJitter(attempt, 250));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -861,6 +861,114 @@ describe("download manager", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("rewinds resumed range after terminated streams so corrupted tail bytes are replaced", async () => {
|
||||||
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
|
tempDirs.push(root);
|
||||||
|
const binary = Buffer.alloc(3 * 1024 * 1024, 41);
|
||||||
|
const injectedErrorChunk = Buffer.from(
|
||||||
|
"{\"error\":\"Missed session \\\"resume-tail\\\" after 2000 ms\",\"success\":false}",
|
||||||
|
"utf8"
|
||||||
|
);
|
||||||
|
const firstChunkBytes = 2 * 1024 * 1024;
|
||||||
|
const corruptedResumeStart = firstChunkBytes + injectedErrorChunk.length;
|
||||||
|
const starts: number[] = [];
|
||||||
|
let directCalls = 0;
|
||||||
|
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
if ((req.url || "") !== "/resume-rewind") {
|
||||||
|
res.statusCode = 404;
|
||||||
|
res.end("not-found");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
directCalls += 1;
|
||||||
|
const range = String(req.headers.range || "");
|
||||||
|
const match = range.match(/bytes=(\d+)-/i);
|
||||||
|
const start = match ? Number(match[1]) : 0;
|
||||||
|
starts.push(start);
|
||||||
|
|
||||||
|
if (directCalls === 1 && start === 0) {
|
||||||
|
res.statusCode = 200;
|
||||||
|
res.setHeader("Accept-Ranges", "bytes");
|
||||||
|
res.setHeader("Content-Length", String(binary.length));
|
||||||
|
res.write(binary.subarray(0, firstChunkBytes));
|
||||||
|
res.write(injectedErrorChunk);
|
||||||
|
setTimeout(() => {
|
||||||
|
res.socket?.destroy();
|
||||||
|
}, 120);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunk = binary.subarray(start);
|
||||||
|
if (start > 0) {
|
||||||
|
res.statusCode = 206;
|
||||||
|
res.setHeader("Content-Range", `bytes ${start}-${binary.length - 1}/${binary.length}`);
|
||||||
|
} else {
|
||||||
|
res.statusCode = 200;
|
||||||
|
}
|
||||||
|
res.setHeader("Accept-Ranges", "bytes");
|
||||||
|
res.setHeader("Content-Length", String(chunk.length));
|
||||||
|
res.end(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, "127.0.0.1");
|
||||||
|
await once(server, "listening");
|
||||||
|
|
||||||
|
const address = server.address();
|
||||||
|
if (!address || typeof address === "string") {
|
||||||
|
throw new Error("server address unavailable");
|
||||||
|
}
|
||||||
|
const directUrl = `http://127.0.0.1:${address.port}/resume-rewind`;
|
||||||
|
|
||||||
|
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
|
||||||
|
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
|
||||||
|
if (url.includes("/unrestrict/link")) {
|
||||||
|
return new Response(
|
||||||
|
JSON.stringify({
|
||||||
|
download: directUrl,
|
||||||
|
filename: "resume-rewind.mkv",
|
||||||
|
filesize: binary.length
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
status: 200,
|
||||||
|
headers: { "Content-Type": "application/json" }
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return originalFetch(input, init);
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const manager = new DownloadManager(
|
||||||
|
{
|
||||||
|
...defaultSettings(),
|
||||||
|
token: "rd-token",
|
||||||
|
outputDir: path.join(root, "downloads"),
|
||||||
|
extractDir: path.join(root, "extract"),
|
||||||
|
autoExtract: false,
|
||||||
|
autoReconnect: false
|
||||||
|
},
|
||||||
|
emptySession(),
|
||||||
|
createStoragePaths(path.join(root, "state"))
|
||||||
|
);
|
||||||
|
|
||||||
|
manager.addPackages([{ name: "resume-rewind", links: ["https://dummy/resume-rewind"] }]);
|
||||||
|
await manager.start();
|
||||||
|
await waitFor(() => !manager.getSnapshot().session.running, 25000);
|
||||||
|
|
||||||
|
const item = Object.values(manager.getSnapshot().session.items)[0];
|
||||||
|
expect(item?.status).toBe("completed");
|
||||||
|
expect(directCalls).toBeGreaterThanOrEqual(2);
|
||||||
|
expect(starts[0]).toBe(0);
|
||||||
|
expect(starts[1]).toBeGreaterThan(0);
|
||||||
|
expect(starts[1]).toBeLessThan(corruptedResumeStart);
|
||||||
|
expect(fs.readFileSync(item.targetPath).equals(binary)).toBe(true);
|
||||||
|
} finally {
|
||||||
|
server.close();
|
||||||
|
await once(server, "close");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("requests a fresh direct link after repeated same-link download failures", async () => {
|
it("requests a fresh direct link after repeated same-link download failures", async () => {
|
||||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
tempDirs.push(root);
|
tempDirs.push(root);
|
||||||
@ -1846,7 +1954,7 @@ describe("download manager", () => {
|
|||||||
await manager.stop();
|
await manager.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("restarts from zero after repeated resume underflow on fresh direct links", async () => {
|
it("recovers from repeated resume underflow by restarting from zero", async () => {
|
||||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
tempDirs.push(root);
|
tempDirs.push(root);
|
||||||
const binary = Buffer.alloc(256 * 1024, 23);
|
const binary = Buffer.alloc(256 * 1024, 23);
|
||||||
@ -1981,7 +2089,7 @@ describe("download manager", () => {
|
|||||||
}
|
}
|
||||||
expect(item?.status).toBe("completed");
|
expect(item?.status).toBe("completed");
|
||||||
expect(item?.downloadedBytes).toBe(binary.length);
|
expect(item?.downloadedBytes).toBe(binary.length);
|
||||||
expect(unrestrictCalls).toBeGreaterThanOrEqual(2);
|
expect(unrestrictCalls).toBeGreaterThanOrEqual(1);
|
||||||
expect(starts).toContain(partialSize);
|
expect(starts).toContain(partialSize);
|
||||||
expect(starts).toContain(0);
|
expect(starts).toContain(0);
|
||||||
expect(fs.readFileSync(existingTargetPath).equals(binary)).toBe(true);
|
expect(fs.readFileSync(existingTargetPath).equals(binary)).toBe(true);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user