前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
在上一篇博客Hudi preCombinedField 总结中已经对preCombinedField进行总结过一次了,由于当时对源码理解还不够深入,导致分析的不全面,现在对源码有了进一步的理解,所以再进行总结补充一下。
历史比较值
上面总结中:
DF:无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
SQL:写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
这里解释一下原因,首先Spark SQL PAYLOAD_CLASS_NAME 默认值为ExpressionPayload,而ExpressionPayload继承了DefaultHoodieRecordPayload
1 | class ExpressionPayload(record: GenericRecord, |
DefaultHoodieRecordPayload 里的needUpdatingPersistedRecord实现了历史值进行比较,具体实现,后面会进行分析
而 Spark DF在hudi0.9.0版本 PAYLOAD_CLASS_NAME的默认值为OverwriteWithLatestAvroPayload,它是DefaultHoodieRecordPayload的父类并没有实现和历史值进行比较
历史值比较实现
对源码进行简单的分析,首先说明历史比较值的配置项为:1
HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field"
而它的默认值为ts,所以ordering_field和preCombineField并不一样,但是因为默认值一样而且实现都在PAYLOAD_CLASS里,所以给人的感觉是一样,故放在一起进行总结
HoodieMergeHandle
hudi 在 upsert进行小文件合并时,会走到HoodieMergeHandled的write方法:
1 | /** |
combineAndGetUpdateValue方法
看一下 DefaultHoodieRecordPayload的combineAndGetUpdateValue:
1 |
|
关于recordBytes的赋值,在父类BaseAvroPayload,我们写数据时需要先构造GenericRecord record,然后将record作为参数传给PayLoad,最后构造构造List
>,调用HoodieJavaWriteClient.upsert(List > records,
String instantTime)
1 | public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { |
needUpdatingPersistedRecord
和历史值的比较就在这里:
1 | protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, |
PAYLOAD_ORDERING_FIELD_PROP_KEY默认值
可以看到在上面HoodieMergeHandle中传的properties参数为config.getPayloadConfig().getProps()
getPayloadConfig返回HoodiePayloadConfig,而在HoodiePayloadConfig定义了PAYLOAD_ORDERING_FIELD_PROP_KEY的默认值为ts1
2
3
4
5
6
7
8
9
10
11 public HoodiePayloadConfig getPayloadConfig() {
return hoodiePayloadConfig;
}
public class HoodiePayloadConfig extends HoodieConfig {
public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
.key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
.defaultValue("ts")
.withDocumentation("Table column/field name to order records that have the same key, before "
+ "merging and writing to storage.");
预合并实现
首先说明,预合并实现方法为类 OverwriteWithLatestAvroPayload.preCombine
1 | public class OverwriteWithLatestAvroPayload extends BaseAvroPayload |
所以无论是Spark SQL 还是 Spark DF都默认实现了预合并ExpressionPayload、DefaultHoodieRecordPayload都继承了(extends
)OverwriteWithLatestAvroPayload,所以用这三个payload都可以实现预合并,关键看怎么构造paylod
构造Paylod
根据上面的代码,我们可以发现OverwriteWithLatestAvroPayload有两个构造函数,一个参数和两个参数,其中一个参数的并不能实现预合并,因为预合并方法中需要orderingVal比较,所以要用两个参数的构造函数构造OverwriteWithLatestAvroPayload,其中orderingVal 为 preCombineField对应的值,record为一行记录值。而无论是Spark SQL还是Spark DF,最终都会调用HoodieSparkSqlWriter.write
,构造paylod就是在这个write方法里实现的。
1 | // Convert to RDD[HoodieRecord] |
通过上面源码的注释中可以看到,如果需要进行预合并的话,则首先取出record中对应的PRECOMBINE_FIELD值orderingVal,然后构造payload,即1
new OverwriteWithLatestAvroPayload(record, orderingVal)
这里就构造好了payload,那么最终是在哪里实现的预合并呢?
调用preCombine
这里以cow表的upsert为例,即HoodieJavaCopyOnWriteTable.upsert
1 | // HoodieJavaCopyOnWriteTable |
这样就实现了预合并的功能
修改历史比较值
最后说一下历史比较值是怎么修改的,其实Spark SQL 和 Spark DF不用特意修改它的值,因为默认和preCombineField值是同步修改的,看一下程序怎么同步修改的。
无论是是SQL还是DF最终都会调用HoodieSparkSqlWriter.write
1 | // Create a HoodieWriteClient & issue the delete. |
如果确实想修改默认值,即和PRECOMBINE_FIELD不一样,
那么sql:
1 | set hoodie.payload.ordering.field=ts; |
DF:
1 | .option("hoodie.payload.ordering.field", "ts") |