Skip to content

Commit db53699

Browse files
committed
feat(index): Refactor metadata table initialization logic based on index abstraction
1 parent 4d19980 commit db53699

20 files changed

Lines changed: 357 additions & 863 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 115 additions & 587 deletions
Large diffs are not rendered by default.

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.util.collection.Pair;
3434
import org.apache.hudi.config.HoodieWriteConfig;
3535
import org.apache.hudi.exception.HoodieMetadataException;
36+
import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
3637
import org.apache.hudi.storage.StorageConfiguration;
3738

3839
import java.util.Arrays;
@@ -72,17 +73,9 @@ protected HoodieBackedTableMetadataWriterTableVersionSix(StorageConfiguration<?>
7273
HoodieWriteConfig writeConfig,
7374
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
7475
HoodieEngineContext engineContext,
76+
ExpressionIndexRecordGenerator expressionIndexRecordGenerator,
7577
Option<String> inflightInstantTimestamp) {
76-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
77-
}
78-
79-
@Override
80-
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig metadataConfig, HoodieTableMetaClient metaClient) {
81-
return MetadataPartitionType.getEnabledPartitions(metadataConfig, metaClient).stream()
82-
.filter(partition -> !partition.equals(MetadataPartitionType.SECONDARY_INDEX))
83-
.filter(partition -> !partition.equals(MetadataPartitionType.EXPRESSION_INDEX))
84-
.filter(partition -> !partition.equals(MetadataPartitionType.PARTITION_STATS))
85-
.collect(Collectors.toList());
78+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, expressionIndexRecordGenerator, inflightInstantTimestamp);
8679
}
8780

8881
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.metadata;
2020

21+
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2122
import org.apache.hudi.avro.model.HoodieMetadataRecord;
2223
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
2324
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -73,6 +74,7 @@
7374
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
7475
import org.apache.hudi.exception.HoodieException;
7576
import org.apache.hudi.exception.HoodieMetadataException;
77+
import org.apache.hudi.metadata.index.Indexer;
7678
import org.apache.hudi.stats.HoodieColumnRangeMetadata;
7779
import org.apache.hudi.storage.StoragePath;
7880
import org.apache.hudi.storage.StoragePathInfo;
@@ -106,6 +108,7 @@
106108
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
107109
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToBloomFilterRecords;
108110
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToColumnStatsRecords;
111+
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToExpressionIndexRecords;
109112
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords;
110113
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToPartitionStatsRecords;
111114
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
@@ -362,6 +365,41 @@ public static HoodieWriteConfig createMetadataWriteConfig(
362365
return metadataWriteConfig;
363366
}
364367

368+
/**
369+
* Convert the clean action to metadata records.
370+
*/
371+
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext,
372+
HoodieCleanMetadata cleanMetadata,
373+
String instantTime,
374+
HoodieTableMetaClient dataMetaClient,
375+
HoodieMetadataConfig metadataConfig,
376+
Map<MetadataPartitionType, Indexer> enabledIndexBuilderMap,
377+
int bloomIndexParallelism,
378+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
379+
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
380+
final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize(
381+
convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
382+
partitionToRecordsMap.put(MetadataPartitionType.FILES.getPartitionPath(), filesPartitionRecordsRDD);
383+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.BLOOM_FILTERS)) {
384+
final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
385+
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism);
386+
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath(), metadataBloomFilterRecordsRDD);
387+
}
388+
389+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.COLUMN_STATS)) {
390+
final HoodieData<HoodieRecord> metadataColumnStatsRDD =
391+
convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
392+
dataMetaClient, metadataConfig, recordTypeOpt);
393+
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
394+
}
395+
if (enabledIndexBuilderMap.containsKey(MetadataPartitionType.EXPRESSION_INDEX)) {
396+
convertMetadataToExpressionIndexRecords(engineContext, cleanMetadata, instantTime, dataMetaClient, metadataConfig, bloomIndexParallelism, partitionToRecordsMap,
397+
recordTypeOpt);
398+
}
399+
400+
return partitionToRecordsMap;
401+
}
402+
365403
/**
366404
* Convert commit action to metadata records for the enabled partition types.
367405
*
@@ -370,12 +408,14 @@ public static HoodieWriteConfig createMetadataWriteConfig(
370408
* @param commitMetadata - Commit action metadata
371409
* @param instantTime - Action instant time
372410
* @param dataMetaClient - HoodieTableMetaClient for data
373-
* @param tableMetadata
411+
* @param tableMetadata - metadata table reader
374412
* @param metadataConfig - HoodieMetadataConfig
375413
* @param enabledPartitionTypes - Set of enabled MDT partitions to update
376414
* @param bloomFilterType - Type of generated bloom filter records
377415
* @param bloomIndexParallelism - Parallelism for bloom filter record generation
378-
* @param enableOptimizeLogBlocksScan - flag used to enable scanInternalV2 for log blocks in data table
416+
* @param writesFileIdEncoding - file id encoding used while generating record index records
417+
* @param engineType - execution engine type
418+
* @param recordTypeOpt - record type override for generated metadata records
379419
* @return Map of partition to metadata records for the commit action
380420
*/
381421
public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, HoodieWriteConfig dataWriteConfig, HoodieCommitMetadata commitMetadata,

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.engine.ReaderContextFactory;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.FileSlice;
30+
import org.apache.hudi.metadata.model.FileSliceAndPartition;
3031
import org.apache.hudi.common.model.HoodieBaseFile;
3132
import org.apache.hudi.common.model.HoodieFileFormat;
3233
import org.apache.hudi.common.model.HoodieIndexDefinition;
@@ -235,7 +236,7 @@ public static <T> Map<String, String> getRecordKeyToSecondaryKey(HoodieTableMeta
235236
}
236237

