SpringCloud学习笔记(十)ElasticSearch9数据同步

10.9 数据同步

因为数据库双写,es中的数据来自于mysql,所以需要保证这两者的数据是同步的。而微服务项目中,两个数据库(mysql和es)可能在不同的微服务上,需要跨服务同步。

解决方案分析:

1.同步调用

这何种方式需要等到hotel-demo写入成功返回结果才算酒店新增完毕,相对比较耗时

image-20230709111519891

2.异步通知

使用MQ作为中间件,hotel-admin更新mysql的事件通知MQ,hotel-demo随之更新。但是这种方式比较依赖于MQ的性能。

image-20230709111643845

3.监听MySQL的binlog

MySQL主从同步原理中根据binlog更新数据库,使用canal中间件直接监听binlog,直接与hotel-admin服务完全解耦合。不过这种方式也会依赖于中间件canal的性能,且因为MySQL需要开启binlog,增加了MySQL的负担,且实现复杂度高。

image-20230709112013722

4.Logstash方法

根据GPT4的回答还有一种方式是使用Logstash,您可以使用Logstash的JDBC输入插件连接MySQL数据库,然后使用Elasticsearch输出插件将数据推送到Elasticsearch中。感觉这个挺好玩的,之后有时间可以搞一下。

MQ方式同步

1.导入amqp依赖

  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

2.配置rabbitmq地址

 spring:
  rabbitmq:
    host: xx.xx.xx.xx
    port: 5672
    username: yuan
    password: 123321
    virtual-host: /

3.定义常量

 public class MqConstants {
  /**
  * 交换机
  */
  public final static String HOTEL_EXCHANGE = "hotel.topic";
  /**
  * 监听新增和修改的队列
  */
  public final static String HOTEL_INSERT_QUEUE = "hotel.insert";
  /**
  * 监听删除的队列
  */
  public final static String HOTEL_DELETE_QUEUE = "hotel.delete";
 ​
  /**
  * 新增和修改
  */
  public final static String HOTEL_INSERT_KEY = "hotel.insert";
 ​
  /**
  * 删除
  */
  public final static String HOTEL_DELETE_KEY = "hotel.delete";
 }

4.消息发送者

 //简单起见直接写在controller里,实际应该写在service更合理  
 //直接注入rabbitTemplate
 @Autowired
 private RabbitTemplate rabbitTemplate;
 ​
 @PostMapping
 @CrossOrigin
 public void saveHotel(@RequestBody Hotel hotel){
     hotelService.save(hotel);
     rabbitTemplate.convertAndSend(HOTEL_EXCHANGE,HOTEL_DELETE_KEY,hotel);
 }
 ​
 @PutMapping()
 @CrossOrigin
 public void updateById(@RequestBody Hotel hotel){
     if (hotel.getId() == null) {
         throw new InvalidParameterException("id不能为空");
    }
     hotelService.updateById(hotel);
     rabbitTemplate.convertAndSend(HOTEL_EXCHANGE,HOTEL_DELETE_KEY,hotel);
 }
 ​
 @DeleteMapping("/{id}")
 @CrossOrigin
 public void deleteById(@PathVariable("id") Long id) {
     hotelService.removeById(id);
     rabbitTemplate.convertAndSend(HOTEL_EXCHANGE,HOTEL_DELETE_KEY,id);
 }

注意:

1.为什么要加@CrossOrigin

解决跨域问题。跨域问题指的是当客户端(如浏览器)通过JavaScript代码访问不同域名或端口的服务器资源时,浏览器会采取安全限制措施来防止跨域攻击。如果前端代码和后端API不在同一个域名或端口下,就会遇到这个问题。在这种情况下,需要在后端API中添加@CrossOrigin注解来解决跨域问题。

2.关于MQ里应该发送全部变更信息还是只发送id?

  • 只发送id:优点:系统解耦,且不会占用过多消息队列资源;缺点:增加消息不一致风险,因为消息接收者只收到id,还需要通过feign等远程调用方式调用消息发送者的findById接口获取更新后的全部数据,然后再更新es的内容。
  • 发送全部变更信息:优点:有助于数据一致性,消息接收方直接根据接收到的消息更新es,且系统复杂性低;缺点:消耗MQ的性能

我个人更倾向于直接发送全部变更信息,黑马的教程中hotel-admin只向消息队列发送id,然后在hotel-demo中根据Id查询了MySQL,压根就不符合他说的hotel-demo不能直接访问MySQL的前提。

5.消息接收者

 //先在service接口定义对应的接口,然后在接口实现类中实现相应功能,没啥好写的这里省略不写了
 //直接看监听器
 @Component
 public class HotelListener {
  @Autowired
  IHotelService hotelService;
 ​
     /**
  * 监听酒店更新业务
  * @param hotel
  */
  @RabbitListener(queues = HOTEL_INSERT_QUEUE)
  public void listenHotelInsertOrUpdate(Hotel hotel){
  hotelService.insertDocument(hotel);
  }
 ​
  /**
  * 监听酒店删除业务
  * @param id
  */
  @RabbitListener(queues = HOTEL_DELETE_QUEUE)
  public void listenHotelDelete(Long id){
  hotelService.deleteById(id);
  }
 }

注意定义MessageConverter.

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注