Skip to content

Commit e4da4de

Browse files
authored
fix: cache object stores and bucket regions to reduce DNS query volume (#3802) (#3935)
1 parent 6211315 commit e4da4de

2 files changed

Lines changed: 132 additions & 9 deletions

File tree

native/core/src/parquet/objectstore/s3.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use log::{debug, error};
1919
use std::collections::HashMap;
20+
use std::sync::OnceLock;
2021
use url::Url;
2122

2223
use crate::execution::jni_api::get_runtime;
@@ -111,13 +112,48 @@ pub fn create_store(
111112
Ok((Box::new(object_store), path))
112113
}
113114

115+
/// Process-wide cache of resolved S3 bucket regions, keyed by bucket name.
116+
///
117+
/// ## Why static / process lifetime?
118+
///
119+
/// See the equivalent rationale on `object_store_cache` in `parquet_support.rs`: the JNI
120+
/// call site creates a new `RuntimeEnv` per file, leaving the executor process as the only
121+
/// available scope for cross-call state. In the standard Spark-on-Kubernetes deployment
122+
/// model each executor is dedicated to a single application, so process and application
123+
/// lifetimes are equivalent.
124+
///
125+
/// ## Unbounded size
126+
///
127+
/// A Spark job accesses a bounded, typically small set of S3 buckets, so the number of
128+
/// entries stays proportional to the number of distinct buckets. Entries are just
129+
/// `(String, String)` pairs and the set does not grow beyond what the job actually touches.
130+
///
131+
/// ## Invalidation
132+
///
133+
/// An S3 bucket's region is permanently fixed at creation time and cannot change; no
134+
/// invalidation is therefore needed. This is what makes a static, never-evicting cache
135+
/// safe here and on the equivalent region-resolution path inside the `object_store` crate.
136+
fn region_cache() -> &'static RwLock<HashMap<String, String>> {
137+
static CACHE: OnceLock<RwLock<HashMap<String, String>>> = OnceLock::new();
138+
CACHE.get_or_init(|| RwLock::new(HashMap::new()))
139+
}
140+
114141
/// Get the bucket region using the [HeadBucket API]. This will fail if the bucket does not exist.
142+
/// Results are cached per bucket to avoid redundant network calls.
115143
///
116144
/// [HeadBucket API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
117145
///
118146
/// TODO this is copied from the object store crate and has been adapted as a workaround
119147
/// for https://github.com/apache/arrow-rs-object-store/issues/479
120148
pub async fn resolve_bucket_region(bucket: &str) -> Result<String, Box<dyn Error>> {
149+
// Check cache first
150+
if let Ok(cache) = region_cache().read() {
151+
if let Some(region) = cache.get(bucket) {
152+
debug!("Using cached region '{region}' for bucket '{bucket}'");
153+
return Ok(region.clone());
154+
}
155+
}
156+
121157
let endpoint = format!("https://{bucket}.s3.amazonaws.com");
122158
let client = reqwest::Client::new();
123159

@@ -142,6 +178,12 @@ pub async fn resolve_bucket_region(bucket: &str) -> Result<String, Box<dyn Error
142178
.to_str()?
143179
.to_string();
144180

181+
// Cache the resolved region
182+
if let Ok(mut cache) = region_cache().write() {
183+
debug!("Caching region '{region}' for bucket '{bucket}'");
184+
cache.insert(bucket.to_string(), region.clone());
185+
}
186+
145187
Ok(region)
146188
}
147189

native/core/src/parquet/parquet_support.rs

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@ use datafusion::execution::object_store::ObjectStoreUrl;
3535
use datafusion::execution::runtime_env::RuntimeEnv;
3636
use datafusion::physical_plan::ColumnarValue;
3737
use datafusion_comet_spark_expr::EvalMode;
38+
use log::debug;
3839
use object_store::path::Path;
3940
use object_store::{parse_url, ObjectStore};
4041
use std::collections::HashMap;
42+
use std::sync::OnceLock;
4143
use std::time::Duration;
44+
use std::{collections::hash_map::DefaultHasher, hash::Hasher, sync::RwLock};
4245
use std::{fmt::Debug, hash::Hash, sync::Arc};
4346
use url::Url;
4447

@@ -444,6 +447,56 @@ fn create_hdfs_object_store(
444447
})
445448
}
446449

