Skip to content

[yaml] - mongodb read normalization#38772

Draft
derrickaw wants to merge 12 commits into
apache:masterfrom
derrickaw:20260602_readFromMongoDb
Draft

[yaml] - mongodb read normalization#38772
derrickaw wants to merge 12 commits into
apache:masterfrom
derrickaw:20260602_readFromMongoDb

Conversation

@derrickaw

Copy link
Copy Markdown
Collaborator
  1. Fixes [yaml] Normalize MongoDBIO #28690
  2. Adds the ReadFromMongoDb transform for yaml

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

arnavarora2004 and others added 10 commits June 9, 2026 13:21
Squashed draft MongoDB read connector changes from PR apache#35802.
…, and tests

- Fully implemented MongoDB read configuration and Provider with JSON schema parsing
- Enhanced MongoDbUtils with a deep BSON-to-Beam row conversion supporting all primitives, arrays, maps, and nested rows
- Added comprehensive Java unit tests for MongoDbUtils and MongoDbReadSchemaTransformProvider
- Mapped WriteToMongoDB and ReadFromMongoDB in standard_io.yaml
- Implemented end-to-end integration test verifying write/read pipeline against containerized MongoDB
…ansforms

- Standardized standard_io mappings to snake_case (error_handling, batch_size)
- Extended integration test to verify error-handling queues are empty for clean runs
@derrickaw derrickaw force-pushed the 20260602_readFromMongoDb branch from 9df8c17 to 3bec200 Compare June 9, 2026 13:21
@derrickaw

Copy link
Copy Markdown
Collaborator Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for reading from MongoDB as a SchemaTransform in the Java SDK, along with exposing this capability in Python YAML pipelines. It includes configuration classes, conversion utilities between BSON Documents and Beam Rows, and corresponding unit tests. Key feedback from the review highlights a potential serialization issue in the Java read transform due to an anonymous inner class, performance optimization opportunities in the BSON-to-Row conversion by using Map instead of Document to avoid copying nested structures, and a robustness improvement in the Python YAML reader to support string-based filters.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/yaml/yaml_io.py
@derrickaw

Copy link
Copy Markdown
Collaborator Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a MongoDB Read SchemaTransform to Apache Beam, enabling schema-based reads from MongoDB in both Java and YAML pipelines. Key additions include configuration and provider classes in Java, utility methods for converting BSON documents to Beam Rows, and integration with Python YAML IO. Feedback on the changes highlights a compilation error in MongoDbReadSchemaTransformProvider due to an incorrect .iterator() call, potential robustness issues in the error-handling path if doc.toJson() fails, missing defensive type checks in MongoDbUtils that could lead to ClassCastExceptions, and a suggestion to parse the projection parameter in Python YAML for consistency.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

final String filterStr = configuration.getFilter();
if (filterStr != null) {
read =
read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The .iterator() call on FindIterable returns a MongoCursor<Document>, which does not match the expected return type of withQueryFn (which expects FindIterable<Document>). This will cause a compilation error. Removing .iterator() will correctly return the FindIterable<Document>.

Suggested change
read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator());
read.withQueryFn(collection -> collection.find(Document.parse(filterStr)));

Comment on lines +142 to +143
byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If doc.toJson() throws an exception (e.g., due to unsupported BSON types or serialization issues), calling it again inside the catch block will cause another exception to be thrown, bypassing the error handling mechanism and crashing the pipeline. Wrapping doc.toJson() in a try-catch block and falling back to doc.toString() ensures robust error handling.

        byte[] docBytes;
        try {
          docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
        } catch (Exception jsonEx) {
          docBytes = doc.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
        }
        receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e));

}
case ARRAY:
case ITERABLE:
Iterable<?> iterable = (Iterable<?>) value;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent a ClassCastException when the BSON value does not match the expected schema type, we should defensively check if value is an instance of Iterable before casting.

        if (!(value instanceof Iterable)) {
          throw new IllegalArgumentException(
              "Expected Iterable for type " + fieldType + ", but got: " + value.getClass().getName());
        }
        Iterable<?> iterable = (Iterable<?>) value;

}
return rowList;
case MAP:
Map<?, ?> map = (Map<?, ?>) value;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent a ClassCastException when the BSON value does not match the expected schema type, we should defensively check if value is an instance of Map before casting.

        if (!(value instanceof Map)) {
          throw new IllegalArgumentException(
              "Expected Map for type " + fieldType + ", but got: " + value.getClass().getName());
        }
        Map<?, ?> map = (Map<?, ?>) value;

Comment on lines +761 to +762
if isinstance(filter, str):
filter = json.loads(filter)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with schema and filter, we should also parse projection if it is provided as a JSON string in the YAML configuration.

Suggested change
if isinstance(filter, str):
filter = json.loads(filter)
if isinstance(filter, str):
filter = json.loads(filter)
if isinstance(projection, str):
projection = json.loads(projection)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[yaml] Normalize MongoDBIO

2 participants