Skip to content

Commit bdfa722

Browse files
committed
feat(index): Refactor metadata table initialization logic based on index abstractoin
1 parent edde4b2 commit bdfa722

20 files changed

Lines changed: 276 additions & 843 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 67 additions & 599 deletions
Large diffs are not rendered by default.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.util.collection.Pair;
3434
import org.apache.hudi.config.HoodieWriteConfig;
3535
import org.apache.hudi.exception.HoodieMetadataException;
36+
import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
3637
import org.apache.hudi.storage.StorageConfiguration;
3738

3839
import java.util.Arrays;
@@ -72,17 +73,9 @@ protected HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
7273
HoodieWriteConfig writeConfig,
7374
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
7475
HoodieEngineContext engineContext,
76+
ExpressionIndexRecordGenerator expressionIndexRecordGenerator,
7577
Option<String> inflightInstantTimestamp) {
76-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
77-
}
78-
79-
@Override
80-
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
81-
return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient).stream()
82-
.filter(partition -> !partition.equals(MetadataPartitionType.SECONDARY_INDEX))
83-
.filter(partition -> !partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
84-
.filter(partition -> !partition.equals(MetadataPartitionType.PARTITION_STATS))
85-
.collect(Collectors.toList());
78+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, expressionIndexRecordGenerator, inflightInstantTimestamp);
8679
}
8780

8881
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.metadata;
2020

21+
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2122
import org.apache.hudi.avro.model.HoodieMetadataRecord;
2223
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
2324
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -73,6 +74,7 @@
7374
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
7475
import org.apache.hudi.exception.HoodieException;
7576
import org.apache.hudi.exception.HoodieMetadataException;
77+
import org.apache.hudi.metadata.index.Indexer;
7678
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
7779
import org.apache.hudi.storage.StoragePath;
7880
import org.apache.hudi.storage.StoragePathInfo;
@@ -106,6 +108,7 @@
106108
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
107109
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords;
108110
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords;
111+
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToExpressionIndexRecords;
109112
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords;
110113
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords;
111114
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
@@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig(
362365
return metadataWriteConfig;
363366
}
364367

368+
/**
369+
* Convert the clean action to metadata records.
370+
*/
371+
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext,
372+
HoodieCleanMetadata cleanMetadata,
373+
String instantTime,
374+
HoodieTableMetaClient dataMetaClient,
375+
HoodieMetadataConfig metadataConfig,
376+
Map<MetadataPartitionType, Indexer> enabledIndexBuilderMap,
377+
int bloomIndexParallelism,
378+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
379+
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
380+
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(
381+
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
382+
partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
383+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.BLOOM_FILTERS)) {
384+
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
385+
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism);
386+
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecordsRDD);
387+
}
388+
389+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.COLUMN_STATS)) {
390+
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
391+
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
392+
dataMetaClient, metadataConfig, recordTypeOpt);
393+
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
394+
}
395+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.EXPRESSION_INDEX)) {
396+
convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, partitionToRecordsMap,
397+
recordTypeOpt);
398+
}
399+
400+
return partitionToRecordsMap;
401+
}
402+
365403
/**
366404
* Convert commit action to metadata records for the enabled partition types.
367405
*
@@ -370,12 +408,14 @@ public static HoodieWriteConfig createMetadataWriteConfig(
370408
* @param commitMetadata - Commit action metadata
371409
* @param instantTime - Action instant time
372410
* @param dataMetaClient - HoodieTableMetaClient for data
373-
* @param tableMetadata
411+
* @param tableMetadata - metadata table reader
374412
* @param metadataConfig - HoodieMetadataConfig
375413
* @param enabledPartitionTypes - Set of enabled MDT partitions to update
376414
* @param bloomFilterType - Type of generated bloom filter records
377415
* @param bloomIndexParallelism - Parallelism for bloom filter record generation
378-
* @param enableOptimizeLogBlocksScan - flag used to enable scanInternalV2 for log blocks in data table
416+
* @param writesFileIdEncoding - file id encoding used while generating record index records
417+
* @param engineType - execution engine type
418+
* @param recordTypeOpt - record type override for generated metadata records
379419
* @return Map of partition to metadata records for the commit action
380420
*/
381421
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieWriteConfig dataWriteConfig, HoodieCommitMetadata commitMetadata,
@@ -555,8 +595,10 @@ public static Set<String> getFilesToFetchColumnStats(List<HoodieWriteStat> parti
555595
// Get the latest merged file slices based on the commited files part of the latest snapshot and the new files of the current commit metadata
556596
List<StoragePathInfo> consolidatedPathInfos = new ArrayList<>();
557597
partitionedWriteStat.forEach(
558-
stat -> consolidatedPathInfos.add(
559-
new StoragePathInfo(new StoragePath(dataMetaClient.getBasePath(), stat.getPath()), stat.getFileSizeInBytes(), false, (short) 0, 0, 0)));
598+
stat -> {
599+
StoragePathInfo pathInfo = new StoragePathInfo(new StoragePath(dataMetaClient.getBasePath(), stat.getPath()), stat.getFileSizeInBytes(), false, (short) 0, 0, 0);
600+
consolidatedPathInfos.add(pathInfo);
601+
});
560602
SyncableFileSystemView fileSystemViewForCommitedFiles =
561603
FileSystemViewManager.createViewManager(new HoodieLocalEngineContext(dataMetaClient.getStorageConf()),
562604
dataWriteConfig.getMetadataConfig(), dataWriteConfig.getViewStorageConfig(), dataWriteConfig.getCommonConfig(),

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.engine.ReaderContextFactory;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.FileSlice;
30+
import org.apache.hudi.metadata.model.FileSliceAndPartition;
3031
import org.apache.hudi.common.model.HoodieBaseFile;
3132
import org.apache.hudi.common.model.HoodieFileFormat;
3233
import org.apache.hudi.common.model.HoodieIndexDefinition;
@@ -235,7 +236,7 @@ public static <T> Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMeta
235236
}
236237

