Skip to content

Commit cb4568d

Browse files
authored
Merge pull request #129 from datum-cloud/feat/datumrelaysfiltering
feat: support custom relays and filter that list the 5 best
2 parents 2252d4f + a67eaea commit cb4568d

3 files changed

Lines changed: 270 additions & 4 deletions

File tree

.github/workflows/bundle.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ jobs:
1919

2020
env:
2121
BUILD_N0DES_API_SECRET: ${{ secrets.N0DES_API_SECRET }}
22+
BUILD_DATUM_CONNECT_RELAY_URLS: ${{ vars.BUILD_DATUM_CONNECT_RELAY_URLS }}
2223
DATUM_API_ENV: production
2324

2425
steps:

.github/workflows/manual-release.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ jobs:
2323

2424
env:
2525
BUILD_N0DES_API_SECRET: ${{ secrets.N0DES_API_SECRET }}
26+
BUILD_DATUM_CONNECT_RELAY_URLS: ${{ vars.BUILD_DATUM_CONNECT_RELAY_URLS }}
2627
DATUM_API_ENV: production
2728

2829
steps:

lib/src/node.rs

Lines changed: 268 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc};
1+
use std::{fmt::Debug, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
22

33
use iroh::{
44
Endpoint, EndpointId, SecretKey, discovery::dns::DnsDiscovery, endpoint::default_relay_mode,
55
protocol::Router,
66
};
7+
use iroh_base::RelayUrl;
78
use iroh_n0des::ApiSecret;
89
use iroh_proxy_utils::upstream::UpstreamMetrics;
910
use iroh_proxy_utils::{
@@ -14,8 +15,13 @@ use iroh_proxy_utils::{
1415
upstream::{AuthError, AuthHandler, UpstreamProxy},
1516
};
1617
use iroh_relay::dns::{DnsProtocol, DnsResolver};
18+
use iroh_relay::{RelayConfig, RelayMap};
1719
use n0_error::{Result, StackResultExt, StdResultExt};
18-
use tokio::{net::TcpListener, sync::futures::Notified, task::JoinHandle};
20+
use tokio::{
21+
net::TcpListener,
22+
sync::futures::Notified,
23+
task::{JoinHandle, JoinSet},
24+
};
1925
use tracing::{Instrument, debug, error_span, info, instrument, warn};
2026

2127
use crate::{ProxyState, Repo, StateWrapper, TcpProxyData, config::Config};
@@ -297,12 +303,15 @@ impl OutboundProxyHandle {
297303
/// Build a new iroh endpoint, applying all relevant details from Configuration
298304
/// to the base endpoint setup
299305
pub(crate) async fn build_endpoint(secret_key: SecretKey, common: &Config) -> Result<Endpoint> {
306+
let relay_mode = relay_mode_from_env_or_build().await?;
300307
let mut builder = match common.discovery_mode {
301308
crate::config::DiscoveryMode::Dns => {
302-
Endpoint::empty_builder(default_relay_mode()).secret_key(secret_key)
309+
Endpoint::empty_builder(relay_mode).secret_key(secret_key)
303310
}
304311
crate::config::DiscoveryMode::Default | crate::config::DiscoveryMode::Hybrid => {
305-
Endpoint::builder().secret_key(secret_key)
312+
Endpoint::builder()
313+
.relay_mode(relay_mode)
314+
.secret_key(secret_key)
306315
}
307316
};
308317
if let Some(addr) = common.ipv4_addr {
@@ -334,6 +343,231 @@ pub(crate) async fn build_endpoint(secret_key: SecretKey, common: &Config) -> Re
334343
Ok(endpoint)
335344
}
336345

346+
const DATUM_CONNECT_RELAY_URLS: &str = "DATUM_CONNECT_RELAY_URLS";
347+
const BUILD_DATUM_CONNECT_RELAY_URLS: &str = "BUILD_DATUM_CONNECT_RELAY_URLS";
348+
const STARTUP_RELAY_SELECTION_MAX: usize = 5;
349+
const STARTUP_RELAY_PROBE_TIMEOUT: Duration = Duration::from_millis(800);
350+
351+
async fn relay_mode_from_env_or_build() -> Result<iroh::endpoint::RelayMode> {
352+
if let Ok(raw_urls) = std::env::var(DATUM_CONNECT_RELAY_URLS) {
353+
match parse_relay_urls(&raw_urls) {
354+
Ok(relays) => {
355+
let relays =
356+
select_best_relays_for_startup(relays, STARTUP_RELAY_SELECTION_MAX).await;
357+
info!(
358+
source = %DATUM_CONNECT_RELAY_URLS,
359+
count = relays.len(),
360+
"using custom iroh relay list from environment"
361+
);
362+
return Ok(iroh::endpoint::RelayMode::Custom(relays_to_map(relays)));
363+
}
364+
Err(err) => {
365+
warn!("invalid relay urls in {DATUM_CONNECT_RELAY_URLS}: {err:#}");
366+
}
367+
}
368+
}
369+
370+
if let Some(raw_urls) = option_env!("BUILD_DATUM_CONNECT_RELAY_URLS") {
371+
match parse_relay_urls(raw_urls) {
372+
Ok(relays) => {
373+
let relays =
374+
select_best_relays_for_startup(relays, STARTUP_RELAY_SELECTION_MAX).await;
375+
info!(
376+
source = %BUILD_DATUM_CONNECT_RELAY_URLS,
377+
count = relays.len(),
378+
"using custom iroh relay list from build environment"
379+
);
380+
return Ok(iroh::endpoint::RelayMode::Custom(relays_to_map(relays)));
381+
}
382+
Err(err) => {
383+
warn!("invalid relay urls in {BUILD_DATUM_CONNECT_RELAY_URLS}: {err:#}");
384+
}
385+
}
386+
}
387+
388+
Ok(default_relay_mode())
389+
}
390+
391+
fn parse_relay_urls(raw: &str) -> Result<Vec<RelayUrl>> {
392+
let relays: Vec<RelayUrl> = raw
393+
.split(|c: char| c == ',' || c == ';' || c.is_ascii_whitespace())
394+
.map(str::trim)
395+
.filter(|s| !s.is_empty())
396+
.map(normalize_relay_url)
397+
.map(|url| RelayUrl::from_str(&url))
398+
.collect::<std::result::Result<Vec<_>, _>>()
399+
.std_context(
400+
"Failed parsing relay URL list. Expected comma/space/newline separated URLs",
401+
)?;
402+
403+
if relays.is_empty() {
404+
n0_error::bail_any!("Relay URL list was provided but empty after parsing");
405+
}
406+
407+
let mut deduped = Vec::with_capacity(relays.len());
408+
for relay in relays {
409+
if !deduped.iter().any(|seen: &RelayUrl| seen == &relay) {
410+
deduped.push(relay);
411+
}
412+
}
413+
Ok(deduped)
414+
}
415+
416+
fn normalize_relay_url(raw: &str) -> String {
417+
if raw.contains("://") {
418+
raw.to_string()
419+
} else {
420+
format!("https://{raw}")
421+
}
422+
}
423+
424+
async fn select_best_relays_for_startup(relays: Vec<RelayUrl>, max_relays: usize) -> Vec<RelayUrl> {
425+
let total_candidates = relays.len();
426+
if relays.len() <= max_relays {
427+
return relays;
428+
}
429+
430+
let client = match reqwest::Client::builder()
431+
.timeout(STARTUP_RELAY_PROBE_TIMEOUT)
432+
.build()
433+
{
434+
Ok(client) => client,
435+
Err(err) => {
436+
warn!("relay probe setup failed, using first {max_relays} relays: {err:#}");
437+
return relays.into_iter().take(max_relays).collect();
438+
}
439+
};
440+
441+
let mut joinset = JoinSet::new();
442+
for relay in relays.iter().cloned() {
443+
let client = client.clone();
444+
joinset.spawn(async move {
445+
let latency = probe_relay_latency(&client, &relay).await;
446+
(relay, latency)
447+
});
448+
}
449+
450+
let mut successful = Vec::new();
451+
let mut failed = Vec::new();
452+
while let Some(joined) = joinset.join_next().await {
453+
match joined {
454+
Ok((relay, Ok(latency))) => successful.push((relay, latency)),
455+
Ok((relay, Err(reason))) => failed.push((relay, reason)),
456+
Err(err) => {
457+
debug!("relay probe task join error: {err:#}");
458+
}
459+
}
460+
}
461+
462+
successful.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.as_str().cmp(b.0.as_str())));
463+
let mut selected: Vec<RelayUrl> = successful
464+
.iter()
465+
.take(max_relays)
466+
.map(|(relay, _)| relay.clone())
467+
.collect();
468+
469+
if selected.len() < max_relays {
470+
failed.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
471+
for (relay, _) in &failed {
472+
if selected.len() == max_relays {
473+
break;
474+
}
475+
if !selected.iter().any(|r| r == relay) {
476+
selected.push(relay.clone());
477+
}
478+
}
479+
}
480+
481+
if selected.len() < max_relays {
482+
for relay in relays {
483+
if selected.len() == max_relays {
484+
break;
485+
}
486+
if !selected.iter().any(|r| r == &relay) {
487+
selected.push(relay);
488+
}
489+
}
490+
}
491+
492+
if !failed.is_empty() {
493+
let failure_samples: Vec<String> = failed
494+
.iter()
495+
.take(5)
496+
.map(|(relay, reason)| format!("{relay} -> {reason}"))
497+
.collect();
498+
warn!(
499+
failed = failed.len(),
500+
samples = ?failure_samples,
501+
"relay ping probe failures observed"
502+
);
503+
}
504+
info!(
505+
total = total_candidates,
506+
successful = successful.len(),
507+
selected = selected.len(),
508+
selected_relays = ?selected,
509+
"selected startup relay shortlist"
510+
);
511+
selected
512+
}
513+
514+
async fn probe_relay_latency(
515+
client: &reqwest::Client,
516+
relay: &RelayUrl,
517+
) -> std::result::Result<Duration, String> {
518+
let host = relay
519+
.host_str()
520+
.ok_or_else(|| "missing host in relay url".to_string())?
521+
// RelayUrl canonicalizes with trailing dot, which can fail strict TLS hostname checks.
522+
.trim_end_matches('.');
523+
let mut https_url = reqwest::Url::parse(&format!("https://{host}/ping"))
524+
.map_err(|err| format!("url parse: {err}"))?;
525+
https_url.set_query(None);
526+
debug!(
527+
relay = %relay,
528+
url = %https_url,
529+
timeout_ms = STARTUP_RELAY_PROBE_TIMEOUT.as_millis(),
530+
"starting relay ping probe"
531+
);
532+
let start = tokio::time::Instant::now();
533+
match client.get(https_url.clone()).send().await {
534+
Ok(resp) if resp.status().is_success() => {
535+
let elapsed = start.elapsed();
536+
debug!(
537+
relay = %relay,
538+
url = %https_url,
539+
status = %resp.status(),
540+
elapsed_ms = elapsed.as_millis(),
541+
"relay ping probe succeeded"
542+
);
543+
Ok(elapsed)
544+
}
545+
Ok(resp) => {
546+
debug!(
547+
relay = %relay,
548+
url = %https_url,
549+
status = %resp.status(),
550+
elapsed_ms = start.elapsed().as_millis(),
551+
"relay ping probe got non-success response"
552+
);
553+
Err(format!("status {}", resp.status()))
554+
}
555+
Err(err) => {
556+
debug!(
557+
relay = %relay,
558+
url = %https_url,
559+
elapsed_ms = start.elapsed().as_millis(),
560+
"relay ping probe request failed: {err:#}"
561+
);
562+
Err(format!("{err:#}"))
563+
}
564+
}
565+
}
566+
567+
fn relays_to_map(relays: Vec<RelayUrl>) -> RelayMap {
568+
RelayMap::from_iter(relays.into_iter().map(RelayConfig::from))
569+
}
570+
337571
pub(crate) fn n0des_api_secret_from_env() -> Result<Option<ApiSecret>> {
338572
let api_secret_str = match std::env::var("N0DES_API_SECRET") {
339573
Ok(s) => s,
@@ -380,3 +614,33 @@ pub(crate) async fn build_n0des_client(
380614
info!(remote=%remote_id.fmt_short(), "Connected to n0des endpoint for metrics collection");
381615
Ok(Arc::new(client))
382616
}
617+
618+
#[cfg(test)]
619+
mod tests {
620+
use super::*;
621+
622+
#[test]
623+
fn parse_relay_urls_accepts_bare_hostnames() {
624+
let input = "iroh-relay.us-east-1.datumconnect.net,iroh-relay.us-west-1.datumconnect.net";
625+
let parsed = parse_relay_urls(input).expect("should parse");
626+
assert_eq!(parsed.len(), 2);
627+
assert_eq!(parsed[0].scheme(), "https");
628+
assert_eq!(
629+
parsed[0].host_str(),
630+
Some("iroh-relay.us-east-1.datumconnect.net.")
631+
);
632+
assert_eq!(
633+
parsed[1].host_str(),
634+
Some("iroh-relay.us-west-1.datumconnect.net.")
635+
);
636+
}
637+
638+
#[test]
639+
fn parse_relay_urls_dedupes_and_skips_empty_tokens() {
640+
let input = " relay-a.example.com, relay-a.example.com;;relay-b.example.com ";
641+
let parsed = parse_relay_urls(input).expect("should parse");
642+
assert_eq!(parsed.len(), 2);
643+
assert_eq!(parsed[0].host_str(), Some("relay-a.example.com."));
644+
assert_eq!(parsed[1].host_str(), Some("relay-b.example.com."));
645+
}
646+
}

0 commit comments

Comments
 (0)