查看原文
其他

Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)

2018-01-20 搜云库 搜云库

上一篇文章,留了一个悬念, ConfigClient 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多的时候,那岂不是维护成本很高,显然不太合适,而使用 SpringCloudBus 消息总线实现方案,可以优雅的解决以上问题,那就是通过消息代理中间件 RabbitMQ 加 Git的 Webhooks來触发配置的更新,那具体是怎么实现的,我会通过图文形式介绍。

Spring Cloud Bus

SpringCloudBus 将分布式系统的节点通过轻量级消息代理连接起来。用于在集群中传播状态更改(例如配置更改事件)或其他管理指令。 SpringCloudBus 的一个核心思想是通过分布式的启动器对 SpringBoot 应用进行扩展,也可以用来建立一个或多个应用之间的通信频道。目前唯一实现的方式是用 AMQP 消息代理作为通道,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上

消息总线

消息总线是一种通信工具,可以在机器之间互相传输消息、文件等。消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向。发送段只需要向消息总线发出消息而不用管消息被如何转发。 Springcloud bus 通过轻量消息代理连接各个分布的节点。管理和传播所有分布式项目中的消息,本质是利用了MQ的广播机制在分布式的系统中传播消息,目前常用的有 Kafka和 RabbitMQ 。 下面是一个配置中心刷新配置的例子

[图片来源于网络如有侵权请私信删除]

  • 1、提交代码触发 post请求给 bus/refresh

  • 2、 server端接收到请求并发送给 SpringCloudBus

  • 3、 SpringCloudbus接到消息并通知给其它客户端

  • 4、其它客户端接收到通知,请求 Server端获取最新配置

  • 5、全部客户端均获取到最新的配置

消息代理

消息代理( MessageBroker)是一种消息验证、传输、路由的架构模式。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作。

和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作。 现有的消息代理开源产品:

  • ActiveMQ

  • Kafka

  • RabbitMQ

  • RocketMQ

目前 SpringCloudBus 支持 RabbitMQ 和 Kafkaspring-cloud-starter-bus-amqp 、 spring-cloud-starter-bus-kafka

RabbitMQ简介

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: PythonRuby、.NETJavaJMSCPHPActionScriptXMPPSTOMP等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP,即 AdvancedmessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

Github:https://github.com/rabbitmq
官网地址:http://www.rabbitmq.com

安装RabbitMQ

安装RabbitMQ 可以参考之前的文章

CentOs7.3 搭建 RabbitMQ 3.6 单机服务:
https://segmentfault.com/a/1190000010693696

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务:
https://segmentfault.com/a/1190000010702020

Spring Boot 中使用 RabbitMQ: https://segmentfault.com/a/1190000011577243

准备工作

以下项目修改不做过多解释,部分代码不再展示,请阅读上篇文章,Spring Cloud(十)高可用的分布式配置中心 Spring Cloud Config 中使用 Refresh:http://www.ymq.io/2017/12/23/spring-cloud-config-eureka-refresh/

把上一篇,示例代码下载,才可以进行一下的操作,下载地址在文章末尾

spring-cloud-eureka-service
spring-cloud-config-server
spring-cloud-eureka-provider-1
spring-cloud-eureka-provider-2
spring-cloud-eureka-provider-3
spring-cloud-feign-consumer

Config Server

在项目 spring-cloud-config-server 进行以下操作

添加依赖

  1. <dependency>

  2.    <groupId>org.springframework.cloud</groupId>

  3.    <artifactId>spring-cloud-starter-bus-amqp</artifactId>

  4. </dependency>

添加配置

在 application.properties 添加以下配置.关闭安全认证

RabbitMQ 的 ymq用户是手动创建的,具体阅读上面 安装 RabbitMQ 部分

  1. #关闭刷新安全认证

  2. management.security.enabled=false

  3. spring.rabbitmq.host=192.168.252.126

  4. spring.rabbitmq.port=5672

  5. spring.rabbitmq.username=ymq

  6. spring.rabbitmq.password=123456

Config Client

修改第上一篇文章项目

spring-cloud-eureka-provider-1
spring-cloud-eureka-provider-2
spring-cloud-eureka-provider-3

添加依赖

  1. <dependency>

  2.    <groupId>org.springframework.cloud</groupId>

  3.    <artifactId>spring-cloud-starter-bus-amqp</artifactId>

  4. </dependency>

添加配置

在 application.properties 添加以下配置.关闭安全认证

  1. spring.rabbitmq.host=192.168.252.126

  2. spring.rabbitmq.port=5672

  3. spring.rabbitmq.username=ymq

  4. spring.rabbitmq.password=123456

测试服务

启动RabbitMQ

启动MQ服务

  1. $ service rabbitmq-server start

  2. Redirecting to /bin/systemctl start  rabbitmq-server.service

