[yaml] - mongodb read normalization#38772
Conversation
Squashed draft MongoDB read connector changes from PR apache#35802.
…, and tests - Fully implemented MongoDB read configuration and Provider with JSON schema parsing - Enhanced MongoDbUtils with a deep BSON-to-Beam row conversion supporting all primitives, arrays, maps, and nested rows - Added comprehensive Java unit tests for MongoDbUtils and MongoDbReadSchemaTransformProvider - Mapped WriteToMongoDB and ReadFromMongoDB in standard_io.yaml - Implemented end-to-end integration test verifying write/read pipeline against containerized MongoDB
…ansforms - Standardized standard_io mappings to snake_case (error_handling, batch_size) - Extended integration test to verify error-handling queues are empty for clean runs
9df8c17 to
3bec200
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for reading from MongoDB as a SchemaTransform in the Java SDK, along with exposing this capability in Python YAML pipelines. It includes configuration classes, conversion utilities between BSON Documents and Beam Rows, and corresponding unit tests. Key feedback from the review highlights a potential serialization issue in the Java read transform due to an anonymous inner class, performance optimization opportunities in the BSON-to-Row conversion by using Map instead of Document to avoid copying nested structures, and a robustness improvement in the Python YAML reader to support string-based filters.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a MongoDB Read SchemaTransform to Apache Beam, enabling schema-based reads from MongoDB in both Java and YAML pipelines. Key additions include configuration and provider classes in Java, utility methods for converting BSON documents to Beam Rows, and integration with Python YAML IO. Feedback on the changes highlights a compilation error in MongoDbReadSchemaTransformProvider due to an incorrect .iterator() call, potential robustness issues in the error-handling path if doc.toJson() fails, missing defensive type checks in MongoDbUtils that could lead to ClassCastExceptions, and a suggestion to parse the projection parameter in Python YAML for consistency.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| final String filterStr = configuration.getFilter(); | ||
| if (filterStr != null) { | ||
| read = | ||
| read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator()); |
There was a problem hiding this comment.
The .iterator() call on FindIterable returns a MongoCursor<Document>, which does not match the expected return type of withQueryFn (which expects FindIterable<Document>). This will cause a compilation error. Removing .iterator() will correctly return the FindIterable<Document>.
| read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator()); | |
| read.withQueryFn(collection -> collection.find(Document.parse(filterStr))); |
| byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); | ||
| receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); |
There was a problem hiding this comment.
If doc.toJson() throws an exception (e.g., due to unsupported BSON types or serialization issues), calling it again inside the catch block will cause another exception to be thrown, bypassing the error handling mechanism and crashing the pipeline. Wrapping doc.toJson() in a try-catch block and falling back to doc.toString() ensures robust error handling.
byte[] docBytes;
try {
docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
} catch (Exception jsonEx) {
docBytes = doc.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
}
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e));| } | ||
| case ARRAY: | ||
| case ITERABLE: | ||
| Iterable<?> iterable = (Iterable<?>) value; |
There was a problem hiding this comment.
To prevent a ClassCastException when the BSON value does not match the expected schema type, we should defensively check if value is an instance of Iterable before casting.
if (!(value instanceof Iterable)) {
throw new IllegalArgumentException(
"Expected Iterable for type " + fieldType + ", but got: " + value.getClass().getName());
}
Iterable<?> iterable = (Iterable<?>) value;| } | ||
| return rowList; | ||
| case MAP: | ||
| Map<?, ?> map = (Map<?, ?>) value; |
There was a problem hiding this comment.
To prevent a ClassCastException when the BSON value does not match the expected schema type, we should defensively check if value is an instance of Map before casting.
if (!(value instanceof Map)) {
throw new IllegalArgumentException(
"Expected Map for type " + fieldType + ", but got: " + value.getClass().getName());
}
Map<?, ?> map = (Map<?, ?>) value;| if isinstance(filter, str): | ||
| filter = json.loads(filter) |
There was a problem hiding this comment.
For consistency with schema and filter, we should also parse projection if it is provided as a JSON string in the YAML configuration.
| if isinstance(filter, str): | |
| filter = json.loads(filter) | |
| if isinstance(filter, str): | |
| filter = json.loads(filter) | |
| if isinstance(projection, str): | |
| projection = json.loads(projection) |
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.