【Spring Cloud学习】-6.Spring Cloud Bus 消息总线


1.简介

1.1 概述

Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions. AMQP and Kafka broker implementations are included with the project. Alternatively, any Spring Cloud Stream binder found on the classpath will work out of the box as a transport.

Spring Cloud Bus将轻量级消息代理程序链接到分布式系统的节点。然后可以将其用于广播状态更改(例如配置更改)或其他管理指令。该项目包括AMQP和Kafka broker 实现。另外,在类路径上找到的任何Spring Cloud Stream绑定程序都可以作为传输工具使用。

1.2 特点

spring cloud bus 使用消息队列来作为分布式环境中信息沟通的工具,通常用于广播状态变更或配置更改等。

2.演示环境

  1. JDK 1.8.0_201
  2. Spring Boot 2.2.0.RELEASE、Spring Cloud Hoxton.RELEASE
  3. 构建工具(apache maven 3.6.3)
  4. 开发工具(IntelliJ IDEA )
  5. Kafka 2.11-2.0.0

3.演示代码

总体结构说明:

  • config-repo: 配置文件仓库
  • ofc-bus-eureka: 注册中心,config-server 和 config-client 都注册到配置中心上
  • ofc-bus-config-server: 集成了 spring-cloud-bus 实现的 config-server
  • ofc-bus-config-client: 集成了 spring-cloud-bus 实现的 config-client

3.1 config-repo

管理配置文件,内部包含四个配置文件,分别是:soulballad.properties、soulballad-dev.properties、soulballad-prod.properties、soulballad-test.properties

内容如下:

# soulballad.properties
my.name=soulballad

# soulballad-dev.properties
my.name=soulballad-dev

# soulballad-prod.properties
my.name=soulballad-pro

# soulballad-test.properties
my.name=soulballad-test

3.2 ofc-bus-eureka

3.2.1 代码说明

eureka-server 服务端,提供 config-server 和 config-client 注册服务。

3.2.2 maven 依赖

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

3.2.3 配置文件

application.yml

server:
  port: 11070

spring:
  application:
    name: ofc-bus-eureka

eureka:
  instance:
    hostname: localhost # 主机名
    prefer-ip-address: true # 优先使用ip
    instance-id: ${eureka.instance.hostname}:${spring.application.name}:${server.port} # 实例id
  client:
    register-with-eureka: false # eureka自我注册
    fetch-registry: false # 是否从注册中心获取注册信息
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/ # 注册中心地址

3.2.4 java代码

OfcBusEurekaApplication.java

@EnableEurekaServer
@SpringBootApplication
public class OfcBusEurekaApplication {

    public static void main(String[] args) {
        SpringApplication.run(OfcBusEurekaApplication.class, args);
    }
}

3.3 ofc-bus-config-server

3.3.1 代码说明

config-server 服务端,注册到 eureka 上。配置 spring-cloud-bus,使用 kafka 作为消息队列。

3.3.2 maven 依赖

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-config-server</artifactId>
    </dependency>
</dependencies>

3.3.3 配置文件

application.yml

server:
  port: 11072

spring:
  application:
    name: ofc-bus-config-server
  cloud:
    config:
      server:
        git:
          # 仓库地址
          uri: https://gitee.com/soulballad/spring-usage-examples.git
          # 对应 {label} 部分,即 Git 的分支
          default-label: master
          # 仓库文件夹名称,多个以逗号分隔
          search-paths: spring-cloud/spring-cloud-03-official/spring-cloud-ofc-07-bus/config-repo
          # git 仓库用户名(公开库可以不用填写)
          username:
          # git 仓库密码(公开库可以不用填写)
          password:
    bus:
      # 开启消息跟踪
      enabled: true
      trace:
        enabled: true
      refresh:
        enabled: true
  kafka:
    consumer:
      # 指定默认消费者 group id
      # 如果不设置,将会使用 commons 工程中的 group-id,那时调用 /actuator/bus-refresh 接口只会刷新其中一个 client
      # 因为在同一个组中的 Consumer,同一个主题只会被一个 Consumer 接收
      group-id: ofc-bus-config-server-consumer-group

