微服务-消息驱动 Spring Cloud Stream


1. 什么是SpringCloudStream

→官网 , 官网doc , Spring Cloud Stream中文指导手册
Spring Cloud Stream是一个构建事件消息驱动的微服务框架, 提供了灵活的编程模型。它构建在SpringBoot上,提供了Kafka,RabbitMQ等消息中间件的个性化配置,引入了发布订阅、消费组和分区的核心概念.

2. 为什么使用Spring Cloud Stream

2.1 问题场景

在实际开发过程中,服务与服务之间通信经常会用到消息中间件, 比如RabbitMQ等,使得中间件和系统之间的耦合很高,如果后面又要替换为 Kafka 变动会很大. 比如 RabbitMQ 和 Kafka ,这两个消息中间件架构上有很大不同,RabbitMQ 有exchange,而 kafka 有 Topic 和 Partitions 分区.

Spring Cloud Stream方案可以整合我们的消息中间件,降低系统和中间件的耦合。(目前仅支持RabbitMQ、Kafka)

简单理解, 就是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型.

2.2 解决什么问题

无感知的使用消息中间件
Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。有效简化开发人员对MQ使用的复杂度,使更多的精力投入到核心业务的处理。

中间件和服务的高度解耦
Spring Cloud Stream进行了配置隔离,开发中只需要调整配置,就可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发高度解耦,服务可以更多的关注自己的业务流程。

2.3 Spring Cloud Stream凭什么可以统一底层差异

在没有绑定器这个概念的情况下,SpringBoot应用要直接与消息中间件进行信息交互,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器(Binder)交互中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程.
在这里插入图片描述

3. Stream应用模型

3.1 模型原理

Spring Cloud Stream由一个中立的中间件内核组成。Spring Cloud Stream会注入输入和输出的channels,应用程序通过这些channels与外界通信,而channels则是通过一个明确的中间件Binder与外部brokers连接。
Spring Cloud Stream应用模型
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过配置binding来绑定具体的binder对象 ,而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便的使用消息驱动.

3.2 模型解读

3.2.1 Middleware

消息中间件, 目前仅支持RabbitMQ、Kafka. 目前流行的RocketMQ可以使用SpringCloudAlibaba整合.

3.2.2 Binder

Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 提供了 Kafka 和 RabbitMQ 的binder实现。

Binder可以看成适配器,用来将Stream与中间连接起来,不同的Binder对应不同的中间件. 可以动态的改变消息的destinations, 需要我们修改外部配置项. 甚至可以任意的改变中间件的类型而不需要修改一行代码。

3.3.2 Bindings

Bindings, 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改消息的destination(对应 Kafka 的topic,RabbitMQ 的 exchanges)而不需要修改一行代码。

Binder生成Binding,Binding用来绑定消息驱动的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT是消费者,OUTPUT是生产者。

3.2.4 Application Core

由Stream封装的消息机制,很少自定义开发.

3.2.5 channels

inputs 输入通道,可以自定义开发;
outputs 输出通道,可以自定义开发.

3.2.6 分区支持

Spring Cloud Stream支持在一个应用程序的多个实例之间数据分区,在分区的情况下,物理通信介质(例如,topic代理)被视为多分区结构。一个或多个生产者应用程序实例将数据发送给多个消费应用实例,并保证共同的特性的数据由相同的消费者实例处理。

Spring Cloud Stream提供了一个通用的抽象,用于统一方式进行分区处理,因此分区可以用于自带分区的代理(如kafka)或者不带分区的代理(如rabbiemq).

分区在有状态处理中是一个很重要的概念,其重要性体现在性能和一致性上,要确保所有相关数据被一并处理,例如,在时间窗平均计算的例子中,给定传感器测量结果应该都由同一应用实例进行计算。

4. 常用api和注解

在这里插入图片描述

4.1 @EnableBinding

将@EnableBinding注解添加到应用的配置类,就可以把一个spring应用转换成Spring Cloud Stream应用,@EnableBinding注解本身就包含@Configuration注解,会触发Spring Cloud Stream 基本配置。

表示应用增加了通道监听功能,可以是一个或者多个,可以传入Sink和Source实现类; 或者是Processor(同时支持Sink和Source), 也可以自定义.

@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({ BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class })
@EnableIntegration
public @interface EnableBinding {

    /**
     * A list of interfaces having methods annotated with {@link Input} and/or
     * {@link Output} to indicate binding targets.
     * @return list of interfaces
     */
    Class<?>[] value() default {};
}