237238
public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
238-
List<Pair<String, FileSlice>> partitionFileSlicePairs,
239+
List<FileSliceAndPartition> partitionFileSlicePairs,
239240
int secondaryIndexMaxParallelism,
240241
String activeModule, HoodieTableMetaClient metaClient,
241242
HoodieIndexDefinition indexDefinition,
@@ -255,8 +256,8 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
255256
engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices");
256257
HoodieFileFormat baseFileFormat = metaClient.getTableConfig().getBaseFileFormat();
257258
return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
258-
final String partition = partitionAndBaseFile.getKey();
259-
final FileSlice fileSlice = partitionAndBaseFile.getValue();
259+
final String partition = partitionAndBaseFile.partitionPath();
260+
final FileSlice fileSlice = partitionAndBaseFile.fileSlice();
260261
Option<StoragePath> dataFilePath = Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> FSUtils.getAbsoluteFilePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
261262
HoodieSchema readerSchema;
262263
if (dataFilePath.isPresent()) {

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ void testConvertToColumnStatsRecordWithValidColumns() {
201201
// Mock convertFilesToColumnStatsRecords to return empty HoodieData
202202
HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
203203
mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
204-
any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
204+
any(), any(), any(), any(), anyInt(), anyInt(), any()))
205205
.thenReturn(mockHoodieData);
206206

207207
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
@@ -223,7 +223,6 @@ void testConvertToColumnStatsRecordWithValidColumns() {
223223
eq(partitionFilesToDelete),
224224
eq(partitionFilesToAdd),
225225
eq(dataMetaClient),
226-
eq(metadataConfig),
227226
eq(4),
228227
eq(1024),
229228
any()
@@ -246,7 +245,7 @@ void testConvertToColumnStatsRecordWithMixedInputs() {
246245
// Mock convertFilesToColumnStatsRecords to return empty HoodieData
247246
HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
248247
mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
249-
any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
248+
any(), any(), any(), any(), anyInt(), anyInt(), any()))
250249
.thenReturn(mockHoodieData);
251250

252251
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
@@ -275,7 +274,6 @@ void testConvertToColumnStatsRecordWithMixedInputs() {
275274
eq(partitionFilesToDelete),
276275
eq(partitionFilesToAdd),
277276
eq(dataMetaClient),
278-
eq(metadataConfig),
279277
eq(4),
280278
eq(1024),
281279
any()

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,14 @@
2727
import org.apache.hudi.common.engine.HoodieEngineContext;
2828
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2929
import org.apache.hudi.common.model.HoodieFileGroupId;
30-
import org.apache.hudi.common.model.HoodieIndexDefinition;
3130
import org.apache.hudi.common.model.HoodieRecord;
32-
import org.apache.hudi.common.schema.HoodieSchema;
33-
import org.apache.hudi.common.table.HoodieTableMetaClient;
3431
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3532
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3633
import org.apache.hudi.common.util.Option;
3734
import org.apache.hudi.common.util.collection.Pair;
3835
import org.apache.hudi.config.HoodieWriteConfig;
3936
import org.apache.hudi.exception.HoodieNotSupportedException;
37+
import org.apache.hudi.metadata.index.UnsupportedExpressionIndexRecordGenerator;
4038
import org.apache.hudi.storage.StorageConfiguration;
4139
import org.apache.hudi.table.BulkInsertPartitioner;
4240

@@ -82,7 +80,7 @@ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
8280
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
8381
HoodieEngineContext engineContext,
8482
Option<String> inFlightInstantTimestamp) {
85-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp);
83+
this(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp, false);
8684
}
8785

