566 lines
18 KiB
JavaScript
566 lines
18 KiB
JavaScript
const { EventEmitter } = require('events');
|
|
const path = require('path');
|
|
const fs = require('fs');
|
|
const crypto = require('crypto');
|
|
const { uploadFile } = require('./hosters');
|
|
const VidmolyUploader = require('./vidmoly-upload');
|
|
const VoeUploader = require('./voe-upload');
|
|
const DoodstreamUploader = require('./doodstream-upload');
|
|
const Semaphore = require('./semaphore');
|
|
const Throttle = require('./throttle');
|
|
|
|
const DEFAULT_SETTINGS = {
|
|
retries: 3,
|
|
maxSpeedKbs: 0,
|
|
parallelCount: 2,
|
|
restartBelowKbs: 0,
|
|
timeIntervalSec: 0,
|
|
maxSizeMb: 0
|
|
};
|
|
|
|
class UploadManager extends EventEmitter {
|
|
constructor(hosterSettings, globalSettings) {
|
|
super();
|
|
this.hosterSettings = hosterSettings || {};
|
|
this.globalSettings = globalSettings || {};
|
|
this.semaphores = {};
|
|
this.globalSemaphore = null;
|
|
this.abortController = new AbortController();
|
|
this.running = false;
|
|
this.stopAfterActive = false;
|
|
this.statsInterval = null;
|
|
this.startTime = 0;
|
|
this.activeJobs = new Map(); // uploadId -> { jobId, speedKbs, bytesUploaded }
|
|
this.jobAbortControllers = new Map(); // jobId -> AbortController
|
|
this.cancelledJobIds = new Set();
|
|
this.sessionBytes = 0;
|
|
this.lastStartTime = {}; // hoster -> timestamp of last upload start
|
|
this.globalThrottle = null;
|
|
}
|
|
|
|
_getSettings(hoster) {
|
|
const settings = { ...DEFAULT_SETTINGS, ...(this.hosterSettings[hoster] || {}) };
|
|
const globalLimit = this._getGlobalParallelLimit();
|
|
if (this.globalSettings.scaleParallelUploads && globalLimit > 0) {
|
|
settings.parallelCount = Math.max(settings.parallelCount || 1, globalLimit);
|
|
}
|
|
return settings;
|
|
}
|
|
|
|
_getGlobalParallelLimit() {
|
|
const raw = Number(this.globalSettings.parallelUploadCount || 0);
|
|
if (!Number.isFinite(raw) || raw <= 0) return 0;
|
|
return Math.max(1, Math.min(100, Math.round(raw)));
|
|
}
|
|
|
|
_getGlobalSemaphore() {
|
|
const limit = this._getGlobalParallelLimit();
|
|
if (limit <= 0) return null;
|
|
if (!this.globalSemaphore) {
|
|
this.globalSemaphore = new Semaphore(limit);
|
|
} else {
|
|
this.globalSemaphore.updateLimit(limit);
|
|
}
|
|
return this.globalSemaphore;
|
|
}
|
|
|
|
_getGlobalThrottle() {
|
|
const kbs = Number(this.globalSettings.globalMaxSpeedKbs || 0);
|
|
if (!Number.isFinite(kbs) || kbs <= 0) return null;
|
|
if (!this.globalThrottle) {
|
|
this.globalThrottle = new Throttle(kbs * 1024);
|
|
} else {
|
|
this.globalThrottle.updateRate(kbs * 1024);
|
|
}
|
|
return this.globalThrottle;
|
|
}
|
|
|
|
_getSemaphore(hoster) {
|
|
if (!this.semaphores[hoster]) {
|
|
const settings = this._getSettings(hoster);
|
|
this.semaphores[hoster] = new Semaphore(settings.parallelCount);
|
|
} else {
|
|
this.semaphores[hoster].updateLimit(this._getSettings(hoster).parallelCount);
|
|
}
|
|
return this.semaphores[hoster];
|
|
}
|
|
|
|
async startBatch(tasks) {
|
|
this.running = true;
|
|
this.stopAfterActive = false;
|
|
this.abortController = new AbortController();
|
|
this.startTime = Date.now();
|
|
this.sessionBytes = 0;
|
|
this.activeJobs.clear();
|
|
this.jobAbortControllers.clear();
|
|
this.cancelledJobIds.clear();
|
|
this.semaphores = {};
|
|
this.globalSemaphore = null;
|
|
this.globalThrottle = null;
|
|
this.lastStartTime = {};
|
|
|
|
const { signal } = this.abortController;
|
|
const batchId = `batch-${Date.now()}`;
|
|
const results = new Map(); // filePath -> { name, size, results: [] }
|
|
|
|
for (const task of tasks) {
|
|
const fileName = path.basename(task.file);
|
|
if (!results.has(task.file)) {
|
|
let size = 0;
|
|
try { size = fs.statSync(task.file).size; } catch {}
|
|
results.set(task.file, { name: fileName, size, results: [] });
|
|
}
|
|
}
|
|
|
|
this._startStatsTimer();
|
|
|
|
const promises = tasks.map((task) => this._runJob(task, results, signal));
|
|
await Promise.allSettled(promises);
|
|
|
|
this._stopStatsTimer();
|
|
this.running = false;
|
|
|
|
const files = Array.from(results.values());
|
|
const total = tasks.length;
|
|
const succeeded = files.reduce((count, file) => count + file.results.filter((result) => result.status === 'done').length, 0);
|
|
|
|
const summary = {
|
|
id: batchId,
|
|
timestamp: new Date().toISOString(),
|
|
total,
|
|
succeeded,
|
|
failed: total - succeeded,
|
|
files
|
|
};
|
|
|
|
this.emit('batch-done', summary);
|
|
}
|
|
|
|
async _runJob(task, results, batchSignal) {
|
|
const settings = this._getSettings(task.hoster);
|
|
const hosterSemaphore = this._getSemaphore(task.hoster);
|
|
const globalSemaphore = this._getGlobalSemaphore();
|
|
const uploadId = crypto.randomBytes(8).toString('hex');
|
|
const jobId = task.jobId || uploadId;
|
|
const fileName = path.basename(task.file);
|
|
let fileSize = 0;
|
|
try { fileSize = fs.statSync(task.file).size; } catch {}
|
|
|
|
const maxAttempts = Math.max(1, (settings.retries || 0) + 1);
|
|
const jobAbortController = new AbortController();
|
|
const { signal, cleanup: cleanupSignals } = this._combineSignals(batchSignal, jobAbortController.signal);
|
|
this.jobAbortControllers.set(jobId, jobAbortController);
|
|
|
|
let hosterSlotAcquired = false;
|
|
let globalSlotAcquired = false;
|
|
let finalResultRecorded = false;
|
|
let lastError = null;
|
|
|
|
const recordFinalResult = (status, payload = {}) => {
|
|
if (finalResultRecorded) return;
|
|
finalResultRecorded = true;
|
|
|
|
const result = {
|
|
hoster: task.hoster,
|
|
status,
|
|
error: payload.error || null,
|
|
download_url: payload.result ? payload.result.download_url || null : null,
|
|
embed_url: payload.result ? payload.result.embed_url || null : null,
|
|
file_code: payload.result ? payload.result.file_code || null : null
|
|
};
|
|
|
|
results.get(task.file).results.push(result);
|
|
};
|
|
|
|
const emitFinalStatus = (status, payload = {}) => {
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
jobId,
|
|
status,
|
|
progress: status === 'done' ? 1 : 0,
|
|
bytesUploaded: status === 'done' ? fileSize : 0,
|
|
bytesTotal: fileSize,
|
|
speedKbs: payload.speedKbs || 0,
|
|
elapsed: payload.elapsed || 0,
|
|
remaining: 0,
|
|
error: payload.error || null,
|
|
result: payload.result || null,
|
|
attempt: payload.attempt || maxAttempts,
|
|
maxAttempts
|
|
});
|
|
};
|
|
|
|
try {
|
|
if (settings.maxSizeMb > 0 && fileSize > settings.maxSizeMb * 1024 * 1024) {
|
|
const error = `Datei zu groß (Max: ${settings.maxSizeMb} MB)`;
|
|
emitFinalStatus('skipped', { error, attempt: 0 });
|
|
recordFinalResult('error', { error });
|
|
return;
|
|
}
|
|
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
jobId,
|
|
status: 'queued',
|
|
progress: 0,
|
|
bytesUploaded: 0,
|
|
bytesTotal: fileSize,
|
|
speedKbs: 0,
|
|
elapsed: 0,
|
|
remaining: 0,
|
|
error: null,
|
|
result: null,
|
|
attempt: 0,
|
|
maxAttempts
|
|
});
|
|
|
|
if (globalSemaphore) {
|
|
await globalSemaphore.acquire(signal);
|
|
globalSlotAcquired = true;
|
|
}
|
|
|
|
await hosterSemaphore.acquire(signal);
|
|
hosterSlotAcquired = true;
|
|
|
|
if (settings.timeIntervalSec > 0) {
|
|
await this._waitForInterval(task.hoster, settings.timeIntervalSec * 1000, signal);
|
|
}
|
|
|
|
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
|
if (signal.aborted || this.stopAfterActive) break;
|
|
|
|
if (attempt > 1) {
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
jobId,
|
|
status: 'retrying',
|
|
progress: 0,
|
|
bytesUploaded: 0,
|
|
bytesTotal: fileSize,
|
|
speedKbs: 0,
|
|
elapsed: 0,
|
|
remaining: 0,
|
|
error: lastError ? lastError.message : null,
|
|
result: null,
|
|
attempt,
|
|
maxAttempts
|
|
});
|
|
await this._sleep(2500, signal);
|
|
}
|
|
|
|
const jobStart = Date.now();
|
|
let lastBytes = 0;
|
|
let lastSpeedTime = jobStart;
|
|
let currentSpeedKbs = 0;
|
|
let lowSpeedSince = 0;
|
|
let speedAbort = null;
|
|
let speedMonitor = null;
|
|
let uploadSignalBundle = { signal, cleanup() {} };
|
|
|
|
try {
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
jobId,
|
|
status: 'getting-server',
|
|
progress: 0,
|
|
bytesUploaded: 0,
|
|
bytesTotal: fileSize,
|
|
speedKbs: 0,
|
|
elapsed: 0,
|
|
remaining: 0,
|
|
error: null,
|
|
result: null,
|
|
attempt,
|
|
maxAttempts
|
|
});
|
|
|
|
const hosterThrottle = settings.maxSpeedKbs > 0
|
|
? new Throttle(settings.maxSpeedKbs * 1024)
|
|
: null;
|
|
const globalThrottle = this._getGlobalThrottle();
|
|
const throttle = hosterThrottle && globalThrottle
|
|
? { consume: async (bytes, sig) => { await hosterThrottle.consume(bytes, sig); await globalThrottle.consume(bytes, sig); } }
|
|
: hosterThrottle || globalThrottle;
|
|
|
|
if (settings.restartBelowKbs > 0) {
|
|
speedAbort = new AbortController();
|
|
uploadSignalBundle = this._combineManySignals([signal, speedAbort.signal]);
|
|
speedMonitor = setInterval(() => {
|
|
if (currentSpeedKbs > 0 && currentSpeedKbs < settings.restartBelowKbs) {
|
|
if (!lowSpeedSince) lowSpeedSince = Date.now();
|
|
if (Date.now() - lowSpeedSince > 6000) {
|
|
speedAbort.abort();
|
|
}
|
|
} else {
|
|
lowSpeedSince = 0;
|
|
}
|
|
}, 2000);
|
|
}
|
|
|
|
this.activeJobs.set(uploadId, { jobId, speedKbs: 0, bytesUploaded: 0 });
|
|
|
|
const progressCb = (bytesUploaded, bytesTotal) => {
|
|
const now = Date.now();
|
|
const elapsed = Math.round((now - jobStart) / 1000);
|
|
const timeDelta = (now - lastSpeedTime) / 1000;
|
|
if (timeDelta >= 1) {
|
|
const bytesDelta = bytesUploaded - lastBytes;
|
|
currentSpeedKbs = Math.round(bytesDelta / timeDelta / 1024);
|
|
lastBytes = bytesUploaded;
|
|
lastSpeedTime = now;
|
|
}
|
|
|
|
const remaining = currentSpeedKbs > 0
|
|
? Math.round((bytesTotal - bytesUploaded) / (currentSpeedKbs * 1024))
|
|
: 0;
|
|
|
|
this.activeJobs.set(uploadId, { jobId, speedKbs: currentSpeedKbs, bytesUploaded });
|
|
|
|
this._emitProgress(uploadId, fileName, task.hoster, {
|
|
jobId,
|
|
status: 'uploading',
|
|
progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
|
|
bytesUploaded,
|
|
bytesTotal,
|
|
speedKbs: currentSpeedKbs,
|
|
elapsed,
|
|
remaining,
|
|
error: null,
|
|
result: null,
|
|
attempt,
|
|
maxAttempts
|
|
});
|
|
};
|
|
|
|
let result;
|
|
if (task.hoster === 'vidmoly.me' && task.username) {
|
|
const vidmoly = new VidmolyUploader();
|
|
await vidmoly.login(task.username, task.password);
|
|
result = await vidmoly.upload(task.file, progressCb, uploadSignalBundle.signal, throttle);
|
|
} else if (task.hoster === 'voe.sx' && task.username) {
|
|
const voe = new VoeUploader();
|
|
await voe.login(task.username, task.password);
|
|
result = await voe.upload(task.file, progressCb, uploadSignalBundle.signal, throttle);
|
|
} else if (task.hoster === 'doodstream.com' && task.username) {
|
|
const dood = new DoodstreamUploader();
|
|
await dood.login(task.username, task.password);
|
|
result = await dood.upload(task.file, progressCb, uploadSignalBundle.signal, throttle);
|
|
} else {
|
|
result = await uploadFile(task.hoster, task.file, task.apiKey, progressCb, uploadSignalBundle.signal, throttle);
|
|
}
|
|
|
|
const elapsed = Math.round((Date.now() - jobStart) / 1000);
|
|
this.sessionBytes += fileSize;
|
|
this.activeJobs.delete(uploadId);
|
|
|
|
emitFinalStatus('done', {
|
|
result,
|
|
speedKbs: currentSpeedKbs,
|
|
elapsed,
|
|
attempt
|
|
});
|
|
recordFinalResult('done', { result });
|
|
return;
|
|
} catch (err) {
|
|
this.activeJobs.delete(uploadId);
|
|
|
|
const isSpeedRestart = speedAbort && speedAbort.signal.aborted && !signal.aborted;
|
|
if (signal.aborted) {
|
|
lastError = new Error('Abgebrochen');
|
|
break;
|
|
}
|
|
|
|
if (this.stopAfterActive) {
|
|
lastError = new Error('Angehalten');
|
|
break;
|
|
}
|
|
|
|
if (isSpeedRestart && attempt < maxAttempts) {
|
|
lastError = new Error('Geschwindigkeit zu niedrig - Neustart');
|
|
await this._sleep(3000, signal);
|
|
continue;
|
|
}
|
|
|
|
lastError = err;
|
|
if (attempt >= maxAttempts) break;
|
|
// Wait 3 seconds before retry
|
|
await this._sleep(3000, signal);
|
|
} finally {
|
|
if (speedMonitor) clearInterval(speedMonitor);
|
|
uploadSignalBundle.cleanup();
|
|
}
|
|
}
|
|
|
|
const wasStopped = this.stopAfterActive && !signal.aborted;
|
|
const wasAborted = signal.aborted || this.cancelledJobIds.has(jobId);
|
|
if (wasStopped || wasAborted) {
|
|
const error = wasStopped ? 'Warteschlange angehalten' : 'Abgebrochen';
|
|
emitFinalStatus('aborted', { error });
|
|
recordFinalResult('aborted', { error });
|
|
return;
|
|
}
|
|
|
|
const error = lastError && lastError.message ? lastError.message : 'Unbekannter Fehler';
|
|
emitFinalStatus('error', { error });
|
|
recordFinalResult('error', { error });
|
|
} catch (err) {
|
|
const wasStopped = this.stopAfterActive && !signal.aborted;
|
|
const error = wasStopped
|
|
? 'Warteschlange angehalten'
|
|
: (signal.aborted || this.cancelledJobIds.has(jobId) ? 'Abgebrochen' : (err && err.message ? err.message : 'Unbekannter Fehler'));
|
|
const status = signal.aborted || this.cancelledJobIds.has(jobId) || wasStopped ? 'aborted' : 'error';
|
|
emitFinalStatus(status, { error });
|
|
recordFinalResult(status === 'error' ? 'error' : 'aborted', { error });
|
|
} finally {
|
|
this.activeJobs.delete(uploadId);
|
|
this.jobAbortControllers.delete(jobId);
|
|
cleanupSignals();
|
|
if (hosterSlotAcquired) hosterSemaphore.release();
|
|
if (globalSlotAcquired && globalSemaphore) globalSemaphore.release();
|
|
}
|
|
}
|
|
|
|
_emitProgress(uploadId, fileName, hoster, data) {
|
|
this.emit('progress', { uploadId, fileName, hoster, ...data });
|
|
}
|
|
|
|
_startStatsTimer() {
|
|
this.statsInterval = setInterval(() => {
|
|
let globalSpeedKbs = 0;
|
|
let activeCount = 0;
|
|
for (const job of this.activeJobs.values()) {
|
|
globalSpeedKbs += job.speedKbs || 0;
|
|
activeCount++;
|
|
}
|
|
|
|
const elapsed = Math.round((Date.now() - this.startTime) / 1000);
|
|
let inProgressBytes = 0;
|
|
for (const job of this.activeJobs.values()) {
|
|
inProgressBytes += job.bytesUploaded || 0;
|
|
}
|
|
|
|
this.emit('stats', {
|
|
state: this.running ? (this.stopAfterActive ? 'stopping' : 'uploading') : 'idle',
|
|
globalSpeedKbs,
|
|
totalBytes: this.sessionBytes + inProgressBytes,
|
|
elapsed,
|
|
activeJobs: activeCount,
|
|
pendingJobs: Object.values(this.semaphores).reduce((sum, semaphore) => sum + semaphore.pending, 0)
|
|
});
|
|
}, 1000);
|
|
}
|
|
|
|
_stopStatsTimer() {
|
|
if (this.statsInterval) {
|
|
clearInterval(this.statsInterval);
|
|
this.statsInterval = null;
|
|
}
|
|
}
|
|
|
|
_combineSignals(signal1, signal2) {
|
|
const controller = new AbortController();
|
|
if (signal1.aborted || signal2.aborted) {
|
|
controller.abort();
|
|
return { signal: controller.signal, cleanup() {} };
|
|
}
|
|
|
|
const onAbort = () => {
|
|
controller.abort();
|
|
cleanup();
|
|
};
|
|
|
|
const cleanup = () => {
|
|
signal1.removeEventListener('abort', onAbort);
|
|
signal2.removeEventListener('abort', onAbort);
|
|
};
|
|
|
|
signal1.addEventListener('abort', onAbort, { once: true });
|
|
signal2.addEventListener('abort', onAbort, { once: true });
|
|
return { signal: controller.signal, cleanup };
|
|
}
|
|
|
|
_combineManySignals(signals) {
|
|
const liveSignals = signals.filter(Boolean);
|
|
const controller = new AbortController();
|
|
|
|
if (liveSignals.some((signal) => signal.aborted)) {
|
|
controller.abort();
|
|
return { signal: controller.signal, cleanup() {} };
|
|
}
|
|
|
|
const listeners = liveSignals.map((signal) => {
|
|
const handler = () => {
|
|
controller.abort();
|
|
cleanup();
|
|
};
|
|
signal.addEventListener('abort', handler, { once: true });
|
|
return { signal, handler };
|
|
});
|
|
|
|
const cleanup = () => {
|
|
for (const entry of listeners) {
|
|
entry.signal.removeEventListener('abort', entry.handler);
|
|
}
|
|
};
|
|
|
|
return { signal: controller.signal, cleanup };
|
|
}
|
|
|
|
_sleep(ms, signal) {
|
|
return new Promise((resolve, reject) => {
|
|
const onAbort = () => {
|
|
clearTimeout(timer);
|
|
reject(new Error('Aborted'));
|
|
};
|
|
|
|
const timer = setTimeout(() => {
|
|
if (signal) signal.removeEventListener('abort', onAbort);
|
|
resolve();
|
|
}, ms);
|
|
|
|
if (signal) {
|
|
if (signal.aborted) {
|
|
clearTimeout(timer);
|
|
reject(new Error('Aborted'));
|
|
return;
|
|
}
|
|
signal.addEventListener('abort', onAbort, { once: true });
|
|
}
|
|
});
|
|
}
|
|
|
|
async _waitForInterval(hoster, intervalMs, signal) {
|
|
const now = Date.now();
|
|
const last = this.lastStartTime[hoster] || 0;
|
|
const elapsed = now - last;
|
|
if (elapsed < intervalMs) {
|
|
await this._sleep(intervalMs - elapsed, signal);
|
|
}
|
|
this.lastStartTime[hoster] = Date.now();
|
|
}
|
|
|
|
cancelJobs(jobIds) {
|
|
for (const jobId of jobIds || []) {
|
|
if (!jobId) continue;
|
|
this.cancelledJobIds.add(jobId);
|
|
const controller = this.jobAbortControllers.get(jobId);
|
|
if (controller && !controller.signal.aborted) {
|
|
controller.abort();
|
|
}
|
|
}
|
|
}
|
|
|
|
finishAfterActive() {
|
|
this.stopAfterActive = true;
|
|
}
|
|
|
|
cancel() {
|
|
if (!this.running) return;
|
|
this.abortController.abort();
|
|
this.stopAfterActive = false;
|
|
this.running = false;
|
|
for (const controller of this.jobAbortControllers.values()) {
|
|
if (!controller.signal.aborted) controller.abort();
|
|
}
|
|
this._stopStatsTimer();
|
|
}
|
|
}
|
|
|
|
module.exports = UploadManager;
|