- 消息队列的介绍
- RabbitMQ的安装与配置
- RabbitMQ的队列类型
- RabbitMQ 标准队列
- RabbitMQ镜像队列(Mirrored Queue)的使用详解
- RabbitMQ死信队列(Dead Letter Queue, DLQ)的使用详解
- RabbitMQ延时队列(Delayed Queue)的使用详解
- RabbitMQ优先级队列(Priority Queue)的使用详解
- RabbitMQ持久化队列(Durable Queue)的使用详解
- RabbitMQ分片队列(Quorum Queue)的使用详解
- RabbitMQ RPC队列(Remote Procedure Call)的使用详解
RabbitMQ 标准队列
class 标准队列RabbitMQ 标准队列(Classic Queue)是消息传递系统中最基本的队列类型,消息按先进先出(FIFO)的顺序存储和传递。以下是标准队列的常见应用场景和详细说明:
1. 任务队列(Task Queue)
- 场景描述:在后台处理任务时,如图像处理、视频转码、数据分析等,需要将任务分配给多个工作节点进行处理,避免阻塞主应用。
- 应用说明:在任务队列中,每个任务都会被推送到 RabbitMQ 的标准队列中,多个消费者从队列中获取任务并处理,任务处理完毕后删除队列中的消息。
- 举例:
- 应用:电商网站后台处理订单数据,如生成发货单、发送确认邮件等。这些任务可以异步处理,以提高系统的响应速度。
- 代码示例(Node.js):
// 生产者:将任务添加到队列 const amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } const queue = 'task_queue'; const msg = 'New Order - Order ID: 12345'; channel.assertQueue(queue, { durable: true }); channel.sendToQueue(queue, Buffer.from(msg), { persistent: true }); console.log(" [x] Sent '%s'", msg); }); }); // 消费者:从队列中取出任务并处理 amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } const queue = 'task_queue'; channel.assertQueue(queue, { durable: true }); channel.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); channel.consume(queue, function(msg) { console.log(" [x] Received '%s'", msg.content.toString()); // 模拟任务处理时间 setTimeout(() => { console.log(" [x] Done"); channel.ack(msg); }, 1000); }); }); });
2. 异步日志处理
- 场景描述:在高并发的系统中,记录日志的过程可能会影响系统的性能。通过将日志写入消息队列,再由专门的日志处理服务异步地写入到文件或数据库中,可以减轻主系统的压力。
- 应用说明:当应用程序需要记录日志时,它将日志消息发送到 RabbitMQ 标准队列中,然后日志服务从队列中取出日志并存储在持久化存储中。
- 举例:
- 应用:Web 服务器记录用户行为日志,如访问页面、点击按钮等。日志处理程序从队列中读取这些日志并存储到数据库或文件系统中。
- 代码示例(Node.js):
// 生产者:将日志消息添加到队列 const logMessage = 'User clicked on the Home button'; channel.assertQueue('log_queue', { durable: true }); channel.sendToQueue('log_queue', Buffer.from(logMessage), { persistent: true }); console.log(" [x] Sent log: '%s'", logMessage); // 消费者:从队列中读取日志并存储 channel.consume('log_queue', function(msg) { console.log(" [x] Log received: '%s'", msg.content.toString()); // 模拟日志写入操作 fs.appendFile('app.log', msg.content.toString() + '\n', function (err) { if (err) throw err; console.log('Log saved!'); channel.ack(msg); }); });
3. 异步通知系统
- 场景描述:在电子商务、社交网络等应用中,经常需要向用户发送通知,如订单更新、消息提醒等。为了避免阻塞主应用,可以使用标准队列进行异步通知处理。
- 应用说明:将通知任务放入 RabbitMQ 的标准队列中,由专门的服务来处理并发送通知,确保主应用的响应时间不受影响。
- 举例:
- 应用:电商平台在用户下单成功后发送订单确认短信或邮件。通知服务从队列中取出任务,调用第三方 API 发送短信或邮件。
- 代码示例(Node.js):
const notificationMsg = 'Order confirmed: Order ID 12345'; channel.assertQueue('notification_queue', { durable: true }); channel.sendToQueue('notification_queue', Buffer.from(notificationMsg), { persistent: true }); console.log(" [x] Sent notification: '%s'", notificationMsg); channel.consume('notification_queue', function(msg) { console.log(" [x] Notification to send: '%s'", msg.content.toString()); // 调用第三方API发送通知 sendNotificationAPI(msg.content.toString(), () => { console.log('Notification sent!'); channel.ack(msg); }); });
4. 工作分配(Work Distribution)
- 场景描述:在分布式系统中,将工作任务分配给多个消费者处理,以充分利用系统资源,实现负载均衡。
- 应用说明:标准队列允许将任务均匀分配给多个消费者,避免某个消费者过载。
- 举例:
- 应用:图片处理应用程序,上传的图片需要经过多个步骤处理(如缩放、裁剪、添加水印),每个步骤可以由不同的消费者处理。
- 代码示例:
const imageTask = 'Resize Image ID: 54321'; channel.assertQueue('image_task_queue', { durable: true }); channel.sendToQueue('image_task_queue', Buffer.from(imageTask), { persistent: true }); console.log(" [x] Sent image task: '%s'", imageTask); channel.consume('image_task_queue', function(msg) { console.log(" [x] Processing image task: '%s'", msg.content.toString()); // 模拟图片处理 processImageTask(msg.content.toString(), () => { console.log('Image processed!'); channel.ack(msg); }); });
总结
RabbitMQ 标准队列是最常用的队列类型,广泛应用于任务队列、异步日志处理、异步通知系统和工作分配等场景。其简单易用,适用于需要可靠传递和顺序处理的消息系统。在实际应用中,结合 RabbitMQ 的持久化、确认机制等功能,可以实现更可靠的消息传递和处理。
评论区
评论列表
{{ item.user.nickname || item.user.username }}