Skip to content

Commit 7799ee7

Browse files
committed
DynamoDB CDC: Fix MODIFY operation by propagating NewImage fully
1 parent 4c510c5 commit 7799ee7

4 files changed

Lines changed: 60 additions & 58 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## Unreleased
4+
- DynamoDB CDC: Fix `MODIFY` operation by propagating `NewImage` fully
45

56
## 2024/09/25 v0.0.18
67
- MongoDB: Improved `MongoDBCrateDBConverter.decode_canonical` to also

src/commons_codec/transform/dynamodb.py

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from commons_codec.model import (
1111
DualRecord,
1212
SQLOperation,
13-
SQLParameterizedSetClause,
1413
SQLParameterizedWhereClause,
1514
)
1615
from commons_codec.util.data import TaggableList
@@ -182,11 +181,14 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
182181
del new_image[key]
183182

184183
dual_record = self.decode_record(event["dynamodb"]["NewImage"])
185-
set_clause = self.update_clause(dual_record)
186184

187185
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
188-
sql = f"UPDATE {self.table_name} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};"
189-
parameters = set_clause.values # noqa: PD011
186+
sql = (
187+
f"UPDATE {self.table_name} "
188+
f"SET {self.TYPED_COLUMN}=:typed, {self.UNTYPED_COLUMN}=:untyped "
189+
f"WHERE {where_clause.to_sql()};"
190+
)
191+
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
190192
parameters.update(where_clause.values)
191193

192194
elif event_name == "REMOVE":
@@ -199,35 +201,6 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
199201

200202
return SQLOperation(sql, parameters)
201203

202-
def update_clause(self, dual_record: DualRecord) -> SQLParameterizedSetClause:
203-
"""
204-
Serializes an image to a comma-separated list of column/values pairs
205-
that can be used in the `SET` clause of an `UPDATE` statement.
206-
207-
IN:
208-
{'humidity': {'N': '84.84'}, 'temperature': {'N': '55.66'}}
209-
210-
OUT:
211-
data['humidity'] = '84.84', data['temperature'] = '55.66'
212-
"""
213-
214-
clause = SQLParameterizedSetClause()
215-
self.record_to_set_clause(dual_record.typed, self.TYPED_COLUMN, clause)
216-
self.record_to_set_clause(dual_record.untyped, self.UNTYPED_COLUMN, clause)
217-
return clause
218-
219-
@staticmethod
220-
def record_to_set_clause(record: t.Dict[str, t.Any], container_column: str, clause: SQLParameterizedSetClause):
221-
for column, value in record.items():
222-
rval = None
223-
if isinstance(value, dict):
224-
rval = f"CAST(:{column} AS OBJECT)"
225-
226-
elif isinstance(value, list) and value and isinstance(value[0], dict):
227-
rval = f"CAST(:{column} AS OBJECT[])"
228-
229-
clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)
230-
231204
def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
232205
"""
233206
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.

tests/transform/test_dynamodb_cdc.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
pytestmark = pytest.mark.dynamodb
1010

11-
1211
READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84}
1312

1413
MSG_UNKNOWN_SOURCE = {
@@ -240,44 +239,44 @@ def test_decode_cdc_insert_nested():
240239

241240
def test_decode_cdc_modify_basic():
242241
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation(
243-
statement="UPDATE foo SET "
244-
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
245-
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
246-
"data['empty_string']=:empty_string, data['null_string']=:null_string "
242+
statement="UPDATE foo SET data=:typed, aux=:untyped "
247243
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
248244
parameters={
249245
"device": "foo",
250246
"timestamp": "2024-07-12T01:17:42",
251-
"humidity": 84.84,
252-
"temperature": 55.66,
253-
"location": "Sydney",
254-
"string_set": ["location_1"],
255-
"number_set": [0.34, 1.0, 2.0, 3.0],
256-
"binary_set": ["U3Vubnk="],
257-
"empty_string": "",
258-
"null_string": None,
247+
"typed": {
248+
"humidity": 84.84,
249+
"temperature": 55.66,
250+
"location": "Sydney",
251+
"string_set": ["location_1"],
252+
"number_set": [0.34, 1.0, 2.0, 3.0],
253+
"binary_set": ["U3Vubnk="],
254+
"empty_string": "",
255+
"null_string": None,
256+
},
257+
"untyped": {},
259258
},
260259
)
261260

262261

263262
def test_decode_cdc_modify_nested():
264263
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation(
265-
statement="UPDATE foo SET "
266-
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
267-
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
268-
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
264+
statement="UPDATE foo SET data=:typed, aux=:untyped "
269265
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
270266
parameters={
271267
"device": "foo",
272268
"timestamp": "2024-07-12T01:17:42",
273-
"tags": ["foo", "bar"],
274-
"empty_map": {},
275-
"empty_list": [],
276-
"string_set": ["location_1"],
277-
"number_set": [0.34, 1.0, 2.0, 3.0],
278-
"binary_set": ["U3Vubnk="],
279-
"somemap": {"test": 1.0, "test2": 2.0},
280-
"list_of_objects": [{"foo": "bar"}, {"baz": "qux"}],
269+
"typed": {
270+
"tags": ["foo", "bar"],
271+
"empty_map": {},
272+
"empty_list": [],
273+
"string_set": ["location_1"],
274+
"number_set": [0.34, 1.0, 2.0, 3.0],
275+
"binary_set": ["U3Vubnk="],
276+
"somemap": {"test": 1.0, "test2": 2.0},
277+
"list_of_objects": [{"foo": "bar"}, {"baz": "qux"}],
278+
},
279+
"untyped": {},
281280
},
282281
)
283282

tests/transform/test_model.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from commons_codec.model import SQLParameterizedClause
2+
3+
4+
def test_parameterized_clause_rval_set():
5+
clause = SQLParameterizedClause()
6+
7+
container_column = "data"
8+
column = "foo"
9+
value = "bar"
10+
rval = f"CAST(:{column} AS OBJECT)"
11+
12+
clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)
13+
14+
assert clause == SQLParameterizedClause(
15+
lvals=["data['foo']"], rvals=["CAST(:foo AS OBJECT)"], values={"foo": "bar"}
16+
)
17+
18+
19+
def test_parameterized_clause_rval_unset():
20+
clause = SQLParameterizedClause()
21+
22+
container_column = "data"
23+
column = "foo"
24+
value = "bar"
25+
rval = None
26+
27+
clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)
28+
29+
assert clause == SQLParameterizedClause(lvals=["data['foo']"], rvals=[":foo"], values={"foo": "bar"})

0 commit comments

Comments
 (0)