Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_execution::RelValue;
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
Expand Down Expand Up @@ -3307,13 +3307,9 @@ impl ModuleHost {
let table_name = table_name.into();
let delta_tx = DeltaTx::from(tx);
let (rows, _, metrics) = if returns_view_table && num_private_cols > 0 {
let optimized = optimized
.into_iter()
.map(|plan| ViewProject::new(plan, num_cols, num_private_cols))
.collect::<Vec<_>>();
execute_plan_for_view::<F>(&optimized, &delta_tx, rlb_pool)
execute_plan_for_view::<F>(optimized.iter(), num_cols, num_private_cols, &delta_tx, rlb_pool)
} else {
execute_plan::<F>(&optimized, &delta_tx, rlb_pool)
execute_plan::<F>(optimized.iter(), &delta_tx, rlb_pool)
}
.context("One-off queries are not allowed to modify the database")?;

Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use spacetimedb_datastore::error::{DatastoreError, ViewError};
use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload};
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, Program};
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_lib::db::raw_def::v9::{Lifecycle, ViewResultHeader};
use spacetimedb_lib::de::DeserializeSeed;
Expand Down Expand Up @@ -188,11 +187,10 @@ pub(crate) fn run_query_for_view(

// Validate shape and disallow views-on-views.
for plan in &plans {
let phys = plan.optimized_physical_plan();
let Some(source_schema) = phys.return_table() else {
let Some(source_schema) = plan.return_table() else {
bail!("query does not return plain table rows");
};
if phys.reads_from_view(true) || phys.reads_from_view(false) {
if plan.reads_from_view(true) || plan.reads_from_view(false) {
bail!("view definition cannot read from other views");
}
if source_schema.row_type != *expected_row_type {
Expand All @@ -215,8 +213,7 @@ pub(crate) fn run_query_for_view(
tx.record_table_scan(&op, table_id);
}

let pipelined = PipelinedProject::from(plan.optimized_physical_plan().clone());
pipelined.execute(&*tx, &mut metrics, &mut |row| {
plan.base_plan().execute(&*tx, &mut metrics, &mut |row| {
rows.push(row.to_product_value());
Ok(())
})?;
Expand Down
102 changes: 5 additions & 97 deletions crates/core/src/subscription/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,5 @@
use spacetimedb_physical_plan::plan::PhysicalPlan;
use spacetimedb_schema::{schema::TableSchema, table_name::TableName};
use std::sync::Arc;

/// Scan strategy types for subscription queries
#[derive(Debug, Clone, Copy)]
enum ScanStrategy {
/// Full table scan - no indexes used
Sequential,
/// Uses index but requires post-filtering on non-indexed columns
IndexedWithFilter,
/// Fully indexed - no post-filtering needed
FullyIndexed,
/// Mixed strategy (combination of index and table scans)
Mixed,
/// Unknown/other strategy
Unknown,
}
use spacetimedb_schema::table_name::TableName;
use spacetimedb_subscription::SubscriptionPlanMetrics;

/// Metrics data for a single subscription query execution
#[derive(Debug)]
Expand All @@ -27,93 +11,17 @@ pub struct QueryMetrics {
pub execution_time_micros: u64,
}

impl std::fmt::Display for ScanStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sequential => write!(f, "sequential"),
Self::IndexedWithFilter => write!(f, "indexed_with_filter"),
Self::FullyIndexed => write!(f, "fully_indexed"),
Self::Mixed => write!(f, "mixed"),
Self::Unknown => write!(f, "unknown"),
}
}
}

/// Recursively extracts column names from filter expressions
fn extract_columns(
expr: &spacetimedb_physical_plan::plan::PhysicalExpr,
schema: Option<&Arc<TableSchema>>,
columns: &mut Vec<String>,
) {
use spacetimedb_physical_plan::plan::PhysicalExpr;

match expr {
PhysicalExpr::Field(tuple_field) => {
let col_name = schema
.and_then(|s| s.columns.get(tuple_field.field_pos))
.map(|col| col.col_name.to_string())
.unwrap_or_else(|| format!("col_{}", tuple_field.field_pos));
columns.push(col_name);
}
PhysicalExpr::BinOp(_, lhs, rhs) => {
extract_columns(lhs, schema, columns);
extract_columns(rhs, schema, columns);
}
PhysicalExpr::LogOp(_, exprs) => {
for expr in exprs {
extract_columns(expr, schema, columns);
}
}
PhysicalExpr::Value(_) => {}
}
}

/// Analyzes subscription scan strategy and creates QueryMetrics
pub fn get_query_metrics(
table_name: TableName,
plan: &PhysicalPlan,
plan_metrics: &SubscriptionPlanMetrics,
rows_scanned: u64,
execution_time_micros: u64,
) -> QueryMetrics {
let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..)));
let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..)));
let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..)));

let strategy = if has_table_scan && has_index_scan {
ScanStrategy::Mixed
} else if has_table_scan {
ScanStrategy::Sequential
} else if has_index_scan && has_post_filter {
ScanStrategy::IndexedWithFilter
} else if has_index_scan {
ScanStrategy::FullyIndexed
} else {
ScanStrategy::Unknown
};

// Extract the schema from the plan
let mut schema: Option<Arc<TableSchema>> = None;
plan.visit(&mut |p| match p {
PhysicalPlan::TableScan(scan, _) => {
schema = Some(scan.schema.clone());
}
PhysicalPlan::IxScan(scan, _) => {
schema = Some(scan.schema.clone());
}
_ => {}
});

let mut columns = Vec::new();
plan.visit(&mut |p| {
if let PhysicalPlan::Filter(_, expr) = p {
extract_columns(expr, schema.as_ref(), &mut columns);
}
});

QueryMetrics {
scan_type: strategy.to_string(),
scan_type: plan_metrics.scan_type().to_owned(),
table_name,
unindexed_columns: columns.join(","),
unindexed_columns: plan_metrics.unindexed_columns().to_owned(),
rows_scanned,
execution_time_micros,
}
Expand Down
Loading
Loading