Multi-Hoster-Upload/lib/upload-manager.js
Administrator cd07f52916 🐛 fix: distinguish 'file not found' from 'file empty' error message
Previously, both missing files (fs.statSync throws) and 0-byte files
produced the same error "Datei ist leer (0 Bytes)". Now:
- Missing files: "Datei nicht gefunden"
- Empty files: "Datei ist leer (0 Bytes)"

Also adds 3 edge case tests (throttle consume(0), unlimited rate,
semaphore release-without-acquire). All 66 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 15:20:37 +01:00

709 lines
25 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { EventEmitter } = require('events');
const path = require('path');
const fs = require('fs');
const crypto = require('crypto');
const { uploadFile } = require('./hosters');
const VidmolyUploader = require('./vidmoly-upload');
const VoeUploader = require('./voe-upload');
const DoodstreamUploader = require('./doodstream-upload');
const Semaphore = require('./semaphore');
const Throttle = require('./throttle');
const DEFAULT_SETTINGS = {
retries: 3,
maxSpeedKbs: 0,
parallelCount: 2,
restartBelowKbs: 0,
timeIntervalSec: 0,
maxSizeMb: 0
};
class UploadManager extends EventEmitter {
constructor(hosterSettings, globalSettings) {
super();
this.hosterSettings = hosterSettings || {};
this.globalSettings = globalSettings || {};
this.semaphores = {};
this.globalSemaphore = null;
this.abortController = new AbortController();
this.running = false;
this.stopAfterActive = false;
this.statsInterval = null;
this.startTime = 0;
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
this.jobAbortControllers = new Map(); // jobId -> AbortController
this.cancelledJobIds = new Set();
this.sessionBytes = 0;
this.lastStartTime = {}; // hoster -> timestamp of last upload start
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
this.globalThrottle = null;
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
this._accountOverrides = new Map(); // hoster -> fallback account object
}
switchAccount(hoster, fallbackAccount) {
this._accountOverrides.set(hoster, fallbackAccount);
}
updateSettings(hosterSettings, globalSettings) {
this.hosterSettings = hosterSettings || this.hosterSettings;
this.globalSettings = globalSettings || this.globalSettings;
// Live-update semaphores for running uploads
for (const [hoster, sem] of Object.entries(this.semaphores)) {
const settings = this._getSettings(hoster);
sem.updateLimit(settings.parallelCount);
}
// Update global throttle if speed limit changed
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
if (newKbs > 0) {
if (this.globalThrottle) {
this.globalThrottle.updateRate(newKbs * 1024);
} else {
this.globalThrottle = new Throttle(newKbs * 1024);
}
} else {
this.globalThrottle = null;
}
// Update global semaphore live
const globalLimit = this._getGlobalParallelLimit();
if (globalLimit > 0 && this.globalSemaphore) {
this.globalSemaphore.updateLimit(globalLimit);
}
}
_getSettings(hoster) {
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
const globalLimit = this._getGlobalParallelLimit();
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
}
return settings;
}
_getGlobalParallelLimit() {
const raw = Number(this.globalSettings.parallelUploadCount || 0);
if (!Number.isFinite(raw) || raw <= 0) return 0;
return Math.max(1, Math.min(100, Math.round(raw)));
}
_getGlobalSemaphore() {
const limit = this._getGlobalParallelLimit();
if (limit <= 0) return null;
if (!this.globalSemaphore) {
this.globalSemaphore = new Semaphore(limit);
} else {
this.globalSemaphore.updateLimit(limit);
}
return this.globalSemaphore;
}
_getGlobalThrottle() {
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
if (!Number.isFinite(kbs) || kbs <= 0) return null;
if (!this.globalThrottle) {
this.globalThrottle = new Throttle(kbs * 1024);
} else {
this.globalThrottle.updateRate(kbs * 1024);
}
return this.globalThrottle;
}
_getSemaphore(hoster) {
if (!this.semaphores[hoster]) {
const settings = this._getSettings(hoster);
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
} else {
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
}
return this.semaphores[hoster];
}
async startBatch(tasks) {
this.running = true;
this.stopAfterActive = false;
this.abortController = new AbortController();
this.startTime = Date.now();
this.sessionBytes = 0;
this.activeJobs.clear();
this.jobAbortControllers.clear();
this.cancelledJobIds.clear();
this.semaphores = {};
this.globalSemaphore = null;
this.globalThrottle = null;
this.lastStartTime = {};
const { signal } = this.abortController;
const batchId = `batch-${Date.now()}`;
const results = new Map(); // filePath -> { name, size, results: [] }
for (const task of tasks) {
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
}
this._startStatsTimer();
const promises = tasks.map((task) => this._runJob(task, results, signal));
await Promise.allSettled(promises);
this._stopStatsTimer();
this.running = false;
const files = Array.from(results.values());
const total = tasks.length;
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
const summary = {
id: batchId,
timestamp: new Date().toISOString(),
total,
succeeded,
failed: total - succeeded,
files
};
this.emit('batch-done', summary);
}
async _runJob(task, results, batchSignal) {
const settings = this._getSettings(task.hoster);
const hosterSemaphore = this._getSemaphore(task.hoster);
const globalSemaphore = this._getGlobalSemaphore();
const uploadId = crypto.randomBytes(8).toString('hex');
const jobId = task.jobId || uploadId;
const fileName = path.basename(task.file);
let fileSize = 0;
let fileNotFound = false;
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
const jobAbortController = new AbortController();
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
this.jobAbortControllers.set(jobId, jobAbortController);
let hosterSlotAcquired = false;
let globalSlotAcquired = false;
let finalResultRecorded = false;
let lastError = null;
const recordFinalResult = (status, payload = {}) => {
if (finalResultRecorded) return;
finalResultRecorded = true;
const result = {
hoster: task.hoster,
status,
error: payload.error || null,
download_url: payload.result ? payload.result.download_url || null : null,
embed_url: payload.result ? payload.result.embed_url || null : null,
file_code: payload.result ? payload.result.file_code || null : null
};
results.get(task.file).results.push(result);
};
const emitFinalStatus = (status, payload = {}) => {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status,
progress: status === 'done' ? 1 : 0,
bytesUploaded: status === 'done' ? fileSize : 0,
bytesTotal: fileSize,
speedKbs: payload.speedKbs || 0,
elapsed: payload.elapsed || 0,
remaining: 0,
error: payload.error || null,
result: payload.result || null,
attempt: payload.attempt || maxAttempts,
maxAttempts
});
};
try {
if (fileNotFound) {
const error = 'Datei nicht gefunden';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (fileSize <= 0) {
const error = 'Datei ist leer (0 Bytes)';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'queued',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt: 0,
maxAttempts
});
// Acquire hoster semaphore first so jobs waiting for a hoster slot
// don't waste global slots (prevents underutilization)
await hosterSemaphore.acquire(signal);
hosterSlotAcquired = true;
if (globalSemaphore) {
await globalSemaphore.acquire(signal);
globalSlotAcquired = true;
}
if (settings.timeIntervalSec > 0) {
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
}
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'retrying',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: lastError ? lastError.message : null,
result: null,
attempt,
maxAttempts
});
await this._sleep(3000, signal);
}
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
let lowSpeedSince = 0;
let speedAbort = null;
let speedMonitor = null;
let uploadSignalBundle = { signal, cleanup() {} };
try {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'getting-server',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt,
maxAttempts
});
const hosterThrottle = settings.maxSpeedKbs > 0
? new Throttle(settings.maxSpeedKbs * 1024)
: null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
if (settings.restartBelowKbs > 0) {
speedAbort = new AbortController();
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
speedMonitor = setInterval(() => {
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
if (!lowSpeedSince) lowSpeedSince = Date.now();
if (Date.now() - lowSpeedSince > 6000) {
speedAbort.abort();
}
} else {
lowSpeedSince = 0;
}
}, 2000);
}
this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 });
let lastEmitTime = 0;
const PROGRESS_EMIT_INTERVAL = 250; // ms throttle UI updates
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const elapsed = Math.round((now - jobStart) / 1000);
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
const bytesDelta = bytesUploaded - lastBytes;
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded });
// Throttle progress emissions to reduce IPC + rendering overhead
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
lastEmitTime = now;
const remaining = currentSpeedKbs > 0
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
: 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded,
bytesTotal,
speedKbs: currentSpeedKbs,
elapsed,
remaining,
error: null,
result: null,
attempt,
maxAttempts
});
};
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
const elapsed = Math.round((Date.now() - jobStart) / 1000);
this.sessionBytes += fileSize;
this.activeJobs.delete(uploadId);
emitFinalStatus('done', {
result,
speedKbs: currentSpeedKbs,
elapsed,
attempt
});
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
if (signal.aborted) {
lastError = new Error('Abgebrochen');
break;
}
if (this.stopAfterActive) {
lastError = new Error('Angehalten');
break;
}
if (isSpeedRestart && attempt < maxAttempts) {
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
await this._sleep(3000, signal);
continue;
}
lastError = err;
if (attempt >= maxAttempts) break;
// Wait 3 seconds before retry
await this._sleep(3000, signal);
} finally {
if (speedMonitor) clearInterval(speedMonitor);
uploadSignalBundle.cleanup();
}
}
const wasStopped = this.stopAfterActive && !signal.aborted;
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
if (wasStopped || wasAborted) {
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
emitFinalStatus('aborted', { error });
recordFinalResult('aborted', { error });
return;
}
// Account fallback: if this account hasn't failed before, try switching
if (task.accountId && !this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
// Wait briefly for switchAccount() to be called from main process
await this._sleep(800, signal);
const override = this._accountOverrides.get(task.hoster);
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
// Switch to fallback account and retry this file
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
});
// Re-run retry loop with new account
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
});
await this._sleep(3000, signal);
}
try {
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 });
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded });
const elapsed = Math.round((now - jobStart) / 1000);
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
elapsed, remaining, error: null, result: null, attempt, maxAttempts
});
};
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
const result = await this._executeUpload(task, progressCb, signal, throttle);
this.activeJobs.delete(uploadId);
this.sessionBytes += fileSize;
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
lastError = err;
if (signal.aborted || this.stopAfterActive) break;
if (attempt >= maxAttempts) break;
}
}
}
}
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
} catch (err) {
const wasStopped = this.stopAfterActive && !signal.aborted;
const error = wasStopped
? 'Warteschlange angehalten'
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
emitFinalStatus(status, { error });
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
} finally {
this.activeJobs.delete(uploadId);
this.jobAbortControllers.delete(jobId);
cleanupSignals();
// Release in reverse order of acquire (global first, then hoster)
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
if (hosterSlotAcquired) hosterSemaphore.release();
}
}
async _executeUpload(task, progressCb, signal, throttle) {
if (task.hoster === 'vidmoly.me' && task.username) {
const vidmoly = new VidmolyUploader();
await vidmoly.login(task.username, task.password);
return vidmoly.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'voe.sx' && task.username) {
const voe = new VoeUploader();
await voe.login(task.username, task.password);
return voe.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'doodstream.com' && task.username) {
const dood = new DoodstreamUploader();
await dood.login(task.username, task.password);
return dood.upload(task.file, progressCb, signal, throttle);
} else {
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
}
}
_emitProgress(uploadId, fileName, hoster, data) {
this.emit('progress', { uploadId, fileName, hoster, ...data });
}
_startStatsTimer() {
if (this.statsInterval) clearInterval(this.statsInterval);
this.statsInterval = setInterval(() => {
let globalSpeedKbs = 0;
let activeCount = 0;
for (const job of this.activeJobs.values()) {
globalSpeedKbs += job.speedKbs || 0;
activeCount++;
}
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
let inProgressBytes = 0;
for (const job of this.activeJobs.values()) {
inProgressBytes += job.bytesUploaded || 0;
}
this.emit('stats', {
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
globalSpeedKbs,
totalBytes: this.sessionBytes + inProgressBytes,
elapsed,
activeJobs: activeCount,
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
});
}, 1000);
}
_stopStatsTimer() {
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
}
_combineSignals(signal1, signal2) {
const controller = new AbortController();
if (signal1.aborted || signal2.aborted) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const onAbort = () => {
controller.abort();
cleanup();
};
const cleanup = () => {
signal1.removeEventListener('abort', onAbort);
signal2.removeEventListener('abort', onAbort);
};
signal1.addEventListener('abort', onAbort, { once: true });
signal2.addEventListener('abort', onAbort, { once: true });
return { signal: controller.signal, cleanup };
}
_combineManySignals(signals) {
const liveSignals = signals.filter(Boolean);
const controller = new AbortController();
if (liveSignals.some((signal) => signal.aborted)) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const listeners = liveSignals.map((signal) => {
const handler = () => {
controller.abort();
cleanup();
};
signal.addEventListener('abort', handler, { once: true });
return { signal, handler };
});
const cleanup = () => {
for (const entry of listeners) {
entry.signal.removeEventListener('abort', entry.handler);
}
};
return { signal: controller.signal, cleanup };
}
_sleep(ms, signal) {
return new Promise((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(new Error('Aborted'));
};
const timer = setTimeout(() => {
if (signal) signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
if (signal) {
if (signal.aborted) {
clearTimeout(timer);
reject(new Error('Aborted'));
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
_waitForInterval(hoster, intervalMs, signal) {
// Serialize interval waits per hoster so concurrent jobs queue up properly
const prev = this.intervalLocks[hoster] || Promise.resolve();
const next = prev.then(async () => {
const now = Date.now();
const last = this.lastStartTime[hoster] || 0;
const elapsed = now - last;
if (elapsed < intervalMs) {
await this._sleep(intervalMs - elapsed, signal);
}
this.lastStartTime[hoster] = Date.now();
});
this.intervalLocks[hoster] = next.catch(() => {});
return next;
}
cancelJobs(jobIds) {
for (const jobId of jobIds || []) {
if (!jobId) continue;
this.cancelledJobIds.add(jobId);
const controller = this.jobAbortControllers.get(jobId);
if (controller && !controller.signal.aborted) {
controller.abort();
}
}
}
finishAfterActive() {
this.stopAfterActive = true;
}
cancel() {
if (!this.running) return;
this.abortController.abort();
this.stopAfterActive = false;
this.running = false;
for (const controller of this.jobAbortControllers.values()) {
if (!controller.signal.aborted) controller.abort();
}
this._stopStatsTimer();
}
}
module.exports = UploadManager;