Skip to content

Commit e51fb74

Browse files
authored
Fix LineagePluginTest on runners other than Direct runner (#38296)
1 parent 3776a99 commit e51fb74

2 files changed

Lines changed: 18 additions & 43 deletions

File tree

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@
44
"comment": "Modify this file in a trivial way to cause this test suite to run",
55
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
66
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute",
7-
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
7+
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
8+
"modification": 1
89
}

sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java

Lines changed: 16 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import java.util.Arrays;
2828
import java.util.List;
29-
import org.apache.beam.sdk.PipelineResult;
3029
import org.apache.beam.sdk.metrics.Lineage;
3130
import org.apache.beam.sdk.options.PipelineOptionsFactory;
3231
import org.apache.beam.sdk.testing.NeedsRunner;
@@ -52,6 +51,8 @@ public class LineagePluginTest {
5251

5352
private static final Logger LOG = LoggerFactory.getLogger(LineagePluginTest.class);
5453

54+
@Rule public TestPipeline testPipeline = createTestPipelineWithLineage();
55+
5556
/**
5657
* TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful
5758
* test output clean while providing deep debugging for failures.
@@ -89,13 +90,11 @@ public void setUp() {
8990
}
9091

9192
/** Helper to create a TestPipeline with test lineage configured. */
92-
private TestPipeline createTestPipelineWithLineage() {
93-
LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class);
93+
private static TestPipeline createTestPipelineWithLineage() {
94+
TestPipeline testPipeline = TestPipeline.create();
95+
LineageOptions options = testPipeline.getOptions().as(LineageOptions.class);
9496
options.setLineageType(TestLineage.class);
95-
TestPipeline pipeline = TestPipeline.fromOptions(options);
96-
// Disable enforcement since we're not using @Rule
97-
pipeline.enableAbandonedNodeEnforcement(false);
98-
return pipeline;
97+
return testPipeline;
9998
}
10099

101100
@Test
@@ -120,16 +119,11 @@ public void testExplicitLineageSelection() {
120119
@Test
121120
@Category(NeedsRunner.class)
122121
public void testLineageIntegrationWithSimpleFQN() {
123-
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
124-
TestPipeline pipeline = createTestPipelineWithLineage();
125-
126122
// Run pipeline that records lineage
127-
pipeline
123+
testPipeline
128124
.apply(Create.of("a", "b", "c"))
129125
.apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table"))));
130-
131-
PipelineResult result = pipeline.run();
132-
result.waitUntilFinish();
126+
testPipeline.run();
133127

134128
// Verify lineage was recorded
135129
List<String> sources = TestLineage.getRecordedSources();
@@ -139,21 +133,16 @@ public void testLineageIntegrationWithSimpleFQN() {
139133
@Test
140134
@Category(NeedsRunner.class)
141135
public void testLineageIntegrationWithSubtype() {
142-
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
143-
TestPipeline pipeline = createTestPipelineWithLineage();
144-
145136
// Run pipeline that records lineage with subtype
146-
pipeline
137+
testPipeline
147138
.apply(Create.of(1, 2, 3))
148139
.apply(
149140
ParDo.of(
150141
new RecordSourceLineageWithSubtypeDoFn(
151142
"spanner",
152143
"table",
153144
Arrays.asList("project", "instance", "database", "table"))));
154-
155-
PipelineResult result = pipeline.run();
156-
result.waitUntilFinish();
145+
testPipeline.run();
157146

158147
// Verify lineage was recorded with subtype
159148
List<String> sources = TestLineage.getRecordedSources();
@@ -163,19 +152,14 @@ public void testLineageIntegrationWithSubtype() {
163152
@Test
164153
@Category(NeedsRunner.class)
165154
public void testLineageIntegrationWithLastSegmentSeparator() {
166-
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
167-
TestPipeline pipeline = createTestPipelineWithLineage();
168-
169155
// Run pipeline that records lineage with custom separator
170-
pipeline
156+
testPipeline
171157
.apply(Create.of("x", "y", "z"))
172158
.apply(
173159
ParDo.of(
174160
new RecordSourceLineageWithSeparatorDoFn(
175161
"gcs", Arrays.asList("bucket", "path/to/file.txt"), "/")));
176-
177-
PipelineResult result = pipeline.run();
178-
result.waitUntilFinish();
162+
testPipeline.run();
179163

180164
// Verify lineage was recorded with separator
181165
List<String> sources = TestLineage.getRecordedSources();
@@ -185,16 +169,11 @@ public void testLineageIntegrationWithLastSegmentSeparator() {
185169
@Test
186170
@Category(NeedsRunner.class)
187171
public void testLineageIntegrationWithBothSourcesAndSinks() {
188-
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
189-
TestPipeline pipeline = createTestPipelineWithLineage();
190-
191172
// Run pipeline that records both source and sink lineage
192-
pipeline
173+
testPipeline
193174
.apply(Create.of("data1", "data2"))
194175
.apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn()));
195-
196-
PipelineResult result = pipeline.run();
197-
result.waitUntilFinish();
176+
testPipeline.run();
198177

199178
// Verify both source and sink lineage were recorded
200179
List<String> sources = TestLineage.getRecordedSources();
@@ -207,16 +186,11 @@ public void testLineageIntegrationWithBothSourcesAndSinks() {
207186
@Test
208187
@Category(NeedsRunner.class)
209188
public void testLineageIntegrationWithMultipleElements() {
210-
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
211-
TestPipeline pipeline = createTestPipelineWithLineage();
212-
213189
// Run pipeline with multiple elements to test thread safety
214-
pipeline
190+
testPipeline
215191
.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
216192
.apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource"))));
217-
218-
PipelineResult result = pipeline.run();
219-
result.waitUntilFinish();
193+
testPipeline.run();
220194

221195
// Verify lineage was recorded for all elements (may have duplicates)
222196
List<String> sources = TestLineage.getRecordedSources();

0 commit comments

Comments
 (0)