前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
学习总结Flink MySQL CDC,主要目的是同步MySQL数据至其他数据源如Hudi、MySQL等,本文主要以 MySQL2Hudi、MySQL2MySQL两个场景进行示例验证。
版本
Flink | 版本 |
---|---|
Flink | 1.14.3、1.15.4、1.16.1 |
Hudi | 0.13.0 |
MYSQL CDC | 2.3.0 |
安装
将下面的Jar包拷贝到flink/lib下面 (以flink1.15.4为例)
- MySQL CDC(CDC读取MySQL): flink-sql-connector-mysql-cdc-2.3.0.jar,下载地址: https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar
- Hudi (Sink Hudi): hudi-flink1.15-bundle-0.13.0.jar,自己对应版本的打包
- Jdbc (Sink MySQL): flink-connector-jdbc-1.15.4.jar, 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar
Flink CDC,只是对于Source表,比如MySQL CDC,就是抽取MySQL Source表,CDC 官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#,可以查看官方文档了解目前Flink CDC支持哪些数据源,每一种数据源都需要下载对应的Jar包
MySQL CDC 参数
1 | CREATE TABLE mysql_cdc_source ( |
要使用MySQL CDC Source首先要开启MySQL binlog日志,其他参数和详细信息可以查看官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html#id6
示例
创建MySQL Source物理表
1 | mysql -uroot -proot-123 cdc |
1 | CREATE TABLE `mysql_cdc_source` ( |
造数1
2
3insert into mysql_cdc_source(id,name,price,ts,dt) values (1,'hudi1',1.1,1000,'20230331');
insert into mysql_cdc_source(id,name,price,ts,dt) values (2,'hudi2',2.2,2000,'20230331');
......
CDC MySQL2Hudi
1 | set yarn.application.name=cdc_mysql2hudi; |
注意,要求source表和sink表字段顺序要对应
CDC MySQL2Mysql
创建MySQL Sink物理表
1 | CREATE TABLE `test_sink_mysql` ( |
1 | set yarn.application.name=cdc_mysql2mysql; |
验证
1、对源表mysql_cdc_source执行insert/update/delete操作,查看目标表数据同步情况,数据实时同步且一致
2、找个比较大的source表,在历史数据同步中间过程中,kill掉任务,利用checkpoint恢复任务,验证全量数据的断点续传
3、对源表执行truncate操作,目标表数据不会同步truncate
4、其他验证……