Skip to content

Commit 6c5068a

Browse files
authored
fix: Resolve pending shapes in FlushTracker when consumer process receives commit fragment with no relevant changes (#4064)
## Summary Fix #4063 When a `{Storage, :flushed, offset}` message arrives at a Consumer process while it's in the middle of processing a multi-fragment transaction, the flush notification could be lost. This caused the FlushTracker in ShapeLogCollector to get stuck waiting for a flush that was already completed. ### Changes **Consumer (`consumer.ex`, `consumer/state.ex`):** - Defer flush notifications that arrive during a pending transaction by storing the max flushed offset in `pending_flush_offset` - Process deferred flush notifications when the pending transaction completes (commit, skip, or no relevant changes) **FlushTracker (`flush_tracker.ex`):** - Simplify to commit-only tracking: remove non-commit fragment handling since Consumer now defers flush notifications during pending transactions - Remove `shapes_with_changes` parameter from `handle_txn_fragment/3` — all affected shapes are tracked uniformly at commit time **ShapeLogCollector (`shape_log_collector.ex`):** - Only call `FlushTracker.handle_txn_fragment/3` on commit fragments - Remove `shapes_with_changes` computation that is no longer needed ## Test plan - [x] New regression test for stuck flush tracker scenario - [x] Stricter assertions in EventRouterTest for txn fragment reslicing - [x] Updated Consumer tests for deferred flush notification behavior - [x] Existing test suite passes
1 parent 64a89a0 commit 6c5068a

8 files changed

Lines changed: 330 additions & 291 deletions

File tree

.changeset/brave-doors-kneel.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Fix stuck flush tracker when storage flush notification arrives mid-transaction in Consumer

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -465,17 +465,14 @@ defmodule Electric.Replication.ShapeLogCollector do
465465

466466
OpenTelemetry.add_span_attributes("txn.is_dropped": true)
467467

468-
{:ok,
469-
%{
470-
state
471-
| flush_tracker:
472-
FlushTracker.handle_txn_fragment(
473-
state.flush_tracker,
474-
txn_fragment,
475-
[],
476-
MapSet.new()
477-
)
478-
}}
468+
flush_tracker =
469+
if txn_fragment.commit do
470+
FlushTracker.handle_txn_fragment(state.flush_tracker, txn_fragment, [])
471+
else
472+
state.flush_tracker
473+
end
474+
475+
{:ok, %{state | flush_tracker: flush_tracker}}
479476
end
480477

481478
defp handle_txn_fragment(
@@ -578,22 +575,9 @@ defmodule Electric.Replication.ShapeLogCollector do
578575

579576
flush_tracker =
580577
case event do
581-
%TransactionFragment{} ->
582-
shapes_with_changes =
583-
for {id, frag} <- events_by_handle,
584-
frag.change_count > 0,
585-
not MapSet.member?(undeliverable_set, id),
586-
do: id,
587-
into: MapSet.new()
588-
589-
if event.commit, do: LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn)
590-
591-
FlushTracker.handle_txn_fragment(
592-
flush_tracker,
593-
event,
594-
delivered_shapes,
595-
shapes_with_changes
596-
)
578+
%TransactionFragment{commit: commit} when not is_nil(commit) ->
579+
LsnTracker.broadcast_last_seen_lsn(state.stack_id, lsn)
580+
FlushTracker.handle_txn_fragment(flush_tracker, event, delivered_shapes)
597581

598582
_ ->
599583
flush_tracker

packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -95,41 +95,15 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do
9595
last_flushed == %{} and :gb_trees.is_empty(tree)
9696
end
9797

98-
@spec handle_txn_fragment(
99-
t(),
100-
TransactionFragment.t(),
101-
Enumerable.t(shape_id()),
102-
MapSet.t(shape_id())
103-
) :: t()
104-
105-
# Non-commit fragment: track affected shapes but don't update last_seen_offset
106-
# or notify. This ensures shapes are registered early so flush notifications
107-
# from Consumers aren't lost when storage flushes before the commit arrives.
108-
def handle_txn_fragment(
109-
%__MODULE__{} = state,
110-
%TransactionFragment{commit: nil, last_log_offset: last_log_offset},
111-
affected_shapes,
112-
_shapes_with_changes
113-
) do
114-
track_shapes(state, last_log_offset, affected_shapes)
115-
end
98+
@spec handle_txn_fragment(t(), TransactionFragment.t(), Enumerable.t(shape_id())) :: t()
11699

117-
# Commit fragment: track shapes that have actual changes in this fragment
118-
# or are already being tracked (need last_sent updated to commit offset).
119-
# Skip shapes that only have a commit marker and already flushed from
120-
# earlier non-commit fragments — there's nothing new to flush for them.
100+
# Commit fragment: track all shapes affected by all fragments of the transaction and update last_seen_offset.
121101
def handle_txn_fragment(
122102
%__MODULE__{} = state,
123103
%TransactionFragment{commit: %Commit{}, last_log_offset: last_log_offset},
124-
affected_shapes,
125-
shapes_with_changes
104+
affected_shapes
126105
) do
127-
shapes_to_track =
128-
Enum.filter(affected_shapes, fn shape ->
129-
shape in shapes_with_changes or is_map_key(state.last_flushed, shape)
130-
end)
131-
132-
state = track_shapes(state, last_log_offset, shapes_to_track)
106+
state = track_shapes(state, last_log_offset, affected_shapes)
133107

134108
state = %{state | last_seen_offset: last_log_offset}
135109

@@ -211,14 +185,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do
211185
min_incomplete_flush_tree: min_incomplete_flush_tree
212186
}
213187