4.2 Source/Sink/Processor

Spring Cloud Stream提供了三个开箱即用的预定义接口。它们是Source,Sink和Processor.

4.2.1 Source

Source接口用于有单一输出(outbound)通道的应用, 进行消息发送.

public interface Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output";

    /**
     * @return output channel
     */
    @Output(Source.OUTPUT)
    MessageChannel output();
}

接口声明了一个名为 “output”的binding , output的配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: test
          contentType: text/plain
  • contentType:用于指定消息的类型。
  • destination:  指定了消息发送的目的地,对应RabbitMQ的exchange是test的所有消息队
    列中。

示例代码

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {

    @Autowired
    MessageChannel output;

    @Override
    public void run(String... strings) throws Exception {
        // 字符串类型发送MQ
        System.out.println("字符串信息发送");
        output.send(MessageBuilder.withPayload("大家好").build());
    }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

通过注入Source 接口的方式, 使用MessageChannel发送消息。

4.2.2 Sink

Sink接口用于有单一输入(inbound)通道的应用, 用于消息接收.

public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();
}

接口声明了一个名为input的binding, input通道的配置.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test   

destination:指定了消息获取的目的地,就是RabbitMQ的exchange.

示例代码

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("一般监听收到:" + message.getPayload());
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}
  • @StreamListener : 定义在方法中,被修饰的方法注册为消息中间件上数据流的事件监听器,注解中属
    性值对应了监听的消息通道名。

4.2.3 Processor

Processor接口用于单个应用同时包含输入和输出通道的情况。可以发送消息和接收消息.

public interface Processor extends Source, Sink {}

4.2.4 自定义消息发送和接收

Spring Cloud Stream内置了两种接口,分别定义了 binding 为 “input” 的输入流和 “output” 的输出流,而在我们实际使用中,需要定义各种输入输出流。使用方法也很简单。
首先自定义一个接口, 包含input和output通道流.

interface MyProcessor {

    String MY_INPUT = "my_input";
    String MY_OUTPUT = "my_output";

    @Input(MyProcessor.MY_INPUT)
    SubscribableChannel input();

    @Output(MyProcessor.MY_OUTPUT)
    MessageChannel output();
}
  • MY_INPUT 、MY_OUTPUT表示channel名称.
  • @Input、@Output表示注入参数为对应的channel名称.
  • Spring会为每一个标注了@Output,@Input的管道接口生成一个实现类

配置名为my_input和my_output的binding,指定destination为 test 的输入输出流。

spring:
  cloud:
    stream:
      defaultBinder: defaultRabbit
      bindings:
        my_input:
          destination: test
        my_output:
          destination: test

使用时,需要在@EnableBinding注解中添加自定义的接口MyProcessor。
使用名为my_output的通道发送消息.

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class Application implements CommandLineRunner {

    @Autowired
    MessageChannel output;

    @Override
    public void run(String... strings) throws Exception {
        // 字符串类型发送MQ
        System.out.println("字符串信息发送");
        output.send(MessageBuilder.withPayload("大家好").build());
    }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

使用@StreamListener监听消息,指定 MyProcessor.MY_INPUT消息通道.

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class Application {
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(MyProcessor.MY_INPUT)
    public void input(Message<String> message) {
        System.out.println("一般监听收到:" + message.getPayload());
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

5. Spring Messaging

Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header.
在这里插入图片描述

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:
在这里插入图片描述

package org.springframework.messaging;
@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;
    default boolean send(Message<?> message) {

         return send(message, INDEFINITE_TIMEOUT);

     }
     boolean send(Message<?> message, long timeout);
}

消息通道里的消息如何被消费呢?
由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅:

package org.springframework.messaging;
public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}

由MessageHandler真正地消费/处理消息:

package org.springframework.messaging;
@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

6. Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。
介绍几种消息的处理方式:

  • 消息的分割
    在这里插入图片描述
  • 消息的聚合
    在这里插入图片描述
  • 消息的过滤
    在这里插入图片描述
  • 消息的分发
    在这里插入图片描述

7. 最佳实践

7.1 创建生产者

创建生产者[cloud-stream-rabbitmq-provider-8801] , 使用rabbitMQ消息队列.

7.1.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>atguigu-cloud-2020</artifactId>
        <groupId>com.atguigu.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider-8801</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

7.1.2 编写配置

server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      bindings: # 服务的整合处理
        output: #通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.65.129
                port: 5672
                username: guest
                password: guest
# eureka配置
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://www.eureka01.com:7001/eureka,http://www.eureka02.com:7002/eureka
  instance:
    prefer-ip-address: true
    hostname: localhost
    instance-id: cloud-stream-provider-8801
    lease-expiration-duration-in-seconds: 5 # 服务失效时间(默认90s)
    lease-renewal-interval-in-seconds: 2 # 服务续约时间(默认30s), 发送心跳到eureka注册中心

bindings配置

  • input:表示channelName, @EnableBinding(Source.class)注解当中配置Source接口,该接口中默认
    定义了channelName的名称,也可以自定义Source接口.

  • destination:消息中间件的exchange或Topic.

  • binder:当前bingding绑定的对应的适配器,该实例表示适配rabbitmq,名称默认为defaultRabbit,可
    以自定义,接着需要配置该名称对应的类型,环境信息等.

binders配置

  • defaultRabbit:binder配置的适配器的名称,和spring.cloud.stream.bindings.input.binder值一样
  • environment:表示当前binder对应的配置信息

7.1.3 编写启动类

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类描述:spring-cloud-stream 消息驱动启动类
 * @Author wang_qz
 * @Date 2021/11/30 20:32
 * @Version 1.0
 */
@SpringBootApplication
public class StreamProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication.class);
    }
}

