RabbitMQ 标准队列(Classic Queue)是消息传递系统中最基本的队列类型,消息按先进先出(FIFO)的顺序存储和传递。以下是标准队列的常见应用场景和详细说明:
// 生产者:将任务添加到队列
const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
const queue = 'task_queue';
const msg = 'New Order - Order ID: 12345';
channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
console.log(" [x] Sent '%s'", msg);
});
});
// 消费者:从队列中取出任务并处理
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
const queue = 'task_queue';
channel.assertQueue(queue, { durable: true });
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
console.log(" [x] Received '%s'", msg.content.toString());
// 模拟任务处理时间
setTimeout(() => {
console.log(" [x] Done");
channel.ack(msg);
}, 1000);
});
});
});
// 生产者:将日志消息添加到队列
const logMessage = 'User clicked on the Home button';
channel.assertQueue('log_queue', { durable: true });
channel.sendToQueue('log_queue', Buffer.from(logMessage), { persistent: true });
console.log(" [x] Sent log: '%s'", logMessage);
// 消费者:从队列中读取日志并存储
channel.consume('log_queue', function(msg) {
console.log(" [x] Log received: '%s'", msg.content.toString());
// 模拟日志写入操作
fs.appendFile('app.log', msg.content.toString() + '\n', function (err) {
if (err) throw err;
console.log('Log saved!');
channel.ack(msg);
});
});
const notificationMsg = 'Order confirmed: Order ID 12345';
channel.assertQueue('notification_queue', { durable: true });
channel.sendToQueue('notification_queue', Buffer.from(notificationMsg), { persistent: true });
console.log(" [x] Sent notification: '%s'", notificationMsg);
channel.consume('notification_queue', function(msg) {
console.log(" [x] Notification to send: '%s'", msg.content.toString());
// 调用第三方API发送通知
sendNotificationAPI(msg.content.toString(), () => {
console.log('Notification sent!');
channel.ack(msg);
});
});
const imageTask = 'Resize Image ID: 54321';
channel.assertQueue('image_task_queue', { durable: true });
channel.sendToQueue('image_task_queue', Buffer.from(imageTask), { persistent: true });
console.log(" [x] Sent image task: '%s'", imageTask);
channel.consume('image_task_queue', function(msg) {
console.log(" [x] Processing image task: '%s'", msg.content.toString());
// 模拟图片处理
processImageTask(msg.content.toString(), () => {
console.log('Image processed!');
channel.ack(msg);
});
});
RabbitMQ 标准队列是最常用的队列类型,广泛应用于任务队列、异步日志处理、异步通知系统和工作分配等场景。其简单易用,适用于需要可靠传递和顺序处理的消息系统。在实际应用中,结合 RabbitMQ 的持久化、确认机制等功能,可以实现更可靠的消息传递和处理。