File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 44from json import JSONEncoder
55from .hooks import HookMap , Hook , hooks_to_dict
66from .options import SourceOptions , TargetOptions
7- from .enum import Mode , Format , Compression
7+ from .enum import Mode , Format , Compression , MergeStrategy
88from .bin import SLING_BIN
99
1010# Try to import pyarrow, fallback to CSV if not available
@@ -46,6 +46,8 @@ def default(self, o):
4646 return o .value
4747 elif isinstance (o , Compression ):
4848 return o .value
49+ elif isinstance (o , MergeStrategy ):
50+ return o .value
4951 elif isinstance (o , datetime .datetime ):
5052 return str (o )
5153 elif isinstance (o , datetime .date ):
Original file line number Diff line number Diff line change @@ -47,7 +47,7 @@ class Format(Enum):
4747class Compression (Enum ):
4848 """
4949 Enum representing the available Sling compression types.
50-
50+
5151 AUTO: Auto-detect compression type
5252 NONE: No compression
5353 ZIP: ZIP compression
@@ -60,4 +60,19 @@ class Compression(Enum):
6060 ZIP = "zip"
6161 GZIP = "gzip"
6262 SNAPPY = "snappy"
63- ZSTD = "zstd"
63+ ZSTD = "zstd"
64+
65+
66+ class MergeStrategy (Enum ):
67+ """
68+ Enum representing the available Sling merge strategies for incremental/backfill modes.
69+
70+ UPDATE_INSERT: Update existing rows, insert new rows (standard upsert behavior)
71+ DELETE_INSERT: Delete matching rows, then insert all (safe and reliable)
72+ INSERT: Insert only, skip existing (append-only scenarios)
73+ UPDATE: Update only, skip new (update existing records only)
74+ """
75+ UPDATE_INSERT = "update_insert"
76+ DELETE_INSERT = "delete_insert"
77+ INSERT = "insert"
78+ UPDATE = "update"
Original file line number Diff line number Diff line change 11from typing import Union
2- from .enum import Format , Compression
2+ from .enum import Format , Compression , MergeStrategy
33
44class SourceOptions :
55 trim_space : bool
@@ -81,6 +81,7 @@ class TargetOptions:
8181 use_bulk : bool
8282 ignore_existing : bool
8383 delete_missing : bool
84+ merge_strategy : Union [MergeStrategy , str ]
8485 column_casing : str
8586 column_typing : dict
8687 add_new_columns : bool
@@ -105,6 +106,7 @@ def __init__(self,
105106 use_bulk : bool = None ,
106107 ignore_existing : bool = None ,
107108 delete_missing : bool = None ,
109+ merge_strategy : Union [MergeStrategy , str ] = None ,
108110 column_casing : str = None ,
109111 column_typing : dict = None ,
110112 add_new_columns : bool = None ,
@@ -128,6 +130,7 @@ def __init__(self,
128130 self .use_bulk = use_bulk
129131 self .ignore_existing = ignore_existing
130132 self .delete_missing = delete_missing
133+ self .merge_strategy = merge_strategy
131134 self .column_casing = column_casing
132135 self .column_typing = column_typing
133136 self .add_new_columns = add_new_columns
You can’t perform that action at this time.
0 commit comments