Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。
配置
注意:jobmanager.archive.fs.dir 要和 historyserver.archive.fs.dir 配置的路径要一样
1
2
3
4
|
./bin/stop-cluster.sh
./bin/start-cluster.sh
# 查看job的运行信息历史
./bin/historyserver.sh start
|
1
2
|
# jar 包后是 args flink run 要在前
./bin/flink run ~/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT-all.jar
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
./bin/flink run -c com.gsm.TableJdbcDeal /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -s file:///tmp/flink-data/flink-checkpoints/6045e59994d8a05b94c08f1ef20a152f/chk-7/_metadata -c com.gsm.TableJdbcDeal /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -c com.gsm.DSJdbcDeal /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -s file:///tmp/flink-data/flink-checkpoints/1697c78d9dfbd1dbfd9f2c80ef18ce30/chk-2/_metadata -c com.gsm.DSJdbcDeal /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -c com.gsm.job.CdcMysql /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -s hdfs://localhost:9000/flink-checkpoints/bee678b1cf49eaa9a3faa432351a5365/chk-2/_metadata -c com.gsm.CdcMysql /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -c com.gsm.CdcPostgres /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
|
1
2
|
# https://flink.apache.org/downloads.html Additional Components Pre-bundled Hadoop
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
|
REPLICA IDENTITY for ‘office.t_country_dict’ is ‘FULL’; UPDATE AND DELETE events will contain the previous values of all
the columns
ALTER TABLE office.t_customer_info_bank REPLICA IDENTITY FULL;
1
2
3
4
5
6
7
8
9
|
./bin/flink run -d -c com.gsm.job.CdcMysql /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar --env dev
./bin/flink run -d -c com.gsm.job.CdcPostgresBasic /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -c com.gsm.job.CdcPostgresLevel2 /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
./bin/flink run -d -c com.gsm.job.CdcPostgres /Users/ynthm/Workspace/work/demo/crm-flink-cdc/build/libs/crm-flink-cdc-1.0-SNAPSHOT.jar
|
uat
1
2
3
4
|
./bin/flink run -d -c com.gsm.job.CdcPostgresBasic /opt/crm/crm-flink-cdc-1.0-SNAPSHOT.jar --env.active uat
./bin/flink run -d -c com.gsm.job.CdcPostgresLevel2 /opt/crm/crm-flink-cdc-1.0-SNAPSHOT.jar --env.active uat
./bin/flink run -d -c com.gsm.job.CdcPostgres /opt/crm/crm-flink-cdc-1.0-SNAPSHOT.jar --env.active uat
./bin/flink run -d -c com.gsm.job.CdcMysql /opt/crm/crm-flink-cdc-1.0-SNAPSHOT.jar --env.active uat
|
消费 Debezium Postgres Connector 产生的数据
当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。当为其他配置时,更新和删除事件的 “before” 字段将只包含 primary key 字段的值,或者为 null(没有 primary key)。 你可以通过运行 ALTER TABLE REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。
‘debezium.snapshot.locking.mode’ = ’none’ 就可以了
锁是很轻量级的,并不是snapshot完后才释放,而是拿到当前的binlog位点后就释放掉了。
保证你接binlog的用户开了reload权限
// https://mvnrepository.com/artifact/org.apache.flink/flink-format-common
implementation ‘org.apache.flink:flink-format-common:1.13.6’
开发Flink应用程序需要最低限度的API依赖。最低的依赖库包括 flink-java
flink-streaming-java
。 大多数应用需要连接特定的连接器和其它类库,例如Kafka的连接器,TableAPI,CEP库等。这些不是Flink核心依赖的一部分,因此必须作为依赖手动添加到应用程序中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependencys>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.3</version>
<scope>provided</scope>
</dependency>
</dependencys>
<build>
<plugins>
<!--打包普通项目-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
|