Multi-Hoster-Upload/lib/upload-manager.js
Administrator 17e9a419b2 fix(rotation): treat byse "disk space" as account-level, not file-rejected
Byse rejects uploads with status like "not enough disk space on your
account" when the account's storage is exhausted. The parser was
flagging every non-OK status as err.fileRejected=true, and the upload-
manager classifier additionally matched the generic "lehnte Datei ab"
prefix as file-rejected. Result: rotation was skipped on a full account
and every subsequent file failed on the same dead account.

- hosters.js: byse parser now distinguishes account-level phrases
  (disk space / storage / quota / insufficient / account full) and sets
  err.accountError=true for those. File-specific failures (Duplicate,
  wrong format, size) keep err.fileRejected=true.
- upload-manager.js: _isFileRejectedError no longer matches the generic
  "lehnte Datei ab" prefix and short-circuits when err.accountError is
  true. _shouldSkipRetryOnAccountError honors the flag and has added
  regex patterns as a safety net.
- Tests: 5 new unit tests covering disk-space/account-level/duplicate
  and the accountError-wins-over-fileRejected precedence.
2026-04-21 16:42:56 +02:00

964 lines
36 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { EventEmitter } = require('events');
const path = require('path');
const fs = require('fs');
const crypto = require('crypto');
const { uploadFile } = require('./hosters');
const VidmolyUploader = require('./vidmoly-upload');
const VoeUploader = require('./voe-upload');
const DoodstreamUploader = require('./doodstream-upload');
const ClouddropUploader = require('./clouddrop-upload');
const Semaphore = require('./semaphore');
const Throttle = require('./throttle');
const DEFAULT_SETTINGS = {
retries: 3,
maxSpeedKbs: 0,
parallelCount: 2,
restartBelowKbs: 0,
timeIntervalSec: 0,
maxSizeMb: 0
};
class UploadManager extends EventEmitter {
constructor(hosterSettings, globalSettings) {
super();
this.hosterSettings = hosterSettings || {};
this.globalSettings = globalSettings || {};
this.semaphores = {};
this.globalSemaphore = null;
this.abortController = new AbortController();
this.running = false;
this.stopAfterActive = false;
this.statsInterval = null;
this.startTime = 0;
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
this.jobAbortControllers = new Map(); // jobId -> AbortController
this.cancelledJobIds = new Set();
this.sessionBytes = 0;
this.lastStartTime = {}; // hoster -> timestamp of last upload start
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
this.globalThrottle = null;
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
this._accountOverrides = new Map(); // hoster -> fallback account object
}
switchAccount(hoster, fallbackAccount) {
const prev = this._accountOverrides.get(hoster);
this._accountOverrides.set(hoster, fallbackAccount);
this._rotLog('switchAccount', {
hoster,
prevOverrideId: prev ? prev.id : null,
toAccountId: fallbackAccount ? fallbackAccount.id : null
});
}
_rotLog(event, data) {
this.emit('rot-log', { ts: Date.now(), event, ...data });
}
// File-specific rejections from the hoster: the same file will get rejected
// on any account, so rotation is pointless. Matches the `err.fileRejected`
// flag set by parsers plus known rejection phrases.
// NOTE: We deliberately do NOT match the generic "lehnte Datei ab" prefix
// here — that phrase is used by the Byse parser for both file- AND
// account-level errors. Account-level ones set err.accountError instead,
// which takes priority in _shouldSkipRetryOnAccountError.
_isFileRejectedError(err) {
if (!err) return false;
if (err.accountError === true) return false; // explicit account-level wins
if (err.fileRejected === true) return true;
if (!err.message) return false;
const m = String(err.message);
return /(Not video file format|Duplicate|Datei zu (klein|gross|groß)|File too (small|large)|Invalid file|Unsupported format)/i.test(m);
}
// Transient network errors — the account is fine, the network or the
// hoster's own backend hiccuped. Retrying on the SAME account is the right
// move; marking it failed would wrongly poison the fallback chain. If all
// retries on the current account still hit this class of error, we bail
// out for this file without blacklisting the account, so other jobs in the
// batch still get a fresh chance on it.
_isTransientNetworkError(err) {
if (!err || !err.message) return false;
const m = String(err.message);
const TRANSIENT = [
/ENOTFOUND/i,
/ECONNRESET/i,
/ECONNREFUSED/i,
/ETIMEDOUT/i,
/EAI_AGAIN/i,
/EHOSTUNREACH/i,
/ENETUNREACH/i,
/EPIPE/i,
/socket hang up/i,
/network (error|failure|problem)/i,
/dns (lookup|error|failed)/i,
/getaddrinfo/i,
/fetch failed/i,
/\bconnect (ETIMEDOUT|ECONN)/i
];
return TRANSIENT.some(p => p.test(m));
}
// Error classes that mean "this account is the problem, retrying on it won't
// help" — we skip the remaining retries and go straight to the fallback
// account. Keeps single runs fast when an account is rate-limited, banned,
// or out of quota.
_shouldSkipRetryOnAccountError(err) {
if (!err) return false;
// Explicit account-level flag from hoster parsers — highest priority.
if (err.accountError === true) return true;
if (!err.message) return false;
const m = String(err.message);
const PATTERNS = [
/Kein Upload-Server/i,
/No upload server/i,
/kein server/i,
/quota/i,
/limit (reached|exceeded|überschritten)/i,
/rate[- ]?limit/i,
/too many requests/i,
/\b(401|403|429)\b/,
/Falscher (User|Username|Passwort)/i,
/Incorrect (Login|Password)/i,
/invalid (credentials|api[- ]?key|token|session)/i,
/(account|user) (banned|suspended|disabled|gesperrt)/i,
/not authorized/i,
/forbidden/i,
/session (expired|abgelaufen)/i,
// Session/CSRF hints — the account's server session went stale, which
// no amount of retrying will fix. Re-login happens on the next account.
/CSRF[- ]?Token nicht gefunden/i,
/CSRF[- ]?token not found/i,
/Bist du eingeloggt/i,
/not logged in/i,
// Storage exhaustion — account is full. Rotate instead of hammering it.
/not enough (disk )?(space|storage)/i,
/insufficient (disk )?space/i,
/disk (space )?full/i,
/storage (exhausted|full|voll|limit)/i,
/account (full|voll)/i
];
return PATTERNS.some(p => p.test(m));
}
updateSettings(hosterSettings, globalSettings) {
this.hosterSettings = hosterSettings || this.hosterSettings;
this.globalSettings = globalSettings || this.globalSettings;
// Live-update semaphores for running uploads
for (const [hoster, sem] of Object.entries(this.semaphores)) {
const settings = this._getSettings(hoster);
sem.updateLimit(settings.parallelCount);
}
// Update global throttle if speed limit changed
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
if (newKbs > 0) {
if (this.globalThrottle) {
this.globalThrottle.updateRate(newKbs * 1024);
} else {
this.globalThrottle = new Throttle(newKbs * 1024);
}
} else {
this.globalThrottle = null;
}
// Update global semaphore live
const globalLimit = this._getGlobalParallelLimit();
if (globalLimit > 0 && this.globalSemaphore) {
this.globalSemaphore.updateLimit(globalLimit);
}
}
_getSettings(hoster) {
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
const globalLimit = this._getGlobalParallelLimit();
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
}
return settings;
}
_getGlobalParallelLimit() {
const raw = Number(this.globalSettings.parallelUploadCount || 0);
if (!Number.isFinite(raw) || raw <= 0) return 0;
return Math.max(1, Math.min(100, Math.round(raw)));
}
_getGlobalSemaphore() {
const limit = this._getGlobalParallelLimit();
if (limit <= 0) return null;
if (!this.globalSemaphore) {
this.globalSemaphore = new Semaphore(limit);
} else {
this.globalSemaphore.updateLimit(limit);
}
return this.globalSemaphore;
}
_getGlobalThrottle() {
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
if (!Number.isFinite(kbs) || kbs <= 0) return null;
if (!this.globalThrottle) {
this.globalThrottle = new Throttle(kbs * 1024);
} else {
this.globalThrottle.updateRate(kbs * 1024);
}
return this.globalThrottle;
}
_getSemaphore(hoster) {
if (!this.semaphores[hoster]) {
const settings = this._getSettings(hoster);
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
} else {
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
}
return this.semaphores[hoster];
}
async startBatch(tasks) {
this.running = true;
this.stopAfterActive = false;
this.abortController = new AbortController();
this.startTime = Date.now();
this.sessionBytes = 0;
this.activeJobs.clear();
this.jobAbortControllers.clear();
this.cancelledJobIds.clear();
this.semaphores = {};
this.globalSemaphore = null;
this.globalThrottle = null;
this.lastStartTime = {};
// Reset account-rotation state each batch. Otherwise a previously failed
// account (e.g. rate-limited during the last batch) stays permanently
// blacklisted until the app restarts — every upload would silently skip
// straight to the fallback even after the original recovered.
this._failedAccounts.clear();
this._accountOverrides.clear();
this._rotLog('batch-start', { taskCount: tasks.length });
const { signal } = this.abortController;
const batchId = `batch-${Date.now()}`;
const results = new Map(); // filePath -> { name, size, results: [] }
this._batchResults = results;
this._additionalPromises = []; // Track jobs added mid-batch via addJobs()
for (const task of tasks) {
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
}
this._startStatsTimer();
const promises = tasks.map((task) => this._runJob(task, results, signal));
await Promise.allSettled(promises);
// Wait for any jobs added mid-batch via addJobs()
while (this._additionalPromises.length > 0) {
const batch = this._additionalPromises.splice(0);
await Promise.allSettled(batch);
}
this._stopStatsTimer();
this.running = false;
const files = Array.from(results.values());
const total = tasks.length;
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
const summary = {
id: batchId,
timestamp: new Date().toISOString(),
total,
succeeded,
failed: total - succeeded,
files
};
this.emit('batch-done', summary);
}
async _runJob(task, results, batchSignal) {
const settings = this._getSettings(task.hoster);
const hosterSemaphore = this._getSemaphore(task.hoster);
const globalSemaphore = this._getGlobalSemaphore();
const uploadId = crypto.randomBytes(8).toString('hex');
const jobId = task.jobId || uploadId;
const fileName = path.basename(task.file);
let fileSize = 0;
let fileNotFound = false;
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
const jobAbortController = new AbortController();
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
this.jobAbortControllers.set(jobId, jobAbortController);
let hosterSlotAcquired = false;
let globalSlotAcquired = false;
let finalResultRecorded = false;
let lastError = null;
const recordFinalResult = (status, payload = {}) => {
if (finalResultRecorded) return;
finalResultRecorded = true;
const result = {
hoster: task.hoster,
status,
error: payload.error || null,
download_url: payload.result ? payload.result.download_url || null : null,
embed_url: payload.result ? payload.result.embed_url || null : null,
file_code: payload.result ? payload.result.file_code || null : null
};
results.get(task.file).results.push(result);
};
const emitFinalStatus = (status, payload = {}) => {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status,
progress: status === 'done' ? 1 : 0,
bytesUploaded: status === 'done' ? fileSize : 0,
bytesTotal: fileSize,
speedKbs: payload.speedKbs || 0,
elapsed: payload.elapsed || 0,
remaining: 0,
error: payload.error || null,
result: payload.result || null,
attempt: payload.attempt || maxAttempts,
maxAttempts
});
};
try {
if (fileNotFound) {
const error = 'Datei nicht gefunden';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (fileSize <= 0) {
const error = 'Datei ist leer (0 Bytes)';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
// If this account already failed in this batch, switch to fallback immediately
// instead of wasting retries on a known-bad account
if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
const override = this._accountOverrides.get(task.hoster);
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('pre-job-swap', {
hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
} else {
this._rotLog('pre-job-swap-blocked', {
hoster: task.hoster, fileName, accountId: task.accountId,
hasOverride: !!override,
overrideAlsoFailed: override ? this._failedAccounts.has(task.hoster + ':' + override.id) : false
});
}
}
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'queued',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt: 0,
maxAttempts
});
// Acquire hoster semaphore first so jobs waiting for a hoster slot
// don't waste global slots (prevents underutilization)
await hosterSemaphore.acquire(signal);
hosterSlotAcquired = true;
if (globalSemaphore) {
await globalSemaphore.acquire(signal);
globalSlotAcquired = true;
}
if (settings.timeIntervalSec > 0) {
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
}
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'retrying',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: lastError ? lastError.message : null,
result: null,
attempt,
maxAttempts
});
await this._sleep(3000, signal);
}
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
let lowSpeedSince = 0;
let speedAbort = null;
let speedMonitor = null;
let uploadSignalBundle = { signal, cleanup() {} };
try {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'getting-server',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt,
maxAttempts
});
const hosterThrottle = settings.maxSpeedKbs > 0
? new Throttle(settings.maxSpeedKbs * 1024)
: null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
if (settings.restartBelowKbs > 0) {
speedAbort = new AbortController();
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
speedMonitor = setInterval(() => {
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
if (!lowSpeedSince) lowSpeedSince = Date.now();
if (Date.now() - lowSpeedSince > 6000) {
speedAbort.abort();
}
} else {
lowSpeedSince = 0;
}
}, 2000);
}
// Mutate this single object on each progress callback instead of
// allocating a fresh one — callback fires on every stream chunk
// (hundreds/sec per active job).
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
let lastEmitTime = 0;
const PROGRESS_EMIT_INTERVAL = 250; // ms throttle UI updates
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const elapsed = Math.round((now - jobStart) / 1000);
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
const bytesDelta = bytesUploaded - lastBytes;
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
// Throttle progress emissions to reduce IPC + rendering overhead
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
lastEmitTime = now;
const remaining = currentSpeedKbs > 0
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
: 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded,
bytesTotal,
speedKbs: currentSpeedKbs,
elapsed,
remaining,
error: null,
result: null,
attempt,
maxAttempts
});
};
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
const elapsed = Math.round((Date.now() - jobStart) / 1000);
this.sessionBytes += fileSize;
this.activeJobs.delete(uploadId);
emitFinalStatus('done', {
result,
speedKbs: currentSpeedKbs,
elapsed,
attempt
});
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
if (signal.aborted) {
lastError = new Error('Abgebrochen');
break;
}
if (this.stopAfterActive) {
lastError = new Error('Angehalten');
break;
}
if (isSpeedRestart && attempt < maxAttempts) {
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
await this._sleep(3000, signal);
continue;
}
lastError = err;
// File-specific rejection — re-uploading won't change the server's
// mind. Break out immediately; the outer file-rejected branch then
// records the final error without burning through 5 × 3s retries.
if (this._isFileRejectedError(err)) break;
// Account-specific errors — don't waste retries on the same account,
// jump straight to rotation.
if (this._shouldSkipRetryOnAccountError(err)) {
this._rotLog('fast-fail', {
hoster: task.hoster, fileName, accountId: task.accountId,
attempt, error: err && err.message ? err.message : String(err)
});
break;
}
if (attempt >= maxAttempts) break;
// Wait 3 seconds before retry
await this._sleep(3000, signal);
} finally {
if (speedMonitor) clearInterval(speedMonitor);
uploadSignalBundle.cleanup();
}
}
const wasStopped = this.stopAfterActive && !signal.aborted;
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
if (wasStopped || wasAborted) {
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
emitFinalStatus('aborted', { error });
recordFinalResult('aborted', { error });
return;
}
// Account rotation: mark the current account failed (if not already),
// wait for main to resolve the next fallback, then retry. Loops so
// A → B → C → ... works for hosters with 3+ accounts.
//
// CRITICAL: we must ALWAYS check for an existing override, even if this
// account is already in _failedAccounts (e.g. another concurrent job
// already marked it failed). Otherwise the second job falls straight
// through to final-error instead of using the already-resolved fallback.
this._rotLog('retries-exhausted', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
// File-specific rejection → same file will get the same verdict on
// every other account, rotation is pointless. Don't blacklist, don't
// retry siblings, just fail this file cleanly.
if (this._isFileRejectedError(lastError)) {
this._rotLog('skip-rotation-file-rejected', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
const error = lastError.message || 'Datei abgelehnt';
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
return;
}
// If the reason for failure was a transient network error we do NOT
// blacklist the account. Other jobs on the same account in this batch
// can still try fresh. This file just errors out for now.
if (this._isTransientNetworkError(lastError)) {
this._rotLog('skip-rotation-transient', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
const error = lastError.message || 'Netzwerkfehler';
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
return;
}
while (task.accountId) {
if (signal.aborted || this.stopAfterActive) break;
const alreadyMarked = this._failedAccounts.has(task.hoster + ':' + task.accountId);
if (!alreadyMarked) {
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
this._rotLog('mark-failed', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
await this._sleep(800, signal);
} else {
this._rotLog('already-marked', {
hoster: task.hoster, fileName, accountId: task.accountId
});
}
const override = this._accountOverrides.get(task.hoster);
if (!override) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'no-override-set',
lastFailedAccountId: task.accountId
});
break;
}
if (this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'override-already-failed',
overrideId: override.id, lastFailedAccountId: task.accountId
});
break;
}
if (override.id === task.accountId) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'override-same-as-current',
lastFailedAccountId: task.accountId
});
break;
}
// Switch to fallback account and retry this file
this._rotLog('rotate', {
hoster: task.hoster, fileName,
fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
});
// Retry loop with the new account. On exhausted failure, the while
// loop iterates: marks this account failed too, asks main for the next
// fallback, and so on.
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
});
await this._sleep(3000, signal);
}
try {
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
const elapsed = Math.round((now - jobStart) / 1000);
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
elapsed, remaining, error: null, result: null, attempt, maxAttempts
});
};
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
const result = await this._executeUpload(task, progressCb, signal, throttle);
this.activeJobs.delete(uploadId);
this.sessionBytes += fileSize;
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
lastError = err;
if (signal.aborted || this.stopAfterActive) break;
if (attempt >= maxAttempts) break;
}
}
}
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
this._rotLog('final-error', {
hoster: task.hoster, fileName, lastFailedAccountId: task.accountId, error
});
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
} catch (err) {
const wasStopped = this.stopAfterActive && !signal.aborted;
const error = wasStopped
? 'Warteschlange angehalten'
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
emitFinalStatus(status, { error });
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
} finally {
this.activeJobs.delete(uploadId);
this.jobAbortControllers.delete(jobId);
cleanupSignals();
// Release in reverse order of acquire (global first, then hoster)
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
if (hosterSlotAcquired) hosterSemaphore.release();
}
}
async _executeUpload(task, progressCb, signal, throttle) {
if (task.hoster === 'vidmoly.me' && task.username) {
const vidmoly = new VidmolyUploader();
await vidmoly.login(task.username, task.password);
return vidmoly.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'voe.sx' && task.username) {
const voe = new VoeUploader();
await voe.login(task.username, task.password);
return voe.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'doodstream.com' && task.username) {
const dood = new DoodstreamUploader();
await dood.login(task.username, task.password);
return dood.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'clouddrop.cc') {
const clouddrop = new ClouddropUploader(task.apiKey);
return clouddrop.upload(task.file, progressCb, signal, throttle);
} else {
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
}
}
_emitProgress(uploadId, fileName, hoster, data) {
this.emit('progress', { uploadId, fileName, hoster, ...data });
}
_startStatsTimer() {
if (this.statsInterval) clearInterval(this.statsInterval);
this.statsInterval = setInterval(() => {
// Single pass over active jobs instead of two.
let globalSpeedKbs = 0;
let activeCount = 0;
let inProgressBytes = 0;
for (const job of this.activeJobs.values()) {
globalSpeedKbs += job.speedKbs || 0;
inProgressBytes += job.bytesUploaded || 0;
activeCount++;
}
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
this.emit('stats', {
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
globalSpeedKbs,
totalBytes: this.sessionBytes + inProgressBytes,
elapsed,
activeJobs: activeCount,
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
});
}, 1000);
}
_stopStatsTimer() {
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
}
_combineSignals(signal1, signal2) {
const controller = new AbortController();
if (signal1.aborted || signal2.aborted) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const onAbort = () => {
controller.abort();
cleanup();
};
const cleanup = () => {
signal1.removeEventListener('abort', onAbort);
signal2.removeEventListener('abort', onAbort);
};
signal1.addEventListener('abort', onAbort, { once: true });
signal2.addEventListener('abort', onAbort, { once: true });
return { signal: controller.signal, cleanup };
}
_combineManySignals(signals) {
const liveSignals = signals.filter(Boolean);
const controller = new AbortController();
if (liveSignals.some((signal) => signal.aborted)) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const listeners = liveSignals.map((signal) => {
const handler = () => {
controller.abort();
cleanup();
};
signal.addEventListener('abort', handler, { once: true });
return { signal, handler };
});
const cleanup = () => {
for (const entry of listeners) {
entry.signal.removeEventListener('abort', entry.handler);
}
};
return { signal: controller.signal, cleanup };
}
_sleep(ms, signal) {
return new Promise((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(new Error('Aborted'));
};
const timer = setTimeout(() => {
if (signal) signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
if (signal) {
if (signal.aborted) {
clearTimeout(timer);
reject(new Error('Aborted'));
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
_waitForInterval(hoster, intervalMs, signal) {
// Serialize interval waits per hoster so concurrent jobs queue up properly
const prev = this.intervalLocks[hoster] || Promise.resolve();
const next = prev.then(async () => {
const now = Date.now();
const last = this.lastStartTime[hoster] || 0;
const elapsed = now - last;
if (elapsed < intervalMs) {
await this._sleep(intervalMs - elapsed, signal);
}
this.lastStartTime[hoster] = Date.now();
});
this.intervalLocks[hoster] = next.catch(() => {});
return next;
}
addJobs(tasks) {
if (!this.running || !tasks || tasks.length === 0) {
return { added: 0, alreadyInBatchJobIds: [] };
}
const { signal } = this.abortController;
const results = this._batchResults || new Map();
const addResult = { added: 0, alreadyInBatchJobIds: [] };
for (const task of tasks) {
// Skip if this job is already being processed (prevent duplicates)
if (task.jobId && this.jobAbortControllers.has(task.jobId)) {
addResult.alreadyInBatchJobIds.push(task.jobId);
continue;
}
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
this._additionalPromises.push(this._runJob(task, results, signal));
addResult.added++;
}
return addResult;
}
cancelJobs(jobIds) {
for (const jobId of jobIds || []) {
if (!jobId) continue;
this.cancelledJobIds.add(jobId);
const controller = this.jobAbortControllers.get(jobId);
if (controller && !controller.signal.aborted) {
controller.abort();
}
}
}
finishAfterActive() {
this.stopAfterActive = true;
}
cancel() {
if (!this.running) return;
this.abortController.abort();
this.stopAfterActive = false;
this.running = false;
for (const controller of this.jobAbortControllers.values()) {
if (!controller.signal.aborted) controller.abort();
}
this._stopStatsTimer();
}
}
module.exports = UploadManager;