avatar

目录
统一配置中心&消息和异步

第5章 统一配置中心

统一配置中心概述

出现的问题

  • 不方便维护(多人开发)
  • 配置内容的安全与权限(数据库密码)
  • 更新配置后需要重启(文案)

统一配置中心

Code
1
2
3
4
5
配置都放在git上(方便版本控制) ---> config-server <——>本地git
|
|
\|/
product、order从config-server拿配置

Config Server

创建config项目

创建远程git私有项目config-repo

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>

yml

yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
application:
name: config
cloud:
config:
server:
git:
uri: https://gitee.com/machine4869/config-repo.git
username: hhdwwt@163.com
password: XXXXXXX
# git本地存在哪
basedir: /machine/codding/springcloud/config/src/main/resources/basedir
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/

启动类

java
1
2
3
//添加
@EnableDiscoveryClient
@EnableConfigServer

说明:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
访问http://localhost:8080/order-dev.yml	可以查看从远程拉取的配置

1、关于路径格式
{name}-{profiles}.yml
/{lable}/{name}-{profiles}.yml
name 服务名
lable 分支(brunch 默认master)

访问路径:localhost:8080/release/order-dev.yml


2、配置格式变化
/order-a.yml
/order-a.properties
/order-a.json

3、加载顺序
order-dev.yml 会加载order.yml 再加载order-dev.yml(重复的内容会覆盖)
order.yml放公用配置

Config Client

pom

xml
1
2
3
4
<dependency>
<groupId>io.pivotal.spring.cloud</groupId>
<artifactId>spring-cloud-services-starter-config-client</artifactId>
</dependency>

先后?先载入本项目的配置:bootstrp.yml

yml

yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
application:
name: order
# 使用远程配置的内容
# datasource:
# driver-class-name: com.mysql.jdbc.Driver
# username: machine
# password: 4869
# url: jdbc:mysql://10.211.55.6:3306/SpringCloud_Sell?characterEncoding=utf-8&useSSL=false
# 使用统一配置中心
cloud:
config:
discovery:
enabled: true
service-id: config
# label: 默认使用master
# name: 默认使用order
# profile: 默认没有使用

验证远程配置已经拿到?注释掉了本地配置数据库的信息,但是仍然能从远程拿到数据库配置,正确启动。

Spring Cloud Bus自动更新配置

不需要重启应用?改了远程git就动态刷新?怎么做?

理论

![屏幕快照 2018-12-20 下午4.19.38](20181220150109873/屏幕快照 2018-12-20 下午4.19.38.png)

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
bus:总线

1、关键:
git远程修改配置后 --> config-server能通知order

2、怎么通知?
消息队列:使用RabbitMQ

config-server使用Spring Cloud Bus后,对外暴露/bus-refresh接口
访问该接口config-server就会把更新信息发送到MQ

3、谁来访问接口?
git来访问最合适--webhook 只需要配置接口地址

实操

1、打通config和order的rabbitMQ

order、config项目都要改:

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

yml

yaml
1
2
3
4
spring:
rabbitmq:
host: 10.211.55.6
# 其他都是默认值

此时,rabbitMQ界面的Queues会出现2条

2、访问/bus-refresh触发 config通知 —rabbit—> order更新

yml

yaml
1
2
3
4
5
6
# 暴露/bus-refresh接口
management:
endpoints:
web:
exposure:
include: "*"

测试:

java
1
2
3
4
5
6
7
8
9
10
@RefreshScope
public class OrderController {
@Value("${girl.name}")
private String girlName;

@GetMapping("/getGirl")
public String getGirl() {
return "girlName:" + girlName;
}
}

解释

Code
1
2
3
查看日志:Mapped "{[/actuator/bus-refresh],methods=[POST]}" 

访问 POST /actuator/bus-refresh 代表通知更新:git通知 -> config通知 -> order更新

测试:

