Multi-Hoster-Upload/lib/upload-manager.js
Administrator 126b1e569a feat(account-rotation): dedicated logging + live toast notifications
To trace whether the fallback chain actually engages during real uploads,
every rotation decision now emits a structured 'rot-log' event from the
upload-manager. main.js persists each event to a new account-rotation.log
(same directory as fileuploader.log; falls back to Desktop then userData)
and also mirrors it into the main debug log with a [ROT] prefix for
single-file grepping.

Logged events:
  - batch-start (clears _failedAccounts / _accountOverrides)
  - pre-job-swap / pre-job-swap-blocked (job picks override before first try)
  - retries-exhausted / mark-failed (enters rotation loop)
  - rotate (switched to new account, retry starting)
  - rotation-end (no override / override already failed)
  - final-error (all accounts exhausted)
  - switchAccount (main resolved the next fallback)

The renderer shows a toast on 'rotate', 'rotation-end' and 'final-error'
so fallback behavior is visible live instead of buried in logs.
2026-04-19 22:57:19 +02:00

821 lines
30 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { EventEmitter } = require('events');
const path = require('path');
const fs = require('fs');
const crypto = require('crypto');
const { uploadFile } = require('./hosters');
const VidmolyUploader = require('./vidmoly-upload');
const VoeUploader = require('./voe-upload');
const DoodstreamUploader = require('./doodstream-upload');
const ClouddropUploader = require('./clouddrop-upload');
const Semaphore = require('./semaphore');
const Throttle = require('./throttle');
const DEFAULT_SETTINGS = {
retries: 3,
maxSpeedKbs: 0,
parallelCount: 2,
restartBelowKbs: 0,
timeIntervalSec: 0,
maxSizeMb: 0
};
class UploadManager extends EventEmitter {
constructor(hosterSettings, globalSettings) {
super();
this.hosterSettings = hosterSettings || {};
this.globalSettings = globalSettings || {};
this.semaphores = {};
this.globalSemaphore = null;
this.abortController = new AbortController();
this.running = false;
this.stopAfterActive = false;
this.statsInterval = null;
this.startTime = 0;
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
this.jobAbortControllers = new Map(); // jobId -> AbortController
this.cancelledJobIds = new Set();
this.sessionBytes = 0;
this.lastStartTime = {}; // hoster -> timestamp of last upload start
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
this.globalThrottle = null;
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
this._accountOverrides = new Map(); // hoster -> fallback account object
}
switchAccount(hoster, fallbackAccount) {
const prev = this._accountOverrides.get(hoster);
this._accountOverrides.set(hoster, fallbackAccount);
this._rotLog('switchAccount', {
hoster,
prevOverrideId: prev ? prev.id : null,
toAccountId: fallbackAccount ? fallbackAccount.id : null
});
}
_rotLog(event, data) {
this.emit('rot-log', { ts: Date.now(), event, ...data });
}
updateSettings(hosterSettings, globalSettings) {
this.hosterSettings = hosterSettings || this.hosterSettings;
this.globalSettings = globalSettings || this.globalSettings;
// Live-update semaphores for running uploads
for (const [hoster, sem] of Object.entries(this.semaphores)) {
const settings = this._getSettings(hoster);
sem.updateLimit(settings.parallelCount);
}
// Update global throttle if speed limit changed
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
if (newKbs > 0) {
if (this.globalThrottle) {
this.globalThrottle.updateRate(newKbs * 1024);
} else {
this.globalThrottle = new Throttle(newKbs * 1024);
}
} else {
this.globalThrottle = null;
}
// Update global semaphore live
const globalLimit = this._getGlobalParallelLimit();
if (globalLimit > 0 && this.globalSemaphore) {
this.globalSemaphore.updateLimit(globalLimit);
}
}
_getSettings(hoster) {
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
const globalLimit = this._getGlobalParallelLimit();
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
}
return settings;
}
_getGlobalParallelLimit() {
const raw = Number(this.globalSettings.parallelUploadCount || 0);
if (!Number.isFinite(raw) || raw <= 0) return 0;
return Math.max(1, Math.min(100, Math.round(raw)));
}
_getGlobalSemaphore() {
const limit = this._getGlobalParallelLimit();
if (limit <= 0) return null;
if (!this.globalSemaphore) {
this.globalSemaphore = new Semaphore(limit);
} else {
this.globalSemaphore.updateLimit(limit);
}
return this.globalSemaphore;
}
_getGlobalThrottle() {
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
if (!Number.isFinite(kbs) || kbs <= 0) return null;
if (!this.globalThrottle) {
this.globalThrottle = new Throttle(kbs * 1024);
} else {
this.globalThrottle.updateRate(kbs * 1024);
}
return this.globalThrottle;
}
_getSemaphore(hoster) {
if (!this.semaphores[hoster]) {
const settings = this._getSettings(hoster);
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
} else {
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
}
return this.semaphores[hoster];
}
async startBatch(tasks) {
this.running = true;
this.stopAfterActive = false;
this.abortController = new AbortController();
this.startTime = Date.now();
this.sessionBytes = 0;
this.activeJobs.clear();
this.jobAbortControllers.clear();
this.cancelledJobIds.clear();
this.semaphores = {};
this.globalSemaphore = null;
this.globalThrottle = null;
this.lastStartTime = {};
// Reset account-rotation state each batch. Otherwise a previously failed
// account (e.g. rate-limited during the last batch) stays permanently
// blacklisted until the app restarts — every upload would silently skip
// straight to the fallback even after the original recovered.
this._failedAccounts.clear();
this._accountOverrides.clear();
this._rotLog('batch-start', { taskCount: tasks.length });
const { signal } = this.abortController;
const batchId = `batch-${Date.now()}`;
const results = new Map(); // filePath -> { name, size, results: [] }
this._batchResults = results;
this._additionalPromises = []; // Track jobs added mid-batch via addJobs()
for (const task of tasks) {
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
}
this._startStatsTimer();
const promises = tasks.map((task) => this._runJob(task, results, signal));
await Promise.allSettled(promises);
// Wait for any jobs added mid-batch via addJobs()
while (this._additionalPromises.length > 0) {
const batch = this._additionalPromises.splice(0);
await Promise.allSettled(batch);
}
this._stopStatsTimer();
this.running = false;
const files = Array.from(results.values());
const total = tasks.length;
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
const summary = {
id: batchId,
timestamp: new Date().toISOString(),
total,
succeeded,
failed: total - succeeded,
files
};
this.emit('batch-done', summary);
}
async _runJob(task, results, batchSignal) {
const settings = this._getSettings(task.hoster);
const hosterSemaphore = this._getSemaphore(task.hoster);
const globalSemaphore = this._getGlobalSemaphore();
const uploadId = crypto.randomBytes(8).toString('hex');
const jobId = task.jobId || uploadId;
const fileName = path.basename(task.file);
let fileSize = 0;
let fileNotFound = false;
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
const jobAbortController = new AbortController();
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
this.jobAbortControllers.set(jobId, jobAbortController);
let hosterSlotAcquired = false;
let globalSlotAcquired = false;
let finalResultRecorded = false;
let lastError = null;
const recordFinalResult = (status, payload = {}) => {
if (finalResultRecorded) return;
finalResultRecorded = true;
const result = {
hoster: task.hoster,
status,
error: payload.error || null,
download_url: payload.result ? payload.result.download_url || null : null,
embed_url: payload.result ? payload.result.embed_url || null : null,
file_code: payload.result ? payload.result.file_code || null : null
};
results.get(task.file).results.push(result);
};
const emitFinalStatus = (status, payload = {}) => {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status,
progress: status === 'done' ? 1 : 0,
bytesUploaded: status === 'done' ? fileSize : 0,
bytesTotal: fileSize,
speedKbs: payload.speedKbs || 0,
elapsed: payload.elapsed || 0,
remaining: 0,
error: payload.error || null,
result: payload.result || null,
attempt: payload.attempt || maxAttempts,
maxAttempts
});
};
try {
if (fileNotFound) {
const error = 'Datei nicht gefunden';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (fileSize <= 0) {
const error = 'Datei ist leer (0 Bytes)';
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
emitFinalStatus('skipped', { error, attempt: 0 });
recordFinalResult('error', { error });
return;
}
// If this account already failed in this batch, switch to fallback immediately
// instead of wasting retries on a known-bad account
if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
const override = this._accountOverrides.get(task.hoster);
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('pre-job-swap', {
hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
} else {
this._rotLog('pre-job-swap-blocked', {
hoster: task.hoster, fileName, accountId: task.accountId,
hasOverride: !!override,
overrideAlsoFailed: override ? this._failedAccounts.has(task.hoster + ':' + override.id) : false
});
}
}
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'queued',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt: 0,
maxAttempts
});
// Acquire hoster semaphore first so jobs waiting for a hoster slot
// don't waste global slots (prevents underutilization)
await hosterSemaphore.acquire(signal);
hosterSlotAcquired = true;
if (globalSemaphore) {
await globalSemaphore.acquire(signal);
globalSlotAcquired = true;
}
if (settings.timeIntervalSec > 0) {
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
}
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'retrying',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: lastError ? lastError.message : null,
result: null,
attempt,
maxAttempts
});
await this._sleep(3000, signal);
}
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
let lowSpeedSince = 0;
let speedAbort = null;
let speedMonitor = null;
let uploadSignalBundle = { signal, cleanup() {} };
try {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'getting-server',
progress: 0,
bytesUploaded: 0,
bytesTotal: fileSize,
speedKbs: 0,
elapsed: 0,
remaining: 0,
error: null,
result: null,
attempt,
maxAttempts
});
const hosterThrottle = settings.maxSpeedKbs > 0
? new Throttle(settings.maxSpeedKbs * 1024)
: null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
if (settings.restartBelowKbs > 0) {
speedAbort = new AbortController();
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
speedMonitor = setInterval(() => {
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
if (!lowSpeedSince) lowSpeedSince = Date.now();
if (Date.now() - lowSpeedSince > 6000) {
speedAbort.abort();
}
} else {
lowSpeedSince = 0;
}
}, 2000);
}
// Mutate this single object on each progress callback instead of
// allocating a fresh one — callback fires on every stream chunk
// (hundreds/sec per active job).
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
let lastEmitTime = 0;
const PROGRESS_EMIT_INTERVAL = 250; // ms throttle UI updates
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const elapsed = Math.round((now - jobStart) / 1000);
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
const bytesDelta = bytesUploaded - lastBytes;
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
// Throttle progress emissions to reduce IPC + rendering overhead
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
lastEmitTime = now;
const remaining = currentSpeedKbs > 0
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
: 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId,
status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded,
bytesTotal,
speedKbs: currentSpeedKbs,
elapsed,
remaining,
error: null,
result: null,
attempt,
maxAttempts
});
};
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
const elapsed = Math.round((Date.now() - jobStart) / 1000);
this.sessionBytes += fileSize;
this.activeJobs.delete(uploadId);
emitFinalStatus('done', {
result,
speedKbs: currentSpeedKbs,
elapsed,
attempt
});
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
if (signal.aborted) {
lastError = new Error('Abgebrochen');
break;
}
if (this.stopAfterActive) {
lastError = new Error('Angehalten');
break;
}
if (isSpeedRestart && attempt < maxAttempts) {
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
await this._sleep(3000, signal);
continue;
}
lastError = err;
if (attempt >= maxAttempts) break;
// Wait 3 seconds before retry
await this._sleep(3000, signal);
} finally {
if (speedMonitor) clearInterval(speedMonitor);
uploadSignalBundle.cleanup();
}
}
const wasStopped = this.stopAfterActive && !signal.aborted;
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
if (wasStopped || wasAborted) {
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
emitFinalStatus('aborted', { error });
recordFinalResult('aborted', { error });
return;
}
// Account rotation: mark the current account failed, wait for main to
// resolve the next fallback, then retry. Loop so A → B → C → ... works
// for hosters with 3+ accounts (the old code only did one level: A → B
// and stopped, even if C would have worked).
this._rotLog('retries-exhausted', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
while (task.accountId && !this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
if (signal.aborted || this.stopAfterActive) break;
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
this._rotLog('mark-failed', {
hoster: task.hoster, fileName, accountId: task.accountId,
lastError: lastError ? lastError.message : null
});
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
await this._sleep(800, signal);
const override = this._accountOverrides.get(task.hoster);
if (!override) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'no-override-set',
lastFailedAccountId: task.accountId
});
break;
}
if (this._failedAccounts.has(task.hoster + ':' + override.id)) {
this._rotLog('rotation-end', {
hoster: task.hoster, fileName, reason: 'override-already-failed',
overrideId: override.id, lastFailedAccountId: task.accountId
});
break;
}
// Switch to fallback account and retry this file
this._rotLog('rotate', {
hoster: task.hoster, fileName,
fromAccountId: task.accountId, toAccountId: override.id
});
task.accountId = override.id;
task.username = override.username;
task.password = override.password;
task.apiKey = override.apiKey;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
});
// Retry loop with the new account. On exhausted failure, the while
// loop iterates: marks this account failed too, asks main for the next
// fallback, and so on.
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
if (signal.aborted || this.stopAfterActive) break;
if (attempt > 1) {
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
speedKbs: 0, elapsed: 0, remaining: 0,
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
});
await this._sleep(3000, signal);
}
try {
const jobStart = Date.now();
let lastBytes = 0;
let lastSpeedTime = jobStart;
let currentSpeedKbs = 0;
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
this.activeJobs.set(uploadId, activeEntry);
const progressCb = (bytesUploaded, bytesTotal) => {
const now = Date.now();
const timeDelta = (now - lastSpeedTime) / 1000;
if (timeDelta >= 1) {
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
lastBytes = bytesUploaded;
lastSpeedTime = now;
}
activeEntry.speedKbs = currentSpeedKbs;
activeEntry.bytesUploaded = bytesUploaded;
const elapsed = Math.round((now - jobStart) / 1000);
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
this._emitProgress(uploadId, fileName, task.hoster, {
jobId, status: 'uploading',
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
elapsed, remaining, error: null, result: null, attempt, maxAttempts
});
};
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
const globalThrottle = this._getGlobalThrottle();
const throttle = hosterThrottle && globalThrottle
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
: hosterThrottle || globalThrottle;
const result = await this._executeUpload(task, progressCb, signal, throttle);
this.activeJobs.delete(uploadId);
this.sessionBytes += fileSize;
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
recordFinalResult('done', { result });
return;
} catch (err) {
this.activeJobs.delete(uploadId);
lastError = err;
if (signal.aborted || this.stopAfterActive) break;
if (attempt >= maxAttempts) break;
}
}
}
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
this._rotLog('final-error', {
hoster: task.hoster, fileName, lastFailedAccountId: task.accountId, error
});
emitFinalStatus('error', { error });
recordFinalResult('error', { error });
} catch (err) {
const wasStopped = this.stopAfterActive && !signal.aborted;
const error = wasStopped
? 'Warteschlange angehalten'
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
emitFinalStatus(status, { error });
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
} finally {
this.activeJobs.delete(uploadId);
this.jobAbortControllers.delete(jobId);
cleanupSignals();
// Release in reverse order of acquire (global first, then hoster)
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
if (hosterSlotAcquired) hosterSemaphore.release();
}
}
async _executeUpload(task, progressCb, signal, throttle) {
if (task.hoster === 'vidmoly.me' && task.username) {
const vidmoly = new VidmolyUploader();
await vidmoly.login(task.username, task.password);
return vidmoly.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'voe.sx' && task.username) {
const voe = new VoeUploader();
await voe.login(task.username, task.password);
return voe.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'doodstream.com' && task.username) {
const dood = new DoodstreamUploader();
await dood.login(task.username, task.password);
return dood.upload(task.file, progressCb, signal, throttle);
} else if (task.hoster === 'clouddrop.cc') {
const clouddrop = new ClouddropUploader(task.apiKey);
return clouddrop.upload(task.file, progressCb, signal, throttle);
} else {
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
}
}
_emitProgress(uploadId, fileName, hoster, data) {
this.emit('progress', { uploadId, fileName, hoster, ...data });
}
_startStatsTimer() {
if (this.statsInterval) clearInterval(this.statsInterval);
this.statsInterval = setInterval(() => {
// Single pass over active jobs instead of two.
let globalSpeedKbs = 0;
let activeCount = 0;
let inProgressBytes = 0;
for (const job of this.activeJobs.values()) {
globalSpeedKbs += job.speedKbs || 0;
inProgressBytes += job.bytesUploaded || 0;
activeCount++;
}
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
this.emit('stats', {
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
globalSpeedKbs,
totalBytes: this.sessionBytes + inProgressBytes,
elapsed,
activeJobs: activeCount,
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
});
}, 1000);
}
_stopStatsTimer() {
if (this.statsInterval) {
clearInterval(this.statsInterval);
this.statsInterval = null;
}
}
_combineSignals(signal1, signal2) {
const controller = new AbortController();
if (signal1.aborted || signal2.aborted) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const onAbort = () => {
controller.abort();
cleanup();
};
const cleanup = () => {
signal1.removeEventListener('abort', onAbort);
signal2.removeEventListener('abort', onAbort);
};
signal1.addEventListener('abort', onAbort, { once: true });
signal2.addEventListener('abort', onAbort, { once: true });
return { signal: controller.signal, cleanup };
}
_combineManySignals(signals) {
const liveSignals = signals.filter(Boolean);
const controller = new AbortController();
if (liveSignals.some((signal) => signal.aborted)) {
controller.abort();
return { signal: controller.signal, cleanup() {} };
}
const listeners = liveSignals.map((signal) => {
const handler = () => {
controller.abort();
cleanup();
};
signal.addEventListener('abort', handler, { once: true });
return { signal, handler };
});
const cleanup = () => {
for (const entry of listeners) {
entry.signal.removeEventListener('abort', entry.handler);
}
};
return { signal: controller.signal, cleanup };
}
_sleep(ms, signal) {
return new Promise((resolve, reject) => {
const onAbort = () => {
clearTimeout(timer);
reject(new Error('Aborted'));
};
const timer = setTimeout(() => {
if (signal) signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
if (signal) {
if (signal.aborted) {
clearTimeout(timer);
reject(new Error('Aborted'));
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
_waitForInterval(hoster, intervalMs, signal) {
// Serialize interval waits per hoster so concurrent jobs queue up properly
const prev = this.intervalLocks[hoster] || Promise.resolve();
const next = prev.then(async () => {
const now = Date.now();
const last = this.lastStartTime[hoster] || 0;
const elapsed = now - last;
if (elapsed < intervalMs) {
await this._sleep(intervalMs - elapsed, signal);
}
this.lastStartTime[hoster] = Date.now();
});
this.intervalLocks[hoster] = next.catch(() => {});
return next;
}
addJobs(tasks) {
if (!this.running || !tasks || tasks.length === 0) {
return { added: 0, alreadyInBatchJobIds: [] };
}
const { signal } = this.abortController;
const results = this._batchResults || new Map();
const addResult = { added: 0, alreadyInBatchJobIds: [] };
for (const task of tasks) {
// Skip if this job is already being processed (prevent duplicates)
if (task.jobId && this.jobAbortControllers.has(task.jobId)) {
addResult.alreadyInBatchJobIds.push(task.jobId);
continue;
}
const fileName = path.basename(task.file);
if (!results.has(task.file)) {
let size = 0;
try { size = fs.statSync(task.file).size; } catch {}
results.set(task.file, { name: fileName, size, results: [] });
}
this._additionalPromises.push(this._runJob(task, results, signal));
addResult.added++;
}
return addResult;
}
cancelJobs(jobIds) {
for (const jobId of jobIds || []) {
if (!jobId) continue;
this.cancelledJobIds.add(jobId);
const controller = this.jobAbortControllers.get(jobId);
if (controller && !controller.signal.aborted) {
controller.abort();
}
}
}
finishAfterActive() {
this.stopAfterActive = true;
}
cancel() {
if (!this.running) return;
this.abortController.abort();
this.stopAfterActive = false;
this.running = false;
for (const controller of this.jobAbortControllers.values()) {
if (!controller.signal.aborted) controller.abort();
}
this._stopStatsTimer();
}
}
module.exports = UploadManager;