• 欢迎访问ByWei.Cn,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,加入百味博客 软件定制QQ群
  • 已升级为最新版主题,并将持续优化改造中,支持说说碎语功能,可像添加文章一样直接添加说说,博客主题升级啦
  • 感谢您百度求点赞啊!百度网址
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏百味博客吧
  • 博主热烈欢迎 软件定制开发 联系:http://www.bywei.cn

Canal消息使用RocketMQ去中心化架构方案

编程语言 百味博客 3个月前 (01-03) 119次浏览 0个评论 扫描二维码

项目背景

原官方的canal单点消费处理实现作为数据库监听服务,在应对大型多数据库架构等重要场景下,随着后续业务发展,大数据量的处理,为提升canal消息处理及时性、处理速度,并且为保障系统稳定性、可维护性,优化重构canal消息处理机制实现横向扩展成为必要趋势。

项目目标

  •  废弃现有canal官方单一节点消费运行处理模式
  •  消息拆分到各个领域,实现canal消息按需配置生产和消费

项目价值

  1. 目前情况:
    1. **运维单一:**所有的数据库监听消息处理都在单一项目,各业务线改动点都需要改动、发布部署该项目,影响面较大
    2. **耦合性高:**代码依赖所有业务线以及oms等项目Dto,项目涉及业务知识领域较广,不利于专业求精
    3. **扩展性差:**当前处理数据量较大,单纯依靠增加服务器资源,不能完全解决消息处理及时性及处理速度
    4. **维护成本高:**需自行处理大并发量数据处理、大数据量分库分表存储等方案,系统稳定性研发维护成本较高
  2. 项目上线后:
    1. **系统安全性高:**在去中心化的应用中,生产消费消息直接对接阿里云MQ,利用现成的平台补偿、重试等机制,消息可靠性得到保障
    2. **可维护性高:**业务消息拆分到各自业务线,公共代码部分在成熟后作为基础工具处理器,整体系统架构更为简单,分散业务运维成本,现有专业业务知识研发维护,提升整体运营维护效率
    3. **系统性能好:**由于去中心化处理方式较传统处理方式更为简单与便捷,因此在大数据量处理同时进行时,去中心化的方式会节约资源
    4. **自主高效性:**去中心化的技术,点对点直接交互,使得高效率、无中心化代理、大规模的信息交互方式成为现实。横向扩展可增加topic、分表消息,消费端可根据自身性能调配服务器资源

技术调研

参考资料

  1. 官方开源地址:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart ​​

 

架构思路

  1. 架构方案 ​

