Previously, clicking 'Ausgewählte starten' on 'Wartet' jobs during an active upload just showed a toast. But the jobs might NOT actually be in the batch (skipped during task building). Now: ALL selected queued/error/aborted jobs are sent to addJobsToBatch. The upload-manager has duplicate protection (checks jobAbortControllers) so jobs already in the batch are skipped. Jobs NOT in the batch get added and start uploading immediately. Toast now shows exact counts: "X hinzugefügt, Y waren schon im Batch" Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
748 lines
26 KiB
JavaScript
748 lines
26 KiB
JavaScript
const { EventEmitter } = require('events');
|
||
const path = require('path');
|
||
const fs = require('fs');
|
||
const crypto = require('crypto');
|
||
const { uploadFile } = require('./hosters');
|
||
const VidmolyUploader = require('./vidmoly-upload');
|
||
const VoeUploader = require('./voe-upload');
|
||
const DoodstreamUploader = require('./doodstream-upload');
|
||
const Semaphore = require('./semaphore');
|
||
const Throttle = require('./throttle');
|
||
|
||
const DEFAULT_SETTINGS = {
|
||
retries: 3,
|
||
maxSpeedKbs: 0,
|
||
parallelCount: 2,
|
||
restartBelowKbs: 0,
|
||
timeIntervalSec: 0,
|
||
maxSizeMb: 0
|
||
};
|
||
|
||
class UploadManager extends EventEmitter {
|
||
constructor(hosterSettings, globalSettings) {
|
||
super();
|
||
this.hosterSettings = hosterSettings || {};
|
||
this.globalSettings = globalSettings || {};
|
||
this.semaphores = {};
|
||
this.globalSemaphore = null;
|
||
this.abortController = new AbortController();
|
||
this.running = false;
|
||
this.stopAfterActive = false;
|
||
this.statsInterval = null;
|
||
this.startTime = 0;
|
||
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
|
||
this.jobAbortControllers = new Map(); // jobId -> AbortController
|
||
this.cancelledJobIds = new Set();
|
||
this.sessionBytes = 0;
|
||
this.lastStartTime = {}; // hoster -> timestamp of last upload start
|
||
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
|
||
this.globalThrottle = null;
|
||
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
|
||
this._accountOverrides = new Map(); // hoster -> fallback account object
|
||
}
|
||
|
||
switchAccount(hoster, fallbackAccount) {
|
||
this._accountOverrides.set(hoster, fallbackAccount);
|
||
}
|
||
|
||
updateSettings(hosterSettings, globalSettings) {
|
||
this.hosterSettings = hosterSettings || this.hosterSettings;
|
||
this.globalSettings = globalSettings || this.globalSettings;
|
||
// Live-update semaphores for running uploads
|
||
for (const [hoster, sem] of Object.entries(this.semaphores)) {
|
||
const settings = this._getSettings(hoster);
|
||
sem.updateLimit(settings.parallelCount);
|
||
}
|
||
// Update global throttle if speed limit changed
|
||
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
|
||
if (newKbs > 0) {
|
||
if (this.globalThrottle) {
|
||
this.globalThrottle.updateRate(newKbs * 1024);
|
||
} else {
|
||
this.globalThrottle = new Throttle(newKbs * 1024);
|
||
}
|
||
} else {
|
||
this.globalThrottle = null;
|
||
}
|
||
// Update global semaphore live
|
||
const globalLimit = this._getGlobalParallelLimit();
|
||
if (globalLimit > 0 && this.globalSemaphore) {
|
||
this.globalSemaphore.updateLimit(globalLimit);
|
||
}
|
||
}
|
||
|
||
_getSettings(hoster) {
|
||
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
|
||
const globalLimit = this._getGlobalParallelLimit();
|
||
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
|
||
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
|
||
}
|
||
return settings;
|
||
}
|
||
|
||
_getGlobalParallelLimit() {
|
||
const raw = Number(this.globalSettings.parallelUploadCount || 0);
|
||
if (!Number.isFinite(raw) || raw <= 0) return 0;
|
||
return Math.max(1, Math.min(100, Math.round(raw)));
|
||
}
|
||
|
||
_getGlobalSemaphore() {
|
||
const limit = this._getGlobalParallelLimit();
|
||
if (limit <= 0) return null;
|
||
if (!this.globalSemaphore) {
|
||
this.globalSemaphore = new Semaphore(limit);
|
||
} else {
|
||
this.globalSemaphore.updateLimit(limit);
|
||
}
|
||
return this.globalSemaphore;
|
||
}
|
||
|
||
_getGlobalThrottle() {
|
||
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
|
||
if (!Number.isFinite(kbs) || kbs <= 0) return null;
|
||
if (!this.globalThrottle) {
|
||
this.globalThrottle = new Throttle(kbs * 1024);
|
||
} else {
|
||
this.globalThrottle.updateRate(kbs * 1024);
|
||
}
|
||
return this.globalThrottle;
|
||
}
|
||
|
||
_getSemaphore(hoster) {
|
||
if (!this.semaphores[hoster]) {
|
||
const settings = this._getSettings(hoster);
|
||
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
|
||
} else {
|
||
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
|
||
}
|
||
return this.semaphores[hoster];
|
||
}
|
||
|
||
async startBatch(tasks) {
|
||
this.running = true;
|
||
this.stopAfterActive = false;
|
||
this.abortController = new AbortController();
|
||
this.startTime = Date.now();
|
||
this.sessionBytes = 0;
|
||
this.activeJobs.clear();
|
||
this.jobAbortControllers.clear();
|
||
this.cancelledJobIds.clear();
|
||
this.semaphores = {};
|
||
this.globalSemaphore = null;
|
||
this.globalThrottle = null;
|
||
this.lastStartTime = {};
|
||
|
||
const { signal } = this.abortController;
|
||
const batchId = `batch-${Date.now()}`;
|
||
const results = new Map(); // filePath -> { name, size, results: [] }
|
||
this._batchResults = results;
|
||
this._additionalPromises = []; // Track jobs added mid-batch via addJobs()
|
||
|
||
for (const task of tasks) {
|
||
const fileName = path.basename(task.file);
|
||
if (!results.has(task.file)) {
|
||
let size = 0;
|
||
try { size = fs.statSync(task.file).size; } catch {}
|
||
results.set(task.file, { name: fileName, size, results: [] });
|
||
}
|
||
}
|
||
|
||
this._startStatsTimer();
|
||
|
||
const promises = tasks.map((task) => this._runJob(task, results, signal));
|
||
await Promise.allSettled(promises);
|
||
// Wait for any jobs added mid-batch via addJobs()
|
||
while (this._additionalPromises.length > 0) {
|
||
const batch = this._additionalPromises.splice(0);
|
||
await Promise.allSettled(batch);
|
||
}
|
||
|
||
this._stopStatsTimer();
|
||
this.running = false;
|
||
|
||
const files = Array.from(results.values());
|
||
const total = tasks.length;
|
||
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
|
||
|
||
const summary = {
|
||
id: batchId,
|
||
timestamp: new Date().toISOString(),
|
||
total,
|
||
succeeded,
|
||
failed: total - succeeded,
|
||
files
|
||
};
|
||
|
||
this.emit('batch-done', summary);
|
||
}
|
||
|
||
async _runJob(task, results, batchSignal) {
|
||
const settings = this._getSettings(task.hoster);
|
||
const hosterSemaphore = this._getSemaphore(task.hoster);
|
||
const globalSemaphore = this._getGlobalSemaphore();
|
||
const uploadId = crypto.randomBytes(8).toString('hex');
|
||
const jobId = task.jobId || uploadId;
|
||
const fileName = path.basename(task.file);
|
||
let fileSize = 0;
|
||
let fileNotFound = false;
|
||
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
|
||
|
||
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
|
||
const jobAbortController = new AbortController();
|
||
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
|
||
this.jobAbortControllers.set(jobId, jobAbortController);
|
||
|
||
let hosterSlotAcquired = false;
|
||
let globalSlotAcquired = false;
|
||
let finalResultRecorded = false;
|
||
let lastError = null;
|
||
|
||
const recordFinalResult = (status, payload = {}) => {
|
||
if (finalResultRecorded) return;
|
||
finalResultRecorded = true;
|
||
|
||
const result = {
|
||
hoster: task.hoster,
|
||
status,
|
||
error: payload.error || null,
|
||
download_url: payload.result ? payload.result.download_url || null : null,
|
||
embed_url: payload.result ? payload.result.embed_url || null : null,
|
||
file_code: payload.result ? payload.result.file_code || null : null
|
||
};
|
||
|
||
results.get(task.file).results.push(result);
|
||
};
|
||
|
||
const emitFinalStatus = (status, payload = {}) => {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status,
|
||
progress: status === 'done' ? 1 : 0,
|
||
bytesUploaded: status === 'done' ? fileSize : 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: payload.speedKbs || 0,
|
||
elapsed: payload.elapsed || 0,
|
||
remaining: 0,
|
||
error: payload.error || null,
|
||
result: payload.result || null,
|
||
attempt: payload.attempt || maxAttempts,
|
||
maxAttempts
|
||
});
|
||
};
|
||
|
||
try {
|
||
if (fileNotFound) {
|
||
const error = 'Datei nicht gefunden';
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
if (fileSize <= 0) {
|
||
const error = 'Datei ist leer (0 Bytes)';
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
|
||
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
|
||
// If this account already failed in this batch, switch to fallback immediately
|
||
// instead of wasting retries on a known-bad account
|
||
if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
|
||
const override = this._accountOverrides.get(task.hoster);
|
||
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
|
||
task.accountId = override.id;
|
||
task.username = override.username;
|
||
task.password = override.password;
|
||
task.apiKey = override.apiKey;
|
||
}
|
||
}
|
||
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'queued',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: null,
|
||
result: null,
|
||
attempt: 0,
|
||
maxAttempts
|
||
});
|
||
|
||
// Acquire hoster semaphore first so jobs waiting for a hoster slot
|
||
// don't waste global slots (prevents underutilization)
|
||
await hosterSemaphore.acquire(signal);
|
||
hosterSlotAcquired = true;
|
||
|
||
if (globalSemaphore) {
|
||
await globalSemaphore.acquire(signal);
|
||
globalSlotAcquired = true;
|
||
}
|
||
|
||
if (settings.timeIntervalSec > 0) {
|
||
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
|
||
}
|
||
|
||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
|
||
if (attempt > 1) {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'retrying',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: lastError ? lastError.message : null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
await this._sleep(3000, signal);
|
||
}
|
||
|
||
const jobStart = Date.now();
|
||
let lastBytes = 0;
|
||
let lastSpeedTime = jobStart;
|
||
let currentSpeedKbs = 0;
|
||
let lowSpeedSince = 0;
|
||
let speedAbort = null;
|
||
let speedMonitor = null;
|
||
let uploadSignalBundle = { signal, cleanup() {} };
|
||
|
||
try {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'getting-server',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
|
||
const hosterThrottle = settings.maxSpeedKbs > 0
|
||
? new Throttle(settings.maxSpeedKbs * 1024)
|
||
: null;
|
||
const globalThrottle = this._getGlobalThrottle();
|
||
const throttle = hosterThrottle && globalThrottle
|
||
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
|
||
: hosterThrottle || globalThrottle;
|
||
|
||
if (settings.restartBelowKbs > 0) {
|
||
speedAbort = new AbortController();
|
||
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
|
||
speedMonitor = setInterval(() => {
|
||
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
|
||
if (!lowSpeedSince) lowSpeedSince = Date.now();
|
||
if (Date.now() - lowSpeedSince > 6000) {
|
||
speedAbort.abort();
|
||
}
|
||
} else {
|
||
lowSpeedSince = 0;
|
||
}
|
||
}, 2000);
|
||
}
|
||
|
||
this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 });
|
||
|
||
let lastEmitTime = 0;
|
||
const PROGRESS_EMIT_INTERVAL = 250; // ms – throttle UI updates
|
||
|
||
const progressCb = (bytesUploaded, bytesTotal) => {
|
||
const now = Date.now();
|
||
const elapsed = Math.round((now - jobStart) / 1000);
|
||
const timeDelta = (now - lastSpeedTime) / 1000;
|
||
if (timeDelta >= 1) {
|
||
const bytesDelta = bytesUploaded - lastBytes;
|
||
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
|
||
lastBytes = bytesUploaded;
|
||
lastSpeedTime = now;
|
||
}
|
||
|
||
this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded });
|
||
|
||
// Throttle progress emissions to reduce IPC + rendering overhead
|
||
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
|
||
lastEmitTime = now;
|
||
|
||
const remaining = currentSpeedKbs > 0
|
||
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
|
||
: 0;
|
||
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'uploading',
|
||
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
||
bytesUploaded,
|
||
bytesTotal,
|
||
speedKbs: currentSpeedKbs,
|
||
elapsed,
|
||
remaining,
|
||
error: null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
};
|
||
|
||
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
|
||
|
||
const elapsed = Math.round((Date.now() - jobStart) / 1000);
|
||
this.sessionBytes += fileSize;
|
||
this.activeJobs.delete(uploadId);
|
||
|
||
emitFinalStatus('done', {
|
||
result,
|
||
speedKbs: currentSpeedKbs,
|
||
elapsed,
|
||
attempt
|
||
});
|
||
recordFinalResult('done', { result });
|
||
return;
|
||
} catch (err) {
|
||
this.activeJobs.delete(uploadId);
|
||
|
||
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
|
||
if (signal.aborted) {
|
||
lastError = new Error('Abgebrochen');
|
||
break;
|
||
}
|
||
|
||
if (this.stopAfterActive) {
|
||
lastError = new Error('Angehalten');
|
||
break;
|
||
}
|
||
|
||
if (isSpeedRestart && attempt < maxAttempts) {
|
||
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
|
||
await this._sleep(3000, signal);
|
||
continue;
|
||
}
|
||
|
||
lastError = err;
|
||
if (attempt >= maxAttempts) break;
|
||
// Wait 3 seconds before retry
|
||
await this._sleep(3000, signal);
|
||
} finally {
|
||
if (speedMonitor) clearInterval(speedMonitor);
|
||
uploadSignalBundle.cleanup();
|
||
}
|
||
}
|
||
|
||
const wasStopped = this.stopAfterActive && !signal.aborted;
|
||
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
|
||
if (wasStopped || wasAborted) {
|
||
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
|
||
emitFinalStatus('aborted', { error });
|
||
recordFinalResult('aborted', { error });
|
||
return;
|
||
}
|
||
|
||
// Account fallback: if this account hasn't failed before, try switching
|
||
if (task.accountId && !this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
|
||
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
|
||
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
|
||
// Wait briefly for switchAccount() to be called from main process
|
||
await this._sleep(800, signal);
|
||
const override = this._accountOverrides.get(task.hoster);
|
||
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
|
||
// Switch to fallback account and retry this file
|
||
task.accountId = override.id;
|
||
task.username = override.username;
|
||
task.password = override.password;
|
||
task.apiKey = override.apiKey;
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
|
||
speedKbs: 0, elapsed: 0, remaining: 0,
|
||
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
|
||
});
|
||
// Re-run retry loop with new account
|
||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
if (attempt > 1) {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
|
||
speedKbs: 0, elapsed: 0, remaining: 0,
|
||
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
|
||
});
|
||
await this._sleep(3000, signal);
|
||
}
|
||
try {
|
||
const jobStart = Date.now();
|
||
let lastBytes = 0;
|
||
let lastSpeedTime = jobStart;
|
||
let currentSpeedKbs = 0;
|
||
this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 });
|
||
|
||
const progressCb = (bytesUploaded, bytesTotal) => {
|
||
const now = Date.now();
|
||
const timeDelta = (now - lastSpeedTime) / 1000;
|
||
if (timeDelta >= 1) {
|
||
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
|
||
lastBytes = bytesUploaded;
|
||
lastSpeedTime = now;
|
||
}
|
||
this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded });
|
||
const elapsed = Math.round((now - jobStart) / 1000);
|
||
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'uploading',
|
||
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
||
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
|
||
elapsed, remaining, error: null, result: null, attempt, maxAttempts
|
||
});
|
||
};
|
||
|
||
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
|
||
const globalThrottle = this._getGlobalThrottle();
|
||
const throttle = hosterThrottle && globalThrottle
|
||
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
|
||
: hosterThrottle || globalThrottle;
|
||
|
||
const result = await this._executeUpload(task, progressCb, signal, throttle);
|
||
this.activeJobs.delete(uploadId);
|
||
this.sessionBytes += fileSize;
|
||
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
|
||
recordFinalResult('done', { result });
|
||
return;
|
||
} catch (err) {
|
||
this.activeJobs.delete(uploadId);
|
||
lastError = err;
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
if (attempt >= maxAttempts) break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
|
||
emitFinalStatus('error', { error });
|
||
recordFinalResult('error', { error });
|
||
} catch (err) {
|
||
const wasStopped = this.stopAfterActive && !signal.aborted;
|
||
const error = wasStopped
|
||
? 'Warteschlange angehalten'
|
||
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
|
||
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
|
||
emitFinalStatus(status, { error });
|
||
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
|
||
} finally {
|
||
this.activeJobs.delete(uploadId);
|
||
this.jobAbortControllers.delete(jobId);
|
||
cleanupSignals();
|
||
// Release in reverse order of acquire (global first, then hoster)
|
||
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
|
||
if (hosterSlotAcquired) hosterSemaphore.release();
|
||
}
|
||
}
|
||
|
||
async _executeUpload(task, progressCb, signal, throttle) {
|
||
if (task.hoster === 'vidmoly.me' && task.username) {
|
||
const vidmoly = new VidmolyUploader();
|
||
await vidmoly.login(task.username, task.password);
|
||
return vidmoly.upload(task.file, progressCb, signal, throttle);
|
||
} else if (task.hoster === 'voe.sx' && task.username) {
|
||
const voe = new VoeUploader();
|
||
await voe.login(task.username, task.password);
|
||
return voe.upload(task.file, progressCb, signal, throttle);
|
||
} else if (task.hoster === 'doodstream.com' && task.username) {
|
||
const dood = new DoodstreamUploader();
|
||
await dood.login(task.username, task.password);
|
||
return dood.upload(task.file, progressCb, signal, throttle);
|
||
} else {
|
||
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
|
||
}
|
||
}
|
||
|
||
_emitProgress(uploadId, fileName, hoster, data) {
|
||
this.emit('progress', { uploadId, fileName, hoster, ...data });
|
||
}
|
||
|
||
_startStatsTimer() {
|
||
if (this.statsInterval) clearInterval(this.statsInterval);
|
||
this.statsInterval = setInterval(() => {
|
||
let globalSpeedKbs = 0;
|
||
let activeCount = 0;
|
||
for (const job of this.activeJobs.values()) {
|
||
globalSpeedKbs += job.speedKbs || 0;
|
||
activeCount++;
|
||
}
|
||
|
||
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
|
||
let inProgressBytes = 0;
|
||
for (const job of this.activeJobs.values()) {
|
||
inProgressBytes += job.bytesUploaded || 0;
|
||
}
|
||
|
||
this.emit('stats', {
|
||
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
|
||
globalSpeedKbs,
|
||
totalBytes: this.sessionBytes + inProgressBytes,
|
||
elapsed,
|
||
activeJobs: activeCount,
|
||
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
|
||
});
|
||
}, 1000);
|
||
}
|
||
|
||
_stopStatsTimer() {
|
||
if (this.statsInterval) {
|
||
clearInterval(this.statsInterval);
|
||
this.statsInterval = null;
|
||
}
|
||
}
|
||
|
||
_combineSignals(signal1, signal2) {
|
||
const controller = new AbortController();
|
||
if (signal1.aborted || signal2.aborted) {
|
||
controller.abort();
|
||
return { signal: controller.signal, cleanup() {} };
|
||
}
|
||
|
||
const onAbort = () => {
|
||
controller.abort();
|
||
cleanup();
|
||
};
|
||
|
||
const cleanup = () => {
|
||
signal1.removeEventListener('abort', onAbort);
|
||
signal2.removeEventListener('abort', onAbort);
|
||
};
|
||
|
||
signal1.addEventListener('abort', onAbort, { once: true });
|
||
signal2.addEventListener('abort', onAbort, { once: true });
|
||
return { signal: controller.signal, cleanup };
|
||
}
|
||
|
||
_combineManySignals(signals) {
|
||
const liveSignals = signals.filter(Boolean);
|
||
const controller = new AbortController();
|
||
|
||
if (liveSignals.some((signal) => signal.aborted)) {
|
||
controller.abort();
|
||
return { signal: controller.signal, cleanup() {} };
|
||
}
|
||
|
||
const listeners = liveSignals.map((signal) => {
|
||
const handler = () => {
|
||
controller.abort();
|
||
cleanup();
|
||
};
|
||
signal.addEventListener('abort', handler, { once: true });
|
||
return { signal, handler };
|
||
});
|
||
|
||
const cleanup = () => {
|
||
for (const entry of listeners) {
|
||
entry.signal.removeEventListener('abort', entry.handler);
|
||
}
|
||
};
|
||
|
||
return { signal: controller.signal, cleanup };
|
||
}
|
||
|
||
_sleep(ms, signal) {
|
||
return new Promise((resolve, reject) => {
|
||
const onAbort = () => {
|
||
clearTimeout(timer);
|
||
reject(new Error('Aborted'));
|
||
};
|
||
|
||
const timer = setTimeout(() => {
|
||
if (signal) signal.removeEventListener('abort', onAbort);
|
||
resolve();
|
||
}, ms);
|
||
|
||
if (signal) {
|
||
if (signal.aborted) {
|
||
clearTimeout(timer);
|
||
reject(new Error('Aborted'));
|
||
return;
|
||
}
|
||
signal.addEventListener('abort', onAbort, { once: true });
|
||
}
|
||
});
|
||
}
|
||
|
||
_waitForInterval(hoster, intervalMs, signal) {
|
||
// Serialize interval waits per hoster so concurrent jobs queue up properly
|
||
const prev = this.intervalLocks[hoster] || Promise.resolve();
|
||
const next = prev.then(async () => {
|
||
const now = Date.now();
|
||
const last = this.lastStartTime[hoster] || 0;
|
||
const elapsed = now - last;
|
||
if (elapsed < intervalMs) {
|
||
await this._sleep(intervalMs - elapsed, signal);
|
||
}
|
||
this.lastStartTime[hoster] = Date.now();
|
||
});
|
||
this.intervalLocks[hoster] = next.catch(() => {});
|
||
return next;
|
||
}
|
||
|
||
addJobs(tasks) {
|
||
if (!this.running || !tasks || tasks.length === 0) return 0;
|
||
const { signal } = this.abortController;
|
||
const results = this._batchResults || new Map();
|
||
let added = 0;
|
||
for (const task of tasks) {
|
||
// Skip if this job is already being processed (prevent duplicates)
|
||
if (task.jobId && this.jobAbortControllers.has(task.jobId)) continue;
|
||
const fileName = path.basename(task.file);
|
||
if (!results.has(task.file)) {
|
||
let size = 0;
|
||
try { size = fs.statSync(task.file).size; } catch {}
|
||
results.set(task.file, { name: fileName, size, results: [] });
|
||
}
|
||
this._additionalPromises.push(this._runJob(task, results, signal));
|
||
added++;
|
||
}
|
||
return added;
|
||
}
|
||
|
||
cancelJobs(jobIds) {
|
||
for (const jobId of jobIds || []) {
|
||
if (!jobId) continue;
|
||
this.cancelledJobIds.add(jobId);
|
||
const controller = this.jobAbortControllers.get(jobId);
|
||
if (controller && !controller.signal.aborted) {
|
||
controller.abort();
|
||
}
|
||
}
|
||
}
|
||
|
||
finishAfterActive() {
|
||
this.stopAfterActive = true;
|
||
}
|
||
|
||
cancel() {
|
||
if (!this.running) return;
|
||
this.abortController.abort();
|
||
this.stopAfterActive = false;
|
||
this.running = false;
|
||
for (const controller of this.jobAbortControllers.values()) {
|
||
if (!controller.signal.aborted) controller.abort();
|
||
}
|
||
this._stopStatsTimer();
|
||
}
|
||
}
|
||
|
||
module.exports = UploadManager;
|