Skip to content

Commit d2b4e7e

Browse files
committed
feat(index): Add Indexer abstraction and refactor metadata table initialization logic
1 parent 14a549f commit d2b4e7e

36 files changed

Lines changed: 2177 additions & 759 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 66 additions & 592 deletions
Large diffs are not rendered by default.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.util.collection.Pair;
3434
import org.apache.hudi.config.HoodieWriteConfig;
3535
import org.apache.hudi.exception.HoodieMetadataException;
36+
import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
3637
import org.apache.hudi.storage.StorageConfiguration;
3738

3839
import java.util.Arrays;
@@ -72,17 +73,9 @@ protected HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
7273
HoodieWriteConfig writeConfig,
7374
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
7475
HoodieEngineContext engineContext,
76+
ExpressionIndexRecordGenerator expressionIndexRecordGenerator,
7577
Option<String> inflightInstantTimestamp) {
76-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
77-
}
78-
79-
@Override
80-
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
81-
return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient).stream()
82-
.filter(partition -> !partition.equals(MetadataPartitionType.SECONDARY_INDEX))
83-
.filter(partition -> !partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
84-
.filter(partition -> !partition.equals(MetadataPartitionType.PARTITION_STATS))
85-
.collect(Collectors.toList());
78+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, expressionIndexRecordGenerator, inflightInstantTimestamp);
8679
}
8780

8881
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.metadata;
2020

21+
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2122
import org.apache.hudi.avro.model.HoodieMetadataRecord;
2223
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
2324
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -73,6 +74,7 @@
7374
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
7475
import org.apache.hudi.exception.HoodieException;
7576
import org.apache.hudi.exception.HoodieMetadataException;
77+
import org.apache.hudi.metadata.index.Indexer;
7678
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
7779
import org.apache.hudi.storage.StoragePath;
7880
import org.apache.hudi.storage.StoragePathInfo;
@@ -106,6 +108,7 @@
106108
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
107109
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords;
108110
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords;
111+
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToExpressionIndexRecords;
109112
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords;
110113
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords;
111114
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
@@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig(
362365
return metadataWriteConfig;
363366
}
364367

368+
/**
369+
* Convert the clean action to metadata records.
370+
*/
371+
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext,
372+
HoodieCleanMetadata cleanMetadata,
373+
String instantTime,
374+
HoodieTableMetaClient dataMetaClient,
375+
HoodieMetadataConfig metadataConfig,
376+
Map<MetadataPartitionType, Indexer> enabledIndexBuilderMap,
377+
int bloomIndexParallelism,
378+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
379+
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
380+
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(
381+
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
382+
partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
383+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.BLOOM_FILTERS)) {
384+
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
385+
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism);
386+
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecordsRDD);
387+
}
388+
389+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.COLUMN_STATS)) {
390+
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
391+
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
392+
dataMetaClient, metadataConfig, recordTypeOpt);
393+
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
394+
}
395+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.EXPRESSION_INDEX)) {
396+
convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, partitionToRecordsMap,
397+
recordTypeOpt);
398+
}
399+
400+
return partitionToRecordsMap;
401+
}
402+
365403
/**
366404
* Convert commit action to metadata records for the enabled partition types.
367405
*
@@ -370,12 +408,14 @@ public static HoodieWriteConfig createMetadataWriteConfig(
370408
* @param commitMetadata - Commit action metadata
371409
* @param instantTime - Action instant time
372410
* @param dataMetaClient - HoodieTableMetaClient for data
373-
* @param tableMetadata
411+
* @param tableMetadata - metadata table reader
374412
* @param metadataConfig - HoodieMetadataConfig
375413
* @param enabledPartitionTypes - Set of enabled MDT partitions to update
376414
* @param bloomFilterType - Type of generated bloom filter records
377415
* @param bloomIndexParallelism - Parallelism for bloom filter record generation
378-
* @param enableOptimizeLogBlocksScan - flag used to enable scanInternalV2 for log blocks in data table
416+
* @param writesFileIdEncoding - file id encoding used while generating record index records
417+
* @param engineType - execution engine type
418+
* @param recordTypeOpt - record type override for generated metadata records
379419
* @return Map of partition to metadata records for the commit action
380420
*/
381421
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieWriteConfig dataWriteConfig, HoodieCommitMetadata commitMetadata,

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.engine.ReaderContextFactory;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.FileSlice;
30+
import org.apache.hudi.metadata.model.FileSliceAndPartition;
3031
import org.apache.hudi.common.model.HoodieBaseFile;
3132
import org.apache.hudi.common.model.HoodieFileFormat;
3233
import org.apache.hudi.common.model.HoodieIndexDefinition;
@@ -235,7 +236,7 @@ public static <T> Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMeta
235236
}
236237

237238
public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
238-
List<Pair<String, FileSlice>> partitionFileSlicePairs,
239+
List<FileSliceAndPartition> partitionFileSlicePairs,
239240
int secondaryIndexMaxParallelism,
240241
String activeModule, HoodieTableMetaClient metaClient,
241242
HoodieIndexDefinition indexDefinition,
@@ -255,8 +256,8 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
255256
engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices");
256257
HoodieFileFormat baseFileFormat = metaClient.getTableConfig().getBaseFileFormat();
257258
return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
258-
final String partition = partitionAndBaseFile.getKey();
259-
final FileSlice fileSlice = partitionAndBaseFile.getValue();
259+
final String partition = partitionAndBaseFile.partitionPath();
260+
final FileSlice fileSlice = partitionAndBaseFile.fileSlice();
260261
Option<StoragePath> dataFilePath = Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> FSUtils.getAbsoluteFilePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
261262
HoodieSchema readerSchema;
262263
if (dataFilePath.isPresent()) {

0 commit comments

Comments
 (0)