7.1.4 编写service

先定义发送消息的接口 com.atguigu.springcloud.service.IMessageProvider

package com.atguigu.springcloud.service;
public interface IMessageProvider {
    String send();
}

发送消息接口的是实现类 com.atguigu.springcloud.service.impl.MessageProviderImpl

package com.atguigu.springcloud.service.impl;

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 发送消息接口
 * @see Source 发送消息通道的定义
 */
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        // 发送消息, withPayload构建消息体
        this.output.send(MessageBuilder.withPayload(serial).build());
        System.out.println(">>>serial: " + serial);
        return serial;
    }
}

7.1.5 编写controller

com.atguigu.springcloud.controller.SendMessageController

package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 类描述:
 * @Author wang_qz
 * @Date 2021/11/30 20:57
 * @Version 1.0
 */
@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}

7.2 创建消费者

创建消费者[cloud-stream-rabbitmq-consumer-8802]

7.2.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>atguigu-cloud-2020</artifactId>
        <groupId>com.atguigu.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer-8802</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

7.2.2 编写配置

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      bindings: # 服务的整合处理
        input: #通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: atguiguA
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.65.129
                port: 5672
                username: guest
                password: guest
# eureka配置
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://www.eureka01.com:7001/eureka,http://www.eureka02.com:7002/eureka
  instance:
    prefer-ip-address: true
    hostname: localhost
    instance-id: cloud-stream-consumer-8802
    lease-expiration-duration-in-seconds: 5 # 服务失效时间(默认90s)
    lease-renewal-interval-in-seconds: 2 # 服务续约时间(默认30s), 发送心跳到eureka注册中心

7.2.3 编写启动类

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }

}

7.2.4 编写消息监听

package com.atguigu.springcloud.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 类描述:
 * @Author wang_qz
 * @Date 2021/11/30 21:09
 * @Version 1.0
 */
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者接收到消息>>>" + message.getPayload() + "\t" + serverPort);
    }
}

7.3 测试消息发送和接收

启动生产者,消费者. 请求 http://localhost:8801/sendMessage 往rabbitMQ的studyExchange交换机的output通道中发送消息.
在这里插入图片描述
进入rabbitMQ控制台查看. 已经有了名为studyExchange的交换机.
在这里插入图片描述
bindings绑定到了studyExchange交换机的atguiguA群组上.
在这里插入图片描述
同时, 消费者的监听器已经监听到通道的消息, 通过INPUT通道读取队列中的消息.
在这里插入图片描述

重复消费问题

  • 消费方如果没有配置group群组, 存在多个消费者时会存在重复消费的情况.
  • 多个消费者在同一个group中,就能够保证消息只会被其中一个应用消费一次。
  • 不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

持久化问题

  • 消费者如果没有指定group群组, 会有一个默认的. 生产者在消费者停机的时候发送一条消息到消息队列, 消费者开机后不会消费,
    造成消息丢失.

  • 配置了group群组, 消费者开机后可以正常读取到之前的消息.

  • 指定group群组 spring.cloud.stream.bindings.input.group=atguiguA

references

Spring Cloud Stream 体系及原理介绍


文章作者: 王子
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 王子 !
评论
  目录