java
1
2
3
4
5
6
7
8
9
10
11
@RefreshScope
public class OrderController {
@Value("${girl.name}")
private String girlName;

@GetMapping("/getGirl")
public String getGirl() {
return "girlName:" + girlName;
}
}
//git修改 》 /bus-refresh触发 》 ...

3、使用git的web-hook 触发接口

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1、使用github提供的WebHooks

使用https://www.ngrok.cc提供的内网穿透

content-type 选 application/json 其他默认
config为webhook提供了专用接口 /monitor

2、
添加如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-monitor</artifactId>
</dependency>

//此时 git修改 即可实时在order看到变化

第6章 消息和异步

异步和消息

异步:客户端请求不会阻塞进程,服务端的响应可以是非即时的

异步的常形态:

  • 通知
  • 请求/异步响应
  • 消息(发布订阅模式)

MQ应用场景:

  • 异步处理(发短信)
  • 流量削峰(秒杀:流量过大,通常加入消息队列控制活动人数,若消息队列长度超过最大数量,就抛弃请求)
  • 日志处理(kafka、大数据)
  • 应用解耦(用户下单后,订单服务通知商品系统[消息写入队列],商品服务订阅消息)

RabbitMQ基本使用

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

order \ message\MqReceiver

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.mxx.order.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqReceiver {

// 1、@RabbitListener(queues = "myQueue")
// 2、自动新建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
// 3、队列和exchange绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue("myQueue"),
exchange = @Exchange("myExchange")
))
public void process(String massage){
log.info("MqReceiver: {}",massage);
}

// 4、分组发送消息(订单消息分别发给:数码供应商、水果供应商)
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "computer",
value = @Queue("computerOrder")
))
public void processComputer(String massage){
log.info("MqReceiver-processComputer: {}",massage);
}

@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "fruit",
value = @Queue("fruitOrder")
))
public void processFruit(String massage){
log.info("MqReceiver-processComputer: {}",massage);
}

}

发送测试

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSendTest {

@Autowired
private AmqpTemplate amqpTemplate;

@Test
public void send(){
amqpTemplate.convertAndSend("myQueue","now "+new Date());
}

@Test
public void sendComputer(){
amqpTemplate.convertAndSend("myOrder","computer","now "+new Date());
}
}

Spring Cloud Stream

理论:

  • 为微服务应用构建消息驱动能力的框架
  • 应用程序通过input\output与binder交互,binder与中间件交互
  • 优势:对消息中间件进一步封装,可做到代码层面无感知\切换中间件
  • 局限:目前只支持rabbitMQ和kafka

![屏幕快照 2018-12-24 下午4.45.50](20181220150109873/屏幕快照 2018-12-24 下午4.45.50.png)

实操:

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. 快速开始

message\StreamReceiver

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@Component
@EnableBinding(Sink.class)
@Slf4j
public class StreamReceiver {
// 消费MQ的消息
@StreamListener(Sink.INPUT)
public void process(String message) {
log.info("StreamReceiver :" + message);
}
}

发送测试:

rabbit > queue > input.anonymous.L92bTj6FRTyOC0QE-Pl0HA > publish message > payload处输入一个hello world,点Publlish message发送一个消息

  1. 自定义消息发送接收
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.mxx.order.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface StreamClient {
String INPUT = "mxxInput";
String OUTPUT = "mxxOutput";

@Input(StreamClient.INPUT)
SubscribableChannel input();

@Output(StreamClient.OUTPUT)
MessageChannel output();
}

yml 、把输入输出流绑定到rabbit的同一个exchanges(topic)

yaml
1
2
3
4
5
6
7
8
spring:    
cloud:
stream:
bindings:
mxxInput:
destination: mxxMessage
mxxOutput:
destination: mxxMessage

启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mxxMessage”

测试output

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.mxx.order.controller;
import com.mxx.order.message.StreamClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(StreamClient.class)
public class OrderController {

@Autowired
private StreamClient streamClient;

@GetMapping("sendMessage")
public void sendMessage(){
//发送消息到MQ
streamClient.output().send(MessageBuilder.withPayload("now "+new Date()).build());
}

}
  1. 分组与持久化

