@@ -789,132 +789,18 @@ def _validate_primary_key_hidden_columns() -> None:
789789 _truncated_right_name = truncated_right ,
790790 )
791791
792- # --- Schemas ---
793- schemas : SummaryDataSchemas | None = None
794- # NOTE: In slim mode, we only print the section if there are differences.
795- if not slim or not comp .schemas .equal ():
796- in_common = sorted (comp .schemas .in_common ().items ())
797- mismatching = sorted (comp .schemas .in_common ().mismatching_dtypes ().items ())
798- schemas = SummaryDataSchemas (
799- left_only_names = sorted (comp .schemas .left_only ().column_names ()),
800- in_common = [
801- (name , str (left_dtype ), str (right_dtype ))
802- for name , (left_dtype , right_dtype ) in in_common
803- ],
804- right_only_names = sorted (comp .schemas .right_only ().column_names ()),
805- _equal = comp .schemas .equal (),
806- _mismatching_dtypes = [
807- (name , str (left_dtype ), str (right_dtype ))
808- for name , (left_dtype , right_dtype ) in mismatching
809- ],
810- )
811-
812- # --- Rows ---
813- rows : SummaryDataRows | None = None
814- if comp .primary_key is not None :
815- rows_equal = comp ._equal_rows ()
816- else :
817- rows_equal = comp .equal_num_rows ()
818- # NOTE: In slim mode, we only print the section if there are differences.
819- if not slim or not rows_equal :
820- if comp .primary_key is not None :
821- rows = SummaryDataRows (
822- n_left = comp .num_rows_left (),
823- n_right = comp .num_rows_right (),
824- n_left_only = comp .num_rows_left_only (),
825- n_joined_equal = comp .num_rows_joined_equal (),
826- n_joined_unequal = comp .num_rows_joined_unequal (),
827- n_right_only = comp .num_rows_right_only (),
828- _equal_rows = comp ._equal_rows (),
829- _equal_num_rows = comp .equal_num_rows (),
830- # NOTE: In slim mode, we omit the row counts section and only show the
831- # row matches section.
832- _show_row_counts = not (comp .equal_num_rows () and slim ),
833- )
834- else :
835- rows = SummaryDataRows (
836- n_left = comp .num_rows_left (),
837- n_right = comp .num_rows_right (),
838- n_left_only = None ,
839- n_joined_equal = None ,
840- n_joined_unequal = None ,
841- n_right_only = None ,
842- _equal_rows = False ,
843- _equal_num_rows = comp .equal_num_rows (),
844- _show_row_counts = True ,
845- )
846-
847- # --- Columns ---
848- columns : list [SummaryDataColumn ] | None = None
849- # NOTE: We can only compute column matches if there are primary key columns and at
850- # least one joined row.
851- match_rates_can_be_computed = (
852- comp .primary_key is not None and comp .num_rows_joined () > 0
792+ schemas = _compute_schemas (comp , slim )
793+ rows = _compute_rows (comp , slim )
794+ columns = _compute_columns (
795+ comp ,
796+ slim ,
797+ show_perfect_column_matches ,
798+ top_k_changes_by_column ,
799+ show_sample_primary_key_per_change ,
800+ )
801+ sample_rows_left_only , sample_rows_right_only = _compute_sample_rows (
802+ comp , sample_k_rows_only
853803 )
854- if match_rates_can_be_computed :
855- match_rates = comp .fraction_same ()
856- # NOTE: In slim mode, we only print the columns section if there are
857- # non-primary key columns and at least one column has a match rate < 1.
858- if not slim or (comp ._other_common_columns and min (match_rates .values ()) < 1 ):
859- columns = []
860- for col_name in sorted (match_rates ):
861- rate = match_rates [col_name ]
862- if not show_perfect_column_matches and rate >= 1 :
863- continue
864- top_k = top_k_changes_by_column [col_name ]
865- changes : list [SummaryDataColumnChange ] | None = None
866- n_total_changes = 0
867- if top_k > 0 and rate < 1 :
868- all_change_counts = comp .change_counts (
869- col_name ,
870- include_sample_primary_key = show_sample_primary_key_per_change ,
871- )
872- n_total_changes = len (all_change_counts )
873- top_change_counts = all_change_counts .head (top_k )
874- changes = []
875- for row in top_change_counts .iter_rows (named = True ):
876- sample_pk : tuple [Any , ...] | None = None
877- if show_sample_primary_key_per_change :
878- pk_cols = comp .primary_key
879- assert isinstance (pk_cols , list )
880- sample_pk = tuple (row [f"sample_{ c } " ] for c in pk_cols )
881- changes .append (
882- SummaryDataColumnChange (
883- old = row [Side .LEFT ],
884- new = row [Side .RIGHT ],
885- count = row ["count" ],
886- sample_pk = sample_pk ,
887- )
888- )
889- columns .append (
890- SummaryDataColumn (
891- name = col_name ,
892- match_rate = rate ,
893- n_total_changes = n_total_changes ,
894- changes = changes ,
895- )
896- )
897-
898- # --- Sample rows left/right only ---
899- sample_rows_left_only : list [tuple [Any , ...]] | None = None
900- sample_rows_right_only : list [tuple [Any , ...]] | None = None
901- if comp .primary_key is not None and sample_k_rows_only > 0 :
902- pk = comp .primary_key
903- assert isinstance (pk , list )
904-
905- if comp .num_rows_left_only () > 0 :
906- df = comp .left_only (lazy = True ).select (pk ).head (sample_k_rows_only ).collect ()
907- sample_rows_left_only = [tuple (row ) for row in df .iter_rows ()]
908- else :
909- sample_rows_left_only = []
910-
911- if comp .num_rows_right_only () > 0 :
912- df = (
913- comp .right_only (lazy = True ).select (pk ).head (sample_k_rows_only ).collect ()
914- )
915- sample_rows_right_only = [tuple (row ) for row in df .iter_rows ()]
916- else :
917- sample_rows_right_only = []
918804
919805 return SummaryData (
920806 equal = False ,
@@ -933,6 +819,144 @@ def _validate_primary_key_hidden_columns() -> None:
933819 )
934820
935821
822+ def _compute_schemas (
823+ comp : DataFrameComparison , slim : bool
824+ ) -> SummaryDataSchemas | None :
825+ # NOTE: In slim mode, we only print the section if there are differences.
826+ if slim and comp .schemas .equal ():
827+ return None
828+ in_common = sorted (comp .schemas .in_common ().items ())
829+ mismatching = sorted (comp .schemas .in_common ().mismatching_dtypes ().items ())
830+ return SummaryDataSchemas (
831+ left_only_names = sorted (comp .schemas .left_only ().column_names ()),
832+ in_common = [
833+ (name , str (left_dtype ), str (right_dtype ))
834+ for name , (left_dtype , right_dtype ) in in_common
835+ ],
836+ right_only_names = sorted (comp .schemas .right_only ().column_names ()),
837+ _equal = comp .schemas .equal (),
838+ _mismatching_dtypes = [
839+ (name , str (left_dtype ), str (right_dtype ))
840+ for name , (left_dtype , right_dtype ) in mismatching
841+ ],
842+ )
843+
844+
845+ def _compute_rows (comp : DataFrameComparison , slim : bool ) -> SummaryDataRows | None :
846+ if comp .primary_key is not None :
847+ rows_equal = comp ._equal_rows ()
848+ else :
849+ rows_equal = comp .equal_num_rows ()
850+ # NOTE: In slim mode, we only print the section if there are differences.
851+ if slim and rows_equal :
852+ return None
853+ if comp .primary_key is not None :
854+ return SummaryDataRows (
855+ n_left = comp .num_rows_left (),
856+ n_right = comp .num_rows_right (),
857+ n_left_only = comp .num_rows_left_only (),
858+ n_joined_equal = comp .num_rows_joined_equal (),
859+ n_joined_unequal = comp .num_rows_joined_unequal (),
860+ n_right_only = comp .num_rows_right_only (),
861+ _equal_rows = comp ._equal_rows (),
862+ _equal_num_rows = comp .equal_num_rows (),
863+ # NOTE: In slim mode, we omit the row counts section and only show the
864+ # row matches section.
865+ _show_row_counts = not (comp .equal_num_rows () and slim ),
866+ )
867+ return SummaryDataRows (
868+ n_left = comp .num_rows_left (),
869+ n_right = comp .num_rows_right (),
870+ n_left_only = None ,
871+ n_joined_equal = None ,
872+ n_joined_unequal = None ,
873+ n_right_only = None ,
874+ _equal_rows = False ,
875+ _equal_num_rows = comp .equal_num_rows (),
876+ _show_row_counts = True ,
877+ )
878+
879+
880+ def _compute_columns (
881+ comp : DataFrameComparison ,
882+ slim : bool ,
883+ show_perfect_column_matches : bool ,
884+ top_k_changes_by_column : dict [str , int ],
885+ show_sample_primary_key_per_change : bool ,
886+ ) -> list [SummaryDataColumn ] | None :
887+ # NOTE: We can only compute column matches if there are primary key columns and at
888+ # least one joined row.
889+ if comp .primary_key is None or comp .num_rows_joined () == 0 :
890+ return None
891+ match_rates = comp .fraction_same ()
892+ # NOTE: In slim mode, we only print the columns section if there are
893+ # non-primary key columns and at least one column has a match rate < 1.
894+ if slim and not (comp ._other_common_columns and min (match_rates .values ()) < 1 ):
895+ return None
896+ columns : list [SummaryDataColumn ] = []
897+ for col_name in sorted (match_rates ):
898+ rate = match_rates [col_name ]
899+ if not show_perfect_column_matches and rate >= 1 :
900+ continue
901+ top_k = top_k_changes_by_column [col_name ]
902+ changes : list [SummaryDataColumnChange ] | None = None
903+ n_total_changes = 0
904+ if top_k > 0 and rate < 1 :
905+ all_change_counts = comp .change_counts (
906+ col_name ,
907+ include_sample_primary_key = show_sample_primary_key_per_change ,
908+ )
909+ n_total_changes = len (all_change_counts )
910+ top_change_counts = all_change_counts .head (top_k )
911+ changes = []
912+ for row in top_change_counts .iter_rows (named = True ):
913+ sample_pk : tuple [Any , ...] | None = None
914+ if show_sample_primary_key_per_change :
915+ pk_cols = comp .primary_key
916+ assert isinstance (pk_cols , list )
917+ sample_pk = tuple (row [f"sample_{ c } " ] for c in pk_cols )
918+ changes .append (
919+ SummaryDataColumnChange (
920+ old = row [Side .LEFT ],
921+ new = row [Side .RIGHT ],
922+ count = row ["count" ],
923+ sample_pk = sample_pk ,
924+ )
925+ )
926+ columns .append (
927+ SummaryDataColumn (
928+ name = col_name ,
929+ match_rate = rate ,
930+ n_total_changes = n_total_changes ,
931+ changes = changes ,
932+ )
933+ )
934+ return columns
935+
936+
937+ def _compute_sample_rows (
938+ comp : DataFrameComparison , sample_k_rows_only : int
939+ ) -> tuple [list [tuple [Any , ...]] | None , list [tuple [Any , ...]] | None ]:
940+ if comp .primary_key is None or sample_k_rows_only <= 0 :
941+ return None , None
942+ pk = comp .primary_key
943+ assert isinstance (pk , list )
944+
945+ if comp .num_rows_left_only () > 0 :
946+ df = comp .left_only (lazy = True ).select (pk ).head (sample_k_rows_only ).collect ()
947+ sample_left = [tuple (row ) for row in df .iter_rows ()]
948+ else :
949+ sample_left = []
950+
951+ if comp .num_rows_right_only () > 0 :
952+ df = comp .right_only (lazy = True ).select (pk ).head (sample_k_rows_only ).collect ()
953+ sample_right = [tuple (row ) for row in df .iter_rows ()]
954+ else :
955+ sample_right = []
956+
957+ return sample_left , sample_right
958+
959+
936960# ------------------------------------------------------------------------------------ #
937961# UTILS #
938962# ------------------------------------------------------------------------------------ #
0 commit comments