跳到主要内容

LakeSoul CDC 表

CDC (Change Data Capture) 是湖仓重要的数据源之一. LakeSoul CDC 表的目标是能够从在线 OLTP 数据库快速同步数据到 LakeSoul 表中,从而下游分析计算任务在较小的时间间隔后就可以读到在线数据库的同步数据,消除了传统 T+1 复制的开销。

LakeSoul 使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canalFlink CDC 中导入 CDC 数据。

创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column 来配置 CDC 变更类型的列名。这一列需要是 string 类型,包含三种取值之一: update, insert, delete.

在 LakeSoul 读数据自动合并时,会保留最新的 insertupdate 数据,并自动过滤掉 delete 的行。

创建 LakeSoul CDC 表

使用 Scala API 或者 SQL,假设操作类型列名为 change_type:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()

注意 LakeSoul CDC 表必须是主键表,并且主键列需要和 CDC 上游数据库表相同。

LakeSoul CDC 表的增量读取

LakeSoul 的增量采用的是主键分片的模式,因此增量数据写入时不需要与存量数据做合并操作。对于 CDC 表,增量数据就是原始的 CDC 流的内容。对 LakeSoul 表的 CDC 增量读取,可以完整保留 CDC 操作标记,即 insert、update、delete。目前 2.2.0 版本在 Spark 中已经支持了增量流式读取。下个版本会发布 Flink Stream Source,支持流式增量读取为 Flink ChangeLog Stream。