Two bugs visible in the user's rotation log:
1. 'error=OK' for byse.sx — the server returned a payload with
msg='OK' and no file_code anywhere we recognized. Our generic
uploadFile threw the bare 'OK' as the error message, which is
useless and misleading. Now when we see an ok-ish msg without
the expected file_code we throw a descriptive error that
includes the first ~400 bytes of the payload so the next time
it happens we can see what's actually being returned (API
changed, new field name, etc.).
2. 'getaddrinfo ENOTFOUND s1055.filemoon' was marking accounts as
permanently failed, blacklisting BOTH byse accounts within the
same batch even though neither was the actual problem — filemoon
(byse's storage backend) briefly had a DNS blip. Added
_isTransientNetworkError() covering DNS/ECONNRESET/ETIMEDOUT/etc.
When all retries on an account exhaust with a transient error,
we now fail just that file and emit 'skip-rotation-transient'
instead of adding the account to _failedAccounts. Other files
in the same batch still get a fresh try on the same account.
922 lines
33 KiB
JavaScript
922 lines
33 KiB
JavaScript
const { EventEmitter } = require('events');
|
||
const path = require('path');
|
||
const fs = require('fs');
|
||
const crypto = require('crypto');
|
||
const { uploadFile } = require('./hosters');
|
||
const VidmolyUploader = require('./vidmoly-upload');
|
||
const VoeUploader = require('./voe-upload');
|
||
const DoodstreamUploader = require('./doodstream-upload');
|
||
const ClouddropUploader = require('./clouddrop-upload');
|
||
const Semaphore = require('./semaphore');
|
||
const Throttle = require('./throttle');
|
||
|
||
const DEFAULT_SETTINGS = {
|
||
retries: 3,
|
||
maxSpeedKbs: 0,
|
||
parallelCount: 2,
|
||
restartBelowKbs: 0,
|
||
timeIntervalSec: 0,
|
||
maxSizeMb: 0
|
||
};
|
||
|
||
class UploadManager extends EventEmitter {
|
||
constructor(hosterSettings, globalSettings) {
|
||
super();
|
||
this.hosterSettings = hosterSettings || {};
|
||
this.globalSettings = globalSettings || {};
|
||
this.semaphores = {};
|
||
this.globalSemaphore = null;
|
||
this.abortController = new AbortController();
|
||
this.running = false;
|
||
this.stopAfterActive = false;
|
||
this.statsInterval = null;
|
||
this.startTime = 0;
|
||
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
|
||
this.jobAbortControllers = new Map(); // jobId -> AbortController
|
||
this.cancelledJobIds = new Set();
|
||
this.sessionBytes = 0;
|
||
this.lastStartTime = {}; // hoster -> timestamp of last upload start
|
||
this.intervalLocks = {}; // hoster -> Promise chain for serialized interval waits
|
||
this.globalThrottle = null;
|
||
this._failedAccounts = new Map(); // hoster -> Set of failed accountIds
|
||
this._accountOverrides = new Map(); // hoster -> fallback account object
|
||
}
|
||
|
||
switchAccount(hoster, fallbackAccount) {
|
||
const prev = this._accountOverrides.get(hoster);
|
||
this._accountOverrides.set(hoster, fallbackAccount);
|
||
this._rotLog('switchAccount', {
|
||
hoster,
|
||
prevOverrideId: prev ? prev.id : null,
|
||
toAccountId: fallbackAccount ? fallbackAccount.id : null
|
||
});
|
||
}
|
||
|
||
_rotLog(event, data) {
|
||
this.emit('rot-log', { ts: Date.now(), event, ...data });
|
||
}
|
||
|
||
// Transient network errors — the account is fine, the network or the
|
||
// hoster's own backend hiccuped. Retrying on the SAME account is the right
|
||
// move; marking it failed would wrongly poison the fallback chain. If all
|
||
// retries on the current account still hit this class of error, we bail
|
||
// out for this file without blacklisting the account, so other jobs in the
|
||
// batch still get a fresh chance on it.
|
||
_isTransientNetworkError(err) {
|
||
if (!err || !err.message) return false;
|
||
const m = String(err.message);
|
||
const TRANSIENT = [
|
||
/ENOTFOUND/i,
|
||
/ECONNRESET/i,
|
||
/ECONNREFUSED/i,
|
||
/ETIMEDOUT/i,
|
||
/EAI_AGAIN/i,
|
||
/EHOSTUNREACH/i,
|
||
/ENETUNREACH/i,
|
||
/EPIPE/i,
|
||
/socket hang up/i,
|
||
/network (error|failure|problem)/i,
|
||
/dns (lookup|error|failed)/i,
|
||
/getaddrinfo/i,
|
||
/fetch failed/i,
|
||
/\bconnect (ETIMEDOUT|ECONN)/i
|
||
];
|
||
return TRANSIENT.some(p => p.test(m));
|
||
}
|
||
|
||
// Error classes that mean "this account is the problem, retrying on it won't
|
||
// help" — we skip the remaining retries and go straight to the fallback
|
||
// account. Keeps single runs fast when an account is rate-limited, banned,
|
||
// or out of quota.
|
||
_shouldSkipRetryOnAccountError(err) {
|
||
if (!err || !err.message) return false;
|
||
const m = String(err.message);
|
||
const PATTERNS = [
|
||
/Kein Upload-Server/i,
|
||
/No upload server/i,
|
||
/kein server/i,
|
||
/quota/i,
|
||
/limit (reached|exceeded|überschritten)/i,
|
||
/rate[- ]?limit/i,
|
||
/too many requests/i,
|
||
/\b(401|403|429)\b/,
|
||
/Falscher (User|Username|Passwort)/i,
|
||
/Incorrect (Login|Password)/i,
|
||
/invalid (credentials|api[- ]?key|token|session)/i,
|
||
/(account|user) (banned|suspended|disabled|gesperrt)/i,
|
||
/not authorized/i,
|
||
/forbidden/i,
|
||
/session (expired|abgelaufen)/i,
|
||
// Session/CSRF hints — the account's server session went stale, which
|
||
// no amount of retrying will fix. Re-login happens on the next account.
|
||
/CSRF[- ]?Token nicht gefunden/i,
|
||
/CSRF[- ]?token not found/i,
|
||
/Bist du eingeloggt/i,
|
||
/not logged in/i
|
||
];
|
||
return PATTERNS.some(p => p.test(m));
|
||
}
|
||
|
||
updateSettings(hosterSettings, globalSettings) {
|
||
this.hosterSettings = hosterSettings || this.hosterSettings;
|
||
this.globalSettings = globalSettings || this.globalSettings;
|
||
// Live-update semaphores for running uploads
|
||
for (const [hoster, sem] of Object.entries(this.semaphores)) {
|
||
const settings = this._getSettings(hoster);
|
||
sem.updateLimit(settings.parallelCount);
|
||
}
|
||
// Update global throttle if speed limit changed
|
||
const newKbs = (this.globalSettings.globalMaxSpeedKbs || 0);
|
||
if (newKbs > 0) {
|
||
if (this.globalThrottle) {
|
||
this.globalThrottle.updateRate(newKbs * 1024);
|
||
} else {
|
||
this.globalThrottle = new Throttle(newKbs * 1024);
|
||
}
|
||
} else {
|
||
this.globalThrottle = null;
|
||
}
|
||
// Update global semaphore live
|
||
const globalLimit = this._getGlobalParallelLimit();
|
||
if (globalLimit > 0 && this.globalSemaphore) {
|
||
this.globalSemaphore.updateLimit(globalLimit);
|
||
}
|
||
}
|
||
|
||
_getSettings(hoster) {
|
||
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
|
||
const globalLimit = this._getGlobalParallelLimit();
|
||
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
|
||
settings.parallelCount = Math.min(settings.parallelCount || 1, globalLimit);
|
||
}
|
||
return settings;
|
||
}
|
||
|
||
_getGlobalParallelLimit() {
|
||
const raw = Number(this.globalSettings.parallelUploadCount || 0);
|
||
if (!Number.isFinite(raw) || raw <= 0) return 0;
|
||
return Math.max(1, Math.min(100, Math.round(raw)));
|
||
}
|
||
|
||
_getGlobalSemaphore() {
|
||
const limit = this._getGlobalParallelLimit();
|
||
if (limit <= 0) return null;
|
||
if (!this.globalSemaphore) {
|
||
this.globalSemaphore = new Semaphore(limit);
|
||
} else {
|
||
this.globalSemaphore.updateLimit(limit);
|
||
}
|
||
return this.globalSemaphore;
|
||
}
|
||
|
||
_getGlobalThrottle() {
|
||
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
|
||
if (!Number.isFinite(kbs) || kbs <= 0) return null;
|
||
if (!this.globalThrottle) {
|
||
this.globalThrottle = new Throttle(kbs * 1024);
|
||
} else {
|
||
this.globalThrottle.updateRate(kbs * 1024);
|
||
}
|
||
return this.globalThrottle;
|
||
}
|
||
|
||
_getSemaphore(hoster) {
|
||
if (!this.semaphores[hoster]) {
|
||
const settings = this._getSettings(hoster);
|
||
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
|
||
} else {
|
||
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
|
||
}
|
||
return this.semaphores[hoster];
|
||
}
|
||
|
||
async startBatch(tasks) {
|
||
this.running = true;
|
||
this.stopAfterActive = false;
|
||
this.abortController = new AbortController();
|
||
this.startTime = Date.now();
|
||
this.sessionBytes = 0;
|
||
this.activeJobs.clear();
|
||
this.jobAbortControllers.clear();
|
||
this.cancelledJobIds.clear();
|
||
this.semaphores = {};
|
||
this.globalSemaphore = null;
|
||
this.globalThrottle = null;
|
||
this.lastStartTime = {};
|
||
// Reset account-rotation state each batch. Otherwise a previously failed
|
||
// account (e.g. rate-limited during the last batch) stays permanently
|
||
// blacklisted until the app restarts — every upload would silently skip
|
||
// straight to the fallback even after the original recovered.
|
||
this._failedAccounts.clear();
|
||
this._accountOverrides.clear();
|
||
this._rotLog('batch-start', { taskCount: tasks.length });
|
||
|
||
const { signal } = this.abortController;
|
||
const batchId = `batch-${Date.now()}`;
|
||
const results = new Map(); // filePath -> { name, size, results: [] }
|
||
this._batchResults = results;
|
||
this._additionalPromises = []; // Track jobs added mid-batch via addJobs()
|
||
|
||
for (const task of tasks) {
|
||
const fileName = path.basename(task.file);
|
||
if (!results.has(task.file)) {
|
||
let size = 0;
|
||
try { size = fs.statSync(task.file).size; } catch {}
|
||
results.set(task.file, { name: fileName, size, results: [] });
|
||
}
|
||
}
|
||
|
||
this._startStatsTimer();
|
||
|
||
const promises = tasks.map((task) => this._runJob(task, results, signal));
|
||
await Promise.allSettled(promises);
|
||
// Wait for any jobs added mid-batch via addJobs()
|
||
while (this._additionalPromises.length > 0) {
|
||
const batch = this._additionalPromises.splice(0);
|
||
await Promise.allSettled(batch);
|
||
}
|
||
|
||
this._stopStatsTimer();
|
||
this.running = false;
|
||
|
||
const files = Array.from(results.values());
|
||
const total = tasks.length;
|
||
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
|
||
|
||
const summary = {
|
||
id: batchId,
|
||
timestamp: new Date().toISOString(),
|
||
total,
|
||
succeeded,
|
||
failed: total - succeeded,
|
||
files
|
||
};
|
||
|
||
this.emit('batch-done', summary);
|
||
}
|
||
|
||
async _runJob(task, results, batchSignal) {
|
||
const settings = this._getSettings(task.hoster);
|
||
const hosterSemaphore = this._getSemaphore(task.hoster);
|
||
const globalSemaphore = this._getGlobalSemaphore();
|
||
const uploadId = crypto.randomBytes(8).toString('hex');
|
||
const jobId = task.jobId || uploadId;
|
||
const fileName = path.basename(task.file);
|
||
let fileSize = 0;
|
||
let fileNotFound = false;
|
||
try { fileSize = fs.statSync(task.file).size; } catch { fileNotFound = true; }
|
||
|
||
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
|
||
const jobAbortController = new AbortController();
|
||
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
|
||
this.jobAbortControllers.set(jobId, jobAbortController);
|
||
|
||
let hosterSlotAcquired = false;
|
||
let globalSlotAcquired = false;
|
||
let finalResultRecorded = false;
|
||
let lastError = null;
|
||
|
||
const recordFinalResult = (status, payload = {}) => {
|
||
if (finalResultRecorded) return;
|
||
finalResultRecorded = true;
|
||
|
||
const result = {
|
||
hoster: task.hoster,
|
||
status,
|
||
error: payload.error || null,
|
||
download_url: payload.result ? payload.result.download_url || null : null,
|
||
embed_url: payload.result ? payload.result.embed_url || null : null,
|
||
file_code: payload.result ? payload.result.file_code || null : null
|
||
};
|
||
|
||
results.get(task.file).results.push(result);
|
||
};
|
||
|
||
const emitFinalStatus = (status, payload = {}) => {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status,
|
||
progress: status === 'done' ? 1 : 0,
|
||
bytesUploaded: status === 'done' ? fileSize : 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: payload.speedKbs || 0,
|
||
elapsed: payload.elapsed || 0,
|
||
remaining: 0,
|
||
error: payload.error || null,
|
||
result: payload.result || null,
|
||
attempt: payload.attempt || maxAttempts,
|
||
maxAttempts
|
||
});
|
||
};
|
||
|
||
try {
|
||
if (fileNotFound) {
|
||
const error = 'Datei nicht gefunden';
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
if (fileSize <= 0) {
|
||
const error = 'Datei ist leer (0 Bytes)';
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
|
||
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
|
||
emitFinalStatus('skipped', { error, attempt: 0 });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
|
||
// If this account already failed in this batch, switch to fallback immediately
|
||
// instead of wasting retries on a known-bad account
|
||
if (task.accountId && this._failedAccounts.has(task.hoster + ':' + task.accountId)) {
|
||
const override = this._accountOverrides.get(task.hoster);
|
||
if (override && !this._failedAccounts.has(task.hoster + ':' + override.id)) {
|
||
this._rotLog('pre-job-swap', {
|
||
hoster: task.hoster, fileName, fromAccountId: task.accountId, toAccountId: override.id
|
||
});
|
||
task.accountId = override.id;
|
||
task.username = override.username;
|
||
task.password = override.password;
|
||
task.apiKey = override.apiKey;
|
||
} else {
|
||
this._rotLog('pre-job-swap-blocked', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId,
|
||
hasOverride: !!override,
|
||
overrideAlsoFailed: override ? this._failedAccounts.has(task.hoster + ':' + override.id) : false
|
||
});
|
||
}
|
||
}
|
||
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'queued',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: null,
|
||
result: null,
|
||
attempt: 0,
|
||
maxAttempts
|
||
});
|
||
|
||
// Acquire hoster semaphore first so jobs waiting for a hoster slot
|
||
// don't waste global slots (prevents underutilization)
|
||
await hosterSemaphore.acquire(signal);
|
||
hosterSlotAcquired = true;
|
||
|
||
if (globalSemaphore) {
|
||
await globalSemaphore.acquire(signal);
|
||
globalSlotAcquired = true;
|
||
}
|
||
|
||
if (settings.timeIntervalSec > 0) {
|
||
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
|
||
}
|
||
|
||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
|
||
if (attempt > 1) {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'retrying',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: lastError ? lastError.message : null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
await this._sleep(3000, signal);
|
||
}
|
||
|
||
const jobStart = Date.now();
|
||
let lastBytes = 0;
|
||
let lastSpeedTime = jobStart;
|
||
let currentSpeedKbs = 0;
|
||
let lowSpeedSince = 0;
|
||
let speedAbort = null;
|
||
let speedMonitor = null;
|
||
let uploadSignalBundle = { signal, cleanup() {} };
|
||
|
||
try {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'getting-server',
|
||
progress: 0,
|
||
bytesUploaded: 0,
|
||
bytesTotal: fileSize,
|
||
speedKbs: 0,
|
||
elapsed: 0,
|
||
remaining: 0,
|
||
error: null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
|
||
const hosterThrottle = settings.maxSpeedKbs > 0
|
||
? new Throttle(settings.maxSpeedKbs * 1024)
|
||
: null;
|
||
const globalThrottle = this._getGlobalThrottle();
|
||
const throttle = hosterThrottle && globalThrottle
|
||
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
|
||
: hosterThrottle || globalThrottle;
|
||
|
||
if (settings.restartBelowKbs > 0) {
|
||
speedAbort = new AbortController();
|
||
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
|
||
speedMonitor = setInterval(() => {
|
||
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
|
||
if (!lowSpeedSince) lowSpeedSince = Date.now();
|
||
if (Date.now() - lowSpeedSince > 6000) {
|
||
speedAbort.abort();
|
||
}
|
||
} else {
|
||
lowSpeedSince = 0;
|
||
}
|
||
}, 2000);
|
||
}
|
||
|
||
// Mutate this single object on each progress callback instead of
|
||
// allocating a fresh one — callback fires on every stream chunk
|
||
// (hundreds/sec per active job).
|
||
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
|
||
this.activeJobs.set(uploadId, activeEntry);
|
||
|
||
let lastEmitTime = 0;
|
||
const PROGRESS_EMIT_INTERVAL = 250; // ms – throttle UI updates
|
||
|
||
const progressCb = (bytesUploaded, bytesTotal) => {
|
||
const now = Date.now();
|
||
const elapsed = Math.round((now - jobStart) / 1000);
|
||
const timeDelta = (now - lastSpeedTime) / 1000;
|
||
if (timeDelta >= 1) {
|
||
const bytesDelta = bytesUploaded - lastBytes;
|
||
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
|
||
lastBytes = bytesUploaded;
|
||
lastSpeedTime = now;
|
||
}
|
||
|
||
activeEntry.speedKbs = currentSpeedKbs;
|
||
activeEntry.bytesUploaded = bytesUploaded;
|
||
|
||
// Throttle progress emissions to reduce IPC + rendering overhead
|
||
if (now - lastEmitTime < PROGRESS_EMIT_INTERVAL) return;
|
||
lastEmitTime = now;
|
||
|
||
const remaining = currentSpeedKbs > 0
|
||
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
|
||
: 0;
|
||
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId,
|
||
status: 'uploading',
|
||
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
||
bytesUploaded,
|
||
bytesTotal,
|
||
speedKbs: currentSpeedKbs,
|
||
elapsed,
|
||
remaining,
|
||
error: null,
|
||
result: null,
|
||
attempt,
|
||
maxAttempts
|
||
});
|
||
};
|
||
|
||
const result = await this._executeUpload(task, progressCb, uploadSignalBundle.signal, throttle);
|
||
|
||
const elapsed = Math.round((Date.now() - jobStart) / 1000);
|
||
this.sessionBytes += fileSize;
|
||
this.activeJobs.delete(uploadId);
|
||
|
||
emitFinalStatus('done', {
|
||
result,
|
||
speedKbs: currentSpeedKbs,
|
||
elapsed,
|
||
attempt
|
||
});
|
||
recordFinalResult('done', { result });
|
||
return;
|
||
} catch (err) {
|
||
this.activeJobs.delete(uploadId);
|
||
|
||
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
|
||
if (signal.aborted) {
|
||
lastError = new Error('Abgebrochen');
|
||
break;
|
||
}
|
||
|
||
if (this.stopAfterActive) {
|
||
lastError = new Error('Angehalten');
|
||
break;
|
||
}
|
||
|
||
if (isSpeedRestart && attempt < maxAttempts) {
|
||
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
|
||
await this._sleep(3000, signal);
|
||
continue;
|
||
}
|
||
|
||
lastError = err;
|
||
// Account-specific errors — don't waste retries on the same account,
|
||
// jump straight to rotation.
|
||
if (this._shouldSkipRetryOnAccountError(err)) {
|
||
this._rotLog('fast-fail', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId,
|
||
attempt, error: err && err.message ? err.message : String(err)
|
||
});
|
||
break;
|
||
}
|
||
if (attempt >= maxAttempts) break;
|
||
// Wait 3 seconds before retry
|
||
await this._sleep(3000, signal);
|
||
} finally {
|
||
if (speedMonitor) clearInterval(speedMonitor);
|
||
uploadSignalBundle.cleanup();
|
||
}
|
||
}
|
||
|
||
const wasStopped = this.stopAfterActive && !signal.aborted;
|
||
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
|
||
if (wasStopped || wasAborted) {
|
||
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
|
||
emitFinalStatus('aborted', { error });
|
||
recordFinalResult('aborted', { error });
|
||
return;
|
||
}
|
||
|
||
// Account rotation: mark the current account failed (if not already),
|
||
// wait for main to resolve the next fallback, then retry. Loops so
|
||
// A → B → C → ... works for hosters with 3+ accounts.
|
||
//
|
||
// CRITICAL: we must ALWAYS check for an existing override, even if this
|
||
// account is already in _failedAccounts (e.g. another concurrent job
|
||
// already marked it failed). Otherwise the second job falls straight
|
||
// through to final-error instead of using the already-resolved fallback.
|
||
this._rotLog('retries-exhausted', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId,
|
||
lastError: lastError ? lastError.message : null
|
||
});
|
||
// If the reason for failure was a transient network error we do NOT
|
||
// blacklist the account. Other jobs on the same account in this batch
|
||
// can still try fresh. This file just errors out for now.
|
||
if (this._isTransientNetworkError(lastError)) {
|
||
this._rotLog('skip-rotation-transient', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId,
|
||
lastError: lastError ? lastError.message : null
|
||
});
|
||
const error = lastError.message || 'Netzwerkfehler';
|
||
emitFinalStatus('error', { error });
|
||
recordFinalResult('error', { error });
|
||
return;
|
||
}
|
||
while (task.accountId) {
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
const alreadyMarked = this._failedAccounts.has(task.hoster + ':' + task.accountId);
|
||
if (!alreadyMarked) {
|
||
this._failedAccounts.set(task.hoster + ':' + task.accountId, true);
|
||
this._rotLog('mark-failed', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId,
|
||
lastError: lastError ? lastError.message : null
|
||
});
|
||
this.emit('account-failed', { hoster: task.hoster, accountId: task.accountId });
|
||
await this._sleep(800, signal);
|
||
} else {
|
||
this._rotLog('already-marked', {
|
||
hoster: task.hoster, fileName, accountId: task.accountId
|
||
});
|
||
}
|
||
const override = this._accountOverrides.get(task.hoster);
|
||
if (!override) {
|
||
this._rotLog('rotation-end', {
|
||
hoster: task.hoster, fileName, reason: 'no-override-set',
|
||
lastFailedAccountId: task.accountId
|
||
});
|
||
break;
|
||
}
|
||
if (this._failedAccounts.has(task.hoster + ':' + override.id)) {
|
||
this._rotLog('rotation-end', {
|
||
hoster: task.hoster, fileName, reason: 'override-already-failed',
|
||
overrideId: override.id, lastFailedAccountId: task.accountId
|
||
});
|
||
break;
|
||
}
|
||
if (override.id === task.accountId) {
|
||
this._rotLog('rotation-end', {
|
||
hoster: task.hoster, fileName, reason: 'override-same-as-current',
|
||
lastFailedAccountId: task.accountId
|
||
});
|
||
break;
|
||
}
|
||
// Switch to fallback account and retry this file
|
||
this._rotLog('rotate', {
|
||
hoster: task.hoster, fileName,
|
||
fromAccountId: task.accountId, toAccountId: override.id
|
||
});
|
||
task.accountId = override.id;
|
||
task.username = override.username;
|
||
task.password = override.password;
|
||
task.apiKey = override.apiKey;
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
|
||
speedKbs: 0, elapsed: 0, remaining: 0,
|
||
error: 'Account-Wechsel zu Fallback', result: null, attempt: 1, maxAttempts
|
||
});
|
||
// Retry loop with the new account. On exhausted failure, the while
|
||
// loop iterates: marks this account failed too, asks main for the next
|
||
// fallback, and so on.
|
||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
if (attempt > 1) {
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'retrying', progress: 0, bytesUploaded: 0, bytesTotal: fileSize,
|
||
speedKbs: 0, elapsed: 0, remaining: 0,
|
||
error: lastError ? lastError.message : '', result: null, attempt, maxAttempts
|
||
});
|
||
await this._sleep(3000, signal);
|
||
}
|
||
try {
|
||
const jobStart = Date.now();
|
||
let lastBytes = 0;
|
||
let lastSpeedTime = jobStart;
|
||
let currentSpeedKbs = 0;
|
||
const activeEntry = { jobId, speedKbs: 0, bytesUploaded: 0 };
|
||
this.activeJobs.set(uploadId, activeEntry);
|
||
|
||
const progressCb = (bytesUploaded, bytesTotal) => {
|
||
const now = Date.now();
|
||
const timeDelta = (now - lastSpeedTime) / 1000;
|
||
if (timeDelta >= 1) {
|
||
currentSpeedKbs = Math.round((bytesUploaded - lastBytes) / timeDelta / 1024);
|
||
lastBytes = bytesUploaded;
|
||
lastSpeedTime = now;
|
||
}
|
||
activeEntry.speedKbs = currentSpeedKbs;
|
||
activeEntry.bytesUploaded = bytesUploaded;
|
||
const elapsed = Math.round((now - jobStart) / 1000);
|
||
const remaining = currentSpeedKbs > 0 ? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024)) : 0;
|
||
this._emitProgress(uploadId, fileName, task.hoster, {
|
||
jobId, status: 'uploading',
|
||
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
||
bytesUploaded, bytesTotal, speedKbs: currentSpeedKbs,
|
||
elapsed, remaining, error: null, result: null, attempt, maxAttempts
|
||
});
|
||
};
|
||
|
||
const hosterThrottle = settings.maxSpeedKbs > 0 ? new Throttle(settings.maxSpeedKbs * 1024) : null;
|
||
const globalThrottle = this._getGlobalThrottle();
|
||
const throttle = hosterThrottle && globalThrottle
|
||
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
|
||
: hosterThrottle || globalThrottle;
|
||
|
||
const result = await this._executeUpload(task, progressCb, signal, throttle);
|
||
this.activeJobs.delete(uploadId);
|
||
this.sessionBytes += fileSize;
|
||
emitFinalStatus('done', { result, speedKbs: currentSpeedKbs, elapsed: Math.round((Date.now() - jobStart) / 1000), attempt });
|
||
recordFinalResult('done', { result });
|
||
return;
|
||
} catch (err) {
|
||
this.activeJobs.delete(uploadId);
|
||
lastError = err;
|
||
if (signal.aborted || this.stopAfterActive) break;
|
||
if (attempt >= maxAttempts) break;
|
||
}
|
||
}
|
||
}
|
||
|
||
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
|
||
this._rotLog('final-error', {
|
||
hoster: task.hoster, fileName, lastFailedAccountId: task.accountId, error
|
||
});
|
||
emitFinalStatus('error', { error });
|
||
recordFinalResult('error', { error });
|
||
} catch (err) {
|
||
const wasStopped = this.stopAfterActive && !signal.aborted;
|
||
const error = wasStopped
|
||
? 'Warteschlange angehalten'
|
||
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
|
||
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
|
||
emitFinalStatus(status, { error });
|
||
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
|
||
} finally {
|
||
this.activeJobs.delete(uploadId);
|
||
this.jobAbortControllers.delete(jobId);
|
||
cleanupSignals();
|
||
// Release in reverse order of acquire (global first, then hoster)
|
||
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
|
||
if (hosterSlotAcquired) hosterSemaphore.release();
|
||
}
|
||
}
|
||
|
||
async _executeUpload(task, progressCb, signal, throttle) {
|
||
if (task.hoster === 'vidmoly.me' && task.username) {
|
||
const vidmoly = new VidmolyUploader();
|
||
await vidmoly.login(task.username, task.password);
|
||
return vidmoly.upload(task.file, progressCb, signal, throttle);
|
||
} else if (task.hoster === 'voe.sx' && task.username) {
|
||
const voe = new VoeUploader();
|
||
await voe.login(task.username, task.password);
|
||
return voe.upload(task.file, progressCb, signal, throttle);
|
||
} else if (task.hoster === 'doodstream.com' && task.username) {
|
||
const dood = new DoodstreamUploader();
|
||
await dood.login(task.username, task.password);
|
||
return dood.upload(task.file, progressCb, signal, throttle);
|
||
} else if (task.hoster === 'clouddrop.cc') {
|
||
const clouddrop = new ClouddropUploader(task.apiKey);
|
||
return clouddrop.upload(task.file, progressCb, signal, throttle);
|
||
} else {
|
||
return uploadFile(task.hoster, task.file, task.apiKey, progressCb, signal, throttle);
|
||
}
|
||
}
|
||
|
||
_emitProgress(uploadId, fileName, hoster, data) {
|
||
this.emit('progress', { uploadId, fileName, hoster, ...data });
|
||
}
|
||
|
||
_startStatsTimer() {
|
||
if (this.statsInterval) clearInterval(this.statsInterval);
|
||
this.statsInterval = setInterval(() => {
|
||
// Single pass over active jobs instead of two.
|
||
let globalSpeedKbs = 0;
|
||
let activeCount = 0;
|
||
let inProgressBytes = 0;
|
||
for (const job of this.activeJobs.values()) {
|
||
globalSpeedKbs += job.speedKbs || 0;
|
||
inProgressBytes += job.bytesUploaded || 0;
|
||
activeCount++;
|
||
}
|
||
|
||
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
|
||
|
||
this.emit('stats', {
|
||
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
|
||
globalSpeedKbs,
|
||
totalBytes: this.sessionBytes + inProgressBytes,
|
||
elapsed,
|
||
activeJobs: activeCount,
|
||
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
|
||
});
|
||
}, 1000);
|
||
}
|
||
|
||
_stopStatsTimer() {
|
||
if (this.statsInterval) {
|
||
clearInterval(this.statsInterval);
|
||
this.statsInterval = null;
|
||
}
|
||
}
|
||
|
||
_combineSignals(signal1, signal2) {
|
||
const controller = new AbortController();
|
||
if (signal1.aborted || signal2.aborted) {
|
||
controller.abort();
|
||
return { signal: controller.signal, cleanup() {} };
|
||
}
|
||
|
||
const onAbort = () => {
|
||
controller.abort();
|
||
cleanup();
|
||
};
|
||
|
||
const cleanup = () => {
|
||
signal1.removeEventListener('abort', onAbort);
|
||
signal2.removeEventListener('abort', onAbort);
|
||
};
|
||
|
||
signal1.addEventListener('abort', onAbort, { once: true });
|
||
signal2.addEventListener('abort', onAbort, { once: true });
|
||
return { signal: controller.signal, cleanup };
|
||
}
|
||
|
||
_combineManySignals(signals) {
|
||
const liveSignals = signals.filter(Boolean);
|
||
const controller = new AbortController();
|
||
|
||
if (liveSignals.some((signal) => signal.aborted)) {
|
||
controller.abort();
|
||
return { signal: controller.signal, cleanup() {} };
|
||
}
|
||
|
||
const listeners = liveSignals.map((signal) => {
|
||
const handler = () => {
|
||
controller.abort();
|
||
cleanup();
|
||
};
|
||
signal.addEventListener('abort', handler, { once: true });
|
||
return { signal, handler };
|
||
});
|
||
|
||
const cleanup = () => {
|
||
for (const entry of listeners) {
|
||
entry.signal.removeEventListener('abort', entry.handler);
|
||
}
|
||
};
|
||
|
||
return { signal: controller.signal, cleanup };
|
||
}
|
||
|
||
_sleep(ms, signal) {
|
||
return new Promise((resolve, reject) => {
|
||
const onAbort = () => {
|
||
clearTimeout(timer);
|
||
reject(new Error('Aborted'));
|
||
};
|
||
|
||
const timer = setTimeout(() => {
|
||
if (signal) signal.removeEventListener('abort', onAbort);
|
||
resolve();
|
||
}, ms);
|
||
|
||
if (signal) {
|
||
if (signal.aborted) {
|
||
clearTimeout(timer);
|
||
reject(new Error('Aborted'));
|
||
return;
|
||
}
|
||
signal.addEventListener('abort', onAbort, { once: true });
|
||
}
|
||
});
|
||
}
|
||
|
||
_waitForInterval(hoster, intervalMs, signal) {
|
||
// Serialize interval waits per hoster so concurrent jobs queue up properly
|
||
const prev = this.intervalLocks[hoster] || Promise.resolve();
|
||
const next = prev.then(async () => {
|
||
const now = Date.now();
|
||
const last = this.lastStartTime[hoster] || 0;
|
||
const elapsed = now - last;
|
||
if (elapsed < intervalMs) {
|
||
await this._sleep(intervalMs - elapsed, signal);
|
||
}
|
||
this.lastStartTime[hoster] = Date.now();
|
||
});
|
||
this.intervalLocks[hoster] = next.catch(() => {});
|
||
return next;
|
||
}
|
||
|
||
addJobs(tasks) {
|
||
if (!this.running || !tasks || tasks.length === 0) {
|
||
return { added: 0, alreadyInBatchJobIds: [] };
|
||
}
|
||
const { signal } = this.abortController;
|
||
const results = this._batchResults || new Map();
|
||
const addResult = { added: 0, alreadyInBatchJobIds: [] };
|
||
for (const task of tasks) {
|
||
// Skip if this job is already being processed (prevent duplicates)
|
||
if (task.jobId && this.jobAbortControllers.has(task.jobId)) {
|
||
addResult.alreadyInBatchJobIds.push(task.jobId);
|
||
continue;
|
||
}
|
||
const fileName = path.basename(task.file);
|
||
if (!results.has(task.file)) {
|
||
let size = 0;
|
||
try { size = fs.statSync(task.file).size; } catch {}
|
||
results.set(task.file, { name: fileName, size, results: [] });
|
||
}
|
||
this._additionalPromises.push(this._runJob(task, results, signal));
|
||
addResult.added++;
|
||
}
|
||
return addResult;
|
||
}
|
||
|
||
cancelJobs(jobIds) {
|
||
for (const jobId of jobIds || []) {
|
||
if (!jobId) continue;
|
||
this.cancelledJobIds.add(jobId);
|
||
const controller = this.jobAbortControllers.get(jobId);
|
||
if (controller && !controller.signal.aborted) {
|
||
controller.abort();
|
||
}
|
||
}
|
||
}
|
||
|
||
finishAfterActive() {
|
||
this.stopAfterActive = true;
|
||
}
|
||
|
||
cancel() {
|
||
if (!this.running) return;
|
||
this.abortController.abort();
|
||
this.stopAfterActive = false;
|
||
this.running = false;
|
||
for (const controller of this.jobAbortControllers.values()) {
|
||
if (!controller.signal.aborted) controller.abort();
|
||
}
|
||
this._stopStatsTimer();
|
||
}
|
||
}
|
||
|
||
module.exports = UploadManager;
|