const { EventEmitter } = require('events'); const path = require('path'); const fs = require('fs'); const crypto = require('crypto'); const { uploadFile } = require('./hosters'); const VidmolyUploader = require('./vidmoly-upload'); const VoeUploader = require('./voe-upload'); const DoodstreamUploader = require('./doodstream-upload'); const ClouddropUploader = require('./clouddrop-upload'); const Semaphore = require('./semaphore'); const Throttle = require('./throttle'); const DEFAULT_SETTINGS = { retries: 3, maxSpeedKbs: 0, parallelCount: 2, restartBelowKbs: 0, timeIntervalSec: 0, maxSizeMb: 0 }; class UploadManager extends EventEmitter { constructor(hosterSettings, globalSettings) { super(); this.hosterSettings = hosterSettings || {}; this.globalSettings = globalSettings || {}; this.semaphores = {}; this.globalSemaphore = null; this.abortController = new AbortController(); this.running = false; this.stopAfterActive = false; this.statsInterval = null; this.startTime = 0; this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded } this.jobAbortControllers = new Map(); // jobId -> AbortController this.cancelledJobIds = new Set(); this.sessionBytes = 0; this.lastStartTime = {}; // hoster -> timestamp of last upload start this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits this.globalThrottle = null; this._failedAccounts = new Map(); // hoster -> Set of failed accountIds this._accountOverrides = new Map(); // hoster -> fallback account object } switchAccount(hoster, fallbackAccount) { const prev = this._accountOverrides.get(hoster); this._accountOverrides.set(hoster, fallbackAccount); this._rotLog('switchAccount', { hoster, prevOverrideId: prev ? prev.id : null, toAccountId: fallbackAccount ? fallbackAccount.id : null }); } // Introspection helpers used by main.js to re-resolve fallbacks when the // config changes mid-batch (e.g. user adds a new account after their only // one ran out of space). Without this, an account that got marked failed // before a fallback existed stays stuck until the app restarts. getFailedAccountKeys() { return Array.from(this._failedAccounts.keys()); } getOverride(hoster) { return this._accountOverrides.get(hoster) || null; } _rotLog(event, data) { this.emit('rot-log', { ts: Date.now(), event, ...data }); } // File-specific rejections from the hoster: the same file will get rejected // on any account, so rotation is pointless. Matches the `err.fileRejected` // flag set by parsers plus known rejection phrases. // NOTE: We deliberately do NOT match the generic "lehnte Datei ab" prefix // here — that phrase is used by the Byse parser for both file- AND // account-level errors. Account-level ones set err.accountError instead, // which takes priority in _shouldSkipRetryOnAccountError. _isFileRejectedError(err) { if (!err) return false; if (err.accountError === true) return false; // explicit account-level wins if (err.fileRejected === true) return true; if (!err.message) return false; const m = String(err.message); return /(Not video file format|Duplicate|Datei zu (klein|gross|groß)|File too (small|large)|Invalid file|Unsupported format)/i.test(m); } // Transient network errors — the account is fine, the network or the // hoster's own backend hiccuped. Retrying on the SAME account is the right // move; marking it failed would wrongly poison the fallback chain. If all // retries on the current account still hit this class of error, we bail // out for this file without blacklisting the account, so other jobs in the // batch still get a fresh chance on it. _isTransientNetworkError(err) { if (!err || !err.message) return false; const m = String(err.message); const TRANSIENT = [ /ENOTFOUND/i, /ECONNRESET/i, /ECONNREFUSED/i, /ETIMEDOUT/i, /EAI_AGAIN/i, /EHOSTUNREACH/i, /ENETUNREACH/i, /EPIPE/i, /socket hang up/i, /network (error|failure|problem)/i, /dns (lookup|error|failed)/i, /getaddrinfo/i, /fetch failed/i, /\bconnect (ETIMEDOUT|ECONN)/i ]; return TRANSIENT.some(p => p.test(m)); } // Error classes that mean "this account is the problem, retrying on it won't // help" — we skip the remaining retries and go straight to the fallback // account. Keeps single runs fast when an account is rate-limited, banned, // or out of quota. _shouldSkipRetryOnAccountError(err) { if (!err) return false; // Explicit account-level flag from hoster parsers — highest priority. if (err.accountError === true) return true; if (!err.message) return false; const m = String(err.message); const PATTERNS = [ /Kein Upload-Server/i, /No upload server/i, /kein server/i, /quota/i, /limit (reached|exceeded|überschritten)/i, /rate[- ]?limit/i, /too many requests/i, /\b(401|403|429)\b/, /Falscher (User|Username|Passwort)/i, /Incorrect (Login|Password)/i, /invalid (credentials|api[- ]?key|token|session)/i, /(account|user) (banned|suspended|disabled|gesperrt)/i, /not authorized/i, /forbidden/i, /session (expired|abgelaufen)/i, // Session/CSRF hints — the account's server session went stale, which // no amount of retrying will fix. Re-login happens on the next account. /CSRF[- ]?Token nicht gefunden/i, /CSRF[- ]?token not found/i, /Bist du eingeloggt/i, /not logged in/i, // Storage exhaustion — account is full. Rotate instead of hammering it. /not enough (disk )?(space|storage)/i, /insufficient (disk )?space/i, /disk (space )?full/i, /storage (exhausted|full|voll|limit)/i, /account (full|voll)/i ]; return PATTERNS.some(p => p.test(m)); } updateSettings(hosterSettings, globalSettings) { this.hosterSettings = hosterSettings || this.hosterSettings; this.globalSettings = globalSettings || this.globalSettings; // Live-update semaphores for running uploads for (const [hoster, sem] of Object.entries(this.semaphores)) { const settings = this._getSettings(hoster); sem.updateLimit(settings.parallelCount); } // Update global throttle if speed limit changed const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0); if (newKbs > 0) { if (this.globalThrottle) { this.globalThrottle.updateRate(newKbs * 1024); } else { this.globalThrottle = new Throttle(newKbs * 1024); } } else { this.globalThrottle = null; } // Update global semaphore live const globalLimit = this._getGlobalParallelLimit(); if (globalLimit > 0 && this.globalSemaphore) { this.globalSemaphore.updateLimit(globalLimit); } } _getSettings(hoster) { const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) }; const globalLimit = this._getGlobalParallelLimit(); if (this.globalSettings.scaleParallelUploads && globalLimit > 0) { settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit); } return settings; } _getGlobalParallelLimit() { const raw = Number(this.globalSettings.parallelUploadCount || 0); if (!Number.isFinite(raw) || raw <= 0) return 0; return Math.max(1, Math.min(100, Math.round(raw))); } _getGlobalSemaphore() { const limit = this._getGlobalParallelLimit(); if (limit <= 0) return null; if (!this.globalSemaphore) { this.globalSemaphore = new Semaphore(limit); } else { this.globalSemaphore.updateLimit(limit); } return this.globalSemaphore; } _getGlobalThrottle() { const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0); if (!Number.isFinite(kbs) || kbs <= 0) return null; if (!this.globalThrottle) { this.globalThrottle = new Throttle(kbs * 1024); } else { this.globalThrottle.updateRate(kbs * 1024); } return this.globalThrottle; } _getSemaphore(hoster) { if (!this.semaphores[hoster]) { const settings = this._getSettings(hoster); this.semaphores[hoster] = new Semaphore(settings.parallelCount); } else { this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount); } return this.semaphores[hoster]; } async startBatch(tasks, opts = {}) { this.running = true; this.stopAfterActive = false; this.abortController = new AbortController(); this.startTime = Date.now(); this.sessionBytes = 0; this.activeJobs.clear(); this.jobAbortControllers.clear(); this.cancelledJobIds.clear(); this.semaphores = {}; this.globalSemaphore = null; this.globalThrottle = null; this.lastStartTime = {}; // Reset account-rotation state each batch — but optionally re-prime from // app-session memory so a "Retry failed" right after batch-done doesn't // burn 5 retries on the account we already know is dead. Caller (main.js) // passes the session-scoped failed/override state. this._failedAccounts.clear(); this._accountOverrides.clear(); if (Array.isArray(opts.primeFailedAccounts)) { for (const key of opts.primeFailedAccounts) this._failedAccounts.set(key, true); } if (Array.isArray(opts.primeOverrides)) { for (const entry of opts.primeOverrides) { if (Array.isArray(entry) && entry.length === 2) this._accountOverrides.set(entry[0], entry[1]); } } this._rotLog('batch-start', { taskCount: tasks.length, primedFailed: this._failedAccounts.size, primedOverrides: this._accountOverrides.size }); const { signal } = this.abortController; const batchId = `batch-${Date.now()}`; const results = new Map(); // filePath -> { name, size, results: [] } this._batchResults = results; this._additionalPromises = []; // Track jobs added mid-batch via addJobs() for (const task of tasks) { const fileName = path.basename(task.file); if (!results.has(task.file)) { let size = 0; try { size = fs.statSync(task.file).size; } catch {} results.set(task.file, { name: fileName, size, results: [] }); } } this._startStatsTimer(); const promises = tasks.map((task) => this._runJob(task, results, signal)); await Promise.allSettled(promises); // Wait for any jobs added mid-batch via addJobs() while (this._additionalPromises.length > 0) { const batch = this._additionalPromises.splice(0); await Promise.allSettled(batch); } this._stopStatsTimer(); this.running = false; const files = Array.from(results.values()); const total = tasks.length; const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0); const summary = { id: batchId, timestamp: new Date().toISOString(), total, succeeded, failed: total - succeeded, files }; this.emit('batch-done', summary); } async _runJob(task, results, batchSignal) { const settings = this._getSettings(task.hoster); const hosterSemaphore = this._getSemaphore(task.hoster); const globalSemaphore = this._getGlobalSemaphore(); const uploadId = crypto.randomBytes(8).toString('hex'); const jobId = task.jobId || uploadId; const fileName = path.basename(task.file); let fileSize = 0; let fileNotFound = false; try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; } const maxAttempts = Math.max(1, (settings.retries || 0) + 1); const jobAbortController = new AbortController(); const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal); this.jobAbortControllers.set(jobId, jobAbortController); let hosterSlotAcquired = false; let globalSlotAcquired = false; let finalResultRecorded = false; let lastError = null; const recordFinalResult = (status, payload = {}) => { if (finalResultRecorded) return; finalResultRecorded = true; const result = { hoster: task.hoster, status, error: payload.error || null, download_url: payload.result ? payload.result.download_url || null : null, embed_url: payload.result ? payload.result.embed_url || null : null, file_code: payload.result ? payload.result.file_code || null : null }; results.get(task.file).results.push(result); }; const emitFinalStatus = (status, payload = {}) => { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status, progress: status === 'done' ? 1 : 0, bytesUploaded: status === 'done' ? fileSize : 0, bytesTotal: fileSize, speedKbs: payload.speedKbs || 0, elapsed: payload.elapsed || 0, remaining: 0, error: payload.error || null, result: payload.result || null, attempt: payload.attempt || maxAttempts, maxAttempts }); }; try { if (fileNotFound) { const error = 'Datei nicht gefunden'; emitFinalStatus('skipped', { error, attempt: 0 }); recordFinalResult('error', { error }); return; } if (fileSize <= 0) { const error = 'Datei ist leer (0 Bytes)'; emitFinalStatus('skipped', { error, attempt: 0 }); recordFinalResult('error', { error }); return; } if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) { const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`; emitFinalStatus('skipped', { error, attempt: 0 }); recordFinalResult('error', { error }); return; } this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'queued', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt: 0, maxAttempts }); // Acquire hoster semaphore first so jobs waiting for a hoster slot // don't waste global slots (prevents underutilization) await hosterSemaphore.acquire(signal); hosterSlotAcquired = true; if (globalSemaphore) { await globalSemaphore.acquire(signal); globalSlotAcquired = true; } if (settings.timeIntervalSec > 0) { await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal); } // Pre-job-swap: if this account was marked failed WHILE this task was // waiting in the semaphore queue, jump straight to the override instead // of burning a guaranteed-to-fail upload attempt. Critical at scale: // with 500 queued jobs and 1 parallel slot, without this check every // job still hits the original dead account first. if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) { const override = this._accountOverrides.get(task.hoster); if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) { this._rotLog('pre-job-swap', { hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id }); task.accountId = override.id; task.username = override.username; task.password = override.password; task.apiKey = override.apiKey; } else { this._rotLog('pre-job-swap-blocked', { hoster: task.hoster, fileName, accountId: task.accountId, hasOverride: !!override, overrideAlsoFailed: override ? this._failedAccounts.has(task.hoster + ':' + override.id) : false }); } } for (let attempt = 1; attempt <= maxAttempts; attempt++) { if (signal.aborted || this.stopAfterActive) break; if (attempt > 1) { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: lastError ? lastError.message : null, result: null, attempt, maxAttempts }); await this._sleep(3000, signal); } const jobStart = Date.now(); let lastBytes = 0; let lastSpeedTime = jobStart; let currentSpeedKbs = 0; let lowSpeedSince = 0; let speedAbort = null; let speedMonitor = null; let uploadSignalBundle = { signal, cleanup() {} }; try { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'getting-server', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt, maxAttempts }); const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null; const globalThrottle = this._getGlobalThrottle(); const throttle = hosterThrottle && globalThrottle ? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } } : hosterThrottle || globalThrottle; if (settings.restartBelowKbs > 0) { speedAbort = new AbortController(); uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]); speedMonitor = setInterval(() => { if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) { if (!lowSpeedSince) lowSpeedSince = Date.now(); if (Date.now() - lowSpeedSince > 6000) { speedAbort.abort(); } } else { lowSpeedSince = 0; } }, 2000); } // Mutate this single object on each progress callback instead of // allocating a fresh one — callback fires on every stream chunk // (hundreds/sec per active job). const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 }; this.activeJobs.set(uploadId, activeEntry); let lastEmitTime = 0; const PROGRESS_EMIT_INTERVAL = 250; // ms – throttle UI updates const progressCb = (bytesUploaded, bytesTotal) => { const now = Date.now(); const elapsed = Math.round((now - jobStart) / 1000); const timeDelta = (now - lastSpeedTime) / 1000; if (timeDelta >= 1) { const bytesDelta = bytesUploaded - lastBytes; currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024); lastBytes = bytesUploaded; lastSpeedTime = now; } activeEntry.speedKbs = currentSpeedKbs; activeEntry.bytesUploaded = bytesUploaded; // Throttle progress emissions to reduce IPC + rendering overhead if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return; lastEmitTime = now; const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0; this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0, bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs, elapsed, remaining, error: null, result: null, attempt, maxAttempts }); }; const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle); const elapsed = Math.round((Date.now() - jobStart) / 1000); this.sessionBytes += fileSize; this.activeJobs.delete(uploadId); emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed, attempt }); recordFinalResult('done', { result }); return; } catch (err) { this.activeJobs.delete(uploadId); const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted; if (signal.aborted) { lastError = new Error('Abgebrochen'); break; } if (this.stopAfterActive) { lastError = new Error('Angehalten'); break; } if (isSpeedRestart && attempt < maxAttempts) { lastError = new Error('Geschwindigkeit zu niedrig - Neustart'); await this._sleep(3000, signal); continue; } lastError = err; // File-specific rejection — re-uploading won't change the server's // mind. Break out immediately; the outer file-rejected branch then // records the final error without burning through 5 × 3s retries. if (this._isFileRejectedError(err)) break; // Account-specific errors — don't waste retries on the same account, // jump straight to rotation. if (this._shouldSkipRetryOnAccountError(err)) { this._rotLog('fast-fail', { hoster: task.hoster, fileName, accountId: task.accountId, attempt, error: err && err.message ? err.message : String(err) }); break; } if (attempt >= maxAttempts) break; // Wait 3 seconds before retry await this._sleep(3000, signal); } finally { if (speedMonitor) clearInterval(speedMonitor); uploadSignalBundle.cleanup(); } } const wasStopped = this.stopAfterActive && !signal.aborted; const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId); if (wasStopped || wasAborted) { const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen'; emitFinalStatus('aborted', { error }); recordFinalResult('aborted', { error }); return; } // Account rotation: mark the current account failed (if not already), // wait for main to resolve the next fallback, then retry. Loops so // A → B → C → ... works for hosters with 3+ accounts. // // CRITICAL: we must ALWAYS check for an existing override, even if this // account is already in _failedAccounts (e.g. another concurrent job // already marked it failed). Otherwise the second job falls straight // through to final-error instead of using the already-resolved fallback. this._rotLog('retries-exhausted', { hoster: task.hoster, fileName, accountId: task.accountId, lastError: lastError ? lastError.message : null }); // File-specific rejection → same file will get the same verdict on // every other account, rotation is pointless. Don't blacklist, don't // retry siblings, just fail this file cleanly. if (this._isFileRejectedError(lastError)) { this._rotLog('skip-rotation-file-rejected', { hoster: task.hoster, fileName, accountId: task.accountId, lastError: lastError ? lastError.message : null }); const error = lastError.message || 'Datei abgelehnt'; emitFinalStatus('error', { error }); recordFinalResult('error', { error }); return; } // If the reason for failure was a transient network error we do NOT // blacklist the account. Other jobs on the same account in this batch // can still try fresh. This file just errors out for now. if (this._isTransientNetworkError(lastError)) { this._rotLog('skip-rotation-transient', { hoster: task.hoster, fileName, accountId: task.accountId, lastError: lastError ? lastError.message : null }); const error = lastError.message || 'Netzwerkfehler'; emitFinalStatus('error', { error }); recordFinalResult('error', { error }); return; } while (task.accountId) { if (signal.aborted || this.stopAfterActive) break; const alreadyMarked = this._failedAccounts.has(task.hoster + ':' + task.accountId); if (!alreadyMarked) { this._failedAccounts.set(task.hoster + ':' + task.accountId, true); this._rotLog('mark-failed', { hoster: task.hoster, fileName, accountId: task.accountId, lastError: lastError ? lastError.message : null }); this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId }); await this._sleep(800, signal); } else { this._rotLog('already-marked', { hoster: task.hoster, fileName, accountId: task.accountId }); } const override = this._accountOverrides.get(task.hoster); if (!override) { this._rotLog('rotation-end', { hoster: task.hoster, fileName, reason: 'no-override-set', lastFailedAccountId: task.accountId }); break; } if (this._failedAccounts.has(task.hoster + ':' + override.id)) { this._rotLog('rotation-end', { hoster: task.hoster, fileName, reason: 'override-already-failed', overrideId: override.id, lastFailedAccountId: task.accountId }); break; } if (override.id === task.accountId) { this._rotLog('rotation-end', { hoster: task.hoster, fileName, reason: 'override-same-as-current', lastFailedAccountId: task.accountId }); break; } // Switch to fallback account and retry this file this._rotLog('rotate', { hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id }); task.accountId = override.id; task.username = override.username; task.password = override.password; task.apiKey = override.apiKey; this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts }); // Retry loop with the new account. On exhausted failure, the while // loop iterates: marks this account failed too, asks main for the next // fallback, and so on. for (let attempt = 1; attempt <= maxAttempts; attempt++) { if (signal.aborted || this.stopAfterActive) break; if (attempt > 1) { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: lastError ? lastError.message : '', result: null, attempt, maxAttempts }); await this._sleep(3000, signal); } try { const jobStart = Date.now(); let lastBytes = 0; let lastSpeedTime = jobStart; let currentSpeedKbs = 0; const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 }; this.activeJobs.set(uploadId, activeEntry); const progressCb = (bytesUploaded, bytesTotal) => { const now = Date.now(); const timeDelta = (now - lastSpeedTime) / 1000; if (timeDelta >= 1) { currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024); lastBytes = bytesUploaded; lastSpeedTime = now; } activeEntry.speedKbs = currentSpeedKbs; activeEntry.bytesUploaded = bytesUploaded; const elapsed = Math.round((now - jobStart) / 1000); const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0; this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0, bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs, elapsed, remaining, error: null, result: null, attempt, maxAttempts }); }; const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null; const globalThrottle = this._getGlobalThrottle(); const throttle = hosterThrottle && globalThrottle ? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } } : hosterThrottle || globalThrottle; const result = await this._executeUpload(task, progressCb, signal, throttle); this.activeJobs.delete(uploadId); this.sessionBytes += fileSize; emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt }); recordFinalResult('done', { result }); return; } catch (err) { this.activeJobs.delete(uploadId); lastError = err; if (signal.aborted || this.stopAfterActive) break; if (attempt >= maxAttempts) break; } } } const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler'; this._rotLog('final-error', { hoster: task.hoster, fileName, lastFailedAccountId: task.accountId, error }); emitFinalStatus('error', { error }); recordFinalResult('error', { error }); } catch (err) { const wasStopped = this.stopAfterActive && !signal.aborted; const error = wasStopped ? 'Warteschlange angehalten' : (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler')); const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error'; emitFinalStatus(status, { error }); recordFinalResult(status === 'error' ? 'error' : 'aborted', { error }); } finally { this.activeJobs.delete(uploadId); this.jobAbortControllers.delete(jobId); cleanupSignals(); // Release in reverse order of acquire (global first, then hoster) if (globalSlotAcquired && globalSemaphore) globalSemaphore.release(); if (hosterSlotAcquired) hosterSemaphore.release(); } } async _executeUpload(task, progressCb, signal, throttle) { if (task.hoster === 'vidmoly.me' && task.username) { const vidmoly = new VidmolyUploader(); await vidmoly.login(task.username, task.password); return vidmoly.upload(task.file, progressCb, signal, throttle); } else if (task.hoster === 'voe.sx' && task.username) { const voe = new VoeUploader(); await voe.login(task.username, task.password); return voe.upload(task.file, progressCb, signal, throttle); } else if (task.hoster === 'doodstream.com' && task.username) { const dood = new DoodstreamUploader(); await dood.login(task.username, task.password); return dood.upload(task.file, progressCb, signal, throttle); } else if (task.hoster === 'clouddrop.cc') { const clouddrop = new ClouddropUploader(task.apiKey); return clouddrop.upload(task.file, progressCb, signal, throttle); } else { return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle); } } _emitProgress(uploadId, fileName, hoster, data) { this.emit('progress', { uploadId, fileName, hoster, ...data }); } _startStatsTimer() { if (this.statsInterval) clearInterval(this.statsInterval); this.statsInterval = setInterval(() => { // Single pass over active jobs instead of two. let globalSpeedKbs = 0; let activeCount = 0; let inProgressBytes = 0; for (const job of this.activeJobs.values()) { globalSpeedKbs += job.speedKbs || 0; inProgressBytes += job.bytesUploaded || 0; activeCount++; } const elapsed = Math.round((Date.now() - this.startTime) / 1000); this.emit('stats', { state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle', globalSpeedKbs, totalBytes: this.sessionBytes + inProgressBytes, elapsed, activeJobs: activeCount, pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0) }); }, 1000); } _stopStatsTimer() { if (this.statsInterval) { clearInterval(this.statsInterval); this.statsInterval = null; } } _combineSignals(signal1, signal2) { const controller = new AbortController(); if (signal1.aborted || signal2.aborted) { controller.abort(); return { signal: controller.signal, cleanup() {} }; } const onAbort = () => { controller.abort(); cleanup(); }; const cleanup = () => { signal1.removeEventListener('abort', onAbort); signal2.removeEventListener('abort', onAbort); }; signal1.addEventListener('abort', onAbort, { once: true }); signal2.addEventListener('abort', onAbort, { once: true }); return { signal: controller.signal, cleanup }; } _combineManySignals(signals) { const liveSignals = signals.filter(Boolean); const controller = new AbortController(); if (liveSignals.some((signal) => signal.aborted)) { controller.abort(); return { signal: controller.signal, cleanup() {} }; } const listeners = liveSignals.map((signal) => { const handler = () => { controller.abort(); cleanup(); }; signal.addEventListener('abort', handler, { once: true }); return { signal, handler }; }); const cleanup = () => { for (const entry of listeners) { entry.signal.removeEventListener('abort', entry.handler); } }; return { signal: controller.signal, cleanup }; } _sleep(ms, signal) { return new Promise((resolve, reject) => { const onAbort = () => { clearTimeout(timer); reject(new Error('Aborted')); }; const timer = setTimeout(() => { if (signal) signal.removeEventListener('abort', onAbort); resolve(); }, ms); if (signal) { if (signal.aborted) { clearTimeout(timer); reject(new Error('Aborted')); return; } signal.addEventListener('abort', onAbort, { once: true }); } }); } _waitForInterval(hoster, intervalMs, signal) { // Serialize interval waits per hoster so concurrent jobs queue up properly const prev = this.intervalLocks[hoster] || Promise.resolve(); const next = prev.then(async () => { const now = Date.now(); const last = this.lastStartTime[hoster] || 0; const elapsed = now - last; if (elapsed < intervalMs) { await this._sleep(intervalMs - elapsed, signal); } this.lastStartTime[hoster] = Date.now(); }); this.intervalLocks[hoster] = next.catch(() => {}); return next; } addJobs(tasks) { if (!this.running || !tasks || tasks.length === 0) { return { added: 0, alreadyInBatchJobIds: [] }; } const { signal } = this.abortController; const results = this._batchResults || new Map(); const addResult = { added: 0, alreadyInBatchJobIds: [] }; for (const task of tasks) { // Skip if this job is already being processed (prevent duplicates) if (task.jobId && this.jobAbortControllers.has(task.jobId)) { addResult.alreadyInBatchJobIds.push(task.jobId); continue; } const fileName = path.basename(task.file); if (!results.has(task.file)) { let size = 0; try { size = fs.statSync(task.file).size; } catch {} results.set(task.file, { name: fileName, size, results: [] }); } this._additionalPromises.push(this._runJob(task, results, signal)); addResult.added++; } return addResult; } cancelJobs(jobIds) { for (const jobId of jobIds || []) { if (!jobId) continue; this.cancelledJobIds.add(jobId); const controller = this.jobAbortControllers.get(jobId); if (controller && !controller.signal.aborted) { controller.abort(); } } } finishAfterActive() { this.stopAfterActive = true; } cancel() { if (!this.running) return; this.abortController.abort(); this.stopAfterActive = false; this.running = false; for (const controller of this.jobAbortControllers.values()) { if (!controller.signal.aborted) controller.abort(); } this._stopStatsTimer(); } } module.exports = UploadManager;