在 RabbitMQ 中,队列是消息存储的核心组件,不同类型的队列可以满足不同的业务需求。以下是一些常见的 RabbitMQ 队列类型和它们的应用场景:
const queueOptions = {
deadLetterExchange: 'dlx', // 死信交换器
deadLetterRoutingKey: 'dlx.routingKey' // 死信路由键
};
channel.assertQueue('myQueue', queueOptions);
rabbitmq-delayed-message-exchange
插件)来实现。x-delayed-message
插件的配置示例:安装插件(如果尚未安装):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
配置延时队列:
const exchange = 'my-delayed-exchange';
const exchangeOptions = {
arguments: {
'x-delayed-type': 'direct' // 使用 direct 类型交换器
}
};
channel.assertExchange(exchange, 'x-delayed-message', exchangeOptions);
const queue = 'myQueue';
const queueOptions = { durable: true };
channel.assertQueue(queue, queueOptions);
channel.bindQueue(queue, exchange, 'routingKey');
// 发送消息并设置延迟时间
const msg = 'Hello after delay';
const msgOptions = {
headers: { 'x-delay': 5000 } // 延迟5秒
};
channel.publish(exchange, 'routingKey', Buffer.from(msg), msgOptions);
const queueOptions = {
arguments: {
'x-max-priority': 10 // 设置最大优先级
}
};
channel.assertQueue('priorityQueue', queueOptions);
const msg = 'High priority message';
const msgOptions = {
priority: 9 // 设置消息优先级
};
channel.sendToQueue('priorityQueue', Buffer.from(msg), msgOptions);
channel.assertQueue('durableQueue', { durable: true });
const msg = 'Persistent message';
channel.sendToQueue('durableQueue', Buffer.from(msg), { persistent: true });
const queueOptions = {
arguments: {
'x-queue-type': 'quorum' // 设置为 Quorum 队列
}
};
channel.assertQueue('quorumQueue', queueOptions);
// 客户端发送请求
channel.assertQueue('', { exclusive: true }, (err, q) => {
const correlationId = generateUuid(); // 唯一的请求标识
const msg = 'RPC request';
channel.consume(q.queue, (msg) => {
if (msg.properties.correlationId === correlationId) {
console.log('Received:', msg.content.toString());
}
}, { noAck: true });
channel.sendToQueue('rpc_queue', Buffer.from(msg), {
correlationId: correlationId,
replyTo: q.queue
});
});
RabbitMQ 提供了多种队列类型以满足不同的业务需求,如延时队列、优先级队列、镜像队列、持久化队列等。根据具体的业务场景,选择合适的队列类型可以显著提高系统的可靠性、可用性和性能。