bun-server-queue
1
总安装量
1
周安装量
#49730
全站排名
安装命令
npx skills add https://github.com/dangaogit/bun-server-skills --skill bun-server-queue
Agent 安装分布
trae
1
qoder
1
trae-cn
1
cursor
1
claude-code
1
Skill 文档
Bun Server Queue
Setup QueueModule
import { Module, QueueModule } from "@dangao/bun-server";
QueueModule.forRoot({
defaultConcurrency: 5, // Concurrent job processing
defaultRetries: 3, // Retry failed jobs
defaultBackoff: 1000, // Backoff between retries (ms)
});
@Module({
imports: [QueueModule],
})
class AppModule {}
Define Queue Handlers
import { Injectable, Queue } from "@dangao/bun-server";
@Injectable()
class EmailQueue {
@Queue("email:send")
async handleSendEmail(job: { to: string; subject: string; body: string }) {
console.log(`Sending email to ${job.to}`);
await this.emailService.send(job.to, job.subject, job.body);
}
@Queue("email:bulk", { concurrency: 2, retries: 5 })
async handleBulkEmail(job: { recipients: string[]; template: string }) {
for (const recipient of job.recipients) {
await this.emailService.sendTemplate(recipient, job.template);
}
}
}
Add Jobs to Queue
import {
Injectable,
Inject,
QueueService,
QUEUE_SERVICE_TOKEN,
} from "@dangao/bun-server";
@Injectable()
class UserService {
constructor(
@Inject(QUEUE_SERVICE_TOKEN) private readonly queue: QueueService
) {}
async createUser(data: CreateUserDto) {
const user = await this.db.createUser(data);
// Add job to queue
await this.queue.add("email:send", {
to: user.email,
subject: "Welcome!",
body: "Thanks for signing up.",
});
return user;
}
async sendBulkEmail(recipients: string[], template: string) {
await this.queue.add("email:bulk", { recipients, template });
}
}
Job Options
await this.queue.add("email:send", payload, {
delay: 5000, // Delay 5 seconds before processing
priority: 1, // Higher priority (default: 0)
retries: 5, // Override default retries
backoff: 2000, // Override default backoff
removeOnComplete: true, // Remove job data after completion
removeOnFail: false, // Keep failed job data
});
Cron Jobs (Scheduled Tasks)
import { Injectable, Cron } from "@dangao/bun-server";
@Injectable()
class ScheduledTasks {
// Every minute
@Cron("* * * * *")
async everyMinute() {
console.log("Running every minute");
}
// Every hour at minute 0
@Cron("0 * * * *")
async everyHour() {
console.log("Running every hour");
}
// Every day at midnight
@Cron("0 0 * * *")
async daily() {
await this.cleanupOldData();
}
// Every Monday at 9 AM
@Cron("0 9 * * 1")
async weeklyReport() {
await this.generateWeeklyReport();
}
// With options
@Cron("*/5 * * * *", { name: "health-check", runOnInit: true })
async healthCheck() {
await this.checkSystemHealth();
}
}
Cron Expression Reference
âââââââââââââ minute (0-59)
â âââââââââââ hour (0-23)
â â âââââââââ day of month (1-31)
â â â âââââââ month (1-12)
â â â â âââââ day of week (0-6, Sunday=0)
â â â â â
* * * * *
Examples:
"* * * * *" - Every minute
"*/5 * * * *" - Every 5 minutes
"0 * * * *" - Every hour
"0 0 * * *" - Daily at midnight
"0 0 * * 0" - Weekly on Sunday
"0 0 1 * *" - Monthly on 1st
"0 9-17 * * 1-5" - Weekdays 9 AM to 5 PM, every hour
Job Events and Monitoring
@Injectable()
class QueueMonitor {
constructor(
@Inject(QUEUE_SERVICE_TOKEN) private readonly queue: QueueService
) {
// Listen to queue events
this.queue.on("completed", (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
this.queue.on("failed", (job, error) => {
console.error(`Job ${job.id} failed:`, error);
});
this.queue.on("progress", (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
}
}
Job Progress
@Queue("import:data")
async handleImport(job: JobData, context: JobContext) {
const items = job.items;
const total = items.length;
for (let i = 0; i < total; i++) {
await this.processItem(items[i]);
await context.updateProgress(((i + 1) / total) * 100);
}
}
Queue Patterns
Delayed Jobs
// Send reminder after 24 hours
await this.queue.add(
"reminder:send",
{ userId, message },
{ delay: 24 * 60 * 60 * 1000 }
);
Rate Limited Processing
@Queue("api:call", { concurrency: 1, backoff: 1000 })
async handleApiCall(job: { endpoint: string }) {
// Processes one at a time with 1s between calls
return fetch(job.endpoint);
}
Retry with Exponential Backoff
@Queue("webhook:send", {
retries: 5,
backoffType: "exponential",
backoff: 1000, // 1s, 2s, 4s, 8s, 16s
})
async handleWebhook(job: WebhookPayload) {
await this.sendWebhook(job);
}