Skip to content

Commit f1c9497

Browse files
antiguruclaude
andcommitted
replace InternalMerger with ColumnationMerger for per-item capacity checks
The generic InternalMerger from differential-dataflow delegates extract() to InternalMerge::extract which processes entire buffers without capacity checks, leading to oversized output containers. This was introduced in TimelyDataflow/differential-dataflow#689. Replace it with ColumnationMerger, a direct Merger impl that iterates items individually with capacity checks after each push — restoring the pre-#689 behavior. The InternalMerge impl on ColumnationStack is retained because DD's Builder requires it as a trait bound. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4149a20 commit f1c9497

1 file changed

Lines changed: 202 additions & 7 deletions

File tree

src/timely-util/src/columnation.rs

Lines changed: 202 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use columnation::{Columnation, Region};
2626
use differential_dataflow::consolidation::consolidate_updates;
2727
use differential_dataflow::difference::Semigroup;
2828
use differential_dataflow::lattice::Lattice;
29+
use differential_dataflow::trace::implementations::merge_batcher::Merger;
2930
use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
3031
use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
3132
use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
@@ -633,28 +634,222 @@ where
633634

634635
fn extract(
635636
&mut self,
637+
position: &mut usize,
636638
upper: AntichainRef<T>,
637639
frontier: &mut Antichain<T>,
638640
keep: &mut Self,
639641
ship: &mut Self,
640642
) {
641-
for (data, time, diff) in self.iter() {
643+
let len = self[..].len();
644+
while *position < len && !keep.at_capacity() && !ship.at_capacity() {
645+
let (data, time, diff) = &self[*position];
642646
if upper.less_equal(time) {
643647
frontier.insert_with(time, |time| time.clone());
644648
keep.copy_destructured(data, time, diff);
645649
} else {
646650
ship.copy_destructured(data, time, diff);
647651
}
652+
*position += 1;
648653
}
649654
}
650655
}
651656

652657
// ---------------------------------------------------------------------------
653-
// ColInternalMerger type alias
658+
// ColumnationMerger
654659
// ---------------------------------------------------------------------------
655660

