-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[yaml] - mongodb read normalization #38772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
43bc9f4
3a05926
eb55f98
205d95a
8095180
73aad3a
0a09db3
5808923
9983d7a
3bec200
e288769
79f4ff1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.mongodb; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; | ||
|
|
||
| import com.google.auto.value.AutoValue; | ||
| import java.io.Serializable; | ||
| import org.apache.beam.sdk.schemas.AutoValueSchema; | ||
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||
| import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; | ||
| import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| /** Configuration class for the MongoDB Read transform. */ | ||
| @DefaultSchema(AutoValueSchema.class) | ||
| @AutoValue | ||
| public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable { | ||
|
|
||
| @SchemaFieldDescription("The connection URI for the MongoDB server.") | ||
| public abstract String getUri(); | ||
|
|
||
| @SchemaFieldDescription("The MongoDB database to read from.") | ||
| public abstract String getDatabase(); | ||
|
|
||
| @SchemaFieldDescription("The MongoDB collection to read from.") | ||
| public abstract String getCollection(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).") | ||
| public abstract String getSchema(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "An optional BSON filter to apply to the read. This should be a valid JSON string.") | ||
| @Nullable | ||
| public abstract String getFilter(); | ||
|
|
||
| @SchemaFieldDescription( | ||
| "This option specifies whether and where to output rows that failed to be read.") | ||
| @Nullable | ||
| public abstract ErrorHandling getErrorHandling(); | ||
|
|
||
| public void validate() { | ||
| checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); | ||
| checkArgument( | ||
| getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); | ||
| checkArgument( | ||
| getCollection() != null && !getCollection().isEmpty(), | ||
| "MongoDB collection must be specified."); | ||
| checkArgument( | ||
| getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified."); | ||
| } | ||
|
|
||
| public static Builder builder() { | ||
| return new AutoValue_MongoDbReadSchemaTransformConfiguration.Builder(); | ||
| } | ||
|
|
||
| @AutoValue.Builder | ||
| public abstract static class Builder { | ||
| public abstract Builder setUri(String uri); | ||
|
|
||
| public abstract Builder setDatabase(String database); | ||
|
|
||
| public abstract Builder setCollection(String collection); | ||
|
|
||
| public abstract Builder setSchema(String schema); | ||
|
|
||
| public abstract Builder setFilter(String filter); | ||
|
|
||
| public abstract Builder setErrorHandling(ErrorHandling errorHandling); | ||
|
|
||
| public abstract MongoDbReadSchemaTransformConfiguration build(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.mongodb; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransform; | ||
| import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; | ||
| import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; | ||
| import org.apache.beam.sdk.schemas.utils.JsonUtils; | ||
| import org.apache.beam.sdk.transforms.DoFn; | ||
| import org.apache.beam.sdk.transforms.ParDo; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionRowTuple; | ||
| import org.apache.beam.sdk.values.PCollectionTuple; | ||
| import org.apache.beam.sdk.values.Row; | ||
| import org.apache.beam.sdk.values.TupleTag; | ||
| import org.apache.beam.sdk.values.TupleTagList; | ||
| import org.bson.Document; | ||
|
|
||
| /** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ | ||
| @AutoService(SchemaTransformProvider.class) | ||
| public class MongoDbReadSchemaTransformProvider | ||
| extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> { | ||
|
|
||
| private static final String OUTPUT_TAG_NAME = "output"; | ||
| public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; | ||
| public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; | ||
|
|
||
| private static final org.apache.beam.sdk.metrics.Counter errorCounter = | ||
| org.apache.beam.sdk.metrics.Metrics.counter( | ||
| MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); | ||
|
|
||
| @Override | ||
| protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { | ||
| return new MongoDbReadSchemaTransform(configuration); | ||
| } | ||
|
|
||
| @Override | ||
| public String identifier() { | ||
| return "beam:schematransform:org.apache.beam:mongodb_read:v1"; | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> inputCollectionNames() { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| @Override | ||
| public List<String> outputCollectionNames() { | ||
| return Collections.singletonList(OUTPUT_TAG_NAME); | ||
| } | ||
|
|
||
| /** The {@link SchemaTransform} that performs the read operation. */ | ||
| private static class MongoDbReadSchemaTransform extends SchemaTransform { | ||
| private final MongoDbReadSchemaTransformConfiguration configuration; | ||
|
|
||
| MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { | ||
| configuration.validate(); | ||
| this.configuration = configuration; | ||
| } | ||
|
|
||
| @Override | ||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||
| Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); | ||
|
|
||
| MongoDbIO.Read read = | ||
| MongoDbIO.read() | ||
| .withUri(configuration.getUri()) | ||
| .withDatabase(configuration.getDatabase()) | ||
| .withCollection(configuration.getCollection()); | ||
|
|
||
| final String filterStr = configuration.getFilter(); | ||
| if (filterStr != null) { | ||
| read = | ||
| read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator()); | ||
| } | ||
|
derrickaw marked this conversation as resolved.
|
||
|
|
||
| PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read); | ||
|
|
||
| boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); | ||
| Schema errorSchema = ErrorHandling.errorSchemaBytes(); | ||
|
|
||
| PCollectionTuple outputTuple = | ||
| mongoDocs.apply( | ||
| "ConvertToBeamRows", | ||
| ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema)) | ||
| .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); | ||
|
|
||
| PCollection<Row> beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema); | ||
| PCollection<Row> errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); | ||
|
|
||
| PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows); | ||
| ErrorHandling errorHandling = configuration.getErrorHandling(); | ||
| if (handleErrors && errorHandling != null) { | ||
| output = output.and(errorHandling.getOutput(), errorOutput); | ||
| } | ||
| return output; | ||
| } | ||
| } | ||
|
|
||
| /** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */ | ||
| static class DocumentToRowFn extends DoFn<Document, Row> { | ||
| private final Schema schema; | ||
| private final boolean handleErrors; | ||
| private final Schema errorSchema; | ||
|
|
||
| DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) { | ||
| this.schema = schema; | ||
| this.handleErrors = handleErrors; | ||
| this.errorSchema = errorSchema; | ||
| } | ||
|
|
||
| @ProcessElement | ||
| public void processElement(@Element Document doc, MultiOutputReceiver receiver) { | ||
| try { | ||
| receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema)); | ||
| } catch (Exception e) { | ||
| if (!handleErrors) { | ||
| throw new RuntimeException( | ||
| "Failed to convert BSON Document to Beam Row: " + doc.toJson(), e); | ||
| } | ||
| errorCounter.inc(); | ||
| byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); | ||
| receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); | ||
|
Comment on lines
+142
to
+143
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If byte[] docBytes;
try {
docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8);
} catch (Exception jsonEx) {
docBytes = doc.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
}
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); |
||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,13 +18,18 @@ | |
| package org.apache.beam.sdk.io.mongodb; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.beam.sdk.schemas.Schema; | ||
| import org.apache.beam.sdk.schemas.Schema.Field; | ||
| import org.apache.beam.sdk.schemas.Schema.FieldType; | ||
| import org.apache.beam.sdk.values.Row; | ||
| import org.bson.BsonNull; | ||
| import org.bson.Document; | ||
| import org.bson.types.Binary; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.joda.time.Instant; | ||
|
|
||
| /** Utility methods for MongoDB IO. */ | ||
| public class MongoDbUtils { | ||
|
|
@@ -71,4 +76,116 @@ public static Document toDocument(Row row) { | |
| } | ||
| return value; | ||
| } | ||
|
|
||
| /** | ||
| * Converts a BSON {@link Document} (or any Map representing fields) to a Beam {@link Row} | ||
| * matching the given {@link Schema}. | ||
| */ | ||
| public static Row toRow(Map<?, ?> doc, Schema schema) { | ||
| Row.Builder rowBuilder = Row.withSchema(schema); | ||
| for (Field field : schema.getFields()) { | ||
| Object value = doc.get(field.getName()); | ||
| rowBuilder.addValue(convertFromBsonValue(value, field.getType())); | ||
| } | ||
| return rowBuilder.build(); | ||
| } | ||
|
|
||
| @SuppressWarnings("JavaUtilDate") | ||
| private static @Nullable Object convertFromBsonValue( | ||
| @Nullable Object value, FieldType fieldType) { | ||
| if (value == null || value instanceof BsonNull) { | ||
| return null; | ||
| } | ||
|
|
||
| switch (fieldType.getTypeName()) { | ||
| case BYTE: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).byteValue() | ||
| : Byte.parseByte(value.toString()); | ||
| case INT16: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).shortValue() | ||
| : Short.parseShort(value.toString()); | ||
| case INT32: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).intValue() | ||
| : Integer.parseInt(value.toString()); | ||
| case INT64: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).longValue() | ||
| : Long.parseLong(value.toString()); | ||
| case FLOAT: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).floatValue() | ||
| : Float.parseFloat(value.toString()); | ||
| case DOUBLE: | ||
| return (value instanceof Number) | ||
| ? ((Number) value).doubleValue() | ||
| : Double.parseDouble(value.toString()); | ||
| case DECIMAL: | ||
| return (value instanceof Number) | ||
| ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) | ||
| : new java.math.BigDecimal(value.toString()); | ||
| case STRING: | ||
| return value.toString(); | ||
| case BOOLEAN: | ||
| return (value instanceof Boolean) | ||
| ? (Boolean) value | ||
| : Boolean.parseBoolean(value.toString()); | ||
| case DATETIME: | ||
| if (value instanceof java.util.Date) { | ||
| return new Instant(((java.util.Date) value).getTime()); | ||
| } else if (value instanceof Number) { | ||
| return new Instant(((Number) value).longValue()); | ||
| } else { | ||
| return Instant.parse(value.toString()); | ||
| } | ||
| case BYTES: | ||
| if (value instanceof Binary) { | ||
| return ((Binary) value).getData(); | ||
| } else if (value instanceof byte[]) { | ||
| return (byte[]) value; | ||
| } else { | ||
| return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); | ||
| } | ||
| case ARRAY: | ||
| case ITERABLE: | ||
| Iterable<?> iterable = (Iterable<?>) value; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent a if (!(value instanceof Iterable)) {
throw new IllegalArgumentException(
"Expected Iterable for type " + fieldType + ", but got: " + value.getClass().getName());
}
Iterable<?> iterable = (Iterable<?>) value; |
||
| List<@Nullable Object> rowList = new ArrayList<>(); | ||
| FieldType elementType = fieldType.getCollectionElementType(); | ||
| if (elementType == null) { | ||
| throw new IllegalArgumentException( | ||
| "Collection element type cannot be null for type: " + fieldType); | ||
| } | ||
| for (Object item : iterable) { | ||
| rowList.add(convertFromBsonValue(item, elementType)); | ||
| } | ||
| return rowList; | ||
| case MAP: | ||
| Map<?, ?> map = (Map<?, ?>) value; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent a if (!(value instanceof Map)) {
throw new IllegalArgumentException(
"Expected Map for type " + fieldType + ", but got: " + value.getClass().getName());
}
Map<?, ?> map = (Map<?, ?>) value; |
||
| Map<String, @Nullable Object> rowMap = new HashMap<>(); | ||
| FieldType valueType = fieldType.getMapValueType(); | ||
| if (valueType == null) { | ||
| throw new IllegalArgumentException( | ||
| "Map value type cannot be null for type: " + fieldType); | ||
| } | ||
| for (Map.Entry<?, ?> entry : map.entrySet()) { | ||
| rowMap.put( | ||
| String.valueOf(entry.getKey()), convertFromBsonValue(entry.getValue(), valueType)); | ||
| } | ||
| return rowMap; | ||
| case ROW: | ||
| Schema rowSchema = fieldType.getRowSchema(); | ||
| if (rowSchema == null) { | ||
| throw new IllegalArgumentException("Row schema cannot be null for type: " + fieldType); | ||
| } | ||
| if (value instanceof Map) { | ||
| return toRow((Map<?, ?>) value, rowSchema); | ||
| } else { | ||
| throw new IllegalArgumentException("Cannot convert value to Row: " + value); | ||
| } | ||
|
derrickaw marked this conversation as resolved.
|
||
| default: | ||
| throw new IllegalArgumentException("Unsupported field type: " + fieldType); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
.iterator()call onFindIterablereturns aMongoCursor<Document>, which does not match the expected return type ofwithQueryFn(which expectsFindIterable<Document>). This will cause a compilation error. Removing.iterator()will correctly return theFindIterable<Document>.