查看MQ状态

  1. $ service rabbitmq-server status

  1. [root@node6 rabbitmq]# service rabbitmq-server status

  2. Redirecting to /bin/systemctl status  rabbitmq-server.service

  3. ● rabbitmq-server.service - RabbitMQ broker

  4.   Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)

  5.   Active: active (running) since Fri 2017-12-29 17:44:10 CST; 9min ago

  6.  Process: 2814 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)

  7. Main PID: 2948 (beam)

  8.   Status: "Initialized"

  9.   CGroup: /system.slice/rabbitmq-server.service

  10.           ├─2948 /usr/lib64/erlang/erts-8.0.3/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 32000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr...

  11.           ├─3131 /usr/lib64/erlang/erts-8.0.3/bin/epmd -daemon

  12.           ├─3233 erl_child_setup 1024

  13.           ├─3240 inet_gethost 4

  14.           └─3241 inet_gethost 4

  15. Dec 29 17:44:08 node6 rabbitmq-server[2948]: RabbitMQ 3.6.10. Copyright (C) 2007-2017 Pivotal Software, Inc.

  16. Dec 29 17:44:08 node6 rabbitmq-server[2948]: ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/

  17. Dec 29 17:44:08 node6 rabbitmq-server[2948]: ##  ##

  18. Dec 29 17:44:08 node6 rabbitmq-server[2948]: ##########  Logs: /var/log/rabbitmq/rabbit@node6.log

  19. Dec 29 17:44:08 node6 rabbitmq-server[2948]: ######  ##        /var/log/rabbitmq/rabbit@node6-sasl.log

  20. Dec 29 17:44:08 node6 rabbitmq-server[2948]: ##########

  21. Dec 29 17:44:08 node6 rabbitmq-server[2948]: Starting broker...

  22. Dec 29 17:44:10 node6 rabbitmq-server[2948]: systemd unit for activation check: "rabbitmq-server.service"

  23. Dec 29 17:44:10 node6 systemd[1]: Started RabbitMQ broker.

  24. Dec 29 17:44:10 node6 rabbitmq-server[2948]: completed with 6 plugins.

  25. [root@node6 rabbitmq]#

启动项目

按照顺序依次启动项目

spring-cloud-eureka-service
spring-cloud-config-server
spring-cloud-eureka-provider-1
spring-cloud-eureka-provider-2
spring-cloud-eureka-provider-3
spring-cloud-feign-consumer

启动该工程后,访问服务注册中心,查看服务是否都已注册成功:http://127.0.0.1:8761/

Exchanges

任何发送到 FanoutExchange 的消息都会被转发到与该 Exchange绑定( Binding)的所有 springCloudBus 队列 Queue上。

检查Queues

浏览器打开 :http://192.168.252.128:15672/

修改配置

修改 Git仓库配置,在 content=hello dev 后面加上 SpringCloudBusTest

查看 Config Server

通过 Postman 发送 GET 请求到:http://localhost:8888/springCloudConfig/dev/master 查看 ConfigServer 是否是最新的值

查看 Config Client

命令窗口,通过 curl http://127.0.0.1:9000/hello 访问服务,或者在浏览器访问 http://127.0.0.1:9000/hello F5 刷新

发现没有得到最新的值

因为我们没有主动触发 ConfigServerbus/refresh接口

刷新配置

通过 Postman 发送 POST请求到:http://localhost:8888/bus/refresh ,我们可以看到以下内容:

注意是 PSOT 请求

