前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun

版本
- Flink 1.15.3
- CDC 2.3.0
- PolarDB-for-PostgreSQL 11.22
CDC Jar包
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/
PolarDB-for-PostgreSQL 安装
PostgreSQL 配置
启用逻辑解码
1 | vi /var/lib/polardb/data/postgresql.conf |
添加 / 调整以下参数:1
2
3
4
5
6
7
8# 开启逻辑解码(必须,CDC 依赖此功能获取增量变更)
wal_level = logical
# 设置 WAL 发送进程数量(需大于 0,根据业务并发调整,如 50 )
max_wal_senders = 50
# 保留 WAL 段数量(控制日志保留时长,根据业务需求调整,如 64 ,每个段默认 16MB )
wal_keep_segments = 64
# 可选:设置复制槽最大数量(若用多个 CDC 任务,需大于任务数,如 50,默认值10 )
max_replication_slots = 50
这些参数调整后,必须重启 PostgreSQL 服务才能生效:1
pg_ctl -D /var/lib/polardb/data restart
权限与用户配置
配置客户端访问权限
1 | vi /var/lib/polardb/data/pg_hba.conf |
1 | # 允许 cdc_user 用户从所有 IP 通过密码认证访问(开发环境临时用,生产环境要限定特定IP) |
说明:
- replication 用于创建复制槽等操作;
- 修改后需重载配置(无需重启服务):pg_ctl -D /var/lib/polardb/data reload 。
创建 CDC 专用用户并赋权
登录 PostgreSQL(psql -U postgres -d postgres -h 127.0.0.1),执行 SQL 命令:
1 | -- 创建带复制权限的用户(根据实际需求改用户名、密码) |
说明:REPLICATION 权限是创建复制槽的必要条件,后续 CDC 工具需用该用户连接。
我们在 PolarDB-for-PostgreSQL 安装配置 已经创建了 cdc_user ,当时的权限是不足的,要让 cdc_user 支持基于逻辑解码的 CDC(如 Flink CDC、Debezium),需补充:
复制协议权限(必配)
1
2
3-- 赋予复制权限(创建/使用逻辑复制槽的基础)
-- 作用:允许用户通过复制协议连接数据库,创建 / 占用逻辑复制槽,拉取 WAL 日志解析变更。
ALTER USER cdc_user WITH REPLICATION;授予数据库 CREATE 权限(非必须)
1
2
3
4
5-- 登录 PostgreSQL 超级用户(如 postgres),为 cdc_user 授予 testdb 数据库的 CREATE 权限:
-- 切换到 testdb 数据库(发布需要在目标数据库中创建)
\c testdb;
-- 授予 cdc_user 在 testdb 中创建对象的权限
GRANT CREATE ON DATABASE testdb TO cdc_user;
如果没有 CREATE 权限 并且没有手动提前创建 dbz_publication , 会报错:1
22025-08-29 16:32:15.613 CST [11787] STATEMENT: CREATE PUBLICATION dbz_publication FOR ALL TABLES;
2025-08-29 16:32:16.711 CST [11828] ERROR: permission denied for database testdb
- 模式(Schema)权限(按需)
若表在自定义模式(如 my_schema)下,需授权模式访问:
1 | -- 允许访问模式下的对象(如查询表、使用序列) |
cdc_pg2mysql
具体参数可以查看官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/
备注:上面的官网文档是 3.1 版本,有的参数在 2.3 版本是没有的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40set yarn.application.name=cdc_pg2mysql;
set execution.target=yarn-per-job;
set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_pg2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;
SET 'table.dml-sync' = 'true';
set pipeline.operator-chaining=false;
CREATE TABLE postgres_cdc_source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.1.1',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'cdc_password',
'database-name' = 'testdb',
'schema-name' = 'public',
'table-name' = 'test_cdc',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'cdc_slot'
);
create table test_sink_mysql (
id int PRIMARY KEY NOT ENFORCED,
name string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.2:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'Root-123',
'table-name' = 'test_sink_mysql_pg',
'sink.buffer-flush.max-rows' = '1000000'
);
insert into test_sink_mysql(id,name) select * from postgres_cdc_source;
逻辑解码插件
上面 CDC SQL中的参数 decoding.plugin.name 对应的是逻辑解码插件, 默认值 decoderbufs。逻辑解码插件是 PostgreSQL 实现逻辑复制和变更数据捕获(CDC)的核心组件,其核心作用是将数据库的物理 WAL(Write-Ahead Log,预写日志)转换为可读性强、结构化的逻辑变更数据(如 INSERT/UPDATE/DELETE 操作详情)。
简单来说,WAL 是 PostgreSQL 用于崩溃恢复的物理日志,记录的是数据块级别的修改,人类无法直接理解;而逻辑解码插件的作用就是 “翻译” 这些物理日志,生成能被外部系统(如复制工具、CDC 工具)识别的逻辑变更信息(例如 “在表 t 中插入了一条 id=1、name=’xxx’ 的记录”)。
官方内置的逻辑解码插件
在 PostgreSQL 10 及以上版本中,官方内置的逻辑解码插件主要有 test_decoding 和 pgoutput 两个(经测试 PolarDB-for-PostgreSQL 只有 pgoutput),两者的核心功能都是将数据库的物理 WAL(Write-Ahead Log)转换为可读的逻辑变更数据,但在设计目标、功能特性和适用场景上有明显区别,具体对比如下:
对比维度 | test_decoding | pgoutput |
---|---|---|
设计目标 | 用于测试和演示逻辑解码机制,验证功能可行性 | 面向生产环境,提供标准化的逻辑变更输出 |
输出格式 | 自定义文本格式,结构简单(如 INSERT: …) | 遵循 PostgreSQL 官方协议的二进制格式 |
元数据完整性 | 仅包含基础变更数据,元数据有限 | 提供完整元数据(事务 ID、时间戳、数据类型等) |
数据类型支持 | 支持基础类型,对复杂类型(如 JSON、数组)支持有限 | 全面支持 PostgreSQL 所有数据类型 |
工具兼容性 | 几乎不支持主流 CDC 工具(如 Debezium、Flink CDC) | 被所有主流 CDC 工具官方推荐和优先支持 |
生产环境适用性 | 不推荐,仅用于调试和学习 | 推荐,官方主推的生产级插件 |
版本支持 | PostgreSQL 9.4 及以上(早期版本已存在) | PostgreSQL 10 及以上(新增官方插件) |
配置复杂度 | 简单,无需复杂参数 | 需配合工具配置(如指定发布者、订阅规则) |
总结:
- test_decoding 适合作为逻辑解码的入门学习工具,或用于验证数据库 WAL 解析功能,但因其格式不标准、功能有限,不适合生产环境。
- pgoutput 是官方为生产环境设计的插件,提供标准化输出和完整功能,是与 Debezium、Flink CDC 等工具集成的首选,能够可靠支持复杂业务场景的数据变更捕获需求。
逻辑复制槽
- slot.name 的作用是指定工具要连接和读取的逻辑复制槽名称。当 CDC 工具(如 Debezium、Flink CDC)连接 PostgreSQL 时,会通过这个参数找到对应的逻辑复制槽,然后持续从该槽中获取由逻辑解码插件(通过 decoding.plugin.name 指定)解析后的逻辑变更数据。
- 逻辑复制槽(Logical Replication Slot)是 PostgreSQL 中实现逻辑复制和变更数据捕获(CDC)的关键组件,用于持久化存储经逻辑解码插件处理后的逻辑变更数据,并为订阅者(如其他数据库、CDC 工具)提供稳定的变更数据读取接口。
- 可以通过 SQL 手动创建逻辑复制槽,但 Flink CDC 任务会根据 slot.name 自动创建对应的逻辑复制槽,如果该复制槽已经提前创建,会报错。
核心作用
- 数据持久化:逻辑复制槽会将逻辑解码插件(如 pgoutput)解析后的变更数据(INSERT/UPDATE/DELETE 等操作)持久化存储,即使订阅者断开连接,未被消费的数据也不会丢失。
- 顺序性保证:变更数据在槽中按事务提交顺序排列,确保订阅者能按原始顺序消费数据,避免乱序问题。
- 断点续传:每个复制槽会记录订阅者的消费位置(LSN,日志序列号),当订阅者重新连接时,可从上次中断的位置继续读取,无需重复消费历史数据。
- 资源隔离:不同的业务场景可创建独立的复制槽,避免相互干扰(例如一个槽用于数据同步,另一个槽用于审计日志)。
常用操作 SQL
创建逻辑复制槽
1 | -- 创建名为 my_slot 的逻辑复制槽,使用 pgoutput 插件 |
查看所有复制槽:
1 | SELECT slot_name, plugin, active FROM pg_replication_slots WHERE slot_type = 'logical'; |
删除复制槽:
1 | SELECT pg_drop_replication_slot('my_slot'); |
异常解决
must be superuser to create FOR ALL TABLES publication
1 | io.debezium.jdbc.JdbcConnectionException: ERROR: must be superuser to create FOR ALL TABLES publication |
原因解析
PostgreSQL 出于安全考虑,对 FOR ALL TABLES 这种 “包含数据库中所有表” 的发布设置了严格权限限制:
- 只有超级用户能创建 FOR ALL TABLES 发布(因为它会自动包含所有现有表和未来新表,可能涉及敏感数据)。
- 普通用户(即使有 CREATE 权限)只能创建指定具体表的发布(如 FOR TABLE table1, table2)。
方案 1:使用超级用户运行 Debezium(不推荐生产环境)
如果是测试环境,可以临时使用超级用户(如 postgres)连接数据库:
- 修改 CDC 配置,将 username 改为 postgres,并使用对应密码。
- 重启 CDC 任务,此时 CREATE PUBLICATION dbz_publication FOR ALL TABLES 会因超级用户权限而成功执行。
缺点:生产环境中使用超级用户存在安全风险,不建议长期使用。
方案 2:创建指定表的发布(推荐生产环境)
让 cdc_user 创建仅包含需要捕获的表的发布(避免 FOR ALL TABLES),步骤如下:
1 | -- 手动创建指定表的发布(用超级用户执行): |
- 生产环境中应严格遵循最小权限原则,避免使用超级用户运行 CDC 工具。
- 通过方案 2,既能满足 CDC 需求,又能保证数据库安全,是推荐的解决方式。
修改默认发布
Flink CDC 使用的默认发布为 dbz_publication ,可以通过参数修改它的默认值:1
'debezium.publication.name' = 'order_publication'
logical decoding requires wal_level >= logical
1 | io.debezium.DebeziumException: Creation of replication slot failed |
wal_level 默认值为 replica ,应当配置为 logical ,我们在前面已经配置过了,但是需要重启
可以通过下面的 SQL 查看当前 wal_level 配置1
SHOW wal_level;
ERROR: replication slot “cdc_slot” already exists
原因:该逻辑复制槽已经存在,可能提前手动创建了,也可能和其他任务重复了。
解决(二选一):
- 如果逻辑复制槽为提前手动创建也没有使用的话可以手动删除
- 为该任务指定别的逻辑复制槽名称
1 | io.debezium.DebeziumException: Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each. |
The “before” field of UPDATE/DELETE message is null
1 | java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE public.test_cdc REPLICA IDENTITY FULL'. Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-replica-identity |
这个错误的原因是 PostgreSQL 表的 REPLICA IDENTITY 配置级别不足,导致 Flink CDC 无法获取 UPDATE/DELETE 操作的旧数据(before 字段)。以下是具体解释和解决方法:
错误根源:REPLICA IDENTITY 配置
PostgreSQL 的 REPLICA IDENTITY 用于定义表在逻辑复制时,如何标识被更新或删除的行,决定了 WAL 日志中是否记录行的旧值(before 数据)。默认值为 DEFAULT,仅记录主键列的旧值,而非全量旧数据。
当 CDC 工具需要捕获 UPDATE/DELETE 操作的完整旧数据(如 Flink CDC 进行数据同步或对账时),若表的 REPLICA IDENTITY 级别不足,就会导致 before 字段为 null,触发该错误。
解决方法:修改表的 REPLICA IDENTITY 为 FULL
1 | -- 将表的 REPLICA IDENTITY 设为 FULL,记录所有列的旧值 |
REPLICA IDENTITY 各级别说明
级别 | 含义 | 适用场景 |
---|---|---|
DEFAULT | 仅记录主键列的旧值(默认值) | 仅需主键标识行的场景 |
USING INDEX | 记录指定唯一索引列的旧值(需指定索引) | 无主键但有唯一索引的表 |
FULL | 记录所有列的旧值(完整的 before 数据) | CDC 需要全量旧数据的场景 |
NOTHING | 不记录任何旧值 | 无需捕获 UPDATE/DELETE 的场景 |
CDC 工具通常需要 FULL 级别,以获取完整的变更前后数据。
验证设置是否生效
1 | -- 查看表的 REPLICA IDENTITY 配置 |
- 若 relreplident 显示为 f,表示已设置为 FULL(d 为 DEFAULT,n 为 NOTHING)。
注意事项
- 性能影响:FULL 级别会增加 WAL 日志的体积(因为记录全量旧值),建议仅对需要 CDC 捕获的表设置,避免对所有表生效。
- 主键要求:若表无主键或唯一索引,DEFAULT 和 USING INDEX 级别可能无法正常工作,需设置为 FULL。
- 生效时间:修改后立即生效,无需重启数据库或 CDC 任务(但建议重启 CDC 任务以确保重新加载配置)。
replication slot “cdc_slot” is active for PID 2538084
原因:两个任务使用了相同的逻辑复制槽
解决:为该任务指定别的逻辑复制槽名称
1 | com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. |
superuser_reserved_connections (3) plus max_wal_senders (100) must be less than max_connections (100)
PostgreSQL 配置中存在参数冲突,具体是:
superuser_reserved_connections
(超级用户预留连接数) + max_wal_senders
(WAL 发送进程数)的值,必须小于 max_connections
(最大连接数)
当前配置:
- superuser_reserved_connections = 3, 默认值 3,为超级用户预留的连接数(即使连接数达到上限,超级用户仍能登录)
- max_wal_senders = 100 默认值10,用于流复制的 WAL 发送进程最大数量(逻辑复制和物理复制都会占用)
- max_connections = 100 ,默认值 100, 数据库允许的最大并发连接数