前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表
版本
Hudi 0.12.0
Spark 2.4.4
入口
1 | val bootstrapDF = spark.emptyDataFrame |
根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,save
方法会走到DefaultSource.createRelation
1 | override def createRelation(sqlContext: SQLContext, |
它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap
HoodieSparkSqlWriter.bootstrap
这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:HoodieTableMetaClient.initTable
,接着调用writeClient.bootstrap
1 | def bootstrap(sqlContext: SQLContext, |
writeClient.bootstrap
这里的writeClient为SparkRDDWriteClient
,然后调用HoodieTable的bootstrap
,我们这里使用表类型为COW,所以为HoodieSparkCopyOnWriteTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
break;
case MERGE_ON_READ:
hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
break;
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
return hoodieSparkTable;
}
HoodieSparkCopyOnWriteTable.bootstrap
1 | public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) { |
SparkBootstrapCommitActionExecutor.execute
这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的
1 |
|
listAndProcessSourcePartitions
这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现
1 | private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException { |
selector.select
MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的
1 | public class MetadataOnlyBootstrapModeSelector extends UniformBootstrapModeSelector { |
UniformBootstrapModeSelector.select
很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。
1 | public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) { |
BootstrapRegexModeSelector
BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值METADATA_ONLY
、hoodie.bootstrap.mode.selector.regex默认值.*
但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)
1 | public BootstrapRegexModeSelector(HoodieWriteConfig writeConfig) { |
关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap
metadataBootstrap
这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler
1 | private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) { |
ParquetBootstrapMetadataHandler.runMetadataBootstrap
ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap
1 | public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator) { |
executeBootstrap
executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()
1 | void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, |
wrapper.execute()
这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用
1 | public E execute() { |
startProducers
我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(producerExecutorService);
producers.stream().map(producer -> {
return completionService.submit(() -> {
try {
preExecuteRunnable.run();
producer.produce(queue);
} catch (Throwable e) {
LOG.error("error producing records", e);
queue.markAsFailed(e);
throw e;
} finally {
synchronized (latch) {
latch.countDown();
if (latch.getCount() == 0) {
// Mark production as done so that consumer will be able to exit
queue.close();
}
}
}
return true;
});
}).collect(Collectors.toList());
return completionService;
}
IteratorBasedQueueProducer.produce
其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator
1 | public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception { |
ParquetReaderIterator.next
可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者
1 | public boolean hasNext() { |
startConsumer
这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)
1 | private Future<E> startConsumer() { |
BootstrapRecordConsumer.consume
它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord,
1 | public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception { |
BootstrapRecordConsumer.consumeOneRecord
这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的
1 | protected void consumeOneRecord(HoodieRecord record) { |
1 | public Option<IndexedRecord> getInsertValue(Schema schema) { |
transformFunction
这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload1
2
3
4
5
6
7
8inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
}
BootstrapRecordPayload.getInsertValue
这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了1
2
3public Option<IndexedRecord> getInsertValue(Schema schema) {
return Option.ofNullable(record);
}
HoodieBootstrapHandle.write
HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD
1 | public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) { |
SparkBootstrapCommitActionExecutor.fullBootstrap
首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
FullRecordBootstrapDataProvider inputProvider =
(FullRecordBootstrapDataProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
properties, context);
JavaRDD<HoodieRecord> inputRecordsRDD =
(JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
partitionFilesList);
// Start Full Bootstrap
final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
table.getActiveTimeline().createNewInstant(requested);
// Setup correct schema and run bulk insert.
return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
}
在父类SparkFullBootstrapDataProviderBase1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
try {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
Option.empty());
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
try {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
props.getString("hoodie.datasource.write.payload.class"));
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
});
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
getBulkInsertActionExecutor
返回SparkBulkInsertCommitActionExecutor
1 | protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) { |
SparkBulkInsertCommitActionExecutor.execute
执行SparkBulkInsertHelper.bulkInsert
1 | public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { |
SparkBulkInsertHelper.bulkInsert
最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<HoodieRecord<T>> inputRecords,
final String instantTime,
final HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor,
final boolean performDedupe,
final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
// write new files
HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
总结
本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。