1414
1515use quickwit_common:: rand:: append_random_suffix;
1616use quickwit_config:: IndexConfig ;
17- use quickwit_parquet_engine:: split:: {
18- MetricsSplitMetadata , MetricsSplitState , SplitId , TimeRange , TAG_ENV , TAG_HOST , TAG_SERVICE ,
19- } ;
17+ use quickwit_parquet_engine:: split:: { MetricsSplitMetadata , MetricsSplitState , SplitId , TimeRange } ;
2018use quickwit_proto:: metastore:: {
2119 CreateIndexRequest , DeleteMetricsSplitsRequest , EntityKind , ListMetricsSplitsRequest ,
2220 MarkMetricsSplitsForDeletionRequest , MetastoreError , PublishMetricsSplitsRequest ,
@@ -32,14 +30,10 @@ use crate::{
3230} ;
3331
3432/// Helper to create a test index and return the actual IndexUid assigned by the metastore.
35- async fn create_test_index (
36- metastore : & mut dyn MetastoreServiceExt ,
37- index_id : & str ,
38- ) -> IndexUid {
33+ async fn create_test_index ( metastore : & mut dyn MetastoreServiceExt , index_id : & str ) -> IndexUid {
3934 let index_uri = format ! ( "ram:///indexes/{index_id}" ) ;
4035 let index_config = IndexConfig :: for_test ( index_id, & index_uri) ;
41- let create_index_request =
42- CreateIndexRequest :: try_from_index_config ( & index_config) . unwrap ( ) ;
36+ let create_index_request = CreateIndexRequest :: try_from_index_config ( & index_config) . unwrap ( ) ;
4337 metastore
4438 . create_index ( create_index_request)
4539 . await
@@ -92,18 +86,15 @@ pub async fn test_metastore_stage_metrics_splits<
9286 let split_1 = build_test_split ( & split_id_1, & index_uid, TimeRange :: new ( 1000 , 2000 ) ) ;
9387 let split_2 = build_test_split ( & split_id_2, & index_uid, TimeRange :: new ( 2000 , 3000 ) ) ;
9488
95- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
96- index_uid. clone ( ) ,
97- & [ split_1, split_2] ,
98- )
99- . unwrap ( ) ;
89+ let request =
90+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_1, split_2] )
91+ . unwrap ( ) ;
10092 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
10193
10294 // Verify both splits are listed in Staged state.
10395 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
10496 . with_split_states ( vec ! [ "Staged" . to_string( ) ] ) ;
105- let list_request =
106- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
97+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
10798 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
10899 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
109100 assert_eq ! ( splits. len( ) , 2 ) ;
@@ -135,11 +126,9 @@ pub async fn test_metastore_stage_metrics_splits_upsert<
135126 . parquet_file ( format ! ( "{split_id}.parquet" ) )
136127 . build ( ) ;
137128
138- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
139- index_uid. clone ( ) ,
140- & [ split_v1] ,
141- )
142- . unwrap ( ) ;
129+ let request =
130+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_v1] )
131+ . unwrap ( ) ;
143132 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
144133
145134 // Stage the same split_id again with 200 rows (upsert).
@@ -153,18 +142,15 @@ pub async fn test_metastore_stage_metrics_splits_upsert<
153142 . parquet_file ( format ! ( "{split_id}.parquet" ) )
154143 . build ( ) ;
155144
156- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
157- index_uid. clone ( ) ,
158- & [ split_v2] ,
159- )
160- . unwrap ( ) ;
145+ let request =
146+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_v2] )
147+ . unwrap ( ) ;
161148 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
162149
163150 // Verify only one split exists and it has the updated num_rows.
164151 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
165152 . with_split_states ( vec ! [ "Staged" . to_string( ) ] ) ;
166- let list_request =
167- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
153+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
168154 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
169155 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
170156 assert_eq ! ( splits. len( ) , 1 ) ;
@@ -186,11 +172,9 @@ pub async fn test_metastore_list_metrics_splits_by_state<
186172 let split_id_2 = format ! ( "{index_id}--split-2" ) ;
187173 let split_1 = build_test_split ( & split_id_1, & index_uid, TimeRange :: new ( 1000 , 2000 ) ) ;
188174 let split_2 = build_test_split ( & split_id_2, & index_uid, TimeRange :: new ( 2000 , 3000 ) ) ;
189- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
190- index_uid. clone ( ) ,
191- & [ split_1, split_2] ,
192- )
193- . unwrap ( ) ;
175+ let request =
176+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_1, split_2] )
177+ . unwrap ( ) ;
194178 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
195179
196180 // Publish split_1 only.
@@ -278,8 +262,7 @@ pub async fn test_metastore_list_metrics_splits_by_time_range<
278262 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
279263 . with_split_states ( vec ! [ "Staged" . to_string( ) ] )
280264 . with_time_range ( 1500 , 3500 ) ;
281- let list_request =
282- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
265+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
283266 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
284267 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
285268 // Should match splits whose time range overlaps [1500, 3500].
@@ -341,8 +324,7 @@ pub async fn test_metastore_list_metrics_splits_by_metric_name<
341324 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
342325 . with_split_states ( vec ! [ "Staged" . to_string( ) ] )
343326 . with_metric_names ( vec ! [ "cpu.usage" . to_string( ) ] ) ;
344- let list_request =
345- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
327+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
346328 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
347329 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
348330 assert_eq ! ( splits. len( ) , 2 ) ;
@@ -408,8 +390,7 @@ pub async fn test_metastore_list_metrics_splits_by_compaction_scope<
408390 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
409391 . with_split_states ( vec ! [ "Staged" . to_string( ) ] )
410392 . with_compaction_scope ( 1700000000 , "metric_name|host|timestamp/V2" ) ;
411- let list_request =
412- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
393+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
413394 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
414395 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
415396 // Only split_1 matches both window_start and sort_fields.
@@ -435,11 +416,9 @@ pub async fn test_metastore_publish_metrics_splits<
435416 let split_2 = build_test_split ( & split_id_2, & index_uid, TimeRange :: new ( 2000 , 3000 ) ) ;
436417
437418 // Stage both.
438- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
439- index_uid. clone ( ) ,
440- & [ split_1, split_2] ,
441- )
442- . unwrap ( ) ;
419+ let request =
420+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_1, split_2] )
421+ . unwrap ( ) ;
443422 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
444423
445424 // Publish both.
@@ -456,8 +435,7 @@ pub async fn test_metastore_publish_metrics_splits<
456435 // Verify they are now Published.
457436 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
458437 . with_split_states ( vec ! [ "Published" . to_string( ) ] ) ;
459- let list_request =
460- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
438+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
461439 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
462440 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
463441 assert_eq ! ( splits. len( ) , 2 ) ;
@@ -469,8 +447,7 @@ pub async fn test_metastore_publish_metrics_splits<
469447 // Verify no Staged splits remain.
470448 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
471449 . with_split_states ( vec ! [ "Staged" . to_string( ) ] ) ;
472- let list_request =
473- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
450+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
474451 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
475452 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
476453 assert_eq ! ( splits. len( ) , 0 ) ;
@@ -516,11 +493,9 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion<
516493 let split_2 = build_test_split ( & split_id_2, & index_uid, TimeRange :: new ( 2000 , 3000 ) ) ;
517494
518495 // Stage and publish.
519- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
520- index_uid. clone ( ) ,
521- & [ split_1, split_2] ,
522- )
523- . unwrap ( ) ;
496+ let request =
497+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_1, split_2] )
498+ . unwrap ( ) ;
524499 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
525500
526501 let publish_request = PublishMetricsSplitsRequest {
@@ -546,8 +521,7 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion<
546521 // Verify split_1 is MarkedForDeletion.
547522 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
548523 . with_split_states ( vec ! [ "MarkedForDeletion" . to_string( ) ] ) ;
549- let list_request =
550- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
524+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
551525 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
552526 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
553527 assert_eq ! ( splits. len( ) , 1 ) ;
@@ -556,8 +530,7 @@ pub async fn test_metastore_mark_metrics_splits_for_deletion<
556530 // Verify split_2 is still Published.
557531 let query = ListMetricsSplitsQuery :: for_index ( index_uid. clone ( ) )
558532 . with_split_states ( vec ! [ "Published" . to_string( ) ] ) ;
559- let list_request =
560- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
533+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
561534 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
562535 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
563536 assert_eq ! ( splits. len( ) , 1 ) ;
@@ -577,11 +550,8 @@ pub async fn test_metastore_delete_metrics_splits<
577550 let split_1 = build_test_split ( & split_id_1, & index_uid, TimeRange :: new ( 1000 , 2000 ) ) ;
578551
579552 // Stage, publish, mark for deletion.
580- let request = StageMetricsSplitsRequest :: try_from_splits_metadata (
581- index_uid. clone ( ) ,
582- & [ split_1] ,
583- )
584- . unwrap ( ) ;
553+ let request =
554+ StageMetricsSplitsRequest :: try_from_splits_metadata ( index_uid. clone ( ) , & [ split_1] ) . unwrap ( ) ;
585555 metastore. stage_metrics_splits ( request) . await . unwrap ( ) ;
586556
587557 let publish_request = PublishMetricsSplitsRequest {
@@ -619,8 +589,7 @@ pub async fn test_metastore_delete_metrics_splits<
619589 "Published" . to_string( ) ,
620590 "MarkedForDeletion" . to_string( ) ,
621591 ] ) ;
622- let list_request =
623- ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
592+ let list_request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) . unwrap ( ) ;
624593 let response = metastore. list_metrics_splits ( list_request) . await . unwrap ( ) ;
625594 let splits = response. deserialize_splits ( ) . unwrap ( ) ;
626595 assert_eq ! ( splits. len( ) , 0 ) ;
0 commit comments