214-
# Only update global offset if we've seen at least one commit.
215-
# Before any commit, last_seen_offset is before_all and there's
216-
# nothing meaningful to report.
217-
if state.last_seen_offset == LogOffset.before_all() do
218-
state
219-
else
220-
update_global_offset(state)
221-
end
188+
update_global_offset(state)
222189
end
223190

224191
# If the shape is not in the mapping, then we're processing a flush notification for a shape that was removed

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,10 +270,20 @@ defmodule Electric.Shapes.Consumer do
270270
end
271271
end
272272

273-
def handle_info({ShapeCache.Storage, :flushed, offset_in}, state) do
274-
{state, offset_txn} = State.align_offset_to_txn_boundary(state, offset_in)
273+
def handle_info({ShapeCache.Storage, :flushed, flushed_offset}, state) do
274+
state =
275+
if is_write_unit_txn(state.write_unit) or is_nil(state.pending_txn) do
276+
# We're not currently in the middle of processing a transaction. This flushed offset is either
277+
# from a previously processed transaction or a non-commit fragment of the most recently
278+
# seen transaction. Notify ShapeLogCollector about it immediately.
279+
confirm_flushed_and_notify(state, flushed_offset)
280+
else
281+
# Storage has signaled latest flushed offset in the middle of processing a multi-fragment
282+
# transaction. Save it for later, to be handled when the commit fragment arrives.
283+
updated_offset = more_recent_offset(state.pending_flush_offset, flushed_offset)
284+
%{state | pending_flush_offset: updated_offset}
285+
end
275286

276-
ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, offset_txn)
277287
{:noreply, state, state.hibernate_after}
278288
end
279289

@@ -567,7 +577,7 @@ defmodule Electric.Shapes.Consumer do
567577
# With write_unit=txn all fragments are buffered until the Commit change is seen. At that
568578
# point, a transaction struct is produced from the buffered fragments and is written to
569579
# storage.
570-
state.write_unit == State.write_unit_txn() ->
580+
is_write_unit_txn(state.write_unit) ->
571581
{txns, transaction_builder} =
572582
TransactionBuilder.build(txn_fragment, state.transaction_builder)
573583