450+
type ObjectStoreCache = RwLock<HashMap<(String, u64), Arc<dyn ObjectStore>>>;
451+
452+
/// Process-wide cache of object stores, keyed by `(scheme://host:port, config_hash)`.
453+
///
454+
/// ## Why static / process lifetime?
455+
///
456+
/// Comet's JNI architecture calls `initRecordBatchReader` once per Parquet file, and each
457+
/// call constructs a fresh `RuntimeEnv`. There is therefore no executor-scoped Rust object
458+
/// with a lifetime longer than a single file read that could own this cache. The executor
459+
/// process itself is the natural scope for HTTP connection-pool reuse, so process lifetime
460+
/// (i.e. `static`) is the appropriate choice here. In the standard Spark-on-Kubernetes
461+
/// deployment model each executor process is dedicated to a single Spark application, so
462+
/// process lifetime and application lifetime are equivalent; the cache is reclaimed when
463+
/// the executor pod terminates.
464+
///
465+
/// ## Unbounded size
466+
///
467+
/// Cache entries are indexed by `(scheme://host:port, hash-of-configs)`. A typical Spark
468+
/// job accesses a small, fixed set of buckets with a stable configuration, so the number of
469+
/// distinct keys is O(buckets × credential-configs) and remains small throughout the job.
470+
/// Entries are cheap relative to the cost of creating a new object store (new HTTP
471+
/// connection pool + DNS resolution), and there is no meaningful benefit from eviction, so
472+
/// no eviction policy is applied.
473+
///
474+
/// ## Credential invalidation
475+
///
476+
/// Object stores that use dynamic credentials (IMDS, WebIdentity, ECS role, STS assume-role)
477+
/// delegate credential refresh to a `CometCredentialProvider` that fetches fresh credentials
478+
/// on every request, so credential rotation is transparent and requires no cache
479+
/// invalidation. Object stores whose credentials are embedded in the Hadoop configuration
480+
/// (e.g. `fs.s3a.access.key` / `fs.s3a.secret.key`) produce a different `config_hash` when
481+
/// those values change, which causes a new store to be created and inserted under the new
482+
/// key; the old entry is harmlessly superseded.
483+
fn object_store_cache() -> &'static ObjectStoreCache {
484+
static CACHE: OnceLock<ObjectStoreCache> = OnceLock::new();
485+
CACHE.get_or_init(|| RwLock::new(HashMap::new()))
486+
}
487+
488+
/// Compute a hash of the object store configuration for cache keying.
489+
fn hash_object_store_configs(configs: &HashMap<String, String>) -> u64 {
490+
let mut hasher = DefaultHasher::new();
491+
let mut keys: Vec<&String> = configs.keys().collect();
492+
keys.sort();
493+
for key in keys {
494+
key.hash(&mut hasher);
495+
configs[key].hash(&mut hasher);
496+
}
497+
hasher.finish()
498+
}
499+
447500
/// Parses the url, registers the object store with configurations, and returns a tuple of the object store url
448501
/// and object store path
449502
pub(crate) fn prepare_object_store_with_configs(
@@ -467,17 +520,45 @@ pub(crate) fn prepare_object_store_with_configs(
467520
&url[url::Position::BeforeHost..url::Position::AfterPort],
468521
);
469522

470-
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
471-
create_hdfs_object_store(&url)
472-
} else if scheme == "s3" {
473-
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
474-
} else {
475-
parse_url(&url)
476-
}
477-
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
523+
let config_hash = hash_object_store_configs(object_store_configs);
524+
let cache_key = (url_key.clone(), config_hash);
525+
526+
// Check the cache first to reuse existing object store instances.
527+
// This enables HTTP connection pooling and avoids redundant DNS lookups.
528+
let cached = {
529+
let cache = object_store_cache()
530+
.read()
531+
.map_err(|e| ExecutionError::GeneralError(format!("Object store cache error: {e}")))?;
532+
cache.get(&cache_key).cloned()
533+
};
534+
535+
let (object_store, object_store_path): (Arc<dyn ObjectStore>, Path) =
536+
if let Some(store) = cached {
537+
debug!("Reusing cached object store for {url_key}");
538+
let path = Path::from_url_path(url.path())
539+
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
540+
(store, path)
541+
} else {
542+
debug!("Creating new object store for {url_key}");
543+
let (store, path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
544+
create_hdfs_object_store(&url)
545+
} else if scheme == "s3" {
546+
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
547+
} else {
548+
parse_url(&url)
549+
}
550+
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
551+
552+
let store: Arc<dyn ObjectStore> = Arc::from(store);
553+
// Insert into cache
554+
if let Ok(mut cache) = object_store_cache().write() {
555+
cache.insert(cache_key, Arc::clone(&store));
556+
}
557+
(store, path)
558+
};
478559

479560
let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
480-
runtime_env.register_object_store(&url, Arc::from(object_store));
561+
runtime_env.register_object_store(&url, object_store);
481562
Ok((object_store_url, object_store_path))
482563
}
483564

0 commit comments

Comments
 (0)