eureka:
  instance:
    hostname: localhost
    prefer-ip-address: true
    instance-id: ${eureka.instance.hostname}:${spring.application.name}:${server.port}
  server:
    port: 11070
  client:
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.server.port}/eureka/

management:
  endpoints:
    web:
      exposure:
        # 开启 endpoint
        include: bus-refresh

kafka.properties

# kafka 配置

## kafka 服务地址
spring.kafka.bootstrap-servers=172.16.11.125:9092
## producer 提供者
### 如果该值大于零时,表示启用重试失败的发送次数
spring.kafka.producer.retries=0
### 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
### 指定消息 key 和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

## consumer 消费者
### 指定默认消费者 group id
spring.kafka.consumer.group-id=springcloud-config-bus-group
### 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为 latest,表示自动将偏移重置为最新的偏移量,可选的值为 latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
### 如果为 true,则消费者的偏移量将在后台定期提交,默认值为 true
spring.kafka.consumer.enable-auto-commit=false
### 如果 'enable.auto.commit'为true,则消费者偏移自动提交给 Kafka 的频率(以毫秒为单位),默认值为 5000
spring.kafka.consumer.auto-commit-interval=100
### 指定消息 key 和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.3.4 java代码

OfcBusConfigServerApplication.java

@EnableConfigServer
@EnableEurekaClient
@SpringBootApplication
@PropertySource("classpath:kafka.properties")
public class OfcBusConfigServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(OfcBusConfigServerApplication.class, args);
    }
}

3.4 ofc-bus-config-client

3.4.1 代码说明

config-client 客户端,注册到 eureka 上。配置 spring-cloud-bus,使用 kafka 作为消息队列。

3.4.2 maven 依赖

pom.xml

<dependencies>
    <!-- web和actuator同时存在时,web需放在前面,否则不生效 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-kafka</artifactId>
    </dependency>
</dependencies>

3.4.3 配置文件

bootstrap.yml

spring:
  cloud:
    config:
      # 对应 {label} 部分,即 Git 的分支
      label: master
      # 对应 config-repo 中文件名前缀
      name: soulballad
      # 对应 {profile} 部分
      profile: dev
      discovery:
        # 开启 Config 服务发现与注册
        enabled: true
        # 指定 config-server
        service-id: ofc-bus-config-server
    bus:
      # 开启消息跟踪
      enabled: true
      trace:
        enabled: true
      refresh:
        enabled: true
  kafka:
    consumer:
      # 指定默认消费者 group id
      # 如果不设置,将会使用 commons 工程中的 group-id,那时调用 /actuator/bus-refresh 接口只会刷新其中一个 client
      # 因为在同一个组中的 Consumer,同一个主题只会被一个 Consumer 接收
      group-id: ofc-bus-config-client-consumer-group

eureka:
  instance:
    hostname: localhost
    prefer-ip-address: true
    instance-id: ${eureka.instance.hostname}:${spring.application.name}:${server.port}
  server:
    port: 11070
  client:
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.server.port}/eureka/

management:
  # 开启 endpoint
  endpoints:
    web:
      exposure:
        include: bus-refresh

application.yml

server:
  port: 11073

spring:
  application:
    name: ofc-bus-config-client

kafka.properties

# kafka 配置

## kafka 服务地址
spring.kafka.bootstrap-servers=172.16.11.125:9092
## producer 提供者
### 如果该值大于零时,表示启用重试失败的发送次数
spring.kafka.producer.retries=0
### 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
### 指定消息 key 和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

## consumer 消费者
### 指定默认消费者 group id
spring.kafka.consumer.group-id=springcloud-config-bus-group
### 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为 latest,表示自动将偏移重置为最新的偏移量,可选的值为 latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
### 如果为 true,则消费者的偏移量将在后台定期提交,默认值为 true
spring.kafka.consumer.enable-auto-commit=false
### 如果 'enable.auto.commit'为true,则消费者偏移自动提交给 Kafka 的频率(以毫秒为单位),默认值为 5000
spring.kafka.consumer.auto-commit-interval=100
### 指定消息 key 和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.4.4 java代码

