Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d892f23
feat: replace fixed MetricDataPoint fields with dynamic tag HashMap
mattmkim Mar 18, 2026
4e928fe
feat: replace ParquetField enum with constants and dynamic validation
mattmkim Mar 18, 2026
d8d71ed
feat: derive sort order and bloom filters from batch schema
mattmkim Mar 18, 2026
ad115bf
feat: union schema accumulation and schema-agnostic ingest validation
mattmkim Mar 18, 2026
d0a995e
feat: dynamic column lookup in split writer
mattmkim Mar 18, 2026
35c3942
feat: remove ParquetSchema dependency from indexing actors
mattmkim Mar 18, 2026
33c4070
refactor: deduplicate test batch helpers
mattmkim Mar 18, 2026
7a5979f
lint
mattmkim Mar 30, 2026
440631b
Merge branch 'main' into matthew.kim/metrics-wide-schema
mattmkim Mar 31, 2026
5eae799
Merge branch 'main' into matthew.kim/metrics-wide-schema
mattmkim Mar 31, 2026
5b2304c
feat(31): sort schema foundation — proto, parser, display, validation…
g-talbot Mar 31, 2026
4d42fd9
fix: rustdoc link errors — use backticks for private items
g-talbot Apr 1, 2026
b6eb595
feat(31): compaction metadata types — extend split metadata, postgres…
g-talbot Mar 31, 2026
759c2ca
Update quickwit/quickwit-parquet-engine/src/table_config.rs
g-talbot Apr 7, 2026
6454f1d
Update quickwit/quickwit-parquet-engine/src/table_config.rs
g-talbot Apr 7, 2026
4d8b6b2
Merge quickwit-oss/main into gtt/phase-31-sort-schema
g-talbot Apr 8, 2026
1e67900
Merge branch 'gtt/phase-31-sort-schema' into gtt/phase-31-compaction-…
g-talbot Apr 8, 2026
4481bef
style: rustfmt long match arm in default_sort_fields
g-talbot Apr 8, 2026
64c5d5f
Merge branch 'gtt/phase-31-sort-schema' into gtt/phase-31-compaction-…
g-talbot Apr 8, 2026
93e1cc7
fix: make parquet_file field backward-compatible in MetricsSplitMetadata
g-talbot Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct IndexingSchedulerState {
///
/// Scheduling executes the following steps:
/// 1. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See
/// [`build_physical_indexing_plan`] for the implementation details.
/// `build_physical_indexing_plan` for the implementation details.
/// 2. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks
/// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any
/// errors (network) happening in this step are ignored. The scheduler runs a control loop that
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct IndexingSchedulerState {
/// Concretely, it will send the faulty nodes of the plan they are supposed to follow.
//
/// Finally, in order to give the time for each indexer to run their indexing tasks, the control
/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
/// plane will wait at least `MIN_DURATION_BETWEEN_SCHEDULING` before comparing the desired
/// plan with the running plan.
pub struct IndexingScheduler {
cluster_id: String,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,8 @@ impl IndexingPipeline {
.spawn(parquet_uploader);

// ParquetPackager
let parquet_schema = quickwit_parquet_engine::schema::ParquetSchema::new();
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default();
let split_writer = quickwit_parquet_engine::storage::ParquetSplitWriter::new(
parquet_schema,
writer_config,
self.params.indexing_directory.path(),
);
Expand Down
221 changes: 7 additions & 214 deletions quickwit/quickwit-indexing/src/actors/parquet_doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
use quickwit_parquet_engine::schema::ParquetSchema;
use quickwit_proto::types::{IndexId, SourceId};
use serde::Serialize;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -143,8 +142,7 @@ impl ParquetDocProcessor {
source_id: SourceId,
indexer_mailbox: Mailbox<ParquetIndexer>,
) -> Self {
let schema = ParquetSchema::new();
let processor = ParquetIngestProcessor::new(schema);
let processor = ParquetIngestProcessor;
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());

info!(
Expand Down Expand Up @@ -306,7 +304,7 @@ impl Handler<RawDocBatch> for ParquetDocProcessor {
// forever.
if !checkpoint_forwarded && !checkpoint_delta.is_empty() {
let empty_batch =
RecordBatch::new_empty(self.processor.schema().arrow_schema().clone());
RecordBatch::new_empty(std::sync::Arc::new(arrow::datatypes::Schema::empty()));
let processed_batch =
ProcessedParquetBatch::new(empty_batch, checkpoint_delta, force_commit);
ctx.send_message(&self.indexer_mailbox, processed_batch)
Expand Down Expand Up @@ -399,14 +397,8 @@ mod tests {

#[tokio::test]
async fn test_metrics_doc_processor_valid_arrow_ipc() {
use std::sync::Arc as StdArc;
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
let universe = Universe::with_accelerated_time();

let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
Expand All @@ -419,103 +411,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch matching the metrics schema
let schema = ParquetSchema::new();
let num_rows = 3;

// Helper to create dictionary arrays
fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(vec![100u64, 101u64, 102u64]));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let value: ArrayRef = StdArc::new(Float64Array::from(vec![42.0, 43.0, 44.0]));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(3, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with the IPC bytes
Expand Down Expand Up @@ -624,13 +520,8 @@ mod tests {
async fn test_metrics_doc_processor_with_indexer() {
use std::sync::Arc as StdArc;

use arrow::array::{
ArrayRef, BinaryViewArray, DictionaryArray, Float64Array, Int32Array, StringArray,
StructArray, UInt8Array, UInt64Array,
};
use arrow::datatypes::{DataType, Field, Int32Type};
use arrow::record_batch::RecordBatch;
use quickwit_parquet_engine::storage::{ParquetSplitWriter, ParquetWriterConfig};
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
use quickwit_proto::metastore::MockMetastoreService;
use quickwit_storage::RamStorage;

Expand All @@ -657,9 +548,8 @@ mod tests {
let (uploader_mailbox, _uploader_handle) = universe.spawn_builder().spawn(uploader);

// Create ParquetPackager
let parquet_schema = ParquetSchema::new();
let writer_config = ParquetWriterConfig::default();
let split_writer = ParquetSplitWriter::new(parquet_schema, writer_config, temp_dir.path());
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path());
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);

Expand All @@ -681,104 +571,7 @@ mod tests {
let (metrics_doc_processor_mailbox, metrics_doc_processor_handle) =
universe.spawn_builder().spawn(metrics_doc_processor);

// Create a test batch
let schema = ParquetSchema::new();
let num_rows = 5;

fn create_dict_array(values: &[&str]) -> ArrayRef {
let keys: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let string_array = StringArray::from(values.to_vec());
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

fn create_nullable_dict_array(values: &[Option<&str>]) -> ArrayRef {
let keys: Vec<Option<i32>> = values
.iter()
.enumerate()
.map(|(i, v)| v.map(|_| i as i32))
.collect();
let string_values: Vec<&str> = values.iter().filter_map(|v| *v).collect();
let string_array = StringArray::from(string_values);
StdArc::new(
DictionaryArray::<Int32Type>::try_new(
Int32Array::from(keys),
StdArc::new(string_array),
)
.unwrap(),
)
}

let metric_name: ArrayRef = create_dict_array(&vec!["cpu.usage"; num_rows]);
let metric_type: ArrayRef = StdArc::new(UInt8Array::from(vec![0u8; num_rows]));
let metric_unit: ArrayRef = StdArc::new(StringArray::from(vec![Some("bytes"); num_rows]));
let timestamps: Vec<u64> = (0..num_rows).map(|i| 100 + i as u64).collect();
let timestamp_secs: ArrayRef = StdArc::new(UInt64Array::from(timestamps));
let start_timestamp_secs: ArrayRef =
StdArc::new(UInt64Array::from(vec![None::<u64>; num_rows]));
let values: Vec<f64> = (0..num_rows).map(|i| 42.0 + i as f64).collect();
let value: ArrayRef = StdArc::new(Float64Array::from(values));
let tag_service: ArrayRef = create_nullable_dict_array(&vec![Some("web"); num_rows]);
let tag_env: ArrayRef = create_nullable_dict_array(&vec![Some("prod"); num_rows]);
let tag_datacenter: ArrayRef =
create_nullable_dict_array(&vec![Some("us-east-1"); num_rows]);
let tag_region: ArrayRef = create_nullable_dict_array(&vec![None; num_rows]);
let tag_host: ArrayRef = create_nullable_dict_array(&vec![Some("host-001"); num_rows]);

// Create empty Variant (Struct with metadata and value BinaryView fields)
let metadata_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let value_array = StdArc::new(BinaryViewArray::from(vec![b"" as &[u8]; num_rows]));
let attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array.clone() as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array.clone() as ArrayRef,
),
]));

let service_name: ArrayRef = create_dict_array(&vec!["my-service"; num_rows]);

let resource_attributes: ArrayRef = StdArc::new(StructArray::from(vec![
(
StdArc::new(Field::new("metadata", DataType::BinaryView, false)),
metadata_array as ArrayRef,
),
(
StdArc::new(Field::new("value", DataType::BinaryView, false)),
value_array as ArrayRef,
),
]));

let batch = RecordBatch::try_new(
schema.arrow_schema().clone(),
vec![
metric_name,
metric_type,
metric_unit,
timestamp_secs,
start_timestamp_secs,
value,
tag_service,
tag_env,
tag_datacenter,
tag_region,
tag_host,
attributes,
service_name,
resource_attributes,
],
)
.unwrap();

// Serialize to Arrow IPC
let batch = create_test_batch_with_tags(5, &["service"]);
let ipc_bytes = record_batch_to_ipc(&batch).unwrap();

// Create RawDocBatch with force_commit to trigger split production
Expand Down
Loading
Loading