feat(index): Add Indexer abstraction and refactor metadata table init…#18348
feat(index): Add Indexer abstraction and refactor metadata table init…#18348cshuo wants to merge 5 commits intoapache:masterfrom
Conversation
| * including file-group initialization, commit, and partition state update. | ||
| */ | ||
| @Slf4j | ||
| public abstract class BaseIndexer implements Indexer { |
There was a problem hiding this comment.
To break the PR down, could we first add these abstraction classes to ease the reviews?
There was a problem hiding this comment.
How about I split changes in the PR into 2 commits, since the indexer abstraction and its implementations involve some refactor of util class, which also affects metadata writer.
d2b4e7e to
116a487
Compare
42e6f52 to
ec26eb4
Compare
yihua
left a comment
There was a problem hiding this comment.
Blocked on my review due to critical logic
bdfa722 to
3237e4d
Compare
08eecef to
db53699
Compare
db53699 to
8a03ea2
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review
Style & readability pass — a few naming and duplication nits below, otherwise the code is well-structured and readable. Style & readability pass — a few small naming and simplification nits below, nothing blocking. Style & readability pass — a handful of naming and readability suggestions below, mostly around the new Indexer abstraction and the refactored initialization flow. Style & readability pass — a few naming nits below, primarily around boolean accessor conventions (flag() → isDeleted()), a misleading name() accessor that returns a full path, an unused @Slf4j, and a comment that drifted away from the logic it describes after the refactor. Style & readability pass — a couple of minor naming and simplification opportunities in the test code, plus a comment that looks like it has a stray word and may not match its adjacent assertion. Style & readability pass — a couple of small naming and readability nits across the new test files; production code changes look clean. Style & readability pass — a few naming and consistency nits below, mostly around a stale Javadoc reference, an opaque boolean field name, a parameter/field name mismatch, and a mutability inconsistency across the new data-holder classes.
| reader.close(); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new HoodieIOException("Error reading total records from file " + path, e); |
There was a problem hiding this comment.
🤖 nit: small typo in the Javadoc — "context ot use" should be "context to use".
| */ | ||
| @Slf4j | ||
| public class SecondaryIndexer extends BaseIndexer { | ||
|
|
There was a problem hiding this comment.
🤖 nit: RECORD_INDEX_AVERAGE_RECORD_SIZE = 48 is also defined in BaseRecordIndexer. Since SecondaryIndexer can't inherit it, could this constant live in a shared place (e.g. HoodieTableMetadataUtil or a RecordIndexConstants class) to avoid the duplication?
| import static org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition; | ||
| import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; | ||
|
|
||
| /** |
There was a problem hiding this comment.
🤖 nit: the Javadoc says "Implementation of the global RECORD_INDEX index" — but this class is the partitioned variant. RecordIndexer is the global one. Might be worth updating to avoid confusion.
| Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new HashMap<>(partitionFileSlicePairsMap.size()); | ||
| int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty() | ||
| ? 1 : Math.max(1, dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / partitionFileSlicePairsMap.size()); | ||
| int totalFileGroupCount = 0; |
There was a problem hiding this comment.
🤖 nit: iterating over keySet() and then calling partitionFileSlicePairsMap.get(partition) inside is a classic two-lookup pattern. The second loop in this same method already uses entrySet() — could you align the first loop to do the same?
| DataPartitionAndRecords init = new DataPartitionAndRecords(2, Option.empty(), records); | ||
| ExposedRecordIndexer indexer = new ExposedRecordIndexer(engineContext, writeConfig, metaClient, init); | ||
|
|
||
| try (MockedStatic<org.apache.hudi.metadata.HoodieTableMetadataUtil> mockedUtil = mockStatic(org.apache.hudi.metadata.HoodieTableMetadataUtil.class)) { |
There was a problem hiding this comment.
🤖 nit: same fully-qualified org.apache.hudi.metadata.HoodieTableMetadataUtil usage as in TestPartitionedRecordIndexer — an import would clean this up and it repeats on lines 63, 111, and 116 too.
| * A class which represents a directory and the files and directories inside it. | ||
| * <p> | ||
| * A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file | ||
| * required for initializing the metadata table. Saving limited properties reduces the total memory footprint when |
There was a problem hiding this comment.
🤖 nit: the Javadoc references PartitionFileInfo but the class is DirectoryInfo — looks like a stale copy-paste. Could you update it to match?
| private final String partitionPath; | ||
| private final String fileName; | ||
| private final boolean flag; | ||
|
|
There was a problem hiding this comment.
🤖 nit: flag is pretty opaque — it's unclear what this boolean represents at the call site. Something like isDeleted or isAdded (or whatever semantic it carries) would read more clearly.
| } | ||
|
|
||
| public static FileInfoAndPartition of(String partition, String path, long size) { | ||
| return new FileInfoAndPartition(partition, path, size); |
There was a problem hiding this comment.
🤖 nit: the factory parameter is named path but the backing field is name — could you align them so readers don't have to mentally translate at the call site?
| */ | ||
| @AllArgsConstructor | ||
| @Getter | ||
| @Setter |
There was a problem hiding this comment.
🤖 nit: @Setter makes this mutable, which is inconsistent with FileInfo and FileInfoAndPartition in the same package — both use final fields and no setters. If mutation isn't needed here, have you considered dropping @Setter and making the fields final?
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice work on the Indexer abstraction refactor! This architectural improvement significantly simplifies the metadata initialization path and enhances test coverage, though there are several functional issues regarding initialization edge cases, unmodifiable maps, and serialization that require attention.
| String instantTime) { | ||
| if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { | ||
| throw new HoodieNotSupportedException("Table version 6 and below does not support expression index"); | ||
| } |
There was a problem hiding this comment.
🤖 The error message says "Table version 6 and below" but the check is lesserThan(HoodieTableVersion.EIGHT), which also covers version 7 (0.16.0). Should this say "Table version 7 and below" or "versions before 8"?
| if (partitionToAllFilesMap.isEmpty()) { | ||
| this.columnsToIndex = Lazy.lazily(Collections::emptyList); | ||
| return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); | ||
| } |
There was a problem hiding this comment.
🤖 Overwriting this.columnsToIndex with an empty list when partitionToAllFilesMap is empty means postInitialization will register the column_stats index definition with zero source fields. Is that intentional? If this indexer instance is reused for a subsequent call with non-empty partitions, the columns would also be lost.
| Map<String, Long> fileNameToSizeMap = partitionInfo.getValue().stream() | ||
| .collect(Collectors.toMap(FileInfo::fileName, FileInfo::size)); | ||
| return HoodieMetadataPayload.createPartitionFilesRecord( | ||
| partitionInfo.getKey(), fileNameToSizeMap, Collections.emptyList()); |
There was a problem hiding this comment.
🤖 Calling fileListRecords.count() triggers a full Spark action just for validation. For tables with many partitions, this doubles the initialization cost of this step. Could you validate using partitionToAllFilesMap.size() against partitions.size() instead (which is always true by construction), or remove this check?
There was a problem hiding this comment.
partitions is the key set of partitionToAllFilesMap, so partitions.size() is always equals to partitionToAllFilesMap.size(). It seems the validation here is unnecessary for the current code base, and it should be safe to be removed.
| if (partitionFileSlicePairs.isEmpty()) { | ||
| return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); | ||
| } | ||
|
|
There was a problem hiding this comment.
🤖 The Lazy wrapper here is created and immediately .get()-ed on the next line, so it doesn't actually provide any lazy evaluation benefit. Could simplify to a direct call: HoodieSchema tableSchema = HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient).orElseThrow(...);
| List<String> filesToDelete = filesDeleted.getOrDefault(partition, Collections.emptyList()); | ||
| fileDeleteCount[0] += filesToDelete.size(); | ||
| HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd, filesToDelete); | ||
| Map<String, Long> filesToAddMap = filesToAdd.stream().collect(Collectors.toMap(FileInfo::fileName, FileInfo::size)); |
There was a problem hiding this comment.
🤖 The old signature used Map<String, Long> which inherently prevented duplicate file names. The new List<FileInfo> can potentially contain duplicates, and Collectors.toMap(FileInfo::fileName, FileInfo::size) will throw IllegalStateException on duplicate keys. Could you add a merge function (e.g., Long::max) to handle this defensively, or document why duplicates can't occur?
| @EnumSource(classOf[HoodieTableType]) | ||
| override def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = { | ||
| val hudiOpts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), | ||
| HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true") |
There was a problem hiding this comment.
🤖 The comment mentions that partition stats should be present, but the assertion checks for a size of 2. If only files and col stats are expected, the count is correct, but the comment might be misleading.
| // FILES partition should always be initialized first if enabled | ||
| if (!filesPartitionAvailable) { | ||
| initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), | ||
| dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); |
There was a problem hiding this comment.
🤖 This will NPE if the metadata table directory exists but the FILES partition is not marked as available.
| this.enabledIndexerMap.remove(RECORD_INDEX); | ||
| indexerMapForPartitionsToInit.remove(RECORD_INDEX); | ||
| } | ||
|
|
There was a problem hiding this comment.
🤖 This will throw UnsupportedOperationException because enabledIndexerMap is unmodifiable.
There was a problem hiding this comment.
Fixed, now RECORD_INDEX partition will be filtered during initializing enabledIndexerMap.
| protected final transient HoodieEngineContext engineContext; | ||
| @Getter | ||
| protected final List<MetadataPartitionType> enabledPartitionTypes; | ||
| protected final transient Map<MetadataPartitionType, Indexer> enabledIndexerMap; |
There was a problem hiding this comment.
🤖 Making these fields transient breaks the Serializable contract of HoodieTableMetadataWriter.
There was a problem hiding this comment.
Marking these fields as transient is intentional. HoodieBackedTableMetadataWriter can be captured in Spark execution closures, so any field reachable from it must either be serializable or excluded from serialization. enabledIndexerMap / expressionIndexRecordGenerator and the existing field engineContext are runtime-only collaborators and are not serializable as part of the Spark closure. So the goal here is to avoid closure-serialization failures, not to make the writer meaningfully deserializable for later reuse.
| } | ||
| return Collections.unmodifiableMap(Arrays.stream(MetadataPartitionType.getValidValues(dataTableMetaClient.getTableConfig().getTableVersion())) | ||
| .filter(partitionType -> | ||
| (partitionType.isMetadataPartitionEnabled(dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient.getTableConfig()) |
There was a problem hiding this comment.
🤖 Is there a versioned getValidValues method added to MetadataPartitionType? This might cause a compilation error.
There was a problem hiding this comment.
Yes, there is a getValidValues method accepting a parameter of type HoodieTableVersion.
87975f4 to
397cf29
Compare
397cf29 to
e4ee525
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — The refactoring is clean and well-organized overall. One minor readability concern: the parameter order of the factory method doesn't match the constructor's field order, which could confuse developers. code looks clean overall, with one minor style inconsistency in variable naming within a stream pipeline.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Good progress on the Indexer abstraction — the decomposition into per-index-type indexers is clean and should make adding new index types much easier. A few items need attention before merging: FileAndPartitionFlag is missing Serializable (which will cause runtime failures on Spark), and there are edge cases to verify around empty-partition handling, HashMap iteration ordering for partition initialization, and ensuring hasPartitionsStateChanged is set correctly when initialization is skipped.
| dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), | ||
| columnsToIndex.get()); | ||
| return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), records)); | ||
| } |
There was a problem hiding this comment.
🤖 When columnsToIndex is empty and buildInitialization returns empty data, postInitialization will still be called and will register an index definition with empty source fields. Is that intentional? It seems like registering a column stats index with no source fields could confuse downstream readers.
There was a problem hiding this comment.
I've checked the logic of the current master branch, the behavior is same as this pr. Not sure if it's intentional cc @yihua
There was a problem hiding this comment.
There is an comment explain this case:
| log.info("Initializing partitioned record index from data partition {}", partition); | ||
| DataPartitionAndRecords dataPartitionAndRecords = initializeRecordIndexPartition(partition, fileSliceAndPartitions, maxParallelismPerHudiPartition); | ||
| fileGroupCountAndRecordsPairMap.put(partition, dataPartitionAndRecords); | ||
| totalFileGroupCount += dataPartitionAndRecords.numFileGroups(); |
There was a problem hiding this comment.
🤖 When partitionFileSlicePairsMap is empty (e.g., empty table), totalFileGroupCount will be 0 and initializationList will be empty, but the code still returns an IndexPartitionInitialization with 0 file groups. Could this cause issues downstream? The non-partitioned RecordIndexer path goes through estimateFileGroupCount which guarantees at least 1 file group via Math.max(1, ...). Is an empty-table scenario expected to be gated earlier in the call chain?
There was a problem hiding this comment.
The same logic exists in the master branch, and I think it's indeed an issue. If partitionFileSlicePairsMap is empty, it still returns a non-empty buildInitialization() result with totalFileGroupCount = 0 and no DataPartitionAndRecords. That means the writer will not skip initialization; instead it can mark the record index partition as available without creating any file groups.
I think we should early return a empty initialization result if partitionFileSlicePairsMap is empty, then metadata writer will skip initializing the partitioned RLI. WDYT? @yihua
| @Override | ||
| public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { | ||
| super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); | ||
| // Validate record index after commit if validation is enabled |
There was a problem hiding this comment.
🤖 Worth noting that since records passed to postInitialization is the union of all per-partition records (built in initializeMetadataPartition), calling records.unpersist() on a Spark union RDD does not automatically unpersist the parent RDDs that were individually persisted inside estimateFileGroupCount. For non-partitioned RLI this is fine (single source), but for partitioned RLI, the individual per-partition records that were persisted may leak. The old code had the same pattern though, so it's not a regression.
|
@yihua all comments are resolved or replied. Can you take another look. |
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Nice updates — all the feedback from the previous review has been addressed cleanly:
FileAndPartitionFlagSerializable (BUG): Fixed. All model POJOs (FileAndPartitionFlag,FileInfo,FileInfoAndPartition,FileSliceAndPartition,DirectoryInfo) now implementSerializablewith explicitserialVersionUID.hasPartitionsStateChangedset unconditionally (BUG): Fixed. The flag is now set at the end ofinitializeMetadataPartition, after the early-return guard for empty initialization lists, so it's only set when work actually happens.- Empty
filesToIndexcausing zero parallelism (BUG): Fixed. The emptiness check inExpressionIndexernow correctly guards onfilesToIndex.isEmpty()instead ofpartitionFileSlicePairs.isEmpty(). records.unpersist()not recursive (QUESTION): Fixed. Changed tounpersistWithDependencies()to clean up per-partition persisted datasets.- Empty
columnsToIndexregistering empty source fields (QUESTION): Fixed.postInitializationnow checksrecords.isEmpty()and registers with an empty list when there are no records. - SecondaryIndexer partition type and HashMap ordering questions were answered by the author with reasonable justifications.
…ialization logic
Describe the issue this Pull Request addresses
The metadata table initialization path had index-specific logic scattered across multiple metadata writer/util classes, which made the code hard to extend and maintain as new index types are added.
This PR addresses that by introducing a dedicated
Indexerabstraction and refactoring initialization flows to use it consistently across engine-specific implementations.This PR is based on #12983
Summary and Changelog
Commit 1: Add
Indexerabstraction and related POJOsIndexer,BaseIndexer,IndexerFactoryRecordIndexer,PartitionedRecordIndexer,BaseRecordIndexer)Commit 2: Refactor metadata table initialization to use the abstraction
Indexerabstraction.HoodieBackedTableMetadataWriterImpact
Risk Level
low
Documentation Update
Contributor's checklist