diff --git a/crates/core/src/dataframe.rs b/crates/core/src/dataframe.rs index 72595ba81..e2c1b8ad8 100644 --- a/crates/core/src/dataframe.rs +++ b/crates/core/src/dataframe.rs @@ -582,6 +582,14 @@ impl PyDataFrame { Ok(Self::new(df)) } + /// Apply window function expressions to the DataFrame + #[pyo3(signature = (*exprs))] + fn window(&self, exprs: Vec) -> PyDataFusionResult { + let window_exprs = exprs.into_iter().map(|e| e.into()).collect(); + let df = self.df.as_ref().clone().window(window_exprs)?; + Ok(Self::new(df)) + } + fn filter(&self, predicate: PyExpr) -> PyDataFusionResult { let df = self.df.as_ref().clone().filter(predicate.into())?; Ok(Self::new(df)) @@ -804,9 +812,30 @@ impl PyDataFrame { } /// Print the query plan - #[pyo3(signature = (verbose=false, analyze=false))] - fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyDataFusionResult<()> { - let df = self.df.as_ref().clone().explain(verbose, analyze)?; + #[pyo3(signature = (verbose=false, analyze=false, format=None))] + fn explain( + &self, + py: Python, + verbose: bool, + analyze: bool, + format: Option<&str>, + ) -> PyDataFusionResult<()> { + let explain_format = match format { + Some(f) => f + .parse::() + .map_err(|_| { + PyDataFusionError::Common(format!( + "Invalid explain format: '{}'. Valid options: indent, tree, pgjson, graphviz", + f + )) + })?, + None => datafusion::common::format::ExplainFormat::Indent, + }; + let opts = datafusion::logical_expr::ExplainOption::default() + .with_verbose(verbose) + .with_analyze(analyze) + .with_format(explain_format); + let df = self.df.as_ref().clone().explain_with_options(opts)?; print_dataframe(py, df) } @@ -875,11 +904,14 @@ impl PyDataFrame { Ok(Self::new(new_df)) } - #[pyo3(signature = (column, preserve_nulls=true))] - fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult { - // TODO: expose RecursionUnnestOptions - // REF: https://github.com/apache/datafusion/pull/11577 - let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + #[pyo3(signature = (column, preserve_nulls=true, recursions=None))] + fn unnest_column( + &self, + column: &str, + preserve_nulls: bool, + recursions: Option>, + ) -> PyDataFusionResult { + let unnest_options = build_unnest_options(preserve_nulls, recursions); let df = self .df .as_ref() @@ -888,15 +920,14 @@ impl PyDataFrame { Ok(Self::new(df)) } - #[pyo3(signature = (columns, preserve_nulls=true))] + #[pyo3(signature = (columns, preserve_nulls=true, recursions=None))] fn unnest_columns( &self, columns: Vec, preserve_nulls: bool, + recursions: Option>, ) -> PyDataFusionResult { - // TODO: expose RecursionUnnestOptions - // REF: https://github.com/apache/datafusion/pull/11577 - let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + let unnest_options = build_unnest_options(preserve_nulls, recursions); let cols = columns.iter().map(|s| s.as_ref()).collect::>(); let df = self .df @@ -922,6 +953,71 @@ impl PyDataFrame { Ok(Self::new(new_df)) } + /// Calculate the set difference with deduplication + fn except_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .except_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Calculate the intersection with deduplication + fn intersect_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .intersect_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Union two DataFrames matching columns by name + fn union_by_name(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .union_by_name(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Union two DataFrames by name with deduplication + fn union_by_name_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .union_by_name_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Deduplicate rows based on specific columns, keeping the first row per group + fn distinct_on( + &self, + on_expr: Vec, + select_expr: Vec, + sort_expr: Option>, + ) -> PyDataFusionResult { + let on_expr = on_expr.into_iter().map(|e| e.into()).collect(); + let select_expr = select_expr.into_iter().map(|e| e.into()).collect(); + let sort_expr = sort_expr.map(to_sort_expressions); + let df = self + .df + .as_ref() + .clone() + .distinct_on(on_expr, select_expr, sort_expr)?; + Ok(Self::new(df)) + } + + /// Sort by column expressions with ascending order and nulls last + fn sort_by(&self, exprs: Vec) -> PyDataFusionResult { + let exprs = exprs.into_iter().map(|e| e.into()).collect(); + let df = self.df.as_ref().clone().sort_by(exprs)?; + Ok(Self::new(df)) + } + /// Write a `DataFrame` to a CSV file. fn write_csv( &self, @@ -1295,6 +1391,26 @@ impl PyDataFrameWriteOptions { } } +fn build_unnest_options( + preserve_nulls: bool, + recursions: Option>, +) -> UnnestOptions { + let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + if let Some(recs) = recursions { + opts.recursions = recs + .into_iter() + .map( + |(input, output, depth)| datafusion::common::RecursionUnnestOption { + input_column: datafusion::common::Column::from(input.as_str()), + output_column: datafusion::common::Column::from(output.as_str()), + depth, + }, + ) + .collect(); + } + opts +} + /// Print DataFrame fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> { // Get string representation of record batches diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 2e6f81166..a736c3966 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -47,6 +47,7 @@ from .dataframe import ( DataFrame, DataFrameWriteOptions, + ExplainFormat, InsertOp, ParquetColumnOptions, ParquetWriterOptions, @@ -82,6 +83,7 @@ "DataFrameWriteOptions", "Database", "ExecutionPlan", + "ExplainFormat", "Expr", "InsertOp", "LogicalPlan", diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 10e2a913f..dc4ad6c19 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -65,6 +65,25 @@ from enum import Enum +class ExplainFormat(Enum): + """Output format for explain plans. + + Controls how the query plan is rendered in :py:meth:`DataFrame.explain`. + """ + + INDENT = "indent" + """Default indented text format.""" + + TREE = "tree" + """Tree-style visual format with box-drawing characters.""" + + PGJSON = "pgjson" + """PostgreSQL-compatible JSON format for use with visualization tools.""" + + GRAPHVIZ = "graphviz" + """Graphviz DOT format for graph rendering.""" + + # excerpt from deltalake # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 class Compression(Enum): @@ -468,6 +487,36 @@ def drop(self, *columns: str) -> DataFrame: """ return DataFrame(self.df.drop(*columns)) + def window(self, *exprs: Expr) -> DataFrame: + """Add window function columns to the DataFrame. + + Applies the given window function expressions and appends the results + as new columns. + + Args: + exprs: Window function expressions to evaluate. + + Returns: + DataFrame with new window function columns appended. + + Examples: + Add a row number within each group: + + >>> import datafusion.functions as f + >>> from datafusion import col + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "x", "y"]}) + >>> df = df.window( + ... f.row_number( + ... partition_by=[col("b")], order_by=[col("a")] + ... ).alias("rn") + ... ) + >>> "rn" in df.schema().names + True + """ + raw = expr_list_to_raw_expr_list(exprs) + return DataFrame(self.df.window(*raw)) + def filter(self, *predicates: Expr | str) -> DataFrame: """Return a DataFrame for which ``predicate`` evaluates to ``True``. @@ -918,7 +967,12 @@ def join_on( exprs = [ensure_expr(expr) for expr in on_exprs] return DataFrame(self.df.join_on(right.df, exprs, how)) - def explain(self, verbose: bool = False, analyze: bool = False) -> None: + def explain( + self, + verbose: bool = False, + analyze: bool = False, + format: ExplainFormat | None = None, + ) -> None: """Print an explanation of the DataFrame's plan so far. If ``analyze`` is specified, runs the plan and reports metrics. @@ -926,8 +980,23 @@ def explain(self, verbose: bool = False, analyze: bool = False) -> None: Args: verbose: If ``True``, more details will be included. analyze: If ``True``, the plan will run and metrics reported. + format: Output format for the plan. Defaults to + :py:attr:`ExplainFormat.INDENT`. + + Examples: + Show the plan in tree format: + + >>> from datafusion import ExplainFormat + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 2, 3]}) + >>> df.explain(format=ExplainFormat.TREE) # doctest: +SKIP + + Show plan with runtime metrics: + + >>> df.explain(analyze=True) # doctest: +SKIP """ - self.df.explain(verbose, analyze) + fmt = format.value if format is not None else None + self.df.explain(verbose, analyze, fmt) def logical_plan(self) -> LogicalPlan: """Return the unoptimized ``LogicalPlan``. @@ -1036,6 +1105,166 @@ def except_all(self, other: DataFrame) -> DataFrame: """ return DataFrame(self.df.except_all(other.df)) + def except_distinct(self, other: DataFrame) -> DataFrame: + """Calculate the set difference with deduplication. + + Returns rows that are in this DataFrame but not in ``other``, + removing any duplicates. In contrast, :py:meth:`except_all` preserves + duplicate rows. + + The two :py:class:`DataFrame` must have exactly the same schema. + + Args: + other: DataFrame to calculate exception with. + + Returns: + DataFrame after set difference with deduplication. + + Examples: + Remove rows present in ``df2`` and deduplicate: + + >>> ctx = dfn.SessionContext() + >>> df1 = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]}) + >>> df2 = ctx.from_pydict({"a": [1, 2], "b": [10, 20]}) + >>> df1.except_distinct(df2).sort("a").to_pydict() + {'a': [3], 'b': [30]} + """ + return DataFrame(self.df.except_distinct(other.df)) + + def intersect_distinct(self, other: DataFrame) -> DataFrame: + """Calculate the intersection with deduplication. + + Returns distinct rows that appear in both DataFrames. In contrast, + :py:meth:`intersect` preserves duplicate rows. + + The two :py:class:`DataFrame` must have exactly the same schema. + + Args: + other: DataFrame to intersect with. + + Returns: + DataFrame after intersection with deduplication. + + Examples: + Find rows common to both DataFrames: + + >>> ctx = dfn.SessionContext() + >>> df1 = ctx.from_pydict({"a": [1, 2, 3], "b": [10, 20, 30]}) + >>> df2 = ctx.from_pydict({"a": [1, 4], "b": [10, 40]}) + >>> df1.intersect_distinct(df2).to_pydict() + {'a': [1], 'b': [10]} + """ + return DataFrame(self.df.intersect_distinct(other.df)) + + def union_by_name(self, other: DataFrame) -> DataFrame: + """Union two :py:class:`DataFrame` matching columns by name. + + Unlike :py:meth:`union` which matches columns by position, this method + matches columns by their names, allowing DataFrames with different + column orders to be combined. + + Args: + other: DataFrame to union with. + + Returns: + DataFrame after union by name. + + Examples: + Combine DataFrames with different column orders: + + >>> ctx = dfn.SessionContext() + >>> df1 = ctx.from_pydict({"a": [1], "b": [10]}) + >>> df2 = ctx.from_pydict({"b": [20], "a": [2]}) + >>> df1.union_by_name(df2).sort("a").to_pydict() + {'a': [1, 2], 'b': [10, 20]} + """ + return DataFrame(self.df.union_by_name(other.df)) + + def union_by_name_distinct(self, other: DataFrame) -> DataFrame: + """Union two :py:class:`DataFrame` by name with deduplication. + + Combines :py:meth:`union_by_name` with deduplication of rows. + + Args: + other: DataFrame to union with. + + Returns: + DataFrame after union by name with deduplication. + + Examples: + Union by name and remove duplicate rows: + + >>> ctx = dfn.SessionContext() + >>> df1 = ctx.from_pydict({"a": [1, 1], "b": [10, 10]}) + >>> df2 = ctx.from_pydict({"b": [10], "a": [1]}) + >>> df1.union_by_name_distinct(df2).to_pydict() + {'a': [1], 'b': [10]} + """ + return DataFrame(self.df.union_by_name_distinct(other.df)) + + def distinct_on( + self, + on_expr: list[Expr], + select_expr: list[Expr], + sort_expr: list[SortKey] | None = None, + ) -> DataFrame: + """Deduplicate rows based on specific columns. + + Returns a new DataFrame with one row per unique combination of the + ``on_expr`` columns, keeping the first row per group as determined by + ``sort_expr``. + + Args: + on_expr: Expressions that determine uniqueness. + select_expr: Expressions to include in the output. + sort_expr: Optional sort expressions to determine which row to keep. + + Returns: + DataFrame after deduplication. + + Examples: + Keep the row with the smallest ``b`` for each unique ``a``: + + >>> from datafusion import col + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 1, 2, 2], "b": [10, 20, 30, 40]}) + >>> df.distinct_on( + ... [col("a")], + ... [col("a"), col("b")], + ... [col("a").sort(ascending=True), col("b").sort(ascending=True)], + ... ).sort("a").to_pydict() + {'a': [1, 2], 'b': [10, 30]} + """ + on_raw = expr_list_to_raw_expr_list(on_expr) + select_raw = expr_list_to_raw_expr_list(select_expr) + sort_raw = sort_list_to_raw_sort_list(sort_expr) if sort_expr else None + return DataFrame(self.df.distinct_on(on_raw, select_raw, sort_raw)) + + def sort_by(self, *exprs: Expr | str) -> DataFrame: + """Sort the DataFrame by column expressions in ascending order. + + This is a convenience method that sorts the DataFrame by the given + expressions in ascending order with nulls last. For more control over + sort direction and null ordering, use :py:meth:`sort` instead. + + Args: + exprs: Expressions or column names to sort by. + + Returns: + DataFrame after sorting. + + Examples: + Sort by a single column: + + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [3, 1, 2]}) + >>> df.sort_by("a").to_pydict() + {'a': [1, 2, 3]} + """ + exprs = [self.parse_sql_expr(e) if isinstance(e, str) else e for e in exprs] + raw = expr_list_to_raw_expr_list(exprs) + return DataFrame(self.df.sort_by(raw)) + def write_csv( self, path: str | pathlib.Path, @@ -1296,23 +1525,52 @@ def count(self) -> int: return self.df.count() @deprecated("Use :py:func:`unnest_columns` instead.") - def unnest_column(self, column: str, preserve_nulls: bool = True) -> DataFrame: + def unnest_column( + self, + column: str, + preserve_nulls: bool = True, + ) -> DataFrame: """See :py:func:`unnest_columns`.""" return DataFrame(self.df.unnest_column(column, preserve_nulls=preserve_nulls)) - def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFrame: + def unnest_columns( + self, + *columns: str, + preserve_nulls: bool = True, + recursions: list[tuple[str, str, int]] | None = None, + ) -> DataFrame: """Expand columns of arrays into a single row per array element. Args: columns: Column names to perform unnest operation on. preserve_nulls: If False, rows with null entries will not be returned. + recursions: Optional list of ``(input_column, output_column, depth)`` + tuples that control how deeply nested columns are unnested. Any + column not mentioned here is unnested with depth 1. Returns: A DataFrame with the columns expanded. + + Examples: + Unnest an array column: + + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [[1, 2], [3]], "b": ["x", "y"]}) + >>> df.unnest_columns("a").to_pydict() + {'a': [1, 2, 3], 'b': ['x', 'x', 'y']} + + With explicit recursion depth: + + >>> df.unnest_columns("a", recursions=[("a", "a", 1)]).to_pydict() + {'a': [1, 2, 3], 'b': ['x', 'x', 'y']} """ columns = list(columns) - return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) + return DataFrame( + self.df.unnest_columns( + columns, preserve_nulls=preserve_nulls, recursions=recursions + ) + ) def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: """Export the DataFrame as an Arrow C Stream. diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 759d6278c..960ccdb2b 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -3569,3 +3569,184 @@ def test_read_parquet_file_sort_order(tmp_path, file_sort_order): pa.parquet.write_table(table, path) df = ctx.read_parquet(path, file_sort_order=file_sort_order) assert df.collect()[0].column(0).to_pylist() == [1, 2] + + +@pytest.mark.parametrize( + ("df1_data", "df2_data", "method", "expected_a", "expected_b"), + [ + pytest.param( + {"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]}, + {"a": [1, 2], "b": [10, 20]}, + "except_distinct", + [3], + [30], + id="except_distinct: removes matching rows and deduplicates", + ), + pytest.param( + {"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]}, + {"a": [1, 4], "b": [10, 40]}, + "intersect_distinct", + [1], + [10], + id="intersect_distinct: keeps common rows and deduplicates", + ), + pytest.param( + {"a": [1], "b": [10]}, + {"b": [20], "a": [2]}, # reversed column order tests matching by name + "union_by_name", + [1, 2], + [10, 20], + id="union_by_name: matches columns by name not position", + ), + pytest.param( + {"a": [1, 1], "b": [10, 10]}, + {"b": [10], "a": [1]}, # reversed column order with duplicates + "union_by_name_distinct", + [1], + [10], + id="union_by_name_distinct: matches by name and deduplicates", + ), + ], +) +def test_set_operations_distinct(df1_data, df2_data, method, expected_a, expected_b): + ctx = SessionContext() + df1 = ctx.from_pydict(df1_data) + df2 = ctx.from_pydict(df2_data) + result = ( + getattr(df1, method)(df2).sort(column("a").sort(ascending=True)).collect()[0] + ) + assert result.column(0).to_pylist() == expected_a + assert result.column(1).to_pylist() == expected_b + + +def test_distinct_on(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2, 2], "b": [10, 20, 30, 40]}) + result = ( + df.distinct_on( + [column("a")], + [column("a"), column("b")], + [column("a").sort(ascending=True), column("b").sort(ascending=True)], + ) + .sort(column("a").sort(ascending=True)) + .collect()[0] + ) + # Keeps the first row per group (smallest b per a) + assert result.column(0).to_pylist() == [1, 2] + assert result.column(1).to_pylist() == [10, 30] + + +@pytest.mark.parametrize( + ("input_values", "expected"), + [ + ([3, 1, 2], [1, 2, 3]), + ([1, 2, 3], [1, 2, 3]), + ([3, None, 1, 2], [1, 2, 3, None]), + ], +) +def test_sort_by(input_values, expected): + """sort_by always sorts ascending with nulls last regardless of input order.""" + ctx = SessionContext() + df = ctx.from_pydict({"a": input_values}) + result = df.sort_by(column("a")).collect()[0] + assert result.column(0).to_pylist() == expected + + +@pytest.mark.parametrize( + ("fmt", "verbose", "analyze", "expected_substring"), + [ + (None, False, False, None), + ("TREE", False, False, "---"), + ("INDENT", True, True, None), + ("PGJSON", False, False, '"Plan"'), + ("GRAPHVIZ", False, False, "digraph"), + ], +) +def test_explain_with_format(capsys, fmt, verbose, analyze, expected_substring): + from datafusion import ExplainFormat + + ctx = SessionContext() + df = ctx.from_pydict({"a": [1]}) + explain_fmt = ExplainFormat[fmt] if fmt is not None else None + df.explain(verbose=verbose, analyze=analyze, format=explain_fmt) + captured = capsys.readouterr() + assert "plan_type" in captured.out + if expected_substring is not None: + assert expected_substring in captured.out + + +@pytest.mark.parametrize( + ("window_exprs", "expected_columns"), + [ + pytest.param( + lambda: [ + f.row_number(partition_by=[column("b")], order_by=[column("a")]).alias( + "rn" + ), + ], + {"rn": [1, 2, 1]}, + id="single window expression", + ), + pytest.param( + lambda: [ + f.row_number(partition_by=[column("b")], order_by=[column("a")]).alias( + "rn" + ), + f.rank(partition_by=[column("b")], order_by=[column("a")]).alias("rnk"), + ], + {"rn": [1, 2, 1], "rnk": [1, 2, 1]}, + id="multiple window expressions", + ), + ], +) +def test_window(window_exprs, expected_columns): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "x", "y"]}) + result = ( + df.window(*window_exprs()).sort(column("a").sort(ascending=True)).collect()[0] + ) + for col_name, expected_values in expected_columns.items(): + assert col_name in result.schema.names + assert ( + result.column(result.schema.get_field_index(col_name)).to_pylist() + == expected_values + ) + + +@pytest.mark.parametrize( + ("input_data", "recursions", "expected_a"), + [ + pytest.param( + {"a": [[1, 2], [3]], "b": ["x", "y"]}, + None, + [1, 2, 3], + id="basic unnest without recursions", + ), + pytest.param( + {"a": [[1, 2], [3]], "b": ["x", "y"]}, + [("a", "a", 1)], + [1, 2, 3], + id="explicit depth 1 matches basic unnest", + ), + pytest.param( + {"a": [[[1, 2], [3]], [[4]]], "b": ["x", "y"]}, + [("a", "a", 1)], + [[1, 2], [3], [4]], + id="depth 1 on nested lists keeps inner lists", + ), + pytest.param( + {"a": [[[1, 2], [3]], [[4]]], "b": ["x", "y"]}, + [("a", "a", 2)], + [1, 2, 3, 4], + id="depth 2 fully flattens nested lists", + ), + ], +) +def test_unnest_columns_with_recursions(input_data, recursions, expected_a): + ctx = SessionContext() + df = ctx.from_pydict(input_data) + kwargs = {} + if recursions is not None: + kwargs["recursions"] = recursions + result = df.unnest_columns("a", **kwargs).collect()[0] + assert result.column(0).to_pylist() == expected_a