Spring Cloud Stream

class Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动微服务的框架。它通过使用 Spring Boot 提供的自动配置特性,极大地简化了与消息中间件的集成。Spring Cloud Stream 提供了一种基于事件的编程模型,使得开发者可以轻松地创建、处理和响应事件。

Spring Cloud Stream 支持多种消息中间件,包括 Apache Kafka、RabbitMQ 等。通过 Spring Cloud Stream,开发者可以实现发布/订阅、流处理等功能,并将业务逻辑与具体的消息中间件解耦。

Spring Cloud Stream 的核心概念

  1. Binder(绑定器):用于连接应用程序和消息中间件的组件。不同的中间件有不同的 Binder 实现,例如 Kafka Binder、RabbitMQ Binder 等。
  2. Binding(绑定):应用程序中定义的输入和输出通道与消息中间件之间的连接。
  3. Channel(通道):应用程序用于发送和接收消息的抽象接口。输入通道用于接收消息,输出通道用于发送消息。
  4. Source(源):表示应用程序的输出通道。
  5. Sink(汇):表示应用程序的输入通道。
  6. Processor(处理器):既是源又是汇的应用程序。

示例项目

我们将创建一个简单的 Spring Cloud Stream 应用,包括两个微服务:message-producermessage-consumermessage-producer 将发送消息到消息中间件,message-consumer 将从消息中间件接收消息并处理。

项目结构

  • message-producer:发送消息。
  • message-consumer:接收消息。

1. 创建 message-producer

添加依赖

pom.xml 中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

注意:这里使用的是 Kafka 作为消息中间件,可以根据需求选择其他 Binder,比如 RabbitMQ。

创建应用主类

package com.example.messageproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MessageProducerApplication.class, args);
    }
}

创建消息发送器

package com.example.messageproducer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageProducerController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/send")
    public String sendMessage() {
        String message = "Hello, Spring Cloud Stream!";
        streamBridge.send("outputChannel", message);
        return "Message sent: " + message;
    }
}

配置文件

src/main/resources/application.yml 中添加以下配置:

spring:
  application:
    name: message-producer
  cloud:
    stream:
      bindings:
        outputChannel:
          destination: my-topic
          producer:
            required-groups: my-group
      kafka:
        binder:
          brokers: localhost:9092

2. 创建 message-consumer

添加依赖

pom.xml 中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

创建应用主类

package com.example.messageconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MessageConsumerApplication.class, args);
    }
}

创建消息接收器

package com.example.messageconsumer;

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class MessageConsumer {

    @Bean
    public Consumer<String> inputChannel() {
        return message -> {
            System.out.println("Received message: " + message);
        };
    }
}

配置文件

src/main/resources/application.yml 中添加以下配置:

spring:
  application:
    name: message-consumer
  cloud:
    stream:
      bindings:
        inputChannel:
          destination: my-topic
          group: my-group
      kafka:
        binder:
          brokers: localhost:9092

3. 启动 Kafka

要运行此示例,需要在本地或使用 Docker 启动 Kafka 和 Zookeeper:

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.5

docker run -d --name kafka -p 9092:9092 --link zookeeper wurstmeister/kafka:2.12-2.5.0 \
  bash -c "start-kafka.sh"

4. 启动项目

分别启动 message-producermessage-consumer 项目:

  1. 启动 MessageProducerApplication
  2. 启动 MessageConsumerApplication

5. 测试消息传递

使用 Curl 或浏览器访问 message-producer/send 端点:

curl http://localhost:8080/send

在控制台中,message-consumer 将会输出接收到的消息:

Received message: Hello, Spring Cloud Stream!

小结

Spring Cloud Stream 提供了一个强大的消息驱动微服务解决方案,通过与消息中间件的集成,开发者可以轻松地实现消息的发送和接收。在实际应用中,可以根据需求配置不同的消息通道和 Binder,以实现更复杂的消息处理逻辑。

如果您有任何问题或需要进一步的帮助,请随时告诉我!

评论区
评论列表
menu