237238
public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
238-
List<Pair<String, FileSlice>> partitionFileSlicePairs,
239+
List<FileSliceAndPartition> partitionFileSlicePairs,
239240
int secondaryIndexMaxParallelism,
240241
String activeModule, HoodieTableMetaClient metaClient,
241242
HoodieIndexDefinition indexDefinition,
@@ -255,8 +256,8 @@ public static <T> HoodieData<HoodieRecord> readSecondaryKeysFromFileSlices(Hoodi
255256
engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFileSlicePairs.size() + " file slices");
256257
HoodieFileFormat baseFileFormat = metaClient.getTableConfig().getBaseFileFormat();
257258
return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> {
258-
final String partition = partitionAndBaseFile.getKey();
259-
final FileSlice fileSlice = partitionAndBaseFile.getValue();
259+
final String partition = partitionAndBaseFile.partitionPath();
260+
final FileSlice fileSlice = partitionAndBaseFile.fileSlice();
260261
Option<StoragePath> dataFilePath = Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> FSUtils.getAbsoluteFilePath(basePath, partition, baseFile.getFileName())).orElseGet(null));
261262
HoodieSchema readerSchema;
262263
if (dataFilePath.isPresent()) {

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.config.HoodieCleanConfig;
3434
import org.apache.hudi.config.HoodieWriteConfig;
3535
import org.apache.hudi.exception.HoodieException;
36+
import org.apache.hudi.metadata.model.FileInfo;
3637
import org.apache.hudi.storage.StorageConfiguration;
3738

3839
import org.junit.jupiter.api.BeforeEach;
@@ -44,6 +45,7 @@
4445
import org.mockito.MockedStatic;
4546

4647
import java.util.ArrayList;
48+
import java.util.Collections;
4749
import java.util.HashMap;
4850
import java.util.List;
4951
import java.util.Map;
@@ -156,7 +158,7 @@ void rollbackFailedWrites_avoidsTimelineReload() {
156158

157159
@Test
158160
void testConvertToColumnStatsRecordWithEmptyInputs() {
159-
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
161+
Map<String, List<FileInfo>> partitionFilesToAdd = new HashMap<>();
160162
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
161163

162164
Map<String, HoodieData<HoodieRecord>> result =
@@ -174,9 +176,9 @@ void testConvertToColumnStatsRecordWithEmptyColumnsToIndex() {
174176
any(), any(), any(), eq(false), any()))
175177
.thenReturn(emptyColumnsMap);
176178

177-
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
178-
Map<String, Long> filesToAdd = new HashMap<>();
179-
filesToAdd.put("file1.parquet", 1024L);
179+
Map<String, List<FileInfo>> partitionFilesToAdd = new HashMap<>();
180+
List<FileInfo> filesToAdd = new ArrayList<>();
181+
filesToAdd.add(FileInfo.of("file1.parquet", 1024L));
180182
partitionFilesToAdd.put("partition1", filesToAdd);
181183
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
182184

@@ -201,11 +203,11 @@ void testConvertToColumnStatsRecordWithValidColumns() {
201203
// Mock convertFilesToColumnStatsRecords to return empty HoodieData
202204
HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
203205
mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
204-
any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
206+
any(), any(), any(), any(), anyInt(), anyInt(), any()))
205207
.thenReturn(mockHoodieData);
206208

207-
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
208-
partitionFilesToAdd.put("partition1", new HashMap<>());
209+
Map<String, List<FileInfo>> partitionFilesToAdd = new HashMap<>();
210+
partitionFilesToAdd.put("partition1", Collections.emptyList());
209211
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
210212

211213
Map<String, HoodieData<HoodieRecord>> result = HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
@@ -223,7 +225,6 @@ void testConvertToColumnStatsRecordWithValidColumns() {
223225
eq(partitionFilesToDelete),
224226
eq(partitionFilesToAdd),
225227
eq(dataMetaClient),
226-
eq(metadataConfig),
227228
eq(4),
228229
eq(1024),
229230
any()
@@ -246,13 +247,13 @@ void testConvertToColumnStatsRecordWithMixedInputs() {
246247
// Mock convertFilesToColumnStatsRecords to return empty HoodieData
247248
HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
248249
mockedUtil.when(() -> HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
249-
any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
250+
any(), any(), any(), any(), anyInt(), anyInt(), any()))
250251
.thenReturn(mockHoodieData);
251252

252-
Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
253-
Map<String, Long> filesToAdd = new HashMap<>();
254-
filesToAdd.put("file1.parquet", 1024L);
255-
filesToAdd.put("file2.parquet", 2048L);
253+
Map<String, List<FileInfo>> partitionFilesToAdd = new HashMap<>();
254+
List<FileInfo> filesToAdd = new ArrayList<>();
255+
filesToAdd.add(FileInfo.of("file1.parquet", 1024L));
256+
filesToAdd.add(FileInfo.of("file2.parquet", 2048L));
256257
partitionFilesToAdd.put("partition1", filesToAdd);
257258

258259
Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
@@ -275,7 +276,6 @@ void testConvertToColumnStatsRecordWithMixedInputs() {
275276
eq(partitionFilesToDelete),
276277
eq(partitionFilesToAdd),
277278
eq(dataMetaClient),
278-
eq(metadataConfig),
279279
eq(4),
280280
eq(1024),
281281
any()

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,14 @@
2727
import org.apache.hudi.common.engine.HoodieEngineContext;
2828
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2929
import org.apache.hudi.common.model.HoodieFileGroupId;
30-
import org.apache.hudi.common.model.HoodieIndexDefinition;
3130
import org.apache.hudi.common.model.HoodieRecord;
32-
import org.apache.hudi.common.schema.HoodieSchema;
33-
import org.apache.hudi.common.table.HoodieTableMetaClient;
3431
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3532
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3633
import org.apache.hudi.common.util.Option;
3734
import org.apache.hudi.common.util.collection.Pair;
3835
import org.apache.hudi.config.HoodieWriteConfig;
3936
import org.apache.hudi.exception.HoodieNotSupportedException;
37+
import org.apache.hudi.metadata.index.UnsupportedExpressionIndexRecordGenerator;
4038
import org.apache.hudi.storage.StorageConfiguration;
4139
import org.apache.hudi.table.BulkInsertPartitioner;
4240

@@ -82,7 +80,7 @@ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
8280
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
8381
HoodieEngineContext engineContext,
8482
Option<String> inFlightInstantTimestamp) {
85-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp);
83+
this(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp, false);
8684
}
8785

8886
FlinkHoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
@@ -91,7 +89,7 @@ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
9189
HoodieEngineContext engineContext,
9290
Option<String> inFlightInstantTimestamp,
9391
boolean streamingWrites) {
94-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp, streamingWrites);
92+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, new UnsupportedExpressionIndexRecordGenerator(EngineType.FLINK), inFlightInstantTimestamp, streamingWrites);
9593
}
9694