问题引出:同一应用中只需有一个实例消费该消息,但是不同实例现在都在消费“mxxMessage”里的消息

解释:默认创建的临时队列,程序关闭的时候,队列也会消失。我们需要一个持久化的队列,并且指定一个分组。当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息

yaml
1
2
3
4
5
6
spring:
cloud:
stream:
bindings:
input:
group: group-1

改进:

yaml
1
2
3
4
5
6
7
8
9
10
spring:    
cloud:
stream:
bindings:
mxxInput:
destination: mxxMessage
group: order #
mxxOutput:
destination: mxxMessage
group: order #

不同实例只会有一个在消费消息,默认轮训方式

  1. 传递对象

发送

java
1
2
// orderDTO是一个对象类型
streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());

接收

java
1
2
3
4
5
// 消费MQ的消息
@StreamListener(StreamClient.INPUT)
public void process(OrderDTO message) {
log.info("StreamReceiver: {}" ,message);
}

为了在rabbit页面方便调试,指定输出格式

yaml
1
2
3
4
5
6
spring:
cloud:
stream:
bindings:
mxxInput:
contentType: application/json # 默认值也是这个

点击“Get Messages”按钮可以查看消息

  1. 消费完后回应

关键: @SendTo

java
1
2
3
4
5
6
7
@StreamListener(StreamClient.INPUT)
@SendTo(StreamClient.OUTPUT) // 将返回值发送到OUTPUT对应的MQ
public String processInput(String message) {
log.info("StreamReceiver input: {}" ,message);

return "完成消费";
}

stream简化了消息队列的开发:不需要关注exchange…啥的

业务:商品和订单服务中使用MQ

业务分析

订单 <—库存变化— 消息队列 <—库存变化— 商品

  • 订单拿到库存变化消息后,将数据记录到redis里

  • 库存变化场景:

    • 第一次录入商品
    • 补货
    • 扣库存
  • 以扣库存为例使用消息队列


实操

1、改成统一配置中心

2、在product 扣库存的地方发送消息

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

service

java
1
2
3
4
5
6
7
8
9
10
11
@Autowired
private AmqpTemplate amqpTemplate;

@Override
public void decreaseStock(List<DecreaseStockInput> decreaseStockInputList) {
//扣库存
//...

//发消息
amqpTemplate.convertAndSend("productInfoOutputList", JsonUtil.toJson(productInfoOutputList));
}

yml :配置mq​

测试:在rabbit创建队列观察

3、在order 接收消息,将消息存到redis

使用docker安装redis

bash
1
$ docker run -d -p 6379:6379 hub.c.163.com/library/redis:latest

pom

xml
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

yml:redis配置

message/ProductInfoReceiver

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.mxx.order.message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;

@Component
public class ProductInfoReceiver {

private static final String PRODUCT_STOCK_TEMPLATE = "product_stock_%s";

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RabbitListener(queuesToDeclare = @Queue("productInfoOutputList"))
public void process(String message){
List<ProductInfoOutput> productInfoOutputList = (List<ProductInfoOutput>)JsonUtil
.fromJson(message, new TypeReference<List<ProductInfoOutput>>(){});

//存到redis
for (ProductInfoOutput productInfoOutput : productInfoOutputList){

stringRedisTemplate.opsForValue().set(
String.format(PRODUCT_STOCK_TEMPLATE,productInfoOutput.getProductId()),
String.valueOf(productInfoOutput.getProductStock()));
}


}
}

业务:异步扣库存分析

保证数据一致性!

  • 可靠的消息投递
  • 用户体验变化
  • 改为异步需要考虑很多细节问题来保证一致性!如果高并发需求不大慎用!

(高并发场景的业务方案:秒杀等,结合redis,缓存,事务,分布式事务,一致性)

文章作者: Machine
文章链接: https://machine4869.gitee.io/2018/12/20/20181220150109873/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论