- Spring Cloud的介绍
- Spring Cloud Config
- Spring Cloud Config的多种存储后端及使用
- Spring Cloud Eureka
- Spring Cloud Gateway
- Spring Cloud OpenFeign
- Spring Cloud Circuit Breaker
- Spring Cloud Sleuth
- Spring Cloud Stream
- Spring Cloud 部署
- Spring Cloud的多环境部署及日志收集
Spring Cloud Stream
class Spring Cloud StreamSpring Cloud Stream 是一个用于构建消息驱动微服务的框架。它通过使用 Spring Boot 提供的自动配置特性,极大地简化了与消息中间件的集成。Spring Cloud Stream 提供了一种基于事件的编程模型,使得开发者可以轻松地创建、处理和响应事件。
Spring Cloud Stream 支持多种消息中间件,包括 Apache Kafka、RabbitMQ 等。通过 Spring Cloud Stream,开发者可以实现发布/订阅、流处理等功能,并将业务逻辑与具体的消息中间件解耦。
Spring Cloud Stream 的核心概念
- Binder(绑定器):用于连接应用程序和消息中间件的组件。不同的中间件有不同的 Binder 实现,例如 Kafka Binder、RabbitMQ Binder 等。
- Binding(绑定):应用程序中定义的输入和输出通道与消息中间件之间的连接。
- Channel(通道):应用程序用于发送和接收消息的抽象接口。输入通道用于接收消息,输出通道用于发送消息。
- Source(源):表示应用程序的输出通道。
- Sink(汇):表示应用程序的输入通道。
- Processor(处理器):既是源又是汇的应用程序。
示例项目
我们将创建一个简单的 Spring Cloud Stream 应用,包括两个微服务:message-producer
和 message-consumer
。message-producer
将发送消息到消息中间件,message-consumer
将从消息中间件接收消息并处理。
项目结构
message-producer
:发送消息。message-consumer
:接收消息。
1. 创建 message-producer
添加依赖
在 pom.xml
中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
注意:这里使用的是 Kafka 作为消息中间件,可以根据需求选择其他 Binder,比如 RabbitMQ。
创建应用主类
package com.example.messageproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MessageProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MessageProducerApplication.class, args);
}
}
创建消息发送器
package com.example.messageproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageProducerController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send")
public String sendMessage() {
String message = "Hello, Spring Cloud Stream!";
streamBridge.send("outputChannel", message);
return "Message sent: " + message;
}
}
配置文件
在 src/main/resources/application.yml
中添加以下配置:
spring:
application:
name: message-producer
cloud:
stream:
bindings:
outputChannel:
destination: my-topic
producer:
required-groups: my-group
kafka:
binder:
brokers: localhost:9092
2. 创建 message-consumer
添加依赖
在 pom.xml
中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
创建应用主类
package com.example.messageconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MessageConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MessageConsumerApplication.class, args);
}
}
创建消息接收器
package com.example.messageconsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class MessageConsumer {
@Bean
public Consumer<String> inputChannel() {
return message -> {
System.out.println("Received message: " + message);
};
}
}
配置文件
在 src/main/resources/application.yml
中添加以下配置:
spring:
application:
name: message-consumer
cloud:
stream:
bindings:
inputChannel:
destination: my-topic
group: my-group
kafka:
binder:
brokers: localhost:9092
3. 启动 Kafka
要运行此示例,需要在本地或使用 Docker 启动 Kafka 和 Zookeeper:
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.5
docker run -d --name kafka -p 9092:9092 --link zookeeper wurstmeister/kafka:2.12-2.5.0 \
bash -c "start-kafka.sh"
4. 启动项目
分别启动 message-producer
和 message-consumer
项目:
- 启动
MessageProducerApplication
。 - 启动
MessageConsumerApplication
。
5. 测试消息传递
使用 Curl 或浏览器访问 message-producer
的 /send
端点:
curl http://localhost:8080/send
在控制台中,message-consumer
将会输出接收到的消息:
Received message: Hello, Spring Cloud Stream!
小结
Spring Cloud Stream 提供了一个强大的消息驱动微服务解决方案,通过与消息中间件的集成,开发者可以轻松地实现消息的发送和接收。在实际应用中,可以根据需求配置不同的消息通道和 Binder,以实现更复杂的消息处理逻辑。
如果您有任何问题或需要进一步的帮助,请随时告诉我!