- Add doodstream.com web login (email+password) as alternative to API key - Fix doodstream login: use X-Requested-With header for JSON response - Add "Aus der Queue entfernen bei Abschluss" setting - Fix byse.sx download URLs to use /d/ prefix - Make config writes async to prevent race conditions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
416 lines
14 KiB
JavaScript
416 lines
14 KiB
JavaScript
const { EventEmitter } = require('events');
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const crypto = require('crypto');
|
|
const { uploadFile } = require('./hosters');
|
|
const VidmolyUploader = require('./vidmoly-upload');
|
|
const VoeUploader = require('./voe-upload');
|
|
const DoodstreamUploader = require('./doodstream-upload');
|
|
const Semaphore = require('./semaphore');
|
|
const Throttle = require('./throttle');
|
|
|
|
const DEFAULT_SETTINGS = {
|
|
retries: 3,
|
|
maxSpeedKbs: 0,
|
|
parallelCount: 2,
|
|
restartBelowKbs: 0,
|
|
timeIntervalSec: 0,
|
|
maxSizeMb: 0
|
|
};
|
|
|
|
class UploadManager extends EventEmitter {
|
|
constructor(hosterSettings) {
|
|
super();
|
|
this.hosterSettings = hosterSettings || {};
|
|
this.semaphores = {};
|
|
this.abortController = new AbortController();
|
|
this.running = false;
|
|
this.statsInterval = null;
|
|
this.startTime = 0;
|
|
this.activeJobs = new Map(); // uploadId -> { speedKbs, bytesUploaded }
|
|
this.sessionBytes = 0;
|
|
}
|
|
|
|
_getSettings(hoster) {
|
|
return { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
|
|
}
|
|
|
|
_getSemaphore(hoster) {
|
|
if (!this.semaphores[hoster]) {
|
|
const settings = this._getSettings(hoster);
|
|
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
|
|
}
|
|
return this.semaphores[hoster];
|
|
}
|
|
|
|
async startBatch(tasks) {
|
|
this.running = true;
|
|
this.abortController = new AbortController();
|
|
this.startTime = Date.now();
|
|
this.sessionBytes = 0;
|
|
this.activeJobs.clear();
|
|
const { signal } = this.abortController;
|
|
|
|
const batchId = `batch-${Date.now()}`;
|
|
const results = new Map(); // filePath -> { name, size, results: [] }
|
|
|
|
// Initialize result map per file
|
|
for (const task of tasks) {
|
|
const fileName = path.basename(task.file);
|
|
if (!results.has(task.file)) {
|
|
let size = 0;
|
|
try { size = fs.statSync(task.file).size; } catch {}
|
|
results.set(task.file, { name: fileName, size, results: [] });
|
|
}
|
|
}
|
|
|
|
// Start global stats emitter
|
|
this._startStatsTimer();
|
|
|
|
// Create job promises — semaphore controls concurrency per hoster
|
|
const promises = tasks.map((task) => this._runJob(task, results, signal));
|
|
await Promise.allSettled(promises);
|
|
|
|
this._stopStatsTimer();
|
|
this.running = false;
|
|
|
|
const files = Array.from(results.values());
|
|
const total = tasks.length;
|
|
const succeeded = files.reduce((n, f) => n + f.results.filter(r => r.status === 'done').length, 0);
|
|
|
|
const summary = {
|
|
id: batchId,
|
|
timestamp: new Date().toISOString(),
|
|
total,
|
|
succeeded,
|
|
failed: total - succeeded,
|
|
files
|
|
};
|
|
|
|
this.emit('batch-done', summary);
|
|
}
|
|
|
|
async _runJob(task, results, signal) {
|
|
const settings = this._getSettings(task.hoster);
|
|
const semaphore = this._getSemaphore(task.hoster);
|
|
const uploadId = crypto.randomBytes(8).toString('hex');
|
|
const fileName = path.basename(task.file);
|
|
let fileSize = 0;
|
|
try { fileSize = fs.statSync(task.file).size; } catch {}
|
|
|
|
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
|
|
|
|
// File size filter
|
|
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
|
|
const errMsg = `Datei zu gross (Max: ${settings.maxSizeMb} MB)`;
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'skipped', progress: 0,
|
|
bytesUploaded: 0, bytesTotal: fileSize,
|
|
speedKbs: 0, elapsed: 0, remaining: 0,
|
|
error: errMsg, result: null, attempt: 0, maxAttempts
|
|
});
|
|
results.get(task.file).results.push({
|
|
hoster: task.hoster, status: 'error', error: errMsg,
|
|
download_url: null, embed_url: null, file_code: null
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Emit queued status
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'queued', progress: 0,
|
|
bytesUploaded: 0, bytesTotal: fileSize,
|
|
speedKbs: 0, elapsed: 0, remaining: 0,
|
|
error: null, result: null, attempt: 0, maxAttempts
|
|
});
|
|
|
|
// Wait for semaphore slot (abortable)
|
|
try {
|
|
await semaphore.acquire(signal);
|
|
} catch {
|
|
// Aborted while waiting in queue — no slot was granted, no release needed
|
|
return;
|
|
}
|
|
if (signal.aborted) {
|
|
semaphore.release();
|
|
return;
|
|
}
|
|
|
|
// Time interval delay between jobs
|
|
if (settings.timeIntervalSec > 0) {
|
|
await this._sleep(settings.timeIntervalSec * 1000, signal).catch(() => {});
|
|
if (signal.aborted) {
|
|
semaphore.release();
|
|
return;
|
|
}
|
|
}
|
|
|
|
let lastError = null;
|
|
|
|
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
|
if (signal.aborted) break;
|
|
|
|
// Retry delay
|
|
if (attempt > 1) {
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'retrying', progress: 0,
|
|
bytesUploaded: 0, bytesTotal: fileSize,
|
|
speedKbs: 0, elapsed: 0, remaining: 0,
|
|
error: lastError ? lastError.message : null,
|
|
result: null, attempt, maxAttempts
|
|
});
|
|
await this._sleep(2500, signal).catch(() => {});
|
|
if (signal.aborted) break;
|
|
}
|
|
|
|
const jobStart = Date.now();
|
|
let lastBytes = 0;
|
|
let lastSpeedTime = jobStart;
|
|
let currentSpeedKbs = 0;
|
|
let lowSpeedSince = 0;
|
|
let speedAbort = null;
|
|
|
|
// Register active job for global stats
|
|
this.activeJobs.set(uploadId, { speedKbs: 0, bytesUploaded: 0 });
|
|
|
|
// Speed monitor and signal cleanup (declared outside try for cleanup in catch)
|
|
let speedMonitor = null;
|
|
let signalCleanup = null;
|
|
|
|
try {
|
|
// Getting server
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'getting-server', progress: 0,
|
|
bytesUploaded: 0, bytesTotal: fileSize,
|
|
speedKbs: 0, elapsed: 0, remaining: 0,
|
|
error: null, result: null, attempt, maxAttempts
|
|
});
|
|
|
|
// Create per-job throttle
|
|
const throttle = settings.maxSpeedKbs > 0
|
|
? new Throttle(settings.maxSpeedKbs * 1024)
|
|
: null;
|
|
|
|
// Speed monitor: abort if too slow
|
|
if (settings.restartBelowKbs > 0) {
|
|
speedAbort = new AbortController();
|
|
}
|
|
|
|
// Combined signal
|
|
let jobSignal = signal;
|
|
if (speedAbort) {
|
|
const combined = this._combineSignals(signal, speedAbort.signal);
|
|
jobSignal = combined.signal;
|
|
signalCleanup = combined.cleanup;
|
|
}
|
|
if (settings.restartBelowKbs > 0) {
|
|
speedMonitor = setInterval(() => {
|
|
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
|
|
if (!lowSpeedSince) lowSpeedSince = Date.now();
|
|
if (Date.now() - lowSpeedSince > 6000) {
|
|
if (speedAbort) speedAbort.abort();
|
|
clearInterval(speedMonitor);
|
|
}
|
|
} else {
|
|
lowSpeedSince = 0;
|
|
}
|
|
}, 2000);
|
|
}
|
|
|
|
// Progress callback with speed tracking
|
|
const progressCb = (bytesUploaded, bytesTotal) => {
|
|
const now = Date.now();
|
|
const elapsed = Math.round((now - jobStart) / 1000);
|
|
|
|
// Speed calculation (update every ~1s)
|
|
const timeDelta = (now - lastSpeedTime) / 1000;
|
|
if (timeDelta >= 1) {
|
|
const bytesDelta = bytesUploaded - lastBytes;
|
|
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
|
|
lastBytes = bytesUploaded;
|
|
lastSpeedTime = now;
|
|
}
|
|
|
|
const remaining = currentSpeedKbs > 0
|
|
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
|
|
: 0;
|
|
|
|
// Update active job stats for global aggregation
|
|
this.activeJobs.set(uploadId, { speedKbs: currentSpeedKbs, bytesUploaded });
|
|
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
|
bytesUploaded, bytesTotal: bytesTotal,
|
|
speedKbs: currentSpeedKbs, elapsed, remaining,
|
|
error: null, result: null, attempt, maxAttempts
|
|
});
|
|
};
|
|
|
|
let result;
|
|
if (task.hoster === 'vidmoly.me' && task.username) {
|
|
const vidmoly = new VidmolyUploader();
|
|
await vidmoly.login(task.username, task.password);
|
|
result = await vidmoly.upload(task.file, progressCb, jobSignal, throttle);
|
|
} else if (task.hoster === 'voe.sx' && task.username) {
|
|
const voe = new VoeUploader();
|
|
await voe.login(task.username, task.password);
|
|
result = await voe.upload(task.file, progressCb, jobSignal, throttle);
|
|
} else if (task.hoster === 'doodstream.com' && task.username) {
|
|
const dood = new DoodstreamUploader();
|
|
await dood.login(task.username, task.password);
|
|
result = await dood.upload(task.file, progressCb, jobSignal, throttle);
|
|
} else {
|
|
result = await uploadFile(task.hoster, task.file, task.apiKey, progressCb, jobSignal, throttle);
|
|
}
|
|
|
|
// Clear speed monitor and signal listeners
|
|
if (speedMonitor) clearInterval(speedMonitor);
|
|
if (signalCleanup) signalCleanup();
|
|
|
|
// Track session bytes
|
|
this.sessionBytes += fileSize;
|
|
this.activeJobs.delete(uploadId);
|
|
|
|
// Success
|
|
const elapsed = Math.round((Date.now() - jobStart) / 1000);
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'done', progress: 1,
|
|
bytesUploaded: fileSize, bytesTotal: fileSize,
|
|
speedKbs: currentSpeedKbs, elapsed, remaining: 0,
|
|
error: null, result, attempt, maxAttempts
|
|
});
|
|
|
|
results.get(task.file).results.push({
|
|
hoster: task.hoster, status: 'done', ...result
|
|
});
|
|
|
|
semaphore.release();
|
|
return; // Success — exit retry loop
|
|
|
|
} catch (err) {
|
|
// Clear speed monitor interval and signal listeners on error
|
|
if (speedMonitor) { clearInterval(speedMonitor); speedMonitor = null; }
|
|
if (signalCleanup) { signalCleanup(); signalCleanup = null; }
|
|
if (speedAbort) {
|
|
// Check if this was a speed restart
|
|
try { speedAbort.abort(); } catch {}
|
|
}
|
|
this.activeJobs.delete(uploadId);
|
|
|
|
if (signal.aborted) {
|
|
lastError = err;
|
|
break;
|
|
}
|
|
|
|
// Check if speed restart (not user abort)
|
|
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
|
|
if (isSpeedRestart && attempt < maxAttempts) {
|
|
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
|
|
continue;
|
|
}
|
|
|
|
lastError = err;
|
|
if (attempt >= maxAttempts) break;
|
|
}
|
|
}
|
|
|
|
// All attempts exhausted
|
|
this.activeJobs.delete(uploadId);
|
|
const errorMsg = signal.aborted
|
|
? 'Abgebrochen'
|
|
: (lastError ? lastError.message : 'Unbekannter Fehler');
|
|
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
status: 'error', progress: 0,
|
|
bytesUploaded: 0, bytesTotal: fileSize,
|
|
speedKbs: 0, elapsed: 0, remaining: 0,
|
|
error: errorMsg, result: null,
|
|
attempt: maxAttempts, maxAttempts
|
|
});
|
|
|
|
results.get(task.file).results.push({
|
|
hoster: task.hoster, status: 'error', error: errorMsg,
|
|
download_url: null, embed_url: null, file_code: null
|
|
});
|
|
|
|
semaphore.release();
|
|
}
|
|
|
|
_emitProgress(uploadId, fileName, hoster, data) {
|
|
this.emit('progress', { uploadId, fileName, hoster, ...data });
|
|
}
|
|
|
|
_startStatsTimer() {
|
|
this.statsInterval = setInterval(() => {
|
|
let globalSpeedKbs = 0;
|
|
let activeCount = 0;
|
|
for (const job of this.activeJobs.values()) {
|
|
globalSpeedKbs += job.speedKbs || 0;
|
|
activeCount++;
|
|
}
|
|
|
|
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
|
|
|
|
// Sum in-progress bytes for live total
|
|
let inProgressBytes = 0;
|
|
for (const job of this.activeJobs.values()) {
|
|
inProgressBytes += job.bytesUploaded || 0;
|
|
}
|
|
|
|
this.emit('stats', {
|
|
state: this.running ? 'uploading' : 'idle',
|
|
globalSpeedKbs,
|
|
totalBytes: this.sessionBytes + inProgressBytes,
|
|
elapsed,
|
|
activeJobs: activeCount,
|
|
pendingJobs: Object.values(this.semaphores).reduce((sum, s) => sum + s.pending, 0)
|
|
});
|
|
}, 1000);
|
|
}
|
|
|
|
_stopStatsTimer() {
|
|
if (this.statsInterval) {
|
|
clearInterval(this.statsInterval);
|
|
this.statsInterval = null;
|
|
}
|
|
}
|
|
|
|
_combineSignals(signal1, signal2) {
|
|
const controller = new AbortController();
|
|
if (signal1.aborted || signal2.aborted) { controller.abort(); return { signal: controller.signal, cleanup() {} }; }
|
|
const cleanup = () => {
|
|
signal1.removeEventListener('abort', onAbort);
|
|
signal2.removeEventListener('abort', onAbort);
|
|
};
|
|
const onAbort = () => { controller.abort(); cleanup(); };
|
|
signal1.addEventListener('abort', onAbort, { once: true });
|
|
signal2.addEventListener('abort', onAbort, { once: true });
|
|
return { signal: controller.signal, cleanup };
|
|
}
|
|
|
|
_sleep(ms, signal) {
|
|
return new Promise((resolve, reject) => {
|
|
const onAbort = () => { clearTimeout(timer); reject(new Error('Aborted')); };
|
|
const timer = setTimeout(() => {
|
|
if (signal) signal.removeEventListener('abort', onAbort);
|
|
resolve();
|
|
}, ms);
|
|
if (signal) {
|
|
if (signal.aborted) { clearTimeout(timer); reject(new Error('Aborted')); return; }
|
|
signal.addEventListener('abort', onAbort, { once: true });
|
|
}
|
|
});
|
|
}
|
|
|
|
cancel() {
|
|
if (this.running) {
|
|
this.abortController.abort();
|
|
this.running = false;
|
|
this._stopStatsTimer();
|
|
this.activeJobs.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = UploadManager;
|