三个 ConfigClient 客户端控制台,分别会打印以下内容意思就是,收到远程更新请求, config.clientKEYS 刷新, key 是 content

  1. 2017-12-29 18:38:49.023  INFO 28944 --- [jeTgrKRGzgj9g-1] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed [config.client.version, content]

  2. 2017-12-29 18:38:49.025  INFO 28944 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_EUREKA-PROVIDER/localhost:eureka-provider:8081: registering service...

  3. 2017-12-29 18:38:49.035  INFO 28944 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_EUREKA-PROVIDER/localhost:eureka-provider:8081 - registration status: 204

  4. 2017-12-29 18:38:49.067  INFO 28944 --- [jeTgrKRGzgj9g-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SpringAMQP#31e87320:0/SimpleConnection@39151d4e [delegate=amqp://ymq@192.168.252.126:5672/, localPort= 64885]

再次查看 Config Client

访问:http://localhost:8081/ ,http://localhost:8082/ ,http://localhost:8083/ 已经刷新了配置

扩展阅读

Git webhooks

现在虽然可以不用重启服务就更新配置了,但还是需要我们手动操作,这样还是不可取的。所以,这里就要用到Git的webhooks来达到自动更新配置。

打开git上配置仓库的地址,添加 webhooks,上面 PayloadURL我写的域名,当然我没有部署,上面的 PayloadURL就填写我们的配置中心触发刷新的地址,当然这里不能写 localhost啦,要外网访问地址才行。

还有这里面有个Secret的秘钥验证,如果这里填写的话,在配置文件上要写上 encrypt.key与之对应。

局部刷新

某些场景下(例如灰度发布),我们可能只想刷新部分微服务的配置,此时可通过 /bus/refresh端点的 destination参数来定位要刷新的应用程序。

例如: /bus/refresh?destination=customers:8000,这样消息总线上的微服务实例就会根据 destination参数的值来判断是否需要要刷新。其中, customers:8000指的是各个微服务的 ApplicationContextID

destination参数也可以用来定位特定的微服务。例如: /bus/refresh?destination=customers:**,这样就可以触发 customers微服务所有实例的配置刷新。

跟踪总线事件

一些场景下,我们可能希望知道 SpringCloudBus事件传播的细节。此时,我们可以跟踪总线事件( RemoteApplicationEvent的子类都是总线事件)。

跟踪总线事件非常简单,只需设置 spring.cloud.bus.trace.enabled=true,这样在 /bus/refresh端点被请求后,访问 /trace端点就可获得类似如下的结果:

发送 GET请求到:http://localhost:8888/trace

  1. [

  2.    {

  3.        "timestamp": 1514543931362,

  4.        "info": {

  5.            "method": "GET",

  6.            "path": "/eureka-provider/dev/master",

  7.            "headers": {

  8.                "request": {

  9.                    "accept": "application/json, application/*+json",

  10.                    "user-agent": "Java/1.8.0_112",

  11.                    "host": "localhost:8888",

  12.                    "connection": "keep-alive"

  13.                },

  14.                "response": {

  15.                    "X-Application-Context": "config-server:8888",

  16.                    "Content-Type": "application/json;charset=UTF-8",

  17.                    "Transfer-Encoding": "chunked",

  18.                    "Date": "Fri, 29 Dec 2017 10:38:51 GMT",

  19.                    "status": "200"

  20.                }

  21.            },

  22.            "timeTaken": "6002"

  23.        }

  24.    },

  25.    {

  26.        "timestamp": 1514543927451,

  27.        "info": {

  28.            "method": "GET",

  29.            "path": "/eureka-provider/dev/master",

  30.            "headers": {

  31.                "request": {

  32.                    "accept": "application/json, application/*+json",

  33.                    "user-agent": "Java/1.8.0_112",

  34.                    "host": "localhost:8888",

  35.                    "connection": "keep-alive"

  36.                },

  37.                "response": {

  38.                    "X-Application-Context": "config-server:8888",

  39.                    "Content-Type": "application/json;charset=UTF-8",

  40.                    "Transfer-Encoding": "chunked",

  41.                    "Date": "Fri, 29 Dec 2017 10:38:47 GMT",

  42.                    "status": "200"

  43.                }

  44.            },

  45.            "timeTaken": "4927"

  46.        }

  47.    },

  48.    {

  49.        "timestamp": 1514543925254,

  50.        "info": {

  51.            "method": "GET",

  52.            "path": "/eureka-provider/dev/master",

  53.            "headers": {

  54.                "request": {

  55.                    "accept": "application/json, application/*+json",

  56.                    "user-agent": "Java/1.8.0_112",

  57.                    "host": "localhost:8888",

  58.                    "connection": "keep-alive"

  59.                },

  60.                "response": {

  61.                    "X-Application-Context": "config-server:8888",

  62.                    "Content-Type": "application/json;charset=UTF-8",

  63.                    "Transfer-Encoding": "chunked",

  64.                    "Date": "Fri, 29 Dec 2017 10:38:45 GMT",

  65.                    "status": "200"

  66.                }

  67.            },

  68.            "timeTaken": "2862"

  69.        }

  70.    },

  71.    {

  72.        "timestamp": 1514543923565,

  73.        "info": {

  74.            "method": "POST",

  75.            "path": "/bus/refresh",

  76.            "headers": {

  77.                "request": {

  78.                    "cache-control": "no-cache",

  79.                    "postman-token": "0e497ec1-0c03-4dc2-bb61-ce2a266227d3",

  80.                    "user-agent": "PostmanRuntime/7.1.1",

  81.                    "accept": "*/*",

  82.                    "host": "127.0.0.1:8888",

  83.                    "accept-encoding": "gzip, deflate",

  84.                    "content-length": "0",

  85.                    "connection": "keep-alive"

  86.                },

  87.                "response": {

  88.                    "X-Application-Context": "config-server:8888",

  89.                    "status": "200"

  90.                }

  91.            },

  92.            "timeTaken": "6616"

  93.        }

  94.    }

  95. ]

源码下载

GitHub:https://github.com/souyunku/spring-cloud-examples/tree/master/spring-cloud-config-bus-rabbitMQ

码云:https://gitee.com/souyunku/spring-cloud-examples/tree/master/spring-cloud-config-bus-rabbitMQ

Contact

  • 作者:鹏磊

  • 出处:http://www.souyunku.com/2017/12/24/spring-cloud-config-bus-rabbitMQ

  • Email:admin@souyunku.com

  • 版权归作者所有,转载请注明出处

  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存