Skip to content

Commit d0551f0

Browse files
committed
Add arrow-flight to javaioPreCommit and fix test issues
- Register :sdks:java:io:arrow-flight in javaioPreCommit task (build.gradle.kts) - Add --add-opens JVM arg for Arrow native memory on JDK 17+ - Make host() nullable at AutoValue level to fix factory method NPE - Eagerly materialize rows from Arrow buffers to prevent stale access - Move root.setRowCount() after vector population for correct ordering - Use AtomicInteger for thread-safe write record counting in tests
1 parent 4b7b933 commit d0551f0

4 files changed

Lines changed: 36 additions & 15 deletions

File tree

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ tasks.register("javaPreCommit") {
343343
// a precommit task build multiple IOs (except those splitting into single jobs)
344344
tasks.register("javaioPreCommit") {
345345
dependsOn(":sdks:java:io:amqp:build")
346+
dependsOn(":sdks:java:io:arrow-flight:build")
346347
dependsOn(":sdks:java:io:cassandra:build")
347348
dependsOn(":sdks:java:io:csv:build")
348349
dependsOn(":sdks:java:io:cdap:build")

sdks/java/io/arrow-flight/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,7 @@ dependencies {
3737
testRuntimeOnly library.java.slf4j_simple
3838
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
3939
}
40+
41+
test {
42+
jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED'
43+
}

sdks/java/io/arrow-flight/src/main/java/org/apache/beam/sdk/io/arrowflight/ArrowFlightIO.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ int getPort(int defaultPort) {
192192
@AutoValue
193193
public abstract static class Read extends PTransform<PBegin, PCollection<Row>> {
194194

195-
abstract String host();
195+
abstract @Nullable String host();
196196

197197
abstract int port();
198198

@@ -252,7 +252,8 @@ public PCollection<Row> expand(PBegin input) {
252252

253253
Schema beamSchema;
254254
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
255-
FlightClient client = createClient(allocator, host(), port(), useTls())) {
255+
FlightClient client =
256+
createClient(allocator, checkNotNull(host(), "host"), port(), useTls())) {
256257
FlightInfo info =
257258
client.getInfo(
258259
FlightDescriptor.command(
@@ -283,7 +284,7 @@ HeaderCallOption[] callOptions() {
283284
@Override
284285
public void populateDisplayData(DisplayData.Builder builder) {
285286
super.populateDisplayData(builder);
286-
builder.add(DisplayData.item("host", host()));
287+
builder.addIfNotNull(DisplayData.item("host", host()));
287288
builder.add(DisplayData.item("port", port()));
288289
builder.add(DisplayData.item("useTls", useTls()));
289290
builder.addIfNotNull(DisplayData.item("command", command()));
@@ -315,15 +316,18 @@ public List<? extends BoundedSource<Row>> split(
315316

316317
List<BoundedSource<Row>> sources = new ArrayList<>();
317318
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
318-
FlightClient client = createClient(allocator, spec.host(), spec.port(), spec.useTls())) {
319+
FlightClient client =
320+
createClient(
321+
allocator, checkNotNull(spec.host(), "host"), spec.port(), spec.useTls())) {
319322
FlightInfo info =
320323
client.getInfo(
321324
FlightDescriptor.command(
322325
checkNotNull(spec.command(), "command").getBytes(StandardCharsets.UTF_8)),
323326
spec.callOptions());
324327
for (FlightEndpoint fe : info.getEndpoints()) {
325328
SerializableEndpoint se =
326-
SerializableEndpoint.fromFlightEndpoint(fe, spec.host(), spec.port());
329+
SerializableEndpoint.fromFlightEndpoint(
330+
fe, checkNotNull(spec.host(), "host"), spec.port());
327331
sources.add(new FlightBoundedSource(spec, beamSchema, se));
328332
}
329333
}
@@ -340,7 +344,9 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
340344
return -1;
341345
}
342346
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
343-
FlightClient client = createClient(allocator, spec.host(), spec.port(), spec.useTls())) {
347+
FlightClient client =
348+
createClient(
349+
allocator, checkNotNull(spec.host(), "host"), spec.port(), spec.useTls())) {
344350
FlightInfo info =
345351
client.getInfo(
346352
FlightDescriptor.command(
@@ -388,13 +394,14 @@ public boolean start() throws IOException {
388394
allocator = new RootAllocator(Long.MAX_VALUE);
389395
Read spec = source.spec;
390396

397+
String hostName = checkNotNull(spec.host(), "host");
391398
if (source.endpoint != null) {
392-
String host = source.endpoint.getHost(spec.host());
399+
String host = source.endpoint.getHost(hostName);
393400
int port = source.endpoint.getPort(spec.port());
394401
client = createClient(allocator, host, port, spec.useTls());
395402
stream = client.getStream(source.endpoint.getTicket(), spec.callOptions());
396403
} else {
397-
client = createClient(allocator, spec.host(), spec.port(), spec.useTls());
404+
client = createClient(allocator, hostName, spec.port(), spec.useTls());
398405
FlightInfo info =
399406
client.getInfo(
400407
FlightDescriptor.command(
@@ -422,7 +429,15 @@ public boolean advance() throws IOException {
422429
if (stream.next()) {
423430
VectorSchemaRoot root = stream.getRoot();
424431
if (root.getRowCount() > 0) {
425-
currentBatchIterator = ArrowConversion.rowsFromRecordBatch(source.beamSchema, root);
432+
Iterator<Row> lazyIterator =
433+
ArrowConversion.rowsFromRecordBatch(source.beamSchema, root);
434+
List<Row> materializedRows = new ArrayList<>();
435+
while (lazyIterator.hasNext()) {
436+
Row lazyRow = lazyIterator.next();
437+
materializedRows.add(
438+
Row.withSchema(source.beamSchema).addValues(lazyRow.getValues()).build());
439+
}
440+
currentBatchIterator = materializedRows.iterator();
426441
}
427442
} else {
428443
return false;
@@ -472,7 +487,7 @@ public BoundedSource<Row> getCurrentSource() {
472487
@AutoValue
473488
public abstract static class Write extends PTransform<PCollection<Row>, PDone> {
474489

475-
abstract String host();
490+
abstract @Nullable String host();
476491

477492
abstract int port();
478493

@@ -547,7 +562,7 @@ public PDone expand(PCollection<Row> input) {
547562
@Override
548563
public void populateDisplayData(DisplayData.Builder builder) {
549564
super.populateDisplayData(builder);
550-
builder.add(DisplayData.item("host", host()));
565+
builder.addIfNotNull(DisplayData.item("host", host()));
551566
builder.add(DisplayData.item("port", port()));
552567
builder.add(DisplayData.item("useTls", useTls()));
553568
builder.addIfNotNull(DisplayData.item("descriptor", descriptor()));
@@ -665,7 +680,6 @@ private void flush() {
665680
}
666681
ensureConnection();
667682

668-
root.setRowCount(batch.size());
669683
for (int colIdx = 0; colIdx < beamSchema.getFieldCount(); colIdx++) {
670684
FieldVector vector = root.getVector(colIdx);
671685
vector.allocateNew();
@@ -680,6 +694,7 @@ private void flush() {
680694
}
681695
vector.setValueCount(batch.size());
682696
}
697+
root.setRowCount(batch.size());
683698

684699
listener.putNext();
685700
RECORDS_WRITTEN.inc(batch.size());

sdks/java/io/arrow-flight/src/test/java/org/apache/beam/sdk/io/arrowflight/ArrowFlightIOTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.nio.charset.StandardCharsets;
2323
import java.util.Collections;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import org.apache.arrow.flight.Action;
2526
import org.apache.arrow.flight.ActionType;
2627
import org.apache.arrow.flight.Criteria;
@@ -121,14 +122,14 @@ public void testWrite() throws Exception {
121122

122123
pipeline.run().waitUntilFinish();
123124

124-
assertEquals(2, producer.writtenRecords);
125+
assertEquals(2, producer.writtenRecords.get());
125126
}
126127

127128
/** A simple FlightProducer that returns predefined data for reads and counts writes. */
128129
private static class TestFlightProducer implements FlightProducer {
129130

130131
private final BufferAllocator allocator;
131-
int writtenRecords = 0;
132+
final AtomicInteger writtenRecords = new AtomicInteger();
132133

133134
TestFlightProducer(BufferAllocator allocator) {
134135
this.allocator = allocator;
@@ -189,7 +190,7 @@ public Runnable acceptPut(
189190
try {
190191
while (flightStream.next()) {
191192
VectorSchemaRoot root = flightStream.getRoot();
192-
writtenRecords += root.getRowCount();
193+
writtenRecords.addAndGet(root.getRowCount());
193194
}
194195
ackStream.onCompleted();
195196
} catch (Exception e) {

0 commit comments

Comments
 (0)