const { EventEmitter } = require('events'); const path = require('path'); const fs = require('fs'); const crypto = require('crypto'); const { uploadFile } = require('./hosters'); const VidmolyUploader = require('./vidmoly-upload'); const VoeUploader = require('./voe-upload'); const Semaphore = require('./semaphore'); const Throttle = require('./throttle'); const DEFAULT_SETTINGS = { retries: 3, maxSpeedKbs: 0, parallelCount: 2, restartBelowKbs: 0, timeIntervalSec: 0, maxSizeMb: 0 }; class UploadManager extends EventEmitter { constructor(hosterSettings) { super(); this.hosterSettings = hosterSettings || {}; this.semaphores = {}; this.abortController = new AbortController(); this.running = false; this.statsInterval = null; this.startTime = 0; this.activeJobs = new Map(); // uploadId -> { speedKbs, bytesUploaded } this.sessionBytes = 0; } _getSettings(hoster) { return { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) }; } _getSemaphore(hoster) { if (!this.semaphores[hoster]) { const settings = this._getSettings(hoster); this.semaphores[hoster] = new Semaphore(settings.parallelCount); } return this.semaphores[hoster]; } async startBatch(tasks) { this.running = true; this.abortController = new AbortController(); this.startTime = Date.now(); this.sessionBytes = 0; this.activeJobs.clear(); const { signal } = this.abortController; const batchId = `batch-${Date.now()}`; const results = new Map(); // filePath -> { name, size, results: [] } // Initialize result map per file for (const task of tasks) { const fileName = path.basename(task.file); if (!results.has(task.file)) { let size = 0; try { size = fs.statSync(task.file).size; } catch {} results.set(task.file, { name: fileName, size, results: [] }); } } // Start global stats emitter this._startStatsTimer(); // Create job promises — semaphore controls concurrency per hoster const promises = tasks.map((task) => this._runJob(task, results, signal)); await Promise.allSettled(promises); this._stopStatsTimer(); this.running = false; const files = Array.from(results.values()); const total = tasks.length; const succeeded = files.reduce((n, f) => n + f.results.filter(r => r.status === 'done').length, 0); const summary = { id: batchId, timestamp: new Date().toISOString(), total, succeeded, failed: total - succeeded, files }; this.emit('batch-done', summary); } async _runJob(task, results, signal) { const settings = this._getSettings(task.hoster); const semaphore = this._getSemaphore(task.hoster); const uploadId = crypto.randomBytes(8).toString('hex'); const fileName = path.basename(task.file); let fileSize = 0; try { fileSize = fs.statSync(task.file).size; } catch {} const maxAttempts = Math.max(1, (settings.retries || 0) + 1); // File size filter if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) { const errMsg = `Datei zu gross (Max: ${settings.maxSizeMb} MB)`; this._emitProgress(uploadId, fileName, task.hoster, { status: 'skipped', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: errMsg, result: null, attempt: 0, maxAttempts }); results.get(task.file).results.push({ hoster: task.hoster, status: 'error', error: errMsg, download_url: null, embed_url: null, file_code: null }); return; } // Emit queued status this._emitProgress(uploadId, fileName, task.hoster, { status: 'queued', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt: 0, maxAttempts }); // Wait for semaphore slot (abortable) try { await semaphore.acquire(signal); } catch { // Aborted while waiting in queue — no slot was granted, no release needed return; } if (signal.aborted) { semaphore.release(); return; } // Time interval delay between jobs if (settings.timeIntervalSec > 0) { await this._sleep(settings.timeIntervalSec * 1000, signal).catch(() => {}); if (signal.aborted) { semaphore.release(); return; } } let lastError = null; for (let attempt = 1; attempt <= maxAttempts; attempt++) { if (signal.aborted) break; // Retry delay if (attempt > 1) { this._emitProgress(uploadId, fileName, task.hoster, { status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: lastError ? lastError.message : null, result: null, attempt, maxAttempts }); await this._sleep(2500, signal).catch(() => {}); if (signal.aborted) break; } const jobStart = Date.now(); let lastBytes = 0; let lastSpeedTime = jobStart; let currentSpeedKbs = 0; let lowSpeedSince = 0; let speedAbort = null; // Register active job for global stats this.activeJobs.set(uploadId, { speedKbs: 0, bytesUploaded: 0 }); // Speed monitor and signal cleanup (declared outside try for cleanup in catch) let speedMonitor = null; let signalCleanup = null; try { // Getting server this._emitProgress(uploadId, fileName, task.hoster, { status: 'getting-server', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: null, result: null, attempt, maxAttempts }); // Create per-job throttle const throttle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null; // Speed monitor: abort if too slow if (settings.restartBelowKbs > 0) { speedAbort = new AbortController(); } // Combined signal let jobSignal = signal; if (speedAbort) { const combined = this._combineSignals(signal, speedAbort.signal); jobSignal = combined.signal; signalCleanup = combined.cleanup; } if (settings.restartBelowKbs > 0) { speedMonitor = setInterval(() => { if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) { if (!lowSpeedSince) lowSpeedSince = Date.now(); if (Date.now() - lowSpeedSince > 6000) { if (speedAbort) speedAbort.abort(); clearInterval(speedMonitor); } } else { lowSpeedSince = 0; } }, 2000); } // Progress callback with speed tracking const progressCb = (bytesUploaded, bytesTotal) => { const now = Date.now(); const elapsed = Math.round((now - jobStart) / 1000); // Speed calculation (update every ~1s) const timeDelta = (now - lastSpeedTime) / 1000; if (timeDelta >= 1) { const bytesDelta = bytesUploaded - lastBytes; currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024); lastBytes = bytesUploaded; lastSpeedTime = now; } const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0; // Update active job stats for global aggregation this.activeJobs.set(uploadId, { speedKbs: currentSpeedKbs, bytesUploaded }); this._emitProgress(uploadId, fileName, task.hoster, { status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0, bytesUploaded, bytesTotal: bytesTotal, speedKbs: currentSpeedKbs, elapsed, remaining, error: null, result: null, attempt, maxAttempts }); }; let result; if (task.hoster === 'vidmoly.me' && task.username) { const vidmoly = new VidmolyUploader(); await vidmoly.login(task.username, task.password); result = await vidmoly.upload(task.file, progressCb, jobSignal, throttle); } else if (task.hoster === 'voe.sx' && task.username) { const voe = new VoeUploader(); await voe.login(task.username, task.password); result = await voe.upload(task.file, progressCb, jobSignal, throttle); } else { result = await uploadFile(task.hoster, task.file, task.apiKey, progressCb, jobSignal, throttle); } // Clear speed monitor and signal listeners if (speedMonitor) clearInterval(speedMonitor); if (signalCleanup) signalCleanup(); // Track session bytes this.sessionBytes += fileSize; this.activeJobs.delete(uploadId); // Success const elapsed = Math.round((Date.now() - jobStart) / 1000); this._emitProgress(uploadId, fileName, task.hoster, { status: 'done', progress: 1, bytesUploaded: fileSize, bytesTotal: fileSize, speedKbs: currentSpeedKbs, elapsed, remaining: 0, error: null, result, attempt, maxAttempts }); results.get(task.file).results.push({ hoster: task.hoster, status: 'done', ...result }); semaphore.release(); return; // Success — exit retry loop } catch (err) { // Clear speed monitor interval and signal listeners on error if (speedMonitor) { clearInterval(speedMonitor); speedMonitor = null; } if (signalCleanup) { signalCleanup(); signalCleanup = null; } if (speedAbort) { // Check if this was a speed restart try { speedAbort.abort(); } catch {} } this.activeJobs.delete(uploadId); if (signal.aborted) { lastError = err; break; } // Check if speed restart (not user abort) const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted; if (isSpeedRestart && attempt < maxAttempts) { lastError = new Error('Geschwindigkeit zu niedrig - Neustart'); continue; } lastError = err; if (attempt >= maxAttempts) break; } } // All attempts exhausted this.activeJobs.delete(uploadId); const errorMsg = signal.aborted ? 'Abgebrochen' : (lastError ? lastError.message : 'Unbekannter Fehler'); this._emitProgress(uploadId, fileName, task.hoster, { status: 'error', progress: 0, bytesUploaded: 0, bytesTotal: fileSize, speedKbs: 0, elapsed: 0, remaining: 0, error: errorMsg, result: null, attempt: maxAttempts, maxAttempts }); results.get(task.file).results.push({ hoster: task.hoster, status: 'error', error: errorMsg, download_url: null, embed_url: null, file_code: null }); semaphore.release(); } _emitProgress(uploadId, fileName, hoster, data) { this.emit('progress', { uploadId, fileName, hoster, ...data }); } _startStatsTimer() { this.statsInterval = setInterval(() => { let globalSpeedKbs = 0; let activeCount = 0; for (const job of this.activeJobs.values()) { globalSpeedKbs += job.speedKbs || 0; activeCount++; } const elapsed = Math.round((Date.now() - this.startTime) / 1000); // Sum in-progress bytes for live total let inProgressBytes = 0; for (const job of this.activeJobs.values()) { inProgressBytes += job.bytesUploaded || 0; } this.emit('stats', { state: this.running ? 'uploading' : 'idle', globalSpeedKbs, totalBytes: this.sessionBytes + inProgressBytes, elapsed, activeJobs: activeCount, pendingJobs: Object.values(this.semaphores).reduce((sum, s) => sum + s.pending, 0) }); }, 1000); } _stopStatsTimer() { if (this.statsInterval) { clearInterval(this.statsInterval); this.statsInterval = null; } } _combineSignals(signal1, signal2) { const controller = new AbortController(); if (signal1.aborted || signal2.aborted) { controller.abort(); return { signal: controller.signal, cleanup() {} }; } const cleanup = () => { signal1.removeEventListener('abort', onAbort); signal2.removeEventListener('abort', onAbort); }; const onAbort = () => { controller.abort(); cleanup(); }; signal1.addEventListener('abort', onAbort, { once: true }); signal2.addEventListener('abort', onAbort, { once: true }); return { signal: controller.signal, cleanup }; } _sleep(ms, signal) { return new Promise((resolve, reject) => { const onAbort = () => { clearTimeout(timer); reject(new Error('Aborted')); }; const timer = setTimeout(() => { if (signal) signal.removeEventListener('abort', onAbort); resolve(); }, ms); if (signal) { if (signal.aborted) { clearTimeout(timer); reject(new Error('Aborted')); return; } signal.addEventListener('abort', onAbort, { once: true }); } }); } cancel() { if (this.running) { this.abortController.abort(); this.running = false; this._stopStatsTimer(); this.activeJobs.clear(); } } } module.exports = UploadManager;