@@ -597,6 +607,7 @@ defmodule Electric.Shapes.Consumer do
597607
defp skip_txn_fragment(state, %TransactionFragment{} = txn_fragment) do
598608
%{state | pending_txn: nil}
599609
|> consider_flushed(txn_fragment.last_log_offset)
610+
|> clear_pending_flush_offset()
600611
end
601612

602613
# This function does similar things to do_handle_txn/2 but with the following simplifications:
@@ -747,9 +758,10 @@ defmodule Electric.Shapes.Consumer do
747758
"No relevant changes written in transaction xid=#{txn.xid}"
748759
end)
749760

750-
state = %{state | pending_txn: nil}
751-
consider_flushed(state, txn_fragment.last_log_offset)
761+
%{state | pending_txn: nil}
762+
|> consider_flushed(txn_fragment.last_log_offset)
752763
end
764+
|> clear_pending_flush_offset()
753765
end
754766

755767
def process_buffered_txn_fragments(%State{buffer: buffer} = state) do
@@ -1006,6 +1018,31 @@ defmodule Electric.Shapes.Consumer do
10061018
end
10071019
end
10081020

1021+
defp confirm_flushed_and_notify(state, flushed_offset) do
1022+
{state, txn_offset} = State.align_offset_to_txn_boundary(state, flushed_offset)
1023+
ShapeLogCollector.notify_flushed(state.stack_id, state.shape_handle, txn_offset)
1024+
state
1025+
end
1026+
1027+
# After a pending transaction completes and txn_offset_mapping is populated,
1028+
# process the deferred flushed offset (if any).
1029+
#
1030+
# Even if the most recent transaction is skipped or no changes from it end up satisfying the
1031+
# shape's `where` condition, Storage may have signaled a flush offset from the previous transaction
1032+
# while we were still processing fragments of the current one. Therefore this function must
1033+
# be called any time `state.pending_txn` is reset to nil in a multi-fragment transaction
1034+
# processing setting.
1035+
defp clear_pending_flush_offset(%{pending_flush_offset: nil} = state), do: state
1036+
1037+
defp clear_pending_flush_offset(%{pending_flush_offset: flushed_offset} = state) do
1038+
%{state | pending_flush_offset: nil}
1039+
|> confirm_flushed_and_notify(flushed_offset)
1040+
end
1041+
1042+
defp more_recent_offset(nil, offset), do: offset
1043+
defp more_recent_offset(offset, nil), do: offset
1044+
defp more_recent_offset(offset1, offset2), do: LogOffset.max(offset1, offset2)
1045+
10091046
defp subscribe(state, action) do
10101047
case ShapeLogCollector.add_shape(state.stack_id, state.shape_handle, state.shape, action) do
10111048
:ok ->

packages/sync-service/lib/electric/shapes/consumer/state.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ defmodule Electric.Shapes.Consumer.State do
4242
# Tracks in-progress transaction, initialized when a txn fragment with has_begin?=true is seen.
4343
# It is used to check whether the entire txn is visible in the snapshot and to mark it
4444
# as flushed in order to handle its remaining fragments appropriately.
45-
pending_txn: nil
45+
pending_txn: nil,
46+
# When a {Storage, :flushed, offset} message arrives during a pending
47+
# transaction, we defer the notification and store the max flushed offset
48+
# here. Multiple deferred notifications are collapsed into a single most recent offset.
49+
pending_flush_offset: nil
4650
]
4751

4852
@type pg_snapshot() :: SnapshotQuery.pg_snapshot()
@@ -393,6 +397,6 @@ defmodule Electric.Shapes.Consumer.State do
393397
]
394398
end
395399

396-
def write_unit_txn, do: @write_unit_txn
397-
def write_unit_txn_fragment, do: @write_unit_txn_fragment
400+
defguard is_write_unit_txn(write_unit) when write_unit == @write_unit_txn
401+
defguard is_write_unit_txn_fragment(write_unit) when write_unit == @write_unit_txn_fragment
398402
end

0 commit comments

Comments
 (0)