656-
/// A `Merger` using internal iteration for `ColumnationStack` containers.
657-
pub type ColInternalMerger<D, T, R> =
658-
differential_dataflow::trace::implementations::merge_batcher::InternalMerger<
659-
ColumnationStack<(D, T, R)>,
660-
>;
661+
/// A `Merger` for `ColumnationStack` containers that performs per-item
662+
/// merge and extract with capacity checks.
663+
///
664+
/// This replaces differential-dataflow's generic `InternalMerger<MC>` which
665+
/// delegates to `InternalMerge::extract` and processes entire buffers without
666+
/// capacity checks, leading to oversized output containers.
667+
pub struct ColumnationMerger<D, T, R> {
668+
_marker: std::marker::PhantomData<(D, T, R)>,
669+
}
670+
671+
impl<D, T, R> Default for ColumnationMerger<D, T, R> {
672+
fn default() -> Self {
673+
Self {
674+
_marker: std::marker::PhantomData,
675+
}
676+
}
677+
}
678+
679+
type CS<D, T, R> = ColumnationStack<(D, T, R)>;
680+
681+
impl<D, T, R> ColumnationMerger<D, T, R>
682+
where
683+
D: Columnation,
684+
T: Columnation,
685+
R: Columnation,
686+
{
687+
#[inline]
688+
fn empty(&self, stash: &mut Vec<CS<D, T, R>>) -> CS<D, T, R> {
689+
stash.pop().unwrap_or_else(|| {
690+
let mut container = CS::<D, T, R>::default();
691+
container.ensure_capacity(&mut None);
692+
container
693+
})
694+
}
695+
696+
#[inline]
697+
fn recycle(&self, mut chunk: CS<D, T, R>, stash: &mut Vec<CS<D, T, R>>) {
698+
chunk.clear();
699+
stash.push(chunk);
700+
}
701+
}
702+
703+
impl<D, T, R> Merger for ColumnationMerger<D, T, R>
704+
where
705+
D: Ord + Columnation + Clone + 'static,
706+
T: Ord + Columnation + Clone + PartialOrder + 'static,
707+
R: Default + Semigroup + Columnation + Clone + 'static,
708+
{
709+
type Time = T;
710+
type Chunk = CS<D, T, R>;
711+
712+
fn merge(
713+
&mut self,
714+
list1: Vec<Self::Chunk>,
715+
list2: Vec<Self::Chunk>,
716+
output: &mut Vec<Self::Chunk>,
717+
stash: &mut Vec<Self::Chunk>,
718+
) {
719+
use std::cmp::Ordering;
720+
721+
let mut list1 = list1.into_iter();
722+
let mut list2 = list2.into_iter();
723+
724+
let mut head1 = list1.next().unwrap_or_default();
725+
let mut head2 = list2.next().unwrap_or_default();
726+
let mut pos1 = 0usize;
727+
let mut pos2 = 0usize;
728+
729+
let mut result = self.empty(stash);
730+
let mut diff_stash = R::default();
731+
732+
// Main merge loop: both sides have data.
733+
while pos1 < head1[..].len() && pos2 < head2[..].len() {
734+
while pos1 < head1[..].len() && pos2 < head2[..].len() && !result.at_capacity() {
735+
let (d1, t1, _) = &head1[pos1];
736+
let (d2, t2, _) = &head2[pos2];
737+
match (d1, t1).cmp(&(d2, t2)) {
738+
Ordering::Less => {
739+
result.copy(&head1[pos1]);
740+
pos1 += 1;
741+
}
742+
Ordering::Greater => {
743+
result.copy(&head2[pos2]);
744+
pos2 += 1;
745+
}
746+
Ordering::Equal => {
747+
let (_, _, r1) = &head1[pos1];
748+
let (_, _, r2) = &head2[pos2];
749+
diff_stash.clone_from(r1);
750+
diff_stash.plus_equals(r2);
751+
if !diff_stash.is_zero() {
752+
let (d, t, _) = &head1[pos1];
753+
result.copy_destructured(d, t, &diff_stash);
754+
}
755+
pos1 += 1;
756+
pos2 += 1;
757+
}
758+
}
759+
}
760+
761+
// Advance exhausted heads.
762+
if pos1 >= head1[..].len() {
763+
self.recycle(std::mem::take(&mut head1), stash);
764+
head1 = list1.next().unwrap_or_default();
765+
pos1 = 0;
766+
}
767+
if pos2 >= head2[..].len() {
768+
self.recycle(std::mem::take(&mut head2), stash);
769+
head2 = list2.next().unwrap_or_default();
770+
pos2 = 0;
771+
}
772+
if result.at_capacity() {
773+
output.push(std::mem::take(&mut result));
774+
result = self.empty(stash);
775+
}
776+
}
777+
778+
// Drain remaining from head1.
779+
if pos1 < head1[..].len() {
780+
for i in pos1..head1[..].len() {
781+
result.copy(&head1[i]);
782+
}
783+
}
784+
if !result.is_empty() {
785+
output.push(std::mem::take(&mut result));
786+
result = self.empty(stash);
787+
}
788+
output.extend(list1);
789+
790+
// Drain remaining from head2.
791+
if pos2 < head2[..].len() {
792+
for i in pos2..head2[..].len() {
793+
result.copy(&head2[i]);
794+
}
795+
}
796+
if !result.is_empty() {
797+
output.push(std::mem::take(&mut result));
798+
}
799+
output.extend(list2);
800+
}
801+
802+
fn extract(
803+
&mut self,
804+
merged: Vec<Self::Chunk>,
805+
upper: AntichainRef<Self::Time>,
806+
frontier: &mut Antichain<Self::Time>,
807+
readied: &mut Vec<Self::Chunk>,
808+
kept: &mut Vec<Self::Chunk>,
809+
stash: &mut Vec<Self::Chunk>,
810+
) {
811+
let mut keep = self.empty(stash);
812+
let mut ship = self.empty(stash);
813+
814+
for buffer in merged {
815+
for (data, time, diff) in buffer.iter() {
816+
if upper.less_equal(time) {
817+
frontier.insert_with(time, |time| time.clone());
818+
keep.copy_destructured(data, time, diff);
819+
if keep.at_capacity() {
820+
kept.push(std::mem::take(&mut keep));
821+
keep = self.empty(stash);
822+
}
823+
} else {
824+
ship.copy_destructured(data, time, diff);
825+
if ship.at_capacity() {
826+
readied.push(std::mem::take(&mut ship));
827+
ship = self.empty(stash);
828+
}
829+
}
830+
}
831+
self.recycle(buffer, stash);
832+
}
833+
834+
if !keep.is_empty() {
835+
kept.push(keep);
836+
}
837+
if !ship.is_empty() {
838+
readied.push(ship);
839+
}
840+
}
841+
842+
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
843+
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
844+
let cb = |siz, cap| {
845+
size += siz;
846+
capacity += cap;
847+
allocations += 1;
848+
};
849+
chunk.heap_size(cb);
850+
(chunk.len(), size, capacity, allocations)
851+
}
852+
}
853+
854+
/// A `Merger` for `ColumnationStack` containers with per-item capacity checks.
855+
pub type ColInternalMerger<D, T, R> = ColumnationMerger<D, T, R>;

0 commit comments

Comments
 (0)