Fix stuck queue scheduling and auto-recover stalled streams v1.3.6
This commit is contained in:
parent
0de5a59a64
commit
0a99d3c584
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "real-debrid-downloader",
|
"name": "real-debrid-downloader",
|
||||||
"version": "1.3.5",
|
"version": "1.3.6",
|
||||||
"description": "Real-Debrid Downloader Desktop (Electron + React + TypeScript)",
|
"description": "Real-Debrid Downloader Desktop (Electron + React + TypeScript)",
|
||||||
"main": "build/main/main/main.js",
|
"main": "build/main/main/main.js",
|
||||||
"author": "Sucukdeluxe",
|
"author": "Sucukdeluxe",
|
||||||
|
|||||||
@ -17,12 +17,22 @@ type ActiveTask = {
|
|||||||
itemId: string;
|
itemId: string;
|
||||||
packageId: string;
|
packageId: string;
|
||||||
abortController: AbortController;
|
abortController: AbortController;
|
||||||
abortReason: "stop" | "cancel" | "reconnect" | "package_toggle" | "none";
|
abortReason: "stop" | "cancel" | "reconnect" | "package_toggle" | "stall" | "none";
|
||||||
resumable: boolean;
|
resumable: boolean;
|
||||||
speedEvents: Array<{ at: number; bytes: number }>;
|
speedEvents: Array<{ at: number; bytes: number }>;
|
||||||
nonResumableCounted: boolean;
|
nonResumableCounted: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const DEFAULT_DOWNLOAD_STALL_TIMEOUT_MS = 120000;
|
||||||
|
|
||||||
|
function getDownloadStallTimeoutMs(): number {
|
||||||
|
const fromEnv = Number(process.env.RD_STALL_TIMEOUT_MS ?? NaN);
|
||||||
|
if (Number.isFinite(fromEnv) && fromEnv >= 2000 && fromEnv <= 600000) {
|
||||||
|
return Math.floor(fromEnv);
|
||||||
|
}
|
||||||
|
return DEFAULT_DOWNLOAD_STALL_TIMEOUT_MS;
|
||||||
|
}
|
||||||
|
|
||||||
type DownloadManagerOptions = {
|
type DownloadManagerOptions = {
|
||||||
megaWebUnrestrict?: MegaWebUnrestrictor;
|
megaWebUnrestrict?: MegaWebUnrestrictor;
|
||||||
};
|
};
|
||||||
@ -953,7 +963,7 @@ export class DownloadManager extends EventEmitter {
|
|||||||
this.startItem(next.packageId, next.itemId);
|
this.startItem(next.packageId, next.itemId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.activeTasks.size === 0 && !this.hasQueuedItems()) {
|
if (this.activeTasks.size === 0 && !this.hasQueuedItems() && this.packagePostProcessTasks.size === 0) {
|
||||||
this.finishRun();
|
this.finishRun();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1086,6 +1096,7 @@ export class DownloadManager extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let freshRetryUsed = false;
|
let freshRetryUsed = false;
|
||||||
|
let stallRetries = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
const unrestricted = await this.debridService.unrestrictLink(item.url);
|
const unrestricted = await this.debridService.unrestrictLink(item.url);
|
||||||
@ -1160,10 +1171,13 @@ export class DownloadManager extends EventEmitter {
|
|||||||
pkg.updatedAt = nowMs();
|
pkg.updatedAt = nowMs();
|
||||||
this.recordRunOutcome(item.id, "completed");
|
this.recordRunOutcome(item.id, "completed");
|
||||||
|
|
||||||
await this.runPackagePostProcessing(pkg.id);
|
void this.runPackagePostProcessing(pkg.id).finally(() => {
|
||||||
this.applyCompletedCleanupPolicy(pkg.id, item.id);
|
this.applyCompletedCleanupPolicy(pkg.id, item.id);
|
||||||
this.persistSoon();
|
this.persistSoon();
|
||||||
this.emitState();
|
this.emitState();
|
||||||
|
});
|
||||||
|
this.persistSoon();
|
||||||
|
this.emitState();
|
||||||
return;
|
return;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const reason = active.abortReason;
|
const reason = active.abortReason;
|
||||||
@ -1192,6 +1206,26 @@ export class DownloadManager extends EventEmitter {
|
|||||||
item.status = "queued";
|
item.status = "queued";
|
||||||
item.speedBps = 0;
|
item.speedBps = 0;
|
||||||
item.fullStatus = "Paket gestoppt";
|
item.fullStatus = "Paket gestoppt";
|
||||||
|
} else if (reason === "stall") {
|
||||||
|
stallRetries += 1;
|
||||||
|
if (stallRetries <= 2) {
|
||||||
|
item.status = "queued";
|
||||||
|
item.speedBps = 0;
|
||||||
|
item.fullStatus = `Keine Daten empfangen, Retry ${stallRetries}/2`;
|
||||||
|
item.lastError = "";
|
||||||
|
item.attempts = 0;
|
||||||
|
item.updatedAt = nowMs();
|
||||||
|
active.abortController = new AbortController();
|
||||||
|
active.abortReason = "none";
|
||||||
|
this.persistSoon();
|
||||||
|
this.emitState();
|
||||||
|
await sleep(350 * stallRetries);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
item.status = "failed";
|
||||||
|
item.lastError = "Download hing wiederholt";
|
||||||
|
item.fullStatus = `Fehler: ${item.lastError}`;
|
||||||
|
this.recordRunOutcome(item.id, "failed");
|
||||||
} else {
|
} else {
|
||||||
const errorText = compactErrorText(error);
|
const errorText = compactErrorText(error);
|
||||||
const shouldFreshRetry = !freshRetryUsed && isFetchFailure(errorText);
|
const shouldFreshRetry = !freshRetryUsed && isFetchFailure(errorText);
|
||||||
@ -1366,8 +1400,43 @@ export class DownloadManager extends EventEmitter {
|
|||||||
throw new Error("Leerer Response-Body");
|
throw new Error("Leerer Response-Body");
|
||||||
}
|
}
|
||||||
const reader = body.getReader();
|
const reader = body.getReader();
|
||||||
|
const stallTimeoutMs = getDownloadStallTimeoutMs();
|
||||||
|
const readWithTimeout = async (): Promise<ReadableStreamReadResult<Uint8Array>> => {
|
||||||
|
if (stallTimeoutMs <= 0) {
|
||||||
|
return reader.read();
|
||||||
|
}
|
||||||
|
return new Promise<ReadableStreamReadResult<Uint8Array>>((resolve, reject) => {
|
||||||
|
let settled = false;
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
active.abortReason = "stall";
|
||||||
|
active.abortController.abort("stall");
|
||||||
|
reject(new Error("stall_timeout"));
|
||||||
|
}, stallTimeoutMs);
|
||||||
|
|
||||||
|
reader.read().then((result) => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve(result);
|
||||||
|
}).catch((error) => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(error);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
const { done, value } = await reader.read();
|
const { done, value } = await readWithTimeout();
|
||||||
if (done) {
|
if (done) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -204,6 +204,220 @@ describe("download manager", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("continues downloading while package post-processing is pending", async () => {
|
||||||
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
|
tempDirs.push(root);
|
||||||
|
const binary = Buffer.alloc(80 * 1024, 7);
|
||||||
|
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
const route = req.url || "";
|
||||||
|
if (route !== "/first" && route !== "/second") {
|
||||||
|
res.statusCode = 404;
|
||||||
|
res.end("not-found");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.statusCode = 200;
|
||||||
|
res.setHeader("Accept-Ranges", "bytes");
|
||||||
|
res.setHeader("Content-Length", String(binary.length));
|
||||||
|
res.end(binary);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, "127.0.0.1");
|
||||||
|
await once(server, "listening");
|
||||||
|
|
||||||
|
const address = server.address();
|
||||||
|
if (!address || typeof address === "string") {
|
||||||
|
throw new Error("server address unavailable");
|
||||||
|
}
|
||||||
|
const firstUrl = `http://127.0.0.1:${address.port}/first`;
|
||||||
|
const secondUrl = `http://127.0.0.1:${address.port}/second`;
|
||||||
|
|
||||||
|
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
|
||||||
|
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
|
||||||
|
if (url.includes("/unrestrict/link")) {
|
||||||
|
const body = init?.body;
|
||||||
|
const bodyText = body instanceof URLSearchParams ? body.toString() : String(body || "");
|
||||||
|
const originalLink = new URLSearchParams(bodyText).get("link") || "";
|
||||||
|
const directUrl = originalLink.includes("second") ? secondUrl : firstUrl;
|
||||||
|
const filename = originalLink.includes("second") ? "second.bin" : "first.bin";
|
||||||
|
return new Response(
|
||||||
|
JSON.stringify({
|
||||||
|
download: directUrl,
|
||||||
|
filename,
|
||||||
|
filesize: binary.length
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
status: 200,
|
||||||
|
headers: { "Content-Type": "application/json" }
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return originalFetch(input, init);
|
||||||
|
};
|
||||||
|
|
||||||
|
let releaseBlockedPostProcess: ((value?: void | PromiseLike<void>) => void) | undefined;
|
||||||
|
try {
|
||||||
|
const manager = new DownloadManager(
|
||||||
|
{
|
||||||
|
...defaultSettings(),
|
||||||
|
token: "rd-token",
|
||||||
|
outputDir: path.join(root, "downloads"),
|
||||||
|
extractDir: path.join(root, "extract"),
|
||||||
|
autoExtract: false,
|
||||||
|
maxParallel: 1
|
||||||
|
},
|
||||||
|
emptySession(),
|
||||||
|
createStoragePaths(path.join(root, "state"))
|
||||||
|
);
|
||||||
|
|
||||||
|
const blocker = new Promise<void>((resolve) => {
|
||||||
|
releaseBlockedPostProcess = resolve;
|
||||||
|
});
|
||||||
|
(manager as unknown as { packagePostProcessQueue: Promise<void> }).packagePostProcessQueue = blocker;
|
||||||
|
|
||||||
|
manager.addPackages([
|
||||||
|
{ name: "first", links: ["https://dummy/first"] },
|
||||||
|
{ name: "second", links: ["https://dummy/second"] }
|
||||||
|
]);
|
||||||
|
|
||||||
|
const initial = manager.getSnapshot();
|
||||||
|
const firstPackage = initial.session.packageOrder[0];
|
||||||
|
const secondPackage = initial.session.packageOrder[1];
|
||||||
|
const firstItem = initial.session.packages[firstPackage]?.itemIds[0] || "";
|
||||||
|
const secondItem = initial.session.packages[secondPackage]?.itemIds[0] || "";
|
||||||
|
|
||||||
|
manager.start();
|
||||||
|
|
||||||
|
await waitFor(() => manager.getSnapshot().session.items[firstItem]?.status === "completed", 12000);
|
||||||
|
await waitFor(() => {
|
||||||
|
const state = manager.getSnapshot().session.items[secondItem]?.status;
|
||||||
|
return state === "validating" || state === "downloading" || state === "integrity_check" || state === "completed";
|
||||||
|
}, 6000);
|
||||||
|
|
||||||
|
if (releaseBlockedPostProcess) {
|
||||||
|
releaseBlockedPostProcess();
|
||||||
|
}
|
||||||
|
await waitFor(() => !manager.getSnapshot().session.running, 25000);
|
||||||
|
|
||||||
|
const done = manager.getSnapshot();
|
||||||
|
expect(done.session.items[firstItem]?.status).toBe("completed");
|
||||||
|
expect(done.session.items[secondItem]?.status).toBe("completed");
|
||||||
|
} finally {
|
||||||
|
if (releaseBlockedPostProcess) {
|
||||||
|
releaseBlockedPostProcess();
|
||||||
|
}
|
||||||
|
server.close();
|
||||||
|
await once(server, "close");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recovers from stalled download streams without manual pause/resume", async () => {
|
||||||
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
|
tempDirs.push(root);
|
||||||
|
const binary = Buffer.alloc(300 * 1024, 17);
|
||||||
|
const previousStallTimeout = process.env.RD_STALL_TIMEOUT_MS;
|
||||||
|
process.env.RD_STALL_TIMEOUT_MS = "2500";
|
||||||
|
let directCalls = 0;
|
||||||
|
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
if ((req.url || "") !== "/stall") {
|
||||||
|
res.statusCode = 404;
|
||||||
|
res.end("not-found");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
directCalls += 1;
|
||||||
|
const range = String(req.headers.range || "");
|
||||||
|
const match = range.match(/bytes=(\d+)-/i);
|
||||||
|
const start = match ? Number(match[1]) : 0;
|
||||||
|
|
||||||
|
if (directCalls === 1 && start === 0) {
|
||||||
|
const firstChunk = Math.floor(binary.length / 3);
|
||||||
|
res.statusCode = 200;
|
||||||
|
res.setHeader("Accept-Ranges", "bytes");
|
||||||
|
res.setHeader("Content-Length", String(binary.length));
|
||||||
|
res.write(binary.subarray(0, firstChunk));
|
||||||
|
setTimeout(() => {
|
||||||
|
if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end(binary.subarray(firstChunk));
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunk = binary.subarray(start);
|
||||||
|
if (start > 0) {
|
||||||
|
res.statusCode = 206;
|
||||||
|
res.setHeader("Content-Range", `bytes ${start}-${binary.length - 1}/${binary.length}`);
|
||||||
|
} else {
|
||||||
|
res.statusCode = 200;
|
||||||
|
}
|
||||||
|
res.setHeader("Accept-Ranges", "bytes");
|
||||||
|
res.setHeader("Content-Length", String(chunk.length));
|
||||||
|
res.end(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, "127.0.0.1");
|
||||||
|
await once(server, "listening");
|
||||||
|
|
||||||
|
const address = server.address();
|
||||||
|
if (!address || typeof address === "string") {
|
||||||
|
throw new Error("server address unavailable");
|
||||||
|
}
|
||||||
|
const directUrl = `http://127.0.0.1:${address.port}/stall`;
|
||||||
|
|
||||||
|
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
|
||||||
|
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
|
||||||
|
if (url.includes("/unrestrict/link")) {
|
||||||
|
return new Response(
|
||||||
|
JSON.stringify({
|
||||||
|
download: directUrl,
|
||||||
|
filename: "stall.bin",
|
||||||
|
filesize: binary.length
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
status: 200,
|
||||||
|
headers: { "Content-Type": "application/json" }
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return originalFetch(input, init);
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const manager = new DownloadManager(
|
||||||
|
{
|
||||||
|
...defaultSettings(),
|
||||||
|
token: "rd-token",
|
||||||
|
outputDir: path.join(root, "downloads"),
|
||||||
|
extractDir: path.join(root, "extract"),
|
||||||
|
autoExtract: false,
|
||||||
|
autoReconnect: false
|
||||||
|
},
|
||||||
|
emptySession(),
|
||||||
|
createStoragePaths(path.join(root, "state"))
|
||||||
|
);
|
||||||
|
|
||||||
|
manager.addPackages([{ name: "stall", links: ["https://dummy/stall"] }]);
|
||||||
|
manager.start();
|
||||||
|
await waitFor(() => !manager.getSnapshot().session.running, 25000);
|
||||||
|
|
||||||
|
const item = Object.values(manager.getSnapshot().session.items)[0];
|
||||||
|
expect(item?.status).toBe("completed");
|
||||||
|
expect(directCalls).toBeGreaterThan(1);
|
||||||
|
expect(fs.existsSync(item.targetPath)).toBe(true);
|
||||||
|
expect(fs.statSync(item.targetPath).size).toBe(binary.length);
|
||||||
|
} finally {
|
||||||
|
if (previousStallTimeout === undefined) {
|
||||||
|
delete process.env.RD_STALL_TIMEOUT_MS;
|
||||||
|
} else {
|
||||||
|
process.env.RD_STALL_TIMEOUT_MS = previousStallTimeout;
|
||||||
|
}
|
||||||
|
server.close();
|
||||||
|
await once(server, "close");
|
||||||
|
}
|
||||||
|
}, 35000);
|
||||||
|
|
||||||
it("uses content-disposition filename when provider filename is opaque", async () => {
|
it("uses content-disposition filename when provider filename is opaque", async () => {
|
||||||
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
const root = fs.mkdtempSync(path.join(os.tmpdir(), "rd-dm-"));
|
||||||
tempDirs.push(root);
|
tempDirs.push(root);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user