Multi-Hoster-Upload/lib/upload-manager.js
Administrator 25b2afbf11 feat: add queue system, per-hoster settings, retry logic, and full UI overhaul
- Add FIFO semaphore for per-hoster concurrency control
- Add token-bucket speed limiter with abort signal support
- Rewrite upload-manager with retry loop, speed monitoring, and rich progress events
- Add per-hoster settings: retries, max speed, parallel count, restart below speed, time interval, max size
- Add context menu with shutdown-after-finish (sleep/shutdown/restart), always-on-top
- Add z-o-o-m-style queue table with 8 columns, status-colored rows, progress bars
- Add debounced queue rendering with scroll position preservation
- Add statusbar with global speed, total bytes, elapsed time
- Fix speedMonitor interval leak on error and scoping bug
- Fix throttle not respecting abort signal during cancellation
- Fix combined signal listener cleanup
- Bump version to 1.1.0

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 05:57:00 +01:00

391 lines
12 KiB
JavaScript

const { EventEmitter } = require('events');
const path = require('path');
const fs = require('fs');
const crypto = require('crypto');
const { uploadFile } = require('./hosters');
const VidmolyUploader = require('./vidmoly-upload');
const Semaphore = require('./semaphore');
const Throttle = require('./throttle');
const DEFAULT_SETTINGS = {
retries: 3,
maxSpeedKbs: 0,
parallelCount: 2,
restartBelowKbs: 0,
timeIntervalSec: 0,
maxSizeMb: 0
};
class UploadManager extends EventEmitter {
constructor(hosterSettings) {
super();
this.hosterSettings = hosterSettings || {};
this.semaphores = {};
this.abortController = new AbortController();
this.running = false;
this.statsInterval = null;
this.startTime = 0;
this.activeJobs = new Map(); // uploadId -> { speedKbs, bytesUploaded }
this.sessionBytes = 0;
}
_getSettings(hoster) {
return { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
}
_getSemaphore(hoster) {
if (!this.semaphores[hoster]) {
const settings = this._getSettings(hoster);
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
}
return this.semaphores[hoster];
}
async startBatch(tasks) {
this.running = true;
this.abortController = new AbortController();
this.startTime = Date.now();
this.sessionBytes = 0;
this.activeJobs.clear();
const { signal } = this.abortController;
const batchId = `batch-${Date.now()}`;
const results = new Map(); // filePath -> { name, size, results: [] }
// Initialize result map per file
for (const task of tasks) {
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
}
// Start global stats emitter
this._startStatsTimer();
// Create job promises — semaphore controls concurrency per hoster
const promises = tasks.map((task) => this._runJob(task, results, signal));
await Promise.allSettled(promises);
this._stopStatsTimer();
this.running = false;
const files = Array.from(results.values());
const total = tasks.length;
const succeeded = files.reduce((n, f) => n + f.results.filter(r => r.status === 'done').length, 0);
const summary = {
id: batchId,
timestamp: new Date().toISOString(),
total,
succeeded,
failed: total - succeeded,
files
};
this.emit('batch-done', summary);
}
async _runJob(task, results, signal) {
const settings = this._getSettings(task.hoster);
const semaphore = this._getSemaphore(task.hoster);
const uploadId = crypto.randomBytes(8).toString('hex');
const fileName = path.basename(task.file);
let fileSize = 0;
try { fileSize = fs.statSync(task.file).size; } catch {}
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
// File size filter
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
const errMsg = `Datei zu gross (Max: ${settings.maxSizeMb} MB)`;
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'skipped', progress: 0,
bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: errMsg, result: null, attempt: 0, maxAttempts
});
results.get(task.file).results.push({
hoster: task.hoster, status: 'error', error: errMsg,
download_url: null, embed_url: null, file_code: null
});
return;
}
// Emit queued status
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'queued', progress: 0,
bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: null, result: null, attempt: 0, maxAttempts
});
// Wait for semaphore slot
await semaphore.acquire();
if (signal.aborted) {
semaphore.release();
return;
}
// Time interval delay between jobs
if (settings.timeIntervalSec > 0) {
await this._sleep(settings.timeIntervalSec * 1000, signal).catch(() => {});
if (signal.aborted) {
semaphore.release();
return;
}
}
let lastError = null;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted) break;
// Retry delay
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'retrying', progress: 0,
bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: lastError ? lastError.message : null,
result: null, attempt, maxAttempts
});
await this._sleep(2500, signal).catch(() => {});
if (signal.aborted) break;
}
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
let lowSpeedSince = 0;
let speedAbort = null;
// Register active job for global stats
this.activeJobs.set(uploadId, { speedKbs: 0, bytesUploaded: 0 });
// Speed monitor interval (declared outside try for cleanup in catch)
let speedMonitor = null;
try {
// Getting server
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'getting-server', progress: 0,
bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: null, result: null, attempt, maxAttempts
});
// Create per-job throttle
const throttle = settings.maxSpeedKbs > 0
? new Throttle(settings.maxSpeedKbs * 1024)
: null;
// Speed monitor: abort if too slow
if (settings.restartBelowKbs > 0) {
speedAbort = new AbortController();
}
// Combined signal
const jobSignal = speedAbort
? this._combineSignals(signal, speedAbort.signal)
: signal;
if (settings.restartBelowKbs > 0) {
speedMonitor = setInterval(() => {
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
if (!lowSpeedSince) lowSpeedSince = Date.now();
if (Date.now() - lowSpeedSince > 6000) {
if (speedAbort) speedAbort.abort();
clearInterval(speedMonitor);
}
} else {
lowSpeedSince = 0;
}
}, 2000);
}
// Progress callback with speed tracking
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const elapsed = Math.round((now - jobStart) / 1000);
// Speed calculation (update every ~1s)
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
const bytesDelta = bytesUploaded - lastBytes;
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
const remaining = currentSpeedKbs > 0
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
: 0;
// Update active job stats for global aggregation
this.activeJobs.set(uploadId, { speedKbs: currentSpeedKbs, bytesUploaded });
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'uploading', progress: bytesTotal > 0 ? bytesUploaded / bytesTotal : 0,
bytesUploaded, bytesTotal: bytesTotal,
speedKbs: currentSpeedKbs, elapsed, remaining,
error: null, result: null, attempt, maxAttempts
});
};
let result;
if (task.hoster === 'vidmoly.me' && task.username) {
const vidmoly = new VidmolyUploader();
await vidmoly.login(task.username, task.password);
result = await vidmoly.upload(task.file, progressCb, jobSignal, throttle);
} else {
result = await uploadFile(task.hoster, task.file, task.apiKey, progressCb, jobSignal, throttle);
}
// Clear speed monitor
if (speedMonitor) clearInterval(speedMonitor);
// Track session bytes
this.sessionBytes += fileSize;
this.activeJobs.delete(uploadId);
// Success
const elapsed = Math.round((Date.now() - jobStart) / 1000);
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'done', progress: 1,
bytesUploaded: fileSize, bytesTotal: fileSize,
speedKbs: currentSpeedKbs, elapsed, remaining: 0,
error: null, result, attempt, maxAttempts
});
results.get(task.file).results.push({
hoster: task.hoster, status: 'done', ...result
});
semaphore.release();
return; // Success — exit retry loop
} catch (err) {
// Clear speed monitor interval on error
if (speedMonitor) { clearInterval(speedMonitor); speedMonitor = null; }
if (speedAbort) {
// Check if this was a speed restart
try { speedAbort.abort(); } catch {}
}
this.activeJobs.delete(uploadId);
if (signal.aborted) {
lastError = err;
break;
}
// Check if speed restart (not user abort)
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
if (isSpeedRestart && attempt < maxAttempts) {
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
continue;
}
lastError = err;
if (attempt >= maxAttempts) break;
}
}
// All attempts exhausted
this.activeJobs.delete(uploadId);
const errorMsg = signal.aborted
? 'Abgebrochen'
: (lastError ? lastError.message : 'Unbekannter Fehler');
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'error', progress: 0,
bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: errorMsg, result: null,
attempt: maxAttempts, maxAttempts
});
results.get(task.file).results.push({
hoster: task.hoster, status: 'error', error: errorMsg,
download_url: null, embed_url: null, file_code: null
});
semaphore.release();
}
_emitProgress(uploadId, fileName, hoster, data) {
this.emit('progress', { uploadId, fileName, hoster, ...data });
}
_startStatsTimer() {
this.statsInterval = setInterval(() => {
let globalSpeedKbs = 0;
let activeCount = 0;
for (const job of this.activeJobs.values()) {
globalSpeedKbs += job.speedKbs || 0;
activeCount++;
}
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
// Sum in-progress bytes for live total
let inProgressBytes = 0;
for (const job of this.activeJobs.values()) {
inProgressBytes += job.bytesUploaded || 0;
}
this.emit('stats', {
state: this.running ? 'uploading' : 'idle',
globalSpeedKbs,
totalBytes: this.sessionBytes + inProgressBytes,
elapsed,
activeJobs: activeCount,
pendingJobs: Object.values(this.semaphores).reduce((sum, s) => sum + s.pending, 0)
});
}, 1000);
}
_stopStatsTimer() {
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
}
_combineSignals(signal1, signal2) {
const controller = new AbortController();
if (signal1.aborted || signal2.aborted) { controller.abort(); return controller.signal; }
const cleanup = () => {
signal1.removeEventListener('abort', onAbort);
signal2.removeEventListener('abort', onAbort);
};
const onAbort = () => { controller.abort(); cleanup(); };
signal1.addEventListener('abort', onAbort, { once: true });
signal2.addEventListener('abort', onAbort, { once: true });
return controller.signal;
}
_sleep(ms, signal) {
return new Promise((resolve, reject) => {
const timer = setTimeout(resolve, ms);
if (signal) {
if (signal.aborted) { clearTimeout(timer); reject(new Error('Aborted')); return; }
signal.addEventListener('abort', () => { clearTimeout(timer); reject(new Error('Aborted')); }, { once: true });
}
});
}
cancel() {
if (this.running) {
this.abortController.abort();
this.running = false;
this._stopStatsTimer();
this.activeJobs.clear();
}
}
}
module.exports = UploadManager;