前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
总结 HUDI preCombinedField,分两大类总结,一类是Spark SQL,这里指的是merge,因为只有merge语句中有多条记录,讨论preCombinedField才有意义;一类是Spark DF,HUDI0.9版本支持SQL建表和增删改查
总结
先说结论:
Spark DF建表写数据时(含更新):
1、UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
2、INSERT时,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
Spark SQL建表,写数据时(含更新):
有ts时,预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
没有ts时,则默认将主键字段的第一个值作为预合并字段,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据(因为这里的预合并字段为主键字段,等于历史值,其实原理跟上面有ts时一样)
1 PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
说明:
1、这里有ts代表设置了preCombinedField字段
2、hudi默认使用布隆索引,布隆索引只保证同一分区下同一个主键对应的值唯一,可以使用全局索引保证所有分区值唯一,这里不展开细说1
2
3
4
5
6
7
8
9
10
11private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case FLINK:
case JAVA:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
3、如果在测试过程中,发现和我的结论不一致,可能和后面的注意事项有关。
4、当指定了hoodie.datasource.write.insert.drop.duplicates=true时,不管是insert还是upsert,如果存在历史数据则不更新。实际在源码中,如果为upsert,也会修改为insert。1
2
3
4
5
6
7
8
9if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}
Spark DF
先说DF建表,DF写hudi表时,默认情况下,hudi,必须指定preCombinedField,否则,会抛出异常(当为insert等其他类型时,preCombinedField可以不用设置,具体见后面的源码解读部分),示例如下
1 | import org.apache.hudi.DataSourceWriteOptions._ |
Spark DF写数据默认OPERATION为UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
当OPERATION为INSERT时(option(OPERATION_OPT_KEY.key(), “INSERT”)),ts不是必须的,可以不设置,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
附hoodie.properties:
1 | #Properties saved on Sat Jul 10 15:08:16 CST 2021 |
可见,hudi表元数据里有hoodie.table.precombine.field=ts,代表preCombinedField生效
SQL
SQL与DF不同,分为两种,有预合并和没有预合并
没有预合并
SQL默认没有预合并
1 | spark.sql( |
没有设置预合并字段值,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据
查看hoodie.properties和在spark.sql(s”show create table ${tableName}”).show(false)打印信息里发现表的元数据信息确实没有preCombinedField,示例中虽然有ts字段,但是没有没有显示设置,当然可以直接去掉ts字段,大家可以自行测试。
有预合并
1 | spark.sql( |
SQL的唯一的区别是在建表语句中加了配置preCombineField = ‘ts’,同样可以在hoodie.properties和打印信息里查看是否有hoodie.table.precombine.field=ts信息。
预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
SQL与DF结合
先用SQL建表,再用DF写数据。这种情况主要是想建表时不想多一列ts字段,而在预合并时可以添加一列预合并字段进行去重,因为目前的版本SQL没有实现该功能。在SQL建表时如果指定了 preCombineField = ‘ts’,则表结构中必须有ts这个字段。
1 | val tableName = "test_hudi_table4" |
上面的程序主要是用SQL先建了表的元数据,然后再用程序指定了PRECOMBINE_FIELD_OPT_KEY=ts,这样就实现了既可以预合并去重,也不用在建表中指定ts字段。但是打印中发现用程序读parquet文件时多了ts列,读表时因为元数据里没有ts列,没有打印出来,实际文件存储的有ts这一列。
上面只是模拟了这一场景,而我们想实现的是下面的
1 | spark.sql( |
在建表时指定了preCombineField = ‘ts’,但是表结构中没有ts字段,而且后面的merge sql拼接时添加这一列。目前master分支还不支持这种情况,如果想实现这一情况,可以自己尝试修改源码支持。
代码
示例代码已上传到gitee,由于公司网把github屏蔽了,以后暂时转到gitee上。
源码解读
解读部分源码
更新:2021-09-26,因为0.9.0版本已发布,故更新源代码解析,可能存在部分源代码没有更新
程序写hudi时ts的必须性
默认配置时,如果不指定PRECOMBINE_FIELD_OPT_KEY,则会抛出以下异常:
1 | 21/07/13 20:04:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) |
对应的源码:
HoodieSparkSqlWriter.scala 230、233行1
2
3
4
5
6
7
8
9
10
11
12
13230 val hoodieAllIncomingRecords = genericRecords.map(gr => {
val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
val hoodieRecord = if (shouldCombine) {
233 val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS_NAME))
} else {
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
}
hoodieRecord
}).toJavaRDD()
getNestedFieldVal1
2
3
4
5
6
7
8
9
10
11
12public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
String[] parts = fieldName.split("\\.");
......
if (returnNullIfNotFound) {
return null;
434 } else if (valueNode.getSchema().getField(parts[i]) == null) {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
437 + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
} else {
throw new HoodieException("The value of " + parts[i] + " can not be null");
}
233行,如果shouldCombine==true,则会调用getNestedFieldVal,并将PRECOMBINE_FIELD_OPT_KEY的值作为fieldName参数传给getNestedFieldVal,而在getNestedFieldVal的434行发现当PRECOMBINE_FIELD_OPT_KEY的值==null时抛出上面的异常。
可以发现当shouldCombine==true,才会调用getNestedFieldVal,才会抛出该异常,而shouldCombine何时为true呢
1 | val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || |
当INSERT_DROP_DUPS为true或者操作类型为UPSERT时,shouldCombine为true,默认的INSERT_DROP_DUPS=false1
2
3
4
5
6
7
8
9
/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
*/
val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.insert.drop.duplicates")
.defaultValue("false")
.withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")
也就是默认情况下,upsert操作,ts是必须的,而insert等其他操作可以没有ts值。这样我们就可以根据实际情况灵活运用了。
注意
用SQL创建新表或者DF append模式创建新表时,如果对应的数据目录已存在,需要先将文件夹删掉,因为hoodie.properties里保存了表的元数据信息,程序里会根据文件信息判断表是否存在,如果存在,会复用旧表的元数据。这种情况存在于想用同一个表测试上面多种情况