[Java IO] Add ArrowFlight IO connector#37904
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new Java I/O connector for Apache Arrow Flight, significantly enhancing data transfer capabilities within Beam pipelines. The connector leverages Arrow Flight's high-performance RPC framework to facilitate efficient reading from and writing to Arrow Flight servers, minimizing serialization overhead and enabling faster data movement. This integration expands Beam's ecosystem by providing a robust solution for interacting with various Arrow Flight-compatible data systems. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
b384457 to
1b3518d
Compare
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Abacn
left a comment
There was a problem hiding this comment.
Thanks for the new IO! Had an initial comment to kick off
e1e26fd to
d0551f0
Compare
Add a new IO connector for Apache Arrow Flight, enabling high-performance data transfer over gRPC using the Arrow columnar format. Includes read (BoundedSource) and write (DoFn with doPut) support with endpoint-level split parallelism and bearer token authentication. Fixes apache#20116
- Register :sdks:java:io:arrow-flight in javaioPreCommit task (build.gradle.kts) - Add --add-opens JVM arg for Arrow native memory on JDK 17+ - Make host() nullable at AutoValue level to fix factory method NPE - Eagerly materialize rows from Arrow buffers to prevent stale access - Move root.setRowCount() after vector population for correct ordering - Use AtomicInteger for thread-safe write record counting in tests
d0551f0 to
7252108
Compare
|
Run Java_Amazon-Web-Services2_IO_Direct PreCommit |
|
Reminder, please take a look at this pr: @chamikaramj @damccorm @shunping |
| break; | ||
| default: | ||
| throw new IllegalArgumentException( | ||
| String.format( |
There was a problem hiding this comment.
Can we add a TODO here about supporting aggregation and logical types? Currently supported types are pretty limited
| } | ||
|
|
||
| test { | ||
| jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' |
There was a problem hiding this comment.
I understsnd add-opens are needed:
Caused by: java.lang.ExceptionInInitializerError: Exception java.lang.RuntimeException: Failed to initialize MemoryUtil. You must start Java with `--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED` (See https://arrow.apache.org/docs/java/install.html) [in thread "direct-runner-worker"]
at org.apache.arrow.memory.util.MemoryUtil.<clinit>(MemoryUtil.java:140)
this means using ArrowFlightIO one needs to set --JdkAddOpenModules=java.base/java.nio=ALL-UNNAMED pipeline options in Java17+. We should note this here and in ArrowFlightIO Javadoc
Also, Java8 runtime doesn't recognize '--add-opens' pipeline options. However we'll soon remove Java8 support so it's fine here.
| Read spec = source.spec; | ||
|
|
||
| String hostName = checkNotNull(spec.host(), "host"); | ||
| if (source.endpoint != null) { |
There was a problem hiding this comment.
A potential issue arises when a source (with endPoint=null) first get read (from endpoint[0]) then split is called on getSource(), it will produce a list of sources for each available endpoint, because source.endpoint=null here and source.split() will unconditionally split. Would this result in any issue like duplicate records get read?
Note that source are usually split before read, with a few exceptions (e.g. BoundedReadFromUnbounded fallback scenario)
| return ArrowType.Binary.INSTANCE; | ||
| case DATETIME: | ||
| return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"); | ||
| default: |
There was a problem hiding this comment.
same here, add TODO for aggregation and logical types
| } | ||
|
|
||
| @Override | ||
| public PDone expand(PCollection<Row> input) { |
There was a problem hiding this comment.
PDone is discouraged. Please add a TODO about implementing dead letter queue (preferred) or WriteWithResult
| return new Field(beamField.getName(), fieldType, Collections.emptyList()); | ||
| } | ||
|
|
||
| private ArrowType beamTypeToArrowType(Schema.FieldType beamType) { |
There was a problem hiding this comment.
Consider move Beam schema->Arrow Schema to ArrowConversion as a public API (the inversion of toBeamSchema)
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
# Conflicts: # CHANGES.md
|
[review-prs-verification] Files reviewed:
Tests/checks run and results:
Fixes committed/pushed:
Final CI status:
Merge/close result:
|
|
[review-prs-verification] Verified after resolving the master merge conflict and pushing
Notes:
|
Summary
Add a new I/O connector for Apache Arrow Flight, enabling high-performance data transfer over gRPC using the Arrow columnar format. Arrow Flight avoids serialization/deserialization overhead and can achieve 10-50x faster data transfer compared to JDBC/ODBC.
Fixes #20116
Changes
ArrowFlightIO.Read— BoundedSource-based read transform that fetches Arrow Flight schemas viagetInfo, splits work byFlightEndpoint, and converts record batches to BeamRowusing the existingArrowConversionextension.ArrowFlightIO.Write— DoFn-based write transform that buffers Beam Rows into configurable batches, converts them to Arrow vectors, and streams them to a Flight server viadoPut.FlightEndpointreturned by the server becomes a separate sub-source for parallel reads.withToken().flight-coreandflight-sqlartifacts inBeamModulePlugin.groovy; added the:sdks:java:io:arrow-flightmodule tosettings.gradle.kts.CHANGES.md(2.73.0) and the built-in connectors table inconnectors.md.Testing
Tests use a real in-process
FlightServerwith aTestFlightProducer:testRead()— starts a Flight server serving two rows, reads them withArrowFlightIO.read(), and asserts the output viaPAssert.testWrite()— writes two rows withArrowFlightIO.write()and verifies the producer received exactly 2 records.Both tests run the full pipeline end-to-end using
TestPipelineand the DirectRunner.Did this cause any problems?
Rollback/revert this commit and redeploy the service.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.