diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..34a6e02150e7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 4 } diff --git a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json index 5abe02fc09c7..3a009261f4f9 100644 --- a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index db95c6703857..c7981d697de4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -190,17 +190,15 @@ private void appendManifestFiles(Table table, Iterable fileWrit int specId = entry.getKey(); List files = entry.getValue(); PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId)); - ManifestWriter writer; - try (FileIO io = table.io()) { - writer = createManifestWriter(table.location(), uuid, spec, io); - for (DataFile file : files) { - writer.add(file); - committedDataFileByteSize.update(file.fileSizeInBytes()); - committedDataFileRecordCount.update(file.recordCount()); - } - writer.close(); - update.appendManifest(writer.toManifestFile()); + FileIO io = table.io(); + ManifestWriter writer = createManifestWriter(table.location(), uuid, spec, io); + for (DataFile file : files) { + writer.add(file); + committedDataFileByteSize.update(file.fileSizeInBytes()); + committedDataFileRecordCount.update(file.recordCount()); } + writer.close(); + update.appendManifest(writer.toManifestFile()); } update.commit(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 748dd319c076..e01b174decb0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.ReleaseInfo; @@ -45,7 +46,6 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.types.Type; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; import org.slf4j.Logger; @@ -54,7 +54,8 @@ @AutoValue public abstract class IcebergCatalogConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogConfig.class); - private transient @MonotonicNonNull Catalog cachedCatalog; + private static final ConcurrentHashMap CATALOG_CACHE = + new ConcurrentHashMap<>(); @Pure @Nullable @@ -75,27 +76,28 @@ public static Builder builder() { public abstract Builder toBuilder(); - public org.apache.iceberg.catalog.Catalog catalog() { - if (cachedCatalog == null) { - String catalogName = getCatalogName(); - if (catalogName == null) { - catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion(); - } - Map catalogProps = getCatalogProperties(); - if (catalogProps == null) { - catalogProps = Maps.newHashMap(); - } - Map confProps = getConfigProperties(); - if (confProps == null) { - confProps = Maps.newHashMap(); - } - Configuration config = new Configuration(); - for (Map.Entry prop : confProps.entrySet()) { - config.set(prop.getKey(), prop.getValue()); - } - cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config); + public Catalog catalog() { + return CATALOG_CACHE.computeIfAbsent(this, IcebergCatalogConfig::buildCatalog); + } + + private static Catalog buildCatalog(IcebergCatalogConfig catalogConfig) { + String catalogName = catalogConfig.getCatalogName(); + if (catalogName == null) { + catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion(); + } + Map catalogProps = catalogConfig.getCatalogProperties(); + if (catalogProps == null) { + catalogProps = Maps.newHashMap(); + } + Map confProps = catalogConfig.getConfigProperties(); + if (confProps == null) { + confProps = Maps.newHashMap(); + } + Configuration config = new Configuration(); + for (Map.Entry prop : confProps.entrySet()) { + config.set(prop.getKey(), prop.getValue()); } - return cachedCatalog; + return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config); } private void checkSupportsNamespaces() { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index eb79513df4f9..ba3b5eee5914 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -29,10 +29,8 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.schemas.Schema; @@ -60,7 +58,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.transforms.Transforms; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -434,20 +431,6 @@ public void close() throws IOException { state.dataFiles.clear(); } } finally { - // Close unique FileIO instances now that all writers are done. - // table.io() may return a shared FileIO; we deduplicate by identity - // so we close each underlying connection pool exactly once. - Set closedIOs = new HashSet<>(); - for (DestinationState state : destinations.values()) { - FileIO io = state.table.io(); - if (io != null && closedIOs.add(io)) { - try { - io.close(); - } catch (Exception e) { - LOG.warn("Failed to close FileIO for table '{}'", state.table.name(), e); - } - } - } destinations.clear(); } checkArgument( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index 81ec229df70f..b3485a7bcc4f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -209,10 +209,7 @@ public void close() throws IOException { fileScanTasks.clear(); fileScanTasks = null; } - if (io != null) { - io.close(); - io = null; - } + io = null; } @Override diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index db6ac943b8d1..d29ba8a4c82e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -1040,7 +1040,7 @@ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws IOExcepti * closes the shared FileIO. */ @Test - public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOException { + public void testRecordWriterManagerDoesNotCloseSharedFileIO() throws IOException { String tableName1 = "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); String tableName2 = @@ -1078,7 +1078,8 @@ public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws IOExcep writerManager.getSerializableDataFiles(); assertTrue("Should have data files for dest1", dataFiles.containsKey(dest1)); assertTrue("Should have data files for dest2", dataFiles.containsKey(dest2)); - assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed); + assertFalse( + "Shared FileIO should NOT be closed by RecordWriterManager", sharedTrackingIO.closed); } private static final class CloseTrackingFileIO implements FileIO { @@ -1201,4 +1202,44 @@ public void testGetOrCreateTable_refreshLogic() { // Verify that refresh() WAS called exactly once because the entry was stale. verify(mockTable, times(1)).refresh(); } + + /** + * Verifies that the shared FileIO survives across multiple bundles. This is the core regression + * test: if RecordWriterManager.close() closed the FileIO, the second bundle would fail with + * "Connection pool shut down". + */ + @Test + public void testFileIOSurvivesAcrossBundles() throws IOException { + String tableName = + "table_survive_" + UUID.randomUUID().toString().replace("-", "").substring(0, 6); + TableIdentifier tableId = TableIdentifier.of("default", tableName); + + Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA); + + CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io()); + Table spyTable = Mockito.spy(realTable); + Mockito.doReturn(sharedIO).when(spyTable).io(); + + Catalog spyCatalog = Mockito.spy(catalog); + Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId); + + WindowedValue dest = getWindowedDestination(tableName, null); + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + + // Bundle 1: write and close + RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, "file_b1", 1000, 3); + assertTrue(bundle1.write(dest, row)); + bundle1.close(); + assertFalse("FileIO must survive after bundle 1 close", sharedIO.closed); + assertTrue( + "Bundle 1 should produce data files", bundle1.getSerializableDataFiles().containsKey(dest)); + + // Bundle 2: write and close using the same catalog (simulates DoFn reuse) + RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, "file_b2", 1000, 3); + assertTrue(bundle2.write(dest, row)); + bundle2.close(); + assertFalse("FileIO must survive after bundle 2 close", sharedIO.closed); + assertTrue( + "Bundle 2 should produce data files", bundle2.getSerializableDataFiles().containsKey(dest)); + } }