OfcBusConfigController.java

@RefreshScope
@RestController
public class OfcBusConfigController {

    @Value("${my.name}")
    private String name;

    @GetMapping(value = "/name")
    public String getName() {
        return name;
    }
}

OfcBusConfigClientApplication.java

@EnableEurekaClient
@SpringBootApplication
@PropertySource("classpath:kafka.properties")
public class OfcBusConfigClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(OfcBusConfigClientApplication.class, args);
    }
}

3.5 git 地址

spring-cloud-ofc-07-bus: Spring Cloud 官方提供的消息总线方案

4.效果展示

依次启动 ofc-bus-eureka、ofc-bus-config-server、ofc-bus-config-client,它们分别使用 11070、11072、11073 端口

4.1 ofc-bus-eureka

访问 eureka 管理台,查看 config-server 和 config-client 注册信息

### GET
GET http://localhost:11070/

4.2 ofc-bus-config-server

访问 spring-cloud-ofc-bus.http 中 ofc-bus-config-server 对应部分,查看输出的配置信息

查看profile=test的配置信息

### GET /soulballad/{profile} 获取配置信息
http://localhost:11072/soulballad/test

查看配置文件内容

### GET /soulballad-{profile}.properties 获取指定文件内容
http://localhost:11072/soulballad-test.properties

其他方式查看配置文件内容

### 接口访问支持以下几种格式
### /{application}/{profile}[/{label}]
GET http://localhost:11072/soulballad/dev/master
### /{application}-{profile}.yml
GET http://localhost:11072/soulballad-dev.yml
### /{label}/{application}-{profile}.yml
GET http://localhost:11072/master/soulballad-dev.yml
### /{application}-{profile}.properties
GET http://localhost:11072/soulballad-dev.properties
### /{label}/{application}-{profile}.properties
GET http://localhost:11072/master/soulballad-dev.properties

4.3 ofc-bus-config-client

访问 spring-cloud-ofc-bus.http 中如下请求,查看输出的配置信息

使用http请求访问配置

### GET /name
GET http://localhost:11073/name

4.4 配置变更

修改 server 上 soulballad-dev.properties 内容:soulballad-dev -> soulballad-dev…changed

my.name=soulballad-dev...changed

在服务端访问,查看修改是否生效

### /{application}-{profile}.yml
GET http://localhost:11072/soulballad-dev.yml

在客户端访问 /name,还是获取到修改前的值: soulballad-dev

### GET /name
GET http://localhost:11073/name

调用 /actuator/bus-refresh 进行刷新

### POST /actuator/bus-refresh
POST http://localhost:11072/actuator/bus-refresh

再次访问 /name,获取到 soulballad-dev…changed

### GET /name
GET http://localhost:11073/name

当有多个 config-client 时,也可以只刷新单个 config-client 的取值,使用如下方式

### POST http://localhost:11072/actuator/bus-refresh/${spring.application.name}:{server.port},
### spring.application.name 和 server.port都是要刷新client的
POST http://localhost:11072/actuator/bus-refresh/ofc-bus-config-client:11073

如果要想在配置文件提交 git 后自动刷新配置,可以使用 Webhook

5.参考

  1. 官方文档–Spring-Cloud-Bus

文章作者: Soulballad
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Soulballad !
评论
 上一篇
【Python】-1.Python数据类型及运算符 【Python】-1.Python数据类型及运算符
1. 常见类型1.1 数字在Python中最数字是一种常见的类型。 1.1.1加减乘除运算print(3 + 2) # 5 print(8 - 6) # 2 print(16 - 3 * 2) # 10 print((17 - 2) / 5
2020-09-01
下一篇 
【Spring Cloud学习】-5.Spring Cloud Config 配置中心 【Spring Cloud学习】-5.Spring Cloud Config 配置中心
1.简介1.1 概述 Spring Cloud Config provides server and client-side support for externalized configuration in a distributed s
  目录