Skip to content

Commit 73a7cee

Browse files
committed
feat(gooddata-pandas): Arrow IPC execution path
Add a parallel Arrow IPC execution path to DataFrameFactory and SeriesFactory that fetches results via the binary endpoint instead of JSON pagination: - arrow_convertor: pa.Table -> DataFrame conversion with label_overrides, grand_totals reordering, column_totals_indexes, primary_labels resolution, and metric field index helper - dataframe: for_exec_def_arrow(), for_arrow_table(), for_exec_result_id Arrow branch; Arrow path wired through for_visualization(), for_created_visualization() - series: use_arrow=True on indexed() / not_indexed() - ArrowConfig holds conversion params (self_destruct, types_mapper, custom_mapping); use_arrow is a dedicated DataFrameFactory.__init__ parameter risk: nonprod
1 parent 7dddd3a commit 73a7cee

6 files changed

Lines changed: 514 additions & 83 deletions

File tree

packages/gooddata-pandas/src/gooddata_pandas/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# (C) 2021 GoodData Corporation
22

33
from gooddata_pandas._version import __version__
4-
from gooddata_pandas.arrow_types import TypesMapper
4+
from gooddata_pandas.arrow_types import ArrowConfig, TypesMapper
55

66
try:
77
from gooddata_pandas.arrow_convertor import convert_arrow_table_to_dataframe

packages/gooddata-pandas/src/gooddata_pandas/arrow_convertor.py

Lines changed: 223 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,28 @@
4343
_REQUIRED_SCHEMA_KEYS = (_META_XTAB, _META_MODEL, _META_VIEW)
4444

4545

46+
def build_metric_field_index(table: pa.Table) -> dict[int, str]:
47+
"""Return {metric_dimension_index: arrow_field_name} from the table schema.
48+
49+
Scans fields whose names start with the ``metric_group_`` prefix and reads
50+
the ``gdc["index"]`` value from their field-level metadata. The resulting
51+
dict maps the zero-based metric dimension index to the Arrow field name,
52+
which is needed to look up the correct column when the caller knows only the
53+
position of a metric in the execution definition.
54+
"""
55+
result: dict[int, str] = {}
56+
for field in table.schema:
57+
if field.name.startswith(_FIELD_METRIC_GROUP) and field.metadata and b"gdc" in field.metadata:
58+
gdc = json.loads(field.metadata[b"gdc"])
59+
if "index" not in gdc:
60+
raise ValueError(
61+
f"Metric field {field.name!r} 'gdc' metadata is missing required key 'index'. "
62+
"The Arrow table must originate from the GoodData /binary execution endpoint."
63+
)
64+
result[gdc["index"]] = field.name
65+
return result
66+
67+
4668
def _parse_schema_metadata(table: pa.Table) -> dict:
4769
"""
4870
Decode and return all GoodData schema metadata keys from an Arrow table.
@@ -55,7 +77,9 @@ def _parse_schema_metadata(table: pa.Table) -> dict:
5577
raise ValueError(
5678
"Arrow table has no schema metadata. Expected GoodData metadata keys: " + ", ".join(_REQUIRED_SCHEMA_KEYS)
5779
)
58-
schema_meta = {k.decode(): json.loads(v) for k, v in table.schema.metadata.items()}
80+
schema_meta = {
81+
k.decode(): json.loads(v) for k, v in table.schema.metadata.items() if k.decode() in _REQUIRED_SCHEMA_KEYS
82+
}
5983
missing = [k for k in _REQUIRED_SCHEMA_KEYS if k not in schema_meta]
6084
if missing:
6185
raise ValueError(
@@ -85,7 +109,18 @@ def _label_ref_to_id_map(xtab_meta: dict) -> dict[str, str]:
85109
return {ref: info["labelId"] for ref, info in xtab_meta["labelMetadata"].items()}
86110

87111

88-
def _label_title(label_id: str, model_meta: dict) -> str | None:
112+
def _get_computed_shape(xtab_meta: dict) -> dict:
113+
"""Return computedShape from xtab metadata, raising ValueError when absent."""
114+
computed_shape = xtab_meta.get("computedShape")
115+
if computed_shape is None:
116+
raise ValueError(
117+
"Arrow table xtab metadata is missing required key 'computedShape'. "
118+
"The table must originate from the GoodData /binary execution endpoint."
119+
)
120+
return computed_shape
121+
122+
123+
def _label_title(label_id: str, model_meta: dict) -> str:
89124
"""Get the display title for a label from model metadata."""
90125
info = model_meta["labels"].get(label_id, {})
91126
return info.get("labelTitle") or info.get("attributeTitle") or label_id
@@ -99,18 +134,34 @@ def _metric_title(metric_idx: int, model_meta: dict, xtab_meta: dict) -> str:
99134
otherwise fall back to the local identifier.
100135
"""
101136
local_ids = model_meta["requestedShape"]["metrics"]
137+
if metric_idx >= len(local_ids):
138+
raise ValueError(
139+
f"metric_idx {metric_idx} is out of range for requestedShape.metrics "
140+
f"(length {len(local_ids)}). The Arrow schema metadata may be inconsistent "
141+
"with the model metadata."
142+
)
102143
local_id = local_ids[metric_idx]
103144
info = model_meta["metrics"].get(local_id, {})
104145
return info.get("title") or local_id
105146

