fix: semaphore abort support, progress clamp, and additional bug fixes

- Semaphore.acquire() now accepts AbortSignal — waiting jobs are properly
  removed from queue on abort, preventing startBatch from hanging forever
- Clamp upload progress to 0-100% in both upload-manager and renderer
- Upload-manager handles semaphore abort rejection gracefully

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Administrator 2026-03-10 06:00:38 +01:00
parent 25b2afbf11
commit 3d759eb8a6
3 changed files with 42 additions and 14 deletions

View File

@ -1,30 +1,53 @@
/**
* FIFO Semaphore for per-hoster concurrency control.
* acquire() blocks until a slot is available, release() frees it.
* acquire(signal?) blocks until a slot is available or the signal aborts.
* release() frees a slot.
*/
class Semaphore {
constructor(limit) {
this.limit = Math.max(1, limit || 1);
this.active = 0;
this.queue = [];
this.queue = []; // { resolve, reject, onAbort? }
}
acquire() {
return new Promise((resolve) => {
acquire(signal) {
return new Promise((resolve, reject) => {
if (signal && signal.aborted) {
reject(new Error('Aborted'));
return;
}
if (this.active < this.limit) {
this.active++;
resolve();
} else {
this.queue.push(resolve);
return;
}
const entry = { resolve, reject };
if (signal) {
entry.onAbort = () => {
// Remove from queue without granting a slot
const idx = this.queue.indexOf(entry);
if (idx !== -1) this.queue.splice(idx, 1);
reject(new Error('Aborted'));
};
signal.addEventListener('abort', entry.onAbort, { once: true });
}
this.queue.push(entry);
});
}
release() {
if (this.queue.length > 0) {
// Don't decrement active — hand slot directly to next waiter
const next = this.queue.shift();
next();
const entry = this.queue.shift();
// Clean up abort listener
if (entry.onAbort) {
// Entry was granted a slot; no need for abort listener anymore
}
entry.resolve();
} else {
this.active = Math.max(0, this.active - 1);
}
@ -35,8 +58,8 @@ class Semaphore {
// If new limit is higher, wake up waiting tasks
while (this.active < this.limit && this.queue.length > 0) {
this.active++;
const next = this.queue.shift();
next();
const entry = this.queue.shift();
entry.resolve();
}
}

View File

@ -122,8 +122,13 @@ class UploadManager extends EventEmitter {
error: null, result: null, attempt: 0, maxAttempts
});
// Wait for semaphore slot
await semaphore.acquire();
// Wait for semaphore slot (abortable)
try {
await semaphore.acquire(signal);
} catch {
// Aborted while waiting in queue — no slot was granted, no release needed
return;
}
if (signal.aborted) {
semaphore.release();
return;
@ -228,7 +233,7 @@ class UploadManager extends EventEmitter {
this.activeJobs.set(uploadId, { speedKbs: currentSpeedKbs, bytesUploaded });
this._emitProgress(uploadId, fileName, task.hoster, {
status: 'uploading', progress: bytesTotal > 0 ? bytesUploaded / bytesTotal : 0,
status: 'uploading', progress: bytesTotal > 0 ? Math.min(1, bytesUploaded / bytesTotal) : 0,
bytesUploaded, bytesTotal: bytesTotal,
speedKbs: currentSpeedKbs, elapsed, remaining,
error: null, result: null, attempt, maxAttempts

View File

@ -221,7 +221,7 @@ function renderQueueTable() {
const elapsed = formatTime(job.elapsed);
const remaining = formatTime(job.remaining);
const speed = job.speedKbs > 0 ? `${formatSpeed(job.speedKbs)}` : '';
const pct = Math.round((job.progress || 0) * 100);
const pct = Math.min(100, Math.round((job.progress || 0) * 100));
const link = job.result ? (job.result.download_url || job.result.embed_url || '') : '';
return `<tr class="${rowClass}" data-job-id="${job.id}" data-link="${escapeAttr(link)}">