Flink-CDC
Flink-CDC
添加依赖
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
DataStream方式
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("k8s-master")
.port(30054)
.username("root")
.password("123456")
//可以同时读多个库
.databaseList("dice")
//如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.tableList("dice.sys_login_log")
.deserializer(new StringDebeziumDeserializationSchema())
// 第一次启动的时候做初始化快照,即把历史数据读过来
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4);
// use parallelism 1 for sink to keep message ordering
streamSource.print().setParallelism(1);
FlinkSQL方式
先要添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.DDL方式建表
tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
" id bigint NOT NULL, " +
" name STRING, " +
" type string,"+
" primary key(id) not enforced"+
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'k8s-master', " +
" 'port' = '30054', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'database-name' = 'dice', " +
" 'table-name' = 'meta' " +
")");
//3.查询数据
Table table = tableEnv.sqlQuery("select * from mysql_binlog");
//4.将动态表转换为流
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
//5.启动任务
env.execute("FlinkCDCWithSQL");