9795
@Override
@@ -166,13 +164,6 @@ protected void preWrite(String instantTime) {
166164
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
167165
}
168166

169-
@Override
170-
protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
171-
HoodieTableMetaClient metaClient, int parallelism, HoodieSchema tableSchema, HoodieSchema readerSchema,
172-
StorageConfiguration<?> storageConf, String instantTime) {
173-
throw new HoodieNotSupportedException("Flink metadata table does not support expression index yet.");
174-
}
175-
176167
@Override
177168
protected List<WriteStatus> streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
178169
return getWriteClient().upsertPreppedRecords(fileGroupIdToTaggedRecords.getValue().collectAsList(), instantTime);

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,12 @@
2727
import org.apache.hudi.common.engine.HoodieEngineContext;
2828
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
2929
import org.apache.hudi.common.model.HoodieFileGroupId;
30-
import org.apache.hudi.common.model.HoodieIndexDefinition;
3130
import org.apache.hudi.common.model.HoodieRecord;
32-
import org.apache.hudi.common.schema.HoodieSchema;
33-
import org.apache.hudi.common.table.HoodieTableMetaClient;
3431
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3532
import org.apache.hudi.common.util.Option;
36-
import org.apache.hudi.common.util.collection.Pair;
3733
import org.apache.hudi.config.HoodieWriteConfig;
3834
import org.apache.hudi.exception.HoodieNotSupportedException;
35+
import org.apache.hudi.metadata.index.UnsupportedExpressionIndexRecordGenerator;
3936
import org.apache.hudi.storage.StorageConfiguration;
4037
import org.apache.hudi.table.BulkInsertPartitioner;
4138

@@ -59,7 +56,7 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada
5956
protected JavaHoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
6057
HoodieEngineContext engineContext,
6158
Option<String> inflightInstantTimestamp) {
62-
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp);
59+
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, new UnsupportedExpressionIndexRecordGenerator(EngineType.JAVA), inflightInstantTimestamp);
6360
}
6461

6562
public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
@@ -147,13 +144,6 @@ protected void preWrite(String instantTime) {
147144
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
148145
}
149146

150-
@Override
151-
protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
152-
HoodieTableMetaClient metaClient, int parallelism, HoodieSchema tableSchema, HoodieSchema readerSchema,
153-
StorageConfiguration<?> storageConf, String instantTime) {
154-
throw new HoodieNotSupportedException("Expression index not supported for Java metadata table writer yet.");
155-
}
156-
157147
@Override
158148
protected EngineType getEngineType() {
159149
return EngineType.JAVA;

0 commit comments

Comments
 (0)