Critical fixes: - Post-processor: remove double attempts increment (onProgress + onArchiveFailure both counted) - Post-processor: fix slot leak when signal aborted after acquireSlot - Scheduler: reset global watchdog high-water mark after stall event (prevents permanent misfires) - Pipeline/DM: fix isPathInsideDir path traversal (add trailing separator check) - Retry-manager: check per-kind exhaustion before shelve threshold (prevents bypass) - Retry-manager: add MAX_SHELVE_COUNT=5 cap to prevent infinite shelve cycling Important fixes: - Scheduler: clear retryDelays and providerCooldowns on start() - Scheduler: skip already-aborting slots in stall detection - Download-manager: fix cleanupAfterExtraction using extractDir instead of outputDir for link removal - Download-manager: add "extracting" to package normalizeSessionStatuses - Download-manager: clear activeTasks map on stop() - Download-manager: remove useless cachedDirectUrls re-insertion after success - Stream-writer: remove duplicate truncation code in error path - Stream-writer: skip alignedFlush in finally when bodyError already set (avoids 5min drain wait) - Stream-writer: re-read elapsed after speed limiter sleep for accurate window reset - Error-classifier: add HTTP 401 (Forbidden) and 410 (NotFound) classification Tests updated to match new shelve/kind-exhaustion priority and 401 classification. All 216 tests pass, build verified. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
499 lines
14 KiB
TypeScript
499 lines
14 KiB
TypeScript
/**
|
|
* scheduler.ts — Queue management, slot allocation, and stall detection.
|
|
*
|
|
* The scheduler runs a loop that fills download slots up to maxParallel,
|
|
* monitors heartbeats for stall detection, and provides a global watchdog.
|
|
*/
|
|
|
|
import { EventEmitter } from "node:events";
|
|
import type { DownloadItem, PackageEntry, PackagePriority, SessionState } from "../../shared/types";
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Types
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export interface SchedulerConfig {
|
|
maxParallel: number;
|
|
stallTimeoutMs: number;
|
|
globalStallWatchdogMs: number;
|
|
}
|
|
|
|
export interface ActiveSlot {
|
|
itemId: string;
|
|
packageId: string;
|
|
abortController: AbortController;
|
|
abortReason: "stop" | "cancel" | "reconnect" | "package_toggle" | "stall" | "shutdown" | "reset" | "none";
|
|
resumable: boolean;
|
|
lastHeartbeatAt: number;
|
|
bytesAtHeartbeat: number;
|
|
blockedOnDiskWrite: boolean;
|
|
blockedOnDiskSince: number;
|
|
}
|
|
|
|
export interface SlotRequest {
|
|
itemId: string;
|
|
packageId: string;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Scheduler
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export class Scheduler extends EventEmitter {
|
|
private generation = 0;
|
|
private running = false;
|
|
private paused = false;
|
|
private config: SchedulerConfig;
|
|
|
|
// Active downloads
|
|
private slots = new Map<string, ActiveSlot>();
|
|
|
|
// Retry delays
|
|
private retryDelays = new Map<string, number>(); // itemId → readyAtEpochMs
|
|
|
|
// Provider cooldowns
|
|
private providerCooldowns = new Map<string, { cooldownUntil: number; failureCount: number }>();
|
|
|
|
// Reconnect state
|
|
private reconnectUntil = 0;
|
|
|
|
// Global watchdog state
|
|
private lastGlobalProgressBytes = 0;
|
|
private lastGlobalProgressAt = 0;
|
|
|
|
// Scoped run (only these packages)
|
|
private scopedPackageIds = new Set<string>();
|
|
|
|
constructor(config: SchedulerConfig) {
|
|
super();
|
|
this.config = { ...config };
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Public API
|
|
// -----------------------------------------------------------------------
|
|
|
|
/** Update config at runtime (e.g. when user changes maxParallel). */
|
|
updateConfig(partial: Partial<SchedulerConfig>): void {
|
|
Object.assign(this.config, partial);
|
|
}
|
|
|
|
/**
|
|
* Start the scheduler loop.
|
|
*
|
|
* @param session Live session state
|
|
* @param startItem Callback to start a download for a slot request
|
|
* @param scopedIds Optional: only run these package IDs
|
|
*/
|
|
async start(
|
|
session: SessionState,
|
|
startItem: (slot: SlotRequest) => void,
|
|
scopedIds?: string[],
|
|
): Promise<void> {
|
|
this.generation++;
|
|
this.running = true;
|
|
this.paused = false;
|
|
this.scopedPackageIds = new Set(scopedIds || []);
|
|
this.lastGlobalProgressBytes = 0;
|
|
this.lastGlobalProgressAt = Date.now();
|
|
this.retryDelays.clear();
|
|
this.providerCooldowns.clear();
|
|
|
|
const myGeneration = this.generation;
|
|
const loopIntervalMs = 120;
|
|
let lastHeartbeatCheckAt = Date.now();
|
|
let lastSoftResetAt = Date.now();
|
|
|
|
while (this.running && this.generation === myGeneration) {
|
|
const now = Date.now();
|
|
|
|
// Paused — just idle
|
|
if (this.paused) {
|
|
await sleep(loopIntervalMs);
|
|
continue;
|
|
}
|
|
|
|
// Reconnect wait
|
|
if (this.reconnectUntil > now) {
|
|
await sleep(220);
|
|
continue;
|
|
}
|
|
|
|
// Fill slots
|
|
const maxParallel = Math.max(1, this.config.maxParallel);
|
|
while (this.slots.size < maxParallel) {
|
|
const next = this.findNextItem(session, now);
|
|
if (!next) break;
|
|
startItem(next);
|
|
}
|
|
|
|
// Heartbeat / stall check (every 2s)
|
|
if (now - lastHeartbeatCheckAt >= 2000) {
|
|
this.checkStalls(now);
|
|
lastHeartbeatCheckAt = now;
|
|
}
|
|
|
|
// Global stall watchdog
|
|
this.runGlobalWatchdog(now);
|
|
|
|
// Soft-reset stale retry delays (every 10 min)
|
|
if (now - lastSoftResetAt >= 600_000) {
|
|
this.cleanupStaleRetryDelays(now);
|
|
lastSoftResetAt = now;
|
|
}
|
|
|
|
// Check if run is complete
|
|
if (this.slots.size === 0) {
|
|
const hasQueued = this.hasQueuedItems(session, now);
|
|
const hasDelayed = this.hasDelayedItems(session, now);
|
|
if (!hasQueued && !hasDelayed) {
|
|
this.emit("run-complete");
|
|
break;
|
|
}
|
|
}
|
|
|
|
await sleep(this.slots.size >= maxParallel ? 170 : loopIntervalMs);
|
|
}
|
|
|
|
this.running = false;
|
|
}
|
|
|
|
/**
|
|
* Stop the scheduler loop (bumps generation to exit).
|
|
*/
|
|
stop(): void {
|
|
this.generation++;
|
|
this.running = false;
|
|
}
|
|
|
|
/**
|
|
* Pause/unpause slot allocation.
|
|
*/
|
|
setPaused(paused: boolean): void {
|
|
this.paused = paused;
|
|
}
|
|
|
|
get isPaused(): boolean {
|
|
return this.paused;
|
|
}
|
|
|
|
get isRunning(): boolean {
|
|
return this.running;
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Slot management
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Register an item as actively downloading.
|
|
*/
|
|
claimSlot(itemId: string, packageId: string, abortController: AbortController): ActiveSlot {
|
|
const slot: ActiveSlot = {
|
|
itemId,
|
|
packageId,
|
|
abortController,
|
|
abortReason: "none",
|
|
resumable: true,
|
|
lastHeartbeatAt: Date.now(),
|
|
bytesAtHeartbeat: 0,
|
|
blockedOnDiskWrite: false,
|
|
blockedOnDiskSince: 0,
|
|
};
|
|
this.slots.set(itemId, slot);
|
|
return slot;
|
|
}
|
|
|
|
/**
|
|
* Release a slot (download finished/failed/cancelled).
|
|
*/
|
|
releaseSlot(itemId: string): void {
|
|
this.slots.delete(itemId);
|
|
}
|
|
|
|
/**
|
|
* Get active slot for an item.
|
|
*/
|
|
getSlot(itemId: string): ActiveSlot | undefined {
|
|
return this.slots.get(itemId);
|
|
}
|
|
|
|
/**
|
|
* Get all active slots.
|
|
*/
|
|
getActiveSlots(): Map<string, ActiveSlot> {
|
|
return this.slots;
|
|
}
|
|
|
|
get activeCount(): number {
|
|
return this.slots.size;
|
|
}
|
|
|
|
hasCapacity(): boolean {
|
|
return this.slots.size < Math.max(1, this.config.maxParallel);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Heartbeat
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Record a heartbeat from an active download.
|
|
*/
|
|
heartbeat(itemId: string, downloadedBytes: number): void {
|
|
const slot = this.slots.get(itemId);
|
|
if (slot) {
|
|
slot.lastHeartbeatAt = Date.now();
|
|
slot.bytesAtHeartbeat = downloadedBytes;
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Retry scheduling
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Schedule a retry delay for an item.
|
|
*/
|
|
scheduleRetry(itemId: string, delayMs: number): void {
|
|
this.retryDelays.set(itemId, Date.now() + Math.max(0, delayMs));
|
|
}
|
|
|
|
/**
|
|
* Check if an item is still delayed.
|
|
*/
|
|
isDelayed(itemId: string, now?: number): boolean {
|
|
const readyAt = this.retryDelays.get(itemId);
|
|
if (!readyAt) return false;
|
|
return readyAt > (now ?? Date.now());
|
|
}
|
|
|
|
/**
|
|
* Clear retry delay for an item.
|
|
*/
|
|
clearRetryDelay(itemId: string): void {
|
|
this.retryDelays.delete(itemId);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Provider cooldowns
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Apply a cooldown to a provider.
|
|
*/
|
|
applyProviderCooldown(provider: string, cooldownMs: number): void {
|
|
const existing = this.providerCooldowns.get(provider) || { cooldownUntil: 0, failureCount: 0 };
|
|
existing.cooldownUntil = Date.now() + cooldownMs;
|
|
existing.failureCount++;
|
|
this.providerCooldowns.set(provider, existing);
|
|
}
|
|
|
|
/**
|
|
* Get remaining cooldown for a provider (ms). 0 = not in cooldown.
|
|
*/
|
|
getProviderCooldownRemaining(provider: string): number {
|
|
const entry = this.providerCooldowns.get(provider);
|
|
if (!entry) return 0;
|
|
const remaining = entry.cooldownUntil - Date.now();
|
|
if (remaining <= 0) {
|
|
entry.failureCount = 0;
|
|
return 0;
|
|
}
|
|
return remaining;
|
|
}
|
|
|
|
/**
|
|
* Clear cooldown for a provider (after success).
|
|
*/
|
|
clearProviderCooldown(provider: string): void {
|
|
this.providerCooldowns.delete(provider);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Reconnect
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Enter reconnect wait mode (429/503 backoff).
|
|
*/
|
|
setReconnectWait(durationMs: number): void {
|
|
this.reconnectUntil = Date.now() + durationMs;
|
|
}
|
|
|
|
/**
|
|
* Check if currently in reconnect wait.
|
|
*/
|
|
isReconnecting(): boolean {
|
|
return this.reconnectUntil > Date.now();
|
|
}
|
|
|
|
/**
|
|
* Get remaining reconnect wait time (ms).
|
|
*/
|
|
getReconnectRemaining(): number {
|
|
return Math.max(0, this.reconnectUntil - Date.now());
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Abort helpers
|
|
// -----------------------------------------------------------------------
|
|
|
|
/**
|
|
* Abort a specific item's download.
|
|
*/
|
|
abortItem(itemId: string, reason: ActiveSlot["abortReason"]): void {
|
|
const slot = this.slots.get(itemId);
|
|
if (slot) {
|
|
slot.abortReason = reason;
|
|
slot.abortController.abort(reason);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Abort all active downloads.
|
|
*/
|
|
abortAll(reason: ActiveSlot["abortReason"]): void {
|
|
for (const slot of this.slots.values()) {
|
|
slot.abortReason = reason;
|
|
slot.abortController.abort(reason);
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Private: item selection
|
|
// -----------------------------------------------------------------------
|
|
|
|
private findNextItem(session: SessionState, now: number): SlotRequest | null {
|
|
const priorities: PackagePriority[] = ["high", "normal", "low"];
|
|
|
|
for (const prio of priorities) {
|
|
for (const packageId of session.packageOrder) {
|
|
const pkg = session.packages[packageId];
|
|
if (!pkg || pkg.cancelled || !pkg.enabled) continue;
|
|
if ((pkg.priority || "normal") !== prio) continue;
|
|
if (this.scopedPackageIds.size > 0 && !this.scopedPackageIds.has(packageId)) continue;
|
|
|
|
for (const itemId of pkg.itemIds) {
|
|
const item = session.items[itemId];
|
|
if (!item) continue;
|
|
if (item.status !== "queued" && item.status !== "reconnect_wait") continue;
|
|
if (this.slots.has(itemId)) continue;
|
|
|
|
// Check retry delay
|
|
const retryAt = this.retryDelays.get(itemId);
|
|
if (retryAt && retryAt > now) continue;
|
|
if (retryAt && retryAt <= now) this.retryDelays.delete(itemId);
|
|
|
|
return { itemId, packageId };
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
private hasQueuedItems(session: SessionState, now: number): boolean {
|
|
for (const packageId of session.packageOrder) {
|
|
const pkg = session.packages[packageId];
|
|
if (!pkg || pkg.cancelled || !pkg.enabled) continue;
|
|
if (this.scopedPackageIds.size > 0 && !this.scopedPackageIds.has(packageId)) continue;
|
|
|
|
for (const itemId of pkg.itemIds) {
|
|
const item = session.items[itemId];
|
|
if (!item) continue;
|
|
const retryAt = this.retryDelays.get(itemId);
|
|
if (retryAt && retryAt > now) continue;
|
|
if (item.status === "queued" || item.status === "reconnect_wait") return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
private hasDelayedItems(session: SessionState, now: number): boolean {
|
|
for (const [itemId, readyAt] of this.retryDelays) {
|
|
if (readyAt <= now) continue;
|
|
const item = session.items[itemId];
|
|
if (!item) continue;
|
|
if (item.status !== "queued" && item.status !== "reconnect_wait") continue;
|
|
const pkg = session.packages[item.packageId];
|
|
if (!pkg || pkg.cancelled || !pkg.enabled) continue;
|
|
if (this.scopedPackageIds.size > 0 && !this.scopedPackageIds.has(item.packageId)) continue;
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Private: stall detection
|
|
// -----------------------------------------------------------------------
|
|
|
|
private checkStalls(now: number): void {
|
|
if (this.config.stallTimeoutMs <= 0) return;
|
|
|
|
for (const slot of this.slots.values()) {
|
|
if (slot.blockedOnDiskWrite) continue; // Don't count disk waits
|
|
if (slot.abortReason !== "none") continue; // Already aborting
|
|
const idleMs = now - slot.lastHeartbeatAt;
|
|
if (idleMs > this.config.stallTimeoutMs) {
|
|
this.emit("stall-detected", { itemId: slot.itemId, idleMs });
|
|
}
|
|
}
|
|
}
|
|
|
|
private runGlobalWatchdog(now: number): void {
|
|
if (this.config.globalStallWatchdogMs <= 0) return;
|
|
if (this.slots.size === 0) return;
|
|
|
|
// Sum total bytes across all active downloads
|
|
let totalBytes = 0;
|
|
let allDiskBlocked = true;
|
|
for (const slot of this.slots.values()) {
|
|
totalBytes += slot.bytesAtHeartbeat;
|
|
if (!slot.blockedOnDiskWrite) allDiskBlocked = false;
|
|
}
|
|
|
|
// If all downloads are disk-blocked, don't trigger watchdog
|
|
if (allDiskBlocked) return;
|
|
|
|
if (totalBytes > this.lastGlobalProgressBytes) {
|
|
this.lastGlobalProgressBytes = totalBytes;
|
|
this.lastGlobalProgressAt = now;
|
|
} else if (now - this.lastGlobalProgressAt > this.config.globalStallWatchdogMs) {
|
|
const stalledIds = [...this.slots.values()]
|
|
.filter(s => !s.blockedOnDiskWrite)
|
|
.map(s => s.itemId);
|
|
this.emit("global-stall", { itemIds: stalledIds });
|
|
// Reset both timestamp and high-water mark so after retry
|
|
// (where bytesAtHeartbeat resets to 0) the watchdog doesn't misfire
|
|
this.lastGlobalProgressAt = now;
|
|
this.lastGlobalProgressBytes = totalBytes;
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Private: cleanup
|
|
// -----------------------------------------------------------------------
|
|
|
|
private cleanupStaleRetryDelays(now: number): void {
|
|
for (const [itemId, readyAt] of this.retryDelays) {
|
|
if (readyAt <= now) {
|
|
this.retryDelays.delete(itemId);
|
|
}
|
|
}
|
|
// Cleanup stale provider cooldowns
|
|
for (const [provider, entry] of this.providerCooldowns) {
|
|
if (entry.cooldownUntil <= now) {
|
|
this.providerCooldowns.delete(provider);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helper
|
|
// ---------------------------------------------------------------------------
|
|
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|