Multi-Hoster-Upload/lib/upload-manager.js
Administrator 655fb6230b feat(rotation): fast-fail on account-specific errors + open-log-folder button + sync rot-log flush
Three related improvements that landed together while wiring up the
rotation log infrastructure:

  - Fast-fail classifier: errors that clearly indicate the account
    itself is the problem (rate limit, quota, banned/suspended, auth
    failure, 401/403, 'Kein Upload-Server' from delivery-node etc.)
    now skip the remaining retries and go straight to rotation. No
    more waiting 5 × 3s between retries just to end up rotating
    anyway. Emits a 'fast-fail' rot-log event so the shortcut is
    visible.

  - Settings: 'Öffnen' button next to the log-file-path input reveals
    the active log file (or its directory if nothing's written yet)
    in the OS file manager, so users don't have to remember paths.

  - rotLog() writes the rotation log synchronously. Only a handful
    of events fire per batch; the 500ms flush batching was saving
    nothing and made the file look empty when users checked right
    after an event. (The main debug log still uses the batched async
    path — that one is high-volume.)
2026-04-19 23:04:20 +02:00

858 lines
31 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { EventEmitter } = require('events');
const path = require('path');
const fs = require('fs');
const crypto = require('crypto');
const { uploadFile } = require('./hosters');
const VidmolyUploader = require('./vidmoly-upload');
const VoeUploader = require('./voe-upload');
const DoodstreamUploader = require('./doodstream-upload');
const ClouddropUploader = require('./clouddrop-upload');
const Semaphore = require('./semaphore');
const Throttle = require('./throttle');
const DEFAULT_SETTINGS = {
retries: 3,
maxSpeedKbs: 0,
parallelCount: 2,
restartBelowKbs: 0,
timeIntervalSec: 0,
maxSizeMb: 0
};
class UploadManager extends EventEmitter {
constructor(hosterSettings, globalSettings) {
super();
this.hosterSettings = hosterSettings || {};
this.globalSettings = globalSettings || {};
this.semaphores = {};
this.globalSemaphore = null;
this.abortController = new AbortController();
this.running = false;
this.stopAfterActive = false;
this.statsInterval = null;
this.startTime = 0;
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
this.jobAbortControllers = new Map(); // jobId -> AbortController
this.cancelledJobIds = new Set();
this.sessionBytes = 0;
this.lastStartTime = {}; // hoster -> timestamp of last upload start
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
this.globalThrottle = null;
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
this._accountOverrides = new Map(); // hoster -> fallback account object
}
switchAccount(hoster, fallbackAccount) {
const prev = this._accountOverrides.get(hoster);
this._accountOverrides.set(hoster, fallbackAccount);
this._rotLog('switchAccount', {
hoster,
prevOverrideId: prev ? prev.id : null,
toAccountId: fallbackAccount ? fallbackAccount.id : null
});
}
_rotLog(event, data) {
this.emit('rot-log', { ts: Date.now(), event, ...data });
}
// Error classes that mean "this account is the problem, retrying on it won't
// help" — we skip the remaining retries and go straight to the fallback
// account. Keeps single runs fast when an account is rate-limited, banned,
// or out of quota. Transient network issues still go through the normal
// retry loop on the same account.
_shouldSkipRetryOnAccountError(err) {
if (!err || !err.message) return false;
const m = String(err.message);
const PATTERNS = [
/Kein Upload-Server/i,
/No upload server/i,
/kein server/i,
/quota/i,
/limit (reached|exceeded|überschritten)/i,
/rate[- ]?limit/i,
/too many requests/i,
/\b(401|403)\b/,
/Falscher (User|Username|Passwort)/i,
/Incorrect (Login|Password)/i,
/invalid (credentials|api[- ]?key|token|session)/i,
/(account|user) (banned|suspended|disabled|gesperrt)/i,
/not authorized/i,
/forbidden/i,
/session (expired|abgelaufen)/i
];
return PATTERNS.some(p => p.test(m));
}
updateSettings(hosterSettings, globalSettings) {
this.hosterSettings = hosterSettings || this.hosterSettings;
this.globalSettings = globalSettings || this.globalSettings;
// Live-update semaphores for running uploads
for (const [hoster, sem] of Object.entries(this.semaphores)) {
const settings = this._getSettings(hoster);
sem.updateLimit(settings.parallelCount);
}
// Update global throttle if speed limit changed
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
if (newKbs > 0) {
if (this.globalThrottle) {
this.globalThrottle.updateRate(newKbs * 1024);
} else {
this.globalThrottle = new Throttle(newKbs * 1024);
}
} else {
this.globalThrottle = null;
}
// Update global semaphore live
const globalLimit = this._getGlobalParallelLimit();
if (globalLimit > 0 && this.globalSemaphore) {
this.globalSemaphore.updateLimit(globalLimit);
}
}
_getSettings(hoster) {
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
const globalLimit = this._getGlobalParallelLimit();
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
}
return settings;
}
_getGlobalParallelLimit() {
const raw = Number(this.globalSettings.parallelUploadCount || 0);
if (!Number.isFinite(raw) || raw <= 0) return 0;
return Math.max(1, Math.min(100, Math.round(raw)));
}
_getGlobalSemaphore() {
const limit = this._getGlobalParallelLimit();
if (limit <= 0) return null;
if (!this.globalSemaphore) {
this.globalSemaphore = new Semaphore(limit);
} else {
this.globalSemaphore.updateLimit(limit);
}
return this.globalSemaphore;
}
_getGlobalThrottle() {
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
if (!Number.isFinite(kbs) || kbs <= 0) return null;
if (!this.globalThrottle) {
this.globalThrottle = new Throttle(kbs * 1024);
} else {
this.globalThrottle.updateRate(kbs * 1024);
}
return this.globalThrottle;
}
_getSemaphore(hoster) {
if (!this.semaphores[hoster]) {
const settings = this._getSettings(hoster);
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
} else {
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
}
return this.semaphores[hoster];
}
async startBatch(tasks) {
this.running = true;
this.stopAfterActive = false;
this.abortController = new AbortController();
this.startTime = Date.now();
this.sessionBytes = 0;
this.activeJobs.clear();
this.jobAbortControllers.clear();
this.cancelledJobIds.clear();
this.semaphores = {};
this.globalSemaphore = null;
this.globalThrottle = null;
this.lastStartTime = {};
// Reset account-rotation state each batch. Otherwise a previously failed
// account (e.g. rate-limited during the last batch) stays permanently
// blacklisted until the app restarts — every upload would silently skip
// straight to the fallback even after the original recovered.
this._failedAccounts.clear();
this._accountOverrides.clear();
this._rotLog('batch-start', { taskCount: tasks.length });
const { signal } = this.abortController;
const batchId = `batch-${Date.now()}`;
const results = new Map(); // filePath -> { name, size, results: [] }
this._batchResults = results;
this._additionalPromises = []; // Track jobs added mid-batch via addJobs()
for (const task of tasks) {
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
}
this._startStatsTimer();
const promises = tasks.map((task) => this._runJob(task, results, signal));
await Promise.allSettled(promises);
// Wait for any jobs added mid-batch via addJobs()
while (this._additionalPromises.length > 0) {
const batch = this._additionalPromises.splice(0);
await Promise.allSettled(batch);
}
this._stopStatsTimer();
this.running = false;
const files = Array.from(results.values());
const total = tasks.length;
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
const summary = {
id: batchId,
timestamp: new Date().toISOString(),
total,
succeeded,
failed: total - succeeded,
files
};
this.emit('batch-done', summary);
}
async _runJob(task, results, batchSignal) {
const settings = this._getSettings(task.hoster);
const hosterSemaphore = this._getSemaphore(task.hoster);
const globalSemaphore = this._getGlobalSemaphore();
const uploadId = crypto.randomBytes(8).toString('hex');
const jobId = task.jobId || uploadId;
const fileName = path.basename(task.file);
let fileSize = 0;
let fileNotFound = false;
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
const jobAbortController = new AbortController();
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
this.jobAbortControllers.set(jobId, jobAbortController);
let hosterSlotAcquired = false;
let globalSlotAcquired = false;
let finalResultRecorded = false;
let lastError = null;
const recordFinalResult = (status, payload = {}) => {
if (finalResultRecorded) return;
finalResultRecorded = true;
const result = {
hoster: task.hoster,
status,
error: payload.error || null,
download_url: payload.result ? payload.result.download_url || null : null,
embed_url: payload.result ? payload.result.embed_url || null : null,
file_code: payload.result ? payload.result.file_code || null : null
};
results.get(task.file).results.push(result);
};
const emitFinalStatus = (status, payload = {}) => {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status,
progress: status === 'done' ? 1 : 0,
bytesUploaded: status === 'done' ? fileSize : 0,
bytesTotal: fileSize,
speedKbs: payload.speedKbs || 0,
elapsed: payload.elapsed || 0,
remaining: 0,
error: payload.error || null,
result: payload.result || null,
attempt: payload.attempt || maxAttempts,
maxAttempts
});
};
try {
if (fileNotFound) {
const error = 'Datei nicht gefunden';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (fileSize <= 0) {
const error = 'Datei ist leer (0 Bytes)';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
// If this account already failed in this batch, switch to fallback immediately
// instead of wasting retries on a known-bad account
if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
const override = this._accountOverrides.get(task.hoster);
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('pre-job-swap', {
hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
} else {
this._rotLog('pre-job-swap-blocked', {
hoster: task.hoster, fileName, accountId: task.accountId,
hasOverride: !!override,
overrideAlsoFailed: override ? this._failedAccounts.has(task.hoster + ':' + override.id) : false
});
}
}
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'queued',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt: 0,
maxAttempts
});
// Acquire hoster semaphore first so jobs waiting for a hoster slot
// don't waste global slots (prevents underutilization)
await hosterSemaphore.acquire(signal);
hosterSlotAcquired = true;
if (globalSemaphore) {
await globalSemaphore.acquire(signal);
globalSlotAcquired = true;
}
if (settings.timeIntervalSec > 0) {
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
}
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'retrying',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: lastError ? lastError.message : null,
result: null,
attempt,
maxAttempts
});
await this._sleep(3000, signal);
}
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
let lowSpeedSince = 0;
let speedAbort = null;
let speedMonitor = null;
let uploadSignalBundle = { signal, cleanup() {} };
try {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'getting-server',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt,
maxAttempts
});
const hosterThrottle = settings.maxSpeedKbs > 0
? new Throttle(settings.maxSpeedKbs * 1024)
: null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
if (settings.restartBelowKbs > 0) {
speedAbort = new AbortController();
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
speedMonitor = setInterval(() => {
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
if (!lowSpeedSince) lowSpeedSince = Date.now();
if (Date.now() - lowSpeedSince > 6000) {
speedAbort.abort();
}
} else {
lowSpeedSince = 0;
}
}, 2000);
}
// Mutate this single object on each progress callback instead of
// allocating a fresh one — callback fires on every stream chunk
// (hundreds/sec per active job).
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
let lastEmitTime = 0;
const PROGRESS_EMIT_INTERVAL = 250; // ms throttle UI updates
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const elapsed = Math.round((now - jobStart) / 1000);
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
const bytesDelta = bytesUploaded - lastBytes;
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
// Throttle progress emissions to reduce IPC + rendering overhead
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
lastEmitTime = now;
const remaining = currentSpeedKbs > 0
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
: 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded,
bytesTotal,
speedKbs: currentSpeedKbs,
elapsed,
remaining,
error: null,
result: null,
attempt,
maxAttempts
});
};
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
const elapsed = Math.round((Date.now() - jobStart) / 1000);
this.sessionBytes += fileSize;
this.activeJobs.delete(uploadId);
emitFinalStatus('done', {
result,
speedKbs: currentSpeedKbs,
elapsed,
attempt
});
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
if (signal.aborted) {
lastError = new Error('Abgebrochen');
break;
}
if (this.stopAfterActive) {
lastError = new Error('Angehalten');
break;
}
if (isSpeedRestart && attempt < maxAttempts) {
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
await this._sleep(3000, signal);
continue;
}
lastError = err;
// Account-specific errors — don't waste retries on the same account,
// jump straight to rotation.
if (this._shouldSkipRetryOnAccountError(err)) {
this._rotLog('fast-fail', {
hoster: task.hoster, fileName, accountId: task.accountId,
attempt, error: err && err.message ? err.message : String(err)
});
break;
}
if (attempt >= maxAttempts) break;
// Wait 3 seconds before retry
await this._sleep(3000, signal);
} finally {
if (speedMonitor) clearInterval(speedMonitor);
uploadSignalBundle.cleanup();
}
}
const wasStopped = this.stopAfterActive && !signal.aborted;
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
if (wasStopped || wasAborted) {
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
emitFinalStatus('aborted', { error });
recordFinalResult('aborted', { error });
return;
}
// Account rotation: mark the current account failed, wait for main to
// resolve the next fallback, then retry. Loop so A → B → C → ... works
// for hosters with 3+ accounts (the old code only did one level: A → B
// and stopped, even if C would have worked).
this._rotLog('retries-exhausted', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
while (task.accountId && !this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
if (signal.aborted || this.stopAfterActive) break;
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
this._rotLog('mark-failed', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
await this._sleep(800, signal);
const override = this._accountOverrides.get(task.hoster);
if (!override) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'no-override-set',
lastFailedAccountId: task.accountId
});
break;
}
if (this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'override-already-failed',
overrideId: override.id, lastFailedAccountId: task.accountId
});
break;
}
// Switch to fallback account and retry this file
this._rotLog('rotate', {
hoster: task.hoster, fileName,
fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
});
// Retry loop with the new account. On exhausted failure, the while
// loop iterates: marks this account failed too, asks main for the next
// fallback, and so on.
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
});
await this._sleep(3000, signal);
}
try {
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
const elapsed = Math.round((now - jobStart) / 1000);
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
elapsed, remaining, error: null, result: null, attempt, maxAttempts
});
};
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
const result = await this._executeUpload(task, progressCb, signal, throttle);
this.activeJobs.delete(uploadId);
this.sessionBytes += fileSize;
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
lastError = err;
if (signal.aborted || this.stopAfterActive) break;
if (attempt >= maxAttempts) break;
}
}
}
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
this._rotLog('final-error', {
hoster: task.hoster, fileName, lastFailedAccountId: task.accountId, error
});
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
} catch (err) {
const wasStopped = this.stopAfterActive && !signal.aborted;
const error = wasStopped
? 'Warteschlange angehalten'
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
emitFinalStatus(status, { error });
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
} finally {
this.activeJobs.delete(uploadId);
this.jobAbortControllers.delete(jobId);
cleanupSignals();
// Release in reverse order of acquire (global first, then hoster)
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
if (hosterSlotAcquired) hosterSemaphore.release();
}
}
async _executeUpload(task, progressCb, signal, throttle) {
if (task.hoster === 'vidmoly.me' && task.username) {
const vidmoly = new VidmolyUploader();
await vidmoly.login(task.username, task.password);
return vidmoly.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'voe.sx' && task.username) {
const voe = new VoeUploader();
await voe.login(task.username, task.password);
return voe.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'doodstream.com' && task.username) {
const dood = new DoodstreamUploader();
await dood.login(task.username, task.password);
return dood.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'clouddrop.cc') {
const clouddrop = new ClouddropUploader(task.apiKey);
return clouddrop.upload(task.file, progressCb, signal, throttle);
} else {
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
}
}
_emitProgress(uploadId, fileName, hoster, data) {
this.emit('progress', { uploadId, fileName, hoster, ...data });
}
_startStatsTimer() {
if (this.statsInterval) clearInterval(this.statsInterval);
this.statsInterval = setInterval(() => {
// Single pass over active jobs instead of two.
let globalSpeedKbs = 0;
let activeCount = 0;
let inProgressBytes = 0;
for (const job of this.activeJobs.values()) {
globalSpeedKbs += job.speedKbs || 0;
inProgressBytes += job.bytesUploaded || 0;
activeCount++;
}
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
this.emit('stats', {
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
globalSpeedKbs,
totalBytes: this.sessionBytes + inProgressBytes,
elapsed,
activeJobs: activeCount,
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
});
}, 1000);
}
_stopStatsTimer() {
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
}
_combineSignals(signal1, signal2) {
const controller = new AbortController();
if (signal1.aborted || signal2.aborted) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const onAbort = () => {
controller.abort();
cleanup();
};
const cleanup = () => {
signal1.removeEventListener('abort', onAbort);
signal2.removeEventListener('abort', onAbort);
};
signal1.addEventListener('abort', onAbort, { once: true });
signal2.addEventListener('abort', onAbort, { once: true });
return { signal: controller.signal, cleanup };
}
_combineManySignals(signals) {
const liveSignals = signals.filter(Boolean);
const controller = new AbortController();
if (liveSignals.some((signal) => signal.aborted)) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const listeners = liveSignals.map((signal) => {
const handler = () => {
controller.abort();
cleanup();
};
signal.addEventListener('abort', handler, { once: true });
return { signal, handler };
});
const cleanup = () => {
for (const entry of listeners) {
entry.signal.removeEventListener('abort', entry.handler);
}
};
return { signal: controller.signal, cleanup };
}
_sleep(ms, signal) {
return new Promise((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(new Error('Aborted'));
};
const timer = setTimeout(() => {
if (signal) signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
if (signal) {
if (signal.aborted) {
clearTimeout(timer);
reject(new Error('Aborted'));
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
_waitForInterval(hoster, intervalMs, signal) {
// Serialize interval waits per hoster so concurrent jobs queue up properly
const prev = this.intervalLocks[hoster] || Promise.resolve();
const next = prev.then(async () => {
const now = Date.now();
const last = this.lastStartTime[hoster] || 0;
const elapsed = now - last;
if (elapsed < intervalMs) {
await this._sleep(intervalMs - elapsed, signal);
}
this.lastStartTime[hoster] = Date.now();
});
this.intervalLocks[hoster] = next.catch(() => {});
return next;
}
addJobs(tasks) {
if (!this.running || !tasks || tasks.length === 0) {
return { added: 0, alreadyInBatchJobIds: [] };
}
const { signal } = this.abortController;
const results = this._batchResults || new Map();
const addResult = { added: 0, alreadyInBatchJobIds: [] };
for (const task of tasks) {
// Skip if this job is already being processed (prevent duplicates)
if (task.jobId && this.jobAbortControllers.has(task.jobId)) {
addResult.alreadyInBatchJobIds.push(task.jobId);
continue;
}
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
this._additionalPromises.push(this._runJob(task, results, signal));
addResult.added++;
}
return addResult;
}
cancelJobs(jobIds) {
for (const jobId of jobIds || []) {
if (!jobId) continue;
this.cancelledJobIds.add(jobId);
const controller = this.jobAbortControllers.get(jobId);
if (controller && !controller.signal.aborted) {
controller.abort();
}
}
}
finishAfterActive() {
this.stopAfterActive = true;
}
cancel() {
if (!this.running) return;
this.abortController.abort();
this.stopAfterActive = false;
this.running = false;
for (const controller of this.jobAbortControllers.values()) {
if (!controller.signal.aborted) controller.abort();
}
this._stopStatsTimer();
}
}
module.exports = UploadManager;