106147

148+
def _apply_label_override(base_title: str, local_id: str, overrides: dict) -> str:
149+
"""
150+
Return the override title for local_id when present, otherwise base_title.
151+
152+
overrides is either label_overrides["labels"] or label_overrides["metrics"].
153+
"""
154+
return overrides.get(local_id, {}).get("title", base_title)
155+
156+
107157
def _build_inline_index(
108158
table: pa.Table,
109159
row_label_refs: list[str],
110160
label_ref_to_id: dict[str, str],
111161
model_meta: dict,
112162
xtab_meta: dict,
113163
resolved_mapper: Callable | None = None,
164+
label_overrides: dict | None = None,
114165
) -> pandas.Index | None:
115166
"""
116167
Build the pandas index from Arrow row attribute columns.
@@ -175,7 +226,8 @@ def _build_inline_index(
175226
processed.append(v)
176227
arrays.append(processed)
177228

178-
names = [_label_title(lid, model_meta) for lid in col_ids]
229+
attr_overrides = (label_overrides or {}).get("labels", {})
230+
names = [_apply_label_override(_label_title(lid, model_meta), lid, attr_overrides) for lid in col_ids]
179231

180232
# Apply resolved_mapper to the string arrays if provided.
181233
string_dtype = resolved_mapper(pa.string()) if resolved_mapper else None
@@ -194,6 +246,7 @@ def _build_field_index(
194246
label_ref_to_id: dict[str, str],
195247
model_meta: dict,
196248
xtab_meta: dict,
249+
label_overrides: dict | None = None,
197250
) -> pandas.Index:
198251
"""
199252
Build the pandas index from Arrow field (column) metadata.
@@ -217,19 +270,43 @@ def _build_field_index(
217270
"""
218271
n_col_labels = len(col_label_refs)
219272
col_label_ids = [label_ref_to_id[ref] for ref in col_label_refs]
273+
attr_overrides = (label_overrides or {}).get("labels", {})
274+
metric_overrides = (label_overrides or {}).get("metrics", {})
275+
metric_local_ids: list[str] = model_meta.get("requestedShape", {}).get("metrics", [])
220276

221277
tuples: list = []
222278
for field in data_fields:
279+
if not field.metadata or b"gdc" not in field.metadata:
280+
raise ValueError(
281+
f"Data field {field.name!r} is missing required 'gdc' field metadata. "
282+
"The Arrow table must originate from the GoodData /binary execution endpoint."
283+
)
223284
gdc = json.loads(field.metadata[b"gdc"])
224285
label_values: list = list(gdc.get("label_values", []))
225286

226-
if gdc["type"] == _GDC_TYPE_METRIC:
227-
m_title = _metric_title(gdc["index"], model_meta, xtab_meta)
287+
gdc_type = gdc.get("type")
288+
if gdc_type is None:
289+
raise ValueError(
290+
f"Data field {field.name!r} 'gdc' metadata is missing required key 'type'. "
291+
"The Arrow table must originate from the GoodData /binary execution endpoint."
292+
)
293+
if gdc_type == _GDC_TYPE_METRIC:
294+
if "index" not in gdc:
295+
raise ValueError(f"Metric field {field.name!r} 'gdc' metadata is missing required key 'index'.")
296+
metric_idx = gdc["index"]
228297
else: # "total"
229-
m_title = _metric_title(gdc["metric_index"], model_meta, xtab_meta)
298+
for _key in ("metric_index", "agg_function"):
299+
if _key not in gdc:
300+
raise ValueError(f"Total field {field.name!r} 'gdc' metadata is missing required key {_key!r}.")
301+
metric_idx = gdc["metric_index"]
230302
agg = gdc["agg_function"].upper()
231-
# Pad remaining column-label levels with the aggregation name
232-
label_values = label_values + [agg] * (n_col_labels - len(label_values))
303+
# Truncate to n_col_labels first (guard against malformed schema),
304+
# then pad remaining column-label levels with the aggregation name.
305+
label_values = label_values[:n_col_labels] + [agg] * (n_col_labels - min(len(label_values), n_col_labels))
306+
307+
m_title = _metric_title(metric_idx, model_meta, xtab_meta)
308+
if metric_overrides and metric_idx < len(metric_local_ids):
309+
m_title = _apply_label_override(m_title, metric_local_ids[metric_idx], metric_overrides)
233310

234311
if n_col_labels == 0:
235312
tuples.append(m_title)
@@ -240,7 +317,9 @@ def _build_field_index(
240317
# No column attribute labels — just a flat Index of metric names
241318
return pandas.Index(tuples, name=None)
242319

243-
names = [_label_title(lid, model_meta) for lid in col_label_ids] + [None]
320+
names = [_apply_label_override(_label_title(lid, model_meta), lid, attr_overrides) for lid in col_label_ids] + [
321+
None
322+
]
244323
return pandas.MultiIndex.from_tuples(tuples, names=names)
245324

246325

@@ -256,6 +335,114 @@ def _wrap_for_columns(idx: pandas.Index | None) -> pandas.Index | None:
256335
return pandas.Index([(v,) for v in idx])
257336

258337

338+
def reorder_grand_totals(
339+
table: pa.Table,
340+
grand_totals_position: str | None,
341+
) -> pa.Table:
342+
"""
343+
Move grand total rows (``__row_type == 2``) to the top or leave them at the bottom.
344+
345+
Subtotal rows (``__row_type == 1``) are not repositioned — they remain adjacent to
346+
their data group. No-op when ``grand_totals_position`` is not ``"top"`` or
347+
``"pinnedTop"``, or when the table has no ``__row_type`` column (no row totals).
348+
"""
349+
if grand_totals_position not in ("top", "pinnedTop"):
350+
return table
351+
if _COL_ROW_TYPE not in table.schema.names:
352+
return table
353+
row_type_vals = table.column(_COL_ROW_TYPE).to_pylist()
354+
grand_mask = pa.array([v == 2 for v in row_type_vals], type=pa.bool_())
355+
grand_total_rows = table.filter(grand_mask)
356+
if grand_total_rows.num_rows == 0:
357+
return table
358+
data_and_sub_rows = table.filter(pa.array([v != 2 for v in row_type_vals], type=pa.bool_()))
359+
return pa.concat_tables([grand_total_rows, data_and_sub_rows])
360+
361+
362+
def compute_column_totals_indexes(table: pa.Table, execution_dims: list) -> list[list[int]]:
363+
"""
364+
Compute column_totals_indexes compatible with DataFrameMetadata from an Arrow table.
365+
366+
Returns a list with one inner list per header level in the output-column dimension,
367+
matching the JSON-path DataFrameMetadata.column_totals_indexes format. Each inner
368+
list contains the zero-based positions of total columns at that header level.
369+
370+
Non-transposed case (Arrow fields = output columns):
371+
grand_total_* fields at position k are a column total at attribute level j when
372+
``j >= len(gdc["label_values"])`` (the field aggregates that level and beyond).
373+
374+
Transposed / metrics-only case (Arrow fields = output rows):
375+
grand_total_* fields become output rows and are already covered by
376+
compute_row_totals_indexes. Returns [] in that case.
377+
"""
378+
schema_meta = _parse_schema_metadata(table)
379+
xtab_meta = schema_meta[_META_XTAB]
380+
is_transposed = schema_meta[_META_VIEW]["isTransposed"]
381+
382+
computed_shape = _get_computed_shape(xtab_meta)
383+
row_label_refs: list[str] = computed_shape.get("rows", [])
384+
col_label_refs: list[str] = computed_shape.get("cols", [])
385+
386+
# In the transposed / metrics-only layout grand_total_* fields become output rows
387+
# (handled by compute_row_totals_indexes) → nothing to do here.
388+
if is_transposed or not row_label_refs:
389+
return []
390+
391+
all_data_fields = [
392+
f for f in table.schema if f.name.startswith(_FIELD_METRIC_GROUP) or f.name.startswith(_FIELD_GRAND_TOTAL)
393+
]
394+
395+
# No grand_total_* fields → no column totals.
396+
if not any(f.name.startswith(_FIELD_GRAND_TOTAL) for f in table.schema):
397+
return []
398+
399+
label_ref_to_id = _label_ref_to_id_map(xtab_meta)
400+
id_to_ref = {v: k for k, v in label_ref_to_id.items()}
401+
col_ref_set = set(col_label_refs)
402+
403+
def _label_ids_in_dim(dim: dict) -> set:
404+
return {h["attributeHeader"]["label"]["id"] for h in dim.get("headers", []) if "attributeHeader" in h}
405+
406+
# Find the column execution dimension.
407+
if col_label_refs:
408+
col_ref_label_ids = {label_ref_to_id[r] for r in col_label_refs}
409+
col_dim = next(
410+
(dim for dim in execution_dims if col_ref_label_ids <= _label_ids_in_dim(dim)),
411+
{},
412+
)
413+
else:
414+
col_dim = next(
415+
(dim for dim in execution_dims if any("measureGroupHeaders" in h for h in dim.get("headers", []))),
416+
{},
417+
)
418+
419+
# Pre-parse gdc metadata once to avoid O(N×M) json.loads calls in the header loop.
420+
parsed_gdcs: list[dict | None] = [
421+
json.loads(f.metadata[b"gdc"]) if f.metadata and b"gdc" in f.metadata else None for f in all_data_fields
422+
]
423+
424+
result: list[list[int]] = []
425+
attr_level = 0
426+
for header in col_dim.get("headers", []):
427+
if "measureGroupHeaders" in header:
428+
result.append([])
429+
else:
430+
label_id = header["attributeHeader"]["label"]["id"]
431+
ref = id_to_ref.get(label_id)
432+
if ref is None or ref not in col_ref_set:
433+
result.append([])
434+
else:
435+
j = attr_level
436+
total_idxs = [
437+
k
438+
for k, gdc in enumerate(parsed_gdcs)
439+
if gdc is not None and gdc["type"] == _GDC_TYPE_TOTAL and j >= len(gdc.get("label_values", []))
440+
]
441+
result.append(total_idxs)
442+
attr_level += 1
443+
return result
444+
445+
259446
def compute_row_totals_indexes(table: pa.Table, execution_dims: list) -> list[list[int]]:
260447
"""
261448
Compute row_totals_indexes compatible with DataFrameMetadata from an Arrow table.
@@ -276,8 +463,9 @@ def compute_row_totals_indexes(table: pa.Table, execution_dims: list) -> list[li
276463
xtab_meta = schema_meta[_META_XTAB]
277464
is_transposed = schema_meta[_META_VIEW]["isTransposed"]
278465

279-
row_label_refs: list[str] = xtab_meta["computedShape"]["rows"]
280-
col_label_refs: list[str] = xtab_meta["computedShape"]["cols"]
466+
computed_shape = _get_computed_shape(xtab_meta)
467+
row_label_refs: list[str] = computed_shape.get("rows", [])
468+
col_label_refs: list[str] = computed_shape.get("cols", [])
281469
label_ref_to_id = _label_ref_to_id_map(xtab_meta)
282470
id_to_ref = {v: k for k, v in label_ref_to_id.items()}
283471

@@ -296,13 +484,13 @@ def _label_ids_in_dim(dim: dict) -> set:
296484
ref_label_ids = {label_ref_to_id[r] for r in output_row_refs}
297485
row_dim = next(
298486
(dim for dim in execution_dims if ref_label_ids <= _label_ids_in_dim(dim)),
299-
execution_dims[0] if execution_dims else {},
487+
{},
300488
)
301489
else:
302490
# Metrics-only: the dimension containing measureGroupHeaders is the output-row dim.
303491
row_dim = next(
304492
(dim for dim in execution_dims if any("measureGroupHeaders" in h for h in dim.get("headers", []))),
305-
execution_dims[0] if execution_dims else {},
493+
{},
306494
)
307495

308496
if use_field_rows:
@@ -311,6 +499,11 @@ def _label_ids_in_dim(dim: dict) -> set:
311499
f for f in table.schema if f.name.startswith(_FIELD_METRIC_GROUP) or f.name.startswith(_FIELD_GRAND_TOTAL)
312500
]
313501

502+
# Pre-parse gdc metadata once to avoid O(N×M) json.loads calls in the header loop.
503+
parsed_gdcs: list[dict | None] = [
504+
json.loads(f.metadata[b"gdc"]) if f.metadata and b"gdc" in f.metadata else None for f in all_data_fields
505+
]
506+
314507
result: list[list[int]] = []
315508
attr_level = 0 # position within output_row_refs
316509
for header in row_dim.get("headers", []):
@@ -325,9 +518,8 @@ def _label_ids_in_dim(dim: dict) -> set:
325518
j = attr_level
326519
total_idxs = [
327520
k
328-
for k, f in enumerate(all_data_fields)
329-
if (gdc := json.loads(f.metadata[b"gdc"]))["type"] == _GDC_TYPE_TOTAL
330-
and j >= len(gdc.get("label_values", []))
521+
for k, gdc in enumerate(parsed_gdcs)
522+
if gdc is not None and gdc["type"] == _GDC_TYPE_TOTAL and j >= len(gdc.get("label_values", []))
331523
]
332524
result.append(total_idxs)
333525
attr_level += 1
@@ -416,6 +608,8 @@ def _compute_primary_labels_from_fields(
416608
return result
417609

418610
for field in all_data_fields:
611+
if not field.metadata or b"gdc" not in field.metadata:
612+
continue
419613
gdc = json.loads(field.metadata[b"gdc"])
420614
if gdc["type"] != _GDC_TYPE_METRIC:
421615
continue
@@ -450,8 +644,9 @@ def compute_primary_labels(
450644
is_transposed = schema_meta[_META_VIEW]["isTransposed"]
451645

452646
label_ref_to_id = _label_ref_to_id_map(xtab_meta)
453-
row_label_refs: list[str] = xtab_meta["computedShape"]["rows"]
454-
col_label_refs: list[str] = xtab_meta["computedShape"]["cols"]
647+
computed_shape = _get_computed_shape(xtab_meta)
648+
row_label_refs: list[str] = computed_shape.get("rows", [])
649+
col_label_refs: list[str] = computed_shape.get("cols", [])
455650

456651
all_data_fields = [
457652
f for f in table.schema if f.name.startswith(_FIELD_METRIC_GROUP) or f.name.startswith(_FIELD_GRAND_TOTAL)
@@ -474,6 +669,7 @@ def convert_arrow_table_to_dataframe(
474669
self_destruct: bool = False,
475670
types_mapper: TypesMapper = TypesMapper.DEFAULT,
476671
custom_mapping: dict | None = None,
672+
label_overrides: dict | None = None,
477673
) -> pandas.DataFrame:
478674
"""
479675
Convert a pyarrow Table returned by the GoodData /binary execution endpoint
@@ -547,11 +743,16 @@ def convert_arrow_table_to_dataframe(
547743

548744
# computedShape.rows → label refs for Arrow row attribute columns
549745
# computedShape.cols → label refs encoded in field metadata
550-
row_label_refs: list[str] = xtab_meta["computedShape"]["rows"]
551-
col_label_refs: list[str] = xtab_meta["computedShape"]["cols"]
746+
computed_shape = _get_computed_shape(xtab_meta)
747+
row_label_refs: list[str] = computed_shape.get("rows", [])
748+
col_label_refs: list[str] = computed_shape.get("cols", [])
552749

553-
inline_index = _build_inline_index(table, row_label_refs, label_ref_to_id, model_meta, xtab_meta, resolved_mapper)
554-
field_index = _build_field_index(all_data_fields, col_label_refs, label_ref_to_id, model_meta, xtab_meta)
750+
inline_index = _build_inline_index(
751+
table, row_label_refs, label_ref_to_id, model_meta, xtab_meta, resolved_mapper, label_overrides
752+
)
753+
field_index = _build_field_index(
754+
all_data_fields, col_label_refs, label_ref_to_id, model_meta, xtab_meta, label_overrides
755+
)
555756

556757
# When there are no row attribute labels (inline_index is None) the server
557758
# packs everything into the field dimension; always use field_index as rows.

0 commit comments

Comments
 (0)