const { EventEmitter } = require('events'); const path = require('path'); const fs = require('fs'); const crypto = require('crypto'); const { uploadFile } = require('./hosters'); const VidmolyUploader = require('./vidmoly-upload'); const VoeUploader = require('./voe-upload'); const DoodstreamUploader = require('./doodstream-upload'); const Semaphore = require('./semaphore'); const Throttle = require('./throttle'); const DEFAULT_SETTINGS = { retries: 3, maxSpeedKbs: 0, parallelCount: 2, restartBelowKbs: 0, timeIntervalSec: 0, maxSizeMb: 0 }; class UploadManager extends EventEmitter { constructor(hosterSettings, globalSettings) { super(); this.hosterSettings = hosterSettings || {}; this.globalSettings = globalSettings || {}; this.semaphores = {}; this.globalSemaphore = null; this.abortController = new AbortController(); this.running = false; this.stopAfterActive = false; this.statsInterval = null; this.startTime = 0; this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded } this.jobAbortControllers = new Map(); // jobId -> AbortController this.cancelledJobIds = new Set(); this.sessionBytes = 0; this.lastStartTime = {}; // hoster -> timestamp of last upload start this.globalThrottle = null; } _getSettings(hoster) { const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) }; const globalLimit = this._getGlobalParallelLimit(); if (this.globalSettings.scaleParallelUploads && globalLimit > 0) { settings.parallelCount = Math.max(settings.parallelCount || 1, globalLimit); } return settings; } _getGlobalParallelLimit() { const raw = Number(this.globalSettings.parallelUploadCount || 0); if (!Number.isFinite(raw) || raw <= 0) return 0; return Math.max(1, Math.min(100, Math.round(raw))); } _getGlobalSemaphore() { const limit = this._getGlobalParallelLimit(); if (limit <= 0) return null; if (!this.globalSemaphore) { this.globalSemaphore = new Semaphore(limit); } else { this.globalSemaphore.updateLimit(limit); } return this.globalSemaphore; } _getGlobalThrottle() { const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0); if (!Number.isFinite(kbs) || kbs <= 0) return null; if (!this.globalThrottle) { this.globalThrottle = new Throttle(kbs * 1024); } else { this.globalThrottle.updateRate(kbs * 1024); } return this.globalThrottle; } _getSemaphore(hoster) { if (!this.semaphores[hoster]) { const settings = this._getSettings(hoster); this.semaphores[hoster] = new Semaphore(settings.parallelCount); } else { this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount); } return this.semaphores[hoster]; } async startBatch(tasks) { this.running = true; this.stopAfterActive = false; this.abortController = new AbortController(); this.startTime = Date.now(); this.sessionBytes = 0; this.activeJobs.clear(); this.jobAbortControllers.clear(); this.cancelledJobIds.clear(); this.semaphores = {}; this.globalSemaphore = null; this.globalThrottle = null; this.lastStartTime = {}; const { signal } = this.abortController; const batchId = `batch-${Date.now()}`; const results = new Map(); // filePath -> { name, size, results: [] } for (const task of tasks) { const fileName = path.basename(task.file); if (!results.has(task.file)) { let size = 0; try { size = fs.statSync(task.file).size; } catch {} results.set(task.file, { name: fileName, size, results: [] }); } } this._startStatsTimer(); const promises = tasks.map((task) => this._runJob(task, results, signal)); await Promise.allSettled(promises); this._stopStatsTimer(); this.running = false; const files = Array.from(results.values()); const total = tasks.length; const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0); const summary = { id: batchId, timestamp: new Date().toISOString(), total, succeeded, failed: total - succeeded, files }; this.emit('batch-done', summary); } async _runJob(task, results, batchSignal) { const settings = this._getSettings(task.hoster); const hosterSemaphore = this._getSemaphore(task.hoster); const globalSemaphore = this._getGlobalSemaphore(); const uploadId = crypto.randomBytes(8).toString('hex'); const jobId = task.jobId || uploadId; const fileName = path.basename(task.file); let fileSize = 0; try { fileSize = fs.statSync(task.file).size; } catch {} const maxAttempts = Math.max(1, (settings.retries || 0) + 1); const jobAbortController = new AbortController(); const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal); this.jobAbortControllers.set(jobId, jobAbortController); let hosterSlotAcquired = false; let globalSlotAcquired = false; let finalResultRecorded = false; let lastError = null; const recordFinalResult = (status, payload = {}) => { if (finalResultRecorded) return; finalResultRecorded = true; const result = { hoster: task.hoster, status, error: payload.error || null, download_url: payload.result ? payload.result.download_url || null : null, embed_url: payload.result ? payload.result.embed_url || null : null, file_code: payload.result ? payload.result.file_code || null : null }; results.get(task.file).results.push(result); }; const emitFinalStatus = (status, payload = {}) => { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status, progress: status === 'done' ? 1 : 0, bytesUploaded: status === 'done' ? fileSize : 0, bytesTotal: fileSize, speedKbs: payload.speedKbs || 0, elapsed: payload.elapsed || 0, remaining: 0, error: payload.error || null, result: payload.result || null, attempt: payload.attempt || maxAttempts, maxAttempts }); }; try { if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) { const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`; emitFinalStatus('skipped', { error, attempt: 0 }); recordFinalResult('error', { error }); return; } this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'queued', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt: 0, maxAttempts }); if (globalSemaphore) { await globalSemaphore.acquire(signal); globalSlotAcquired = true; } await hosterSemaphore.acquire(signal); hosterSlotAcquired = true; if (settings.timeIntervalSec > 0) { await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal); } for (let attempt = 1; attempt <= maxAttempts; attempt++) { if (signal.aborted || this.stopAfterActive) break; if (attempt > 1) { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: lastError ? lastError.message : null, result: null, attempt, maxAttempts }); await this._sleep(2500, signal); } const jobStart = Date.now(); let lastBytes = 0; let lastSpeedTime = jobStart; let currentSpeedKbs = 0; let lowSpeedSince = 0; let speedAbort = null; let speedMonitor = null; let uploadSignalBundle = { signal, cleanup() {} }; try { this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'getting-server', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt, maxAttempts }); const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null; const globalThrottle = this._getGlobalThrottle(); const throttle = hosterThrottle && globalThrottle ? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } } : hosterThrottle || globalThrottle; if (settings.restartBelowKbs > 0) { speedAbort = new AbortController(); uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]); speedMonitor = setInterval(() => { if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) { if (!lowSpeedSince) lowSpeedSince = Date.now(); if (Date.now() - lowSpeedSince > 6000) { speedAbort.abort(); } } else { lowSpeedSince = 0; } }, 2000); } this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 }); const progressCb = (bytesUploaded, bytesTotal) => { const now = Date.now(); const elapsed = Math.round((now - jobStart) / 1000); const timeDelta = (now - lastSpeedTime) / 1000; if (timeDelta >= 1) { const bytesDelta = bytesUploaded - lastBytes; currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024); lastBytes = bytesUploaded; lastSpeedTime = now; } const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0; this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded }); this._emitProgress(uploadId, fileName, task.hoster, { jobId, status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0, bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs, elapsed, remaining, error: null, result: null, attempt, maxAttempts }); }; let result; if (task.hoster === 'vidmoly.me' && task.username) { const vidmoly = new VidmolyUploader(); await vidmoly.login(task.username, task.password); result = await vidmoly.upload(task.file, progressCb, uploadSignalBundle.signal, throttle); } else if (task.hoster === 'voe.sx' && task.username) { const voe = new VoeUploader(); await voe.login(task.username, task.password); result = await voe.upload(task.file, progressCb, uploadSignalBundle.signal, throttle); } else if (task.hoster === 'doodstream.com' && task.username) { const dood = new DoodstreamUploader(); await dood.login(task.username, task.password); result = await dood.upload(task.file, progressCb, uploadSignalBundle.signal, throttle); } else { result = await uploadFile(task.hoster, task.file, task.apiKey, progressCb, uploadSignalBundle.signal, throttle); } const elapsed = Math.round((Date.now() - jobStart) / 1000); this.sessionBytes += fileSize; this.activeJobs.delete(uploadId); emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed, attempt }); recordFinalResult('done', { result }); return; } catch (err) { this.activeJobs.delete(uploadId); const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted; if (signal.aborted) { lastError = new Error('Abgebrochen'); break; } if (this.stopAfterActive) { lastError = new Error('Angehalten'); break; } if (isSpeedRestart && attempt < maxAttempts) { lastError = new Error('Geschwindigkeit zu niedrig - Neustart'); await this._sleep(3000, signal); continue; } lastError = err; if (attempt >= maxAttempts) break; // Wait 3 seconds before retry await this._sleep(3000, signal); } finally { if (speedMonitor) clearInterval(speedMonitor); uploadSignalBundle.cleanup(); } } const wasStopped = this.stopAfterActive && !signal.aborted; const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId); if (wasStopped || wasAborted) { const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen'; emitFinalStatus('aborted', { error }); recordFinalResult('aborted', { error }); return; } const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler'; emitFinalStatus('error', { error }); recordFinalResult('error', { error }); } catch (err) { const wasStopped = this.stopAfterActive && !signal.aborted; const error = wasStopped ? 'Warteschlange angehalten' : (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler')); const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error'; emitFinalStatus(status, { error }); recordFinalResult(status === 'error' ? 'error' : 'aborted', { error }); } finally { this.activeJobs.delete(uploadId); this.jobAbortControllers.delete(jobId); cleanupSignals(); if (hosterSlotAcquired) hosterSemaphore.release(); if (globalSlotAcquired && globalSemaphore) globalSemaphore.release(); } } _emitProgress(uploadId, fileName, hoster, data) { this.emit('progress', { uploadId, fileName, hoster, ...data }); } _startStatsTimer() { this.statsInterval = setInterval(() => { let globalSpeedKbs = 0; let activeCount = 0; for (const job of this.activeJobs.values()) { globalSpeedKbs += job.speedKbs || 0; activeCount++; } const elapsed = Math.round((Date.now() - this.startTime) / 1000); let inProgressBytes = 0; for (const job of this.activeJobs.values()) { inProgressBytes += job.bytesUploaded || 0; } this.emit('stats', { state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle', globalSpeedKbs, totalBytes: this.sessionBytes + inProgressBytes, elapsed, activeJobs: activeCount, pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0) }); }, 1000); } _stopStatsTimer() { if (this.statsInterval) { clearInterval(this.statsInterval); this.statsInterval = null; } } _combineSignals(signal1, signal2) { const controller = new AbortController(); if (signal1.aborted || signal2.aborted) { controller.abort(); return { signal: controller.signal, cleanup() {} }; } const onAbort = () => { controller.abort(); cleanup(); }; const cleanup = () => { signal1.removeEventListener('abort', onAbort); signal2.removeEventListener('abort', onAbort); }; signal1.addEventListener('abort', onAbort, { once: true }); signal2.addEventListener('abort', onAbort, { once: true }); return { signal: controller.signal, cleanup }; } _combineManySignals(signals) { const liveSignals = signals.filter(Boolean); const controller = new AbortController(); if (liveSignals.some((signal) => signal.aborted)) { controller.abort(); return { signal: controller.signal, cleanup() {} }; } const listeners = liveSignals.map((signal) => { const handler = () => { controller.abort(); cleanup(); }; signal.addEventListener('abort', handler, { once: true }); return { signal, handler }; }); const cleanup = () => { for (const entry of listeners) { entry.signal.removeEventListener('abort', entry.handler); } }; return { signal: controller.signal, cleanup }; } _sleep(ms, signal) { return new Promise((resolve, reject) => { const onAbort = () => { clearTimeout(timer); reject(new Error('Aborted')); }; const timer = setTimeout(() => { if (signal) signal.removeEventListener('abort', onAbort); resolve(); }, ms); if (signal) { if (signal.aborted) { clearTimeout(timer); reject(new Error('Aborted')); return; } signal.addEventListener('abort', onAbort, { once: true }); } }); } async _waitForInterval(hoster, intervalMs, signal) { const now = Date.now(); const last = this.lastStartTime[hoster] || 0; const elapsed = now - last; if (elapsed < intervalMs) { await this._sleep(intervalMs - elapsed, signal); } this.lastStartTime[hoster] = Date.now(); } cancelJobs(jobIds) { for (const jobId of jobIds || []) { if (!jobId) continue; this.cancelledJobIds.add(jobId); const controller = this.jobAbortControllers.get(jobId); if (controller && !controller.signal.aborted) { controller.abort(); } } } finishAfterActive() { this.stopAfterActive = true; } cancel() { if (!this.running) return; this.abortController.abort(); this.stopAfterActive = false; this.running = false; for (const controller of this.jobAbortControllers.values()) { if (!controller.signal.aborted) controller.abort(); } this._stopStatsTimer(); } } module.exports = UploadManager;