前言
总结 HUDI preCombinedField,分两大类总结,一类是Spark SQL,这里指的是merge,因为只有merge语句中有多条记录,讨论preCombinedField才有意义;一类是Spark DF,HUDI0.9版本支持SQL建表和增删改查
总结
先说结论:
Spark DF建表写数据时(含更新):
1、UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
2、INSERT时,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
Spark SQL建表,写数据时(含更新):
有ts时,预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
没有ts时,则默认将主键字段的第一个值作为预合并字段,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据(因为这里的预合并字段为主键字段,等于历史值,其实原理跟上面有ts时一样)
1 PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field
说明:
1、这里有ts代表设置了preCombinedField字段
2、hudi默认使用布隆索引,布隆索引只保证同一分区下同一个主键对应的值唯一,可以使用全局索引保证所有分区值唯一,这里不展开细说1
2
3
4
5
6
7
8
9
10
11private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case FLINK:
case JAVA:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
3、如果在测试过程中,发现和我的结论不一致,可能和后面的注意事项有关。
4、当指定了hoodie.datasource.write.insert.drop.duplicates=true时,不管是insert还是upsert,如果存在历史数据则不更新。实际在源码中,如果为upsert,也会修改为insert。1
2
3
4
5
6
7
8
9if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}