8886
FlinkHoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
@@ -91,7 +89,7 @@ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
9189
HoodieEngineContext engineContext,
9290
Option<String> inFlightInstantTimestamp,
9391
boolean streamingWrites) {
94-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp, streamingWrites);
92+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, new UnsupportedExpressionIndexRecordGenerator(EngineType.FLINK), inFlightInstantTimestamp, streamingWrites);
9593
}
9694

9795
@Override
@@ -166,13 +164,6 @@ protected void preWrite(String instantTime) {
166164
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
167165
}
168166

169-
@Override
170-
protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
171-
HoodieTableMetaClient metaClient, int parallelism, HoodieSchema tableSchema, HoodieSchema readerSchema,
172-
StorageConfiguration<?> storageConf, String instantTime) {
173-
throw new HoodieNotSupportedException("Flink metadata table does not support expression index yet.");
174-
}
175-
176167
@Override
177168
protected List<WriteStatus> streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
178169
return getWriteClient().upsertPreppedRecords(fileGroupIdToTaggedRecords.getValue().collectAsList(), instantTime);

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,12 @@
2727
import org.apache.hudi.common.engine.HoodieEngineContext;
2828
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2929
import org.apache.hudi.common.model.HoodieFileGroupId;
30-
import org.apache.hudi.common.model.HoodieIndexDefinition;
3130
import org.apache.hudi.common.model.HoodieRecord;
32-
import org.apache.hudi.common.schema.HoodieSchema;
33-
import org.apache.hudi.common.table.HoodieTableMetaClient;
3431
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3532
import org.apache.hudi.common.util.Option;
36-
import org.apache.hudi.common.util.collection.Pair;
3733
import org.apache.hudi.config.HoodieWriteConfig;
3834
import org.apache.hudi.exception.HoodieNotSupportedException;
35+
import org.apache.hudi.metadata.index.UnsupportedExpressionIndexRecordGenerator;
3936
import org.apache.hudi.storage.StorageConfiguration;
4037
import org.apache.hudi.table.BulkInsertPartitioner;
4138

@@ -59,7 +56,7 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada
5956
protected JavaHoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
6057
HoodieEngineContext engineContext,
6158
Option<String> inflightInstantTimestamp) {
62-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
59+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, new UnsupportedExpressionIndexRecordGenerator(EngineType.JAVA), inflightInstantTimestamp);
6360
}
6461

6562
public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
@@ -147,13 +144,6 @@ protected void preWrite(String instantTime) {
147144
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
148145
}
149146

150-
@Override
151-
protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
152-
HoodieTableMetaClient metaClient, int parallelism, HoodieSchema tableSchema, HoodieSchema readerSchema,
153-
StorageConfiguration<?> storageConf, String instantTime) {
154-
throw new HoodieNotSupportedException("Expression index not supported for Java metadata table writer yet.");
155-
}
156-
157147
@Override
158148
protected EngineType getEngineType() {
159149
return EngineType.JAVA;

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2839,7 +2839,7 @@ private void validateMetadata(HoodieJavaWriteClient testClient, Option<String> i
28392839
// check if the last instant is restore, then the metadata table should have only the partitions that are not deleted
28402840
metaClient.reloadActiveTimeline().getReverseOrderedInstants().findFirst().ifPresent(instant -> {
28412841
if (instant.getAction().equals(HoodieActiveTimeline.RESTORE_ACTION)) {
2842-
metadataWriter.getEnabledPartitionTypes().stream().filter(partitionType -> !MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
2842+
metadataWriter.getEnabledIndexerMap().keySet().stream().filter(partitionType -> !MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
28432843
.forEach(partitionType -> assertTrue(metadataTablePartitions.contains(partitionType.getPartitionPath())));
28442844
}
28452845
});

0 commit comments

Comments
 (0)