技术框架选型

  1. 阿里云RocketMQ暂不支持批量发送消息,当canal.mq.flatMessage = true时,会发送失败
  2. mq事务开始结束处理,消息ack问题。MQ消息处理 https://github.com/alibaba/canal/blob/master/example/src/main/java/com/alibaba/otter/canal/example/rocketmq/CanalRocketMQClientExample.java
  3. mq配置需 (>=1.1.5版本) rocketmq.tag=rocketmq的tag配置
  4. 自定义动态tag方式修改源码,利用动态topic逻辑调整tag​
    /**
    * 按 schema 或者 schema+table 将 message 分配到对应topic
    *
    * @param message 原message
    * @param defaultTopic 默认topic
    * @param dynamicTopicConfigs 动态topic规则
    * @return 分隔后的message map
    */
    public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) 
    
    
    //修改发送消息
    private void sendMessage(Message message, int partition)
    消费tag过滤
    public static final String ROCKETMQ_SUBSCRIBE_FILTER             = ROOT + "." + "subscribe.filter";
    rocketMQConsumer.subscribe(this.topic, "ES_UPDATE_EVENT");
    rocketMQConsumer.registerMessageListener(new MessageListenerOrderly()
  1. MQ配置案例
    1. 动态配置topic,预计9个topic
      database_a:database_a_test\..*_order.*,
      database_b:database_b_test\..*_order.*,
      database_c:database_b_test\..*_order.*,
      database_d:database_b_test\..*_order.*,
      database_e:database_b_test\..*_order.*,
      database_f:database_b_test\..*_order.*,
    2. 动态配置tag,1个topic9个tag,按照动态topic模式修改源码实现
      database_a:database_a_test\..*_order.*,
      database_b:database_b_test\..*_order.*,
      database_c:database_b_test\..*_order.*,
      database_d:database_b_test\..*_order.*,
      database_e:database_b_test\..*_order.*,
      database_f:database_b_test\..*_order.*,

      过滤ddl数据

       # binlog filter config
      canal.instance.filter.druid.ddl = true
      canal.instance.filter.query.dcl = true
      canal.instance.filter.query.ddl = true
  1. 关于mq顺序:https://help.aliyun.com/document_detail/117459.htm?spm=5176.smartservice_service_chat.help.dexternal.2290709a7Oh57w ​多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  2. 同一信息在批量事务内
    — 同一事务修改两个表数据,一条消息
    set autocommit = 0; — //1是自动提交
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME +1 where ID=126828587192704;
    UPDATE database_a_order_passenger set UPDATE_TIME = UPDATE_TIME +1 where ORDER_ID=126828587192704;
    commit; — //或者rollback;– 每个事务修改单数据,分多条消息
    set autocommit = 0; — //1是自动提交
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME -1 where ID=126828587192704;
    commit; — //或者rollback;
    UPDATE database_a_order_p set UPDATE_TIME = UPDATE_TIME -1 where ORDER_ID=126828587192704;
    commit; — //或者rollback;

    — 一个事务修改全表数据,多条消息
    set autocommit = 0; — //1是自动提交
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME -1 ;
    commit; — //或者rollback;

    — 一个事务修改同一个订单,一条消息
    set autocommit = 0; — //1是自动提交
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME -1 where ID=126828587192704;
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME +1 where ID=126828587192704;
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME -1 where ID=126828587192704;
    UPDATE database_a_order set UPDATE_TIME = UPDATE_TIME +1 where ID=126828587192704;
    commit; — //或者rollback;

ES消息处理流程与架构

  1. 方案前提说明

​​1)确认使用rocketMQ

2)确定按照单topic多tag模式对接,修改源代码发布canal-server

3)事务内的消息发MQ处理方案?

(1)消费端构建一个Redis队列:判断事务开始,就放到redis队列中,直到结束,最大100条。然后取出队列合并处理

(2)改源码canal-server端过滤处理事务内的消息,调研改动方式

  1. 序列图

技术问题答疑

1、CanalMQ是如何保障服务异常或重启时的消息的一致性问题;

答:canal-server处理消息时会记录读取binlog日志的指针,服务重启时会从记录指针点开始继续处理

2、MQ的费用问题?

答:基础topic 2块钱,按多topic模式预计9个topic。2*9*30=540块/月

(需修改源码实现)按多tag模式,1个topic多个tag。  2*30=60块/月

消息量  82024764*0.001*3=  ?   10 亿=2000块/月(当前16亿消息量)

替换省成本

3、如何减少转存到MQ的binlog数据量?

1)canal配置监听指定表

2)canal配置监听指定类型的binlog操作日志

3)可修改源码:过滤业务场景中所需要的业务操作数据

4)可以采用kafka来替代RocketMQ

4、CanalMQ如何保障binlog转存的顺序性?

答:

1)单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS

2)多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题

5、消费端如何保障处理的顺序性?

答:依赖消息生成端的顺序性,保证消费封装 处理线程的顺序

6、MQ没法批量获取消息,同一个事务内如果有多条操作记录的话,如何合并发送一次MQ通知?

1)消费端构建一个Redis队列?

2)改源码canal-server端过滤处理事务内的消息?

7、canalMQ方案与es-service 是否可以并行执行?

答:可以,只是会重复发送,灰度中可并行发送,长远是替换掉

8、ota rpc 的mq处理独立rpc服务? 单topic多tag模式

项目地址

canal

 


百味博客 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Canal消息使用RocketMQ去中心化架构方案
喜欢 (0)
[微信扫一扫]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址