Skip to content
Open
253 changes: 250 additions & 3 deletions crates/trusted-server-adapter-fastly/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::platform::{build_runtime_services, FastlyPlatformGeo};

const TRUSTED_SERVER_CONFIG_STORE: &str = "trusted_server_config";
const EDGEZERO_ENABLED_KEY: &str = "edgezero_enabled";
const EDGEZERO_ROLLOUT_PCT_KEY: &str = "edgezero_rollout_pct";

/// Result of routing a request, distinguishing buffered from streaming publisher responses.
///
Expand Down Expand Up @@ -85,6 +86,50 @@ fn parse_edgezero_flag(value: &str) -> bool {
v.eq_ignore_ascii_case("true") || v == "1"
}

/// Parses a rollout percentage string into a value in `0..=100`.
///
/// Accepts only integer strings in the range 0–100 (inclusive) after whitespace
/// trimming. Returns `None` for anything else: non-integer, out-of-range,
/// empty string.
fn parse_rollout_pct(value: &str) -> Option<u8> {
let n: u16 = value.trim().parse().ok()?;
if n > 100 {
return None;
}
Some(n as u8)
}

/// Maps an arbitrary string to a deterministic bucket in `0..100`.
///
/// Uses FNV-1a (32-bit variant) to produce a uniform-enough distribution for
/// canary traffic splitting without pulling in any hash crates. The same input
/// always produces the same output across Rust versions because the algorithm
/// is defined here, not delegated to `DefaultHasher`.
fn fnv1a_bucket(key: &str) -> u8 {
const FNV_OFFSET: u32 = 2_166_136_261;
const FNV_PRIME: u32 = 16_777_619;
let mut hash = FNV_OFFSET;
for byte in key.as_bytes() {
hash ^= u32::from(*byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
(hash % 100) as u8
}

/// Returns `true` if the given bucket should be routed to the `EdgeZero` path.
///
/// `bucket` must be in `0..100`; `rollout_pct` in `0..=100`.
/// When `rollout_pct = 0` no bucket ever routes to `EdgeZero` (instant rollback).
/// When `rollout_pct = 100` every bucket routes to `EdgeZero` (full cutover).
fn canary_routes_to_edgezero(bucket: u8, rollout_pct: u8) -> bool {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpickcanary_routes_to_edgezero is just bucket < rollout_pct; "canary" is misleading at the call site.

The function works at any rollout_pct (0, 50, 100), not only canary stages. Reads awkwardly at the call site: if canary_routes_to_edgezero(bucket, rollout_pct).

Suggest routes_to_edgezero(bucket, rollout_pct) — shorter and accurate at any rollout value.

debug_assert!(bucket < 100, "should be a value produced by fnv1a_bucket");
debug_assert!(
rollout_pct <= 100,
"should be a value produced by read_rollout_pct"
);
bucket < rollout_pct
}

/// Opens the shared Fastly Config Store used by both the `EdgeZero` flag read and
/// `EdgeZero` dispatch metadata.
///
Expand Down Expand Up @@ -113,6 +158,42 @@ fn is_edgezero_enabled(config_store: &ConfigStoreHandle) -> Result<bool, fastly:
Ok(value.as_deref().is_some_and(parse_edgezero_flag))
}

/// Reads `edgezero_rollout_pct` from the config store.
///
/// | Config store state | Return value | Effect |
/// |---------------------------------|--------------|----------------------------|
/// | Key absent | `100` | Full rollout (backward compat) |
/// | Key present, valid 0–100 | parsed value | Partial or full rollout |
/// | Key present, invalid | `0` | All legacy (safe default) |
/// | Key read error | `0` | All legacy (safe default) |
fn read_rollout_pct(config_store: &ConfigStoreHandle) -> u8 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ refactor — Add unit tests for read_rollout_pct using an in-memory ConfigStore.

The plan asserts ConfigStoreHandle cannot be constructed in unit tests; this is not accurate at the pinned edgezero-core rev. The ConfigStore trait is public, and ConfigStoreHandle::new(Arc::new(...)) is public — edgezero-core's own tests use exactly this pattern.

The four documented branches are safety-critical; only the first is indirectly covered (via parse_rollout_pct tests).

Suggested addition in the existing #[cfg(test)] mod tests:

use edgezero_core::config_store::{ConfigStore, ConfigStoreError};

struct TestConfigStore {
    response: Result<Option<String>, ConfigStoreError>,
}

impl ConfigStore for TestConfigStore {
    fn get(&self, _key: &str) -> Result<Option<String>, ConfigStoreError> {
        self.response.clone()
    }
}

fn handle(response: Result<Option<String>, ConfigStoreError>) -> ConfigStoreHandle {
    ConfigStoreHandle::new(Arc::new(TestConfigStore { response }))
}

#[test]
fn read_rollout_pct_absent_defaults_to_full_rollout() {
    assert_eq!(read_rollout_pct(&handle(Ok(None))), 100);
}

#[test]
fn read_rollout_pct_invalid_value_defaults_to_zero() {
    assert_eq!(read_rollout_pct(&handle(Ok(Some("abc".into())))), 0);
}

#[test]
fn read_rollout_pct_out_of_range_defaults_to_zero() {
    assert_eq!(read_rollout_pct(&handle(Ok(Some("101".into())))), 0);
}

#[test]
fn read_rollout_pct_read_error_defaults_to_zero() {
    assert_eq!(
        read_rollout_pct(&handle(Err(ConfigStoreError::unavailable("boom")))),
        0,
    );
}

These tests pin the safety-critical defaults under refactoring — the part of the design that matters in incidents.

match config_store.get(EDGEZERO_ROLLOUT_PCT_KEY) {
Ok(Some(value)) => match parse_rollout_pct(&value) {
Some(pct) => pct,
None => {
log::warn!(
"invalid edgezero_rollout_pct value {:?}, defaulting to 0 (legacy path)",
value
);
0
}
},
Ok(None) => {
// Fires per-request when key is absent and edgezero_enabled=true.
// At production scale this creates one warn per request until the key is set.
// Resolution: set edgezero_rollout_pct = "0" before setting edgezero_enabled = "true".
log::warn!(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 wrench — Per-request warn log on absent edgezero_rollout_pct.

This emits a log::warn! for every request when the key is absent. The code comment acknowledges "At production scale this creates one warn per request until the key is set" but does not fix it. Combined with the absent-key default = 100% rollout, this is a real operational hazard: a single ops ordering mistake (setting edgezero_enabled = "true" before edgezero_rollout_pct) results in both an immediate full cutover and a sustained warn flood. The warn doesn't tell anyone anything is broken (traffic flows fine), so it's likely to be silenced/ignored — not acted on.

Fix (pick one):

  • Demote to log::debug! — the runbook + setup procedure is the real safety net, not this log line:
    Ok(None) => {
        log::debug!(
            "edgezero_rollout_pct key absent, defaulting to 100 (full rollout — backward compat)"
        );
        100
    }
  • Better: change the absent-key default to 0 (fail-safe) — see the design question in the review body — and drop the warn entirely. An absent key would then surface as zero canary traffic, observable from real-time stats, rather than as a log flood.

"edgezero_rollout_pct key absent, defaulting to 100 (full rollout — backward compat)"
);
100
}
Err(e) => {
log::warn!("failed to read edgezero_rollout_pct: {e}, defaulting to 0 (legacy path)");
0
}
}
}

fn health_response(req: &FastlyRequest) -> Option<FastlyResponse> {
if req.get_method() == FastlyMethod::GET && req.get_path() == "/health" {
return Some(FastlyResponse::from_status(200).with_body_text_plain("ok"));
Expand Down Expand Up @@ -146,14 +227,36 @@ fn main() {
}
};

if is_edgezero_enabled(&edgezero_config_store).unwrap_or_else(|e| {
if !is_edgezero_enabled(&edgezero_config_store).unwrap_or_else(|e| {
log::warn!("failed to read edgezero_enabled flag, falling back to legacy path: {e}");
false
}) {
log::debug!("routing request through EdgeZero path");
log::debug!("routing request through legacy path (edgezero_enabled=false)");
legacy_main(req);
return;
}

let rollout_pct = read_rollout_pct(&edgezero_config_store);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 thinking — Two config-store reads per request.

Each request now hits config_store.get(...) twice (is_edgezero_enabled then read_rollout_pct). Fastly Config Store reads are local-edge and fast, but at full QPS that's a measurable doubling. Consider consolidating into one helper that returns (enabled, rollout_pct) after a single read sequence, or wrapping both behind a cached snapshot if the SDK supports it. Not urgent — flag only.

let routing_key = match req.get_client_ip_addr() {
Some(ip) => ip.to_string(),
None => {
log::debug!(
"no client IP available, using empty routing key (deterministic bucket 61)"
);
String::new()
}
};
let bucket = fnv1a_bucket(&routing_key);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌱 seedling — Skip the FNV hash + IpAddr.to_string() allocation when rollout_pct == 0 || == 100.

routing_key = ip.to_string() allocates a String per request, and the FNV computation runs even when the decision is degenerate (0 → always legacy, 100 → always EdgeZero). For steady-state rollback (0) and post-cutover (100) — which together cover most of the canary's lifetime — this work is wasted.

let rollout_pct = read_rollout_pct(&edgezero_config_store);

let route_to_edgezero = match rollout_pct {
    0 => false,
    100 => true,
    pct => {
        let routing_key = req.get_client_ip_addr()
            .map(|ip| ip.to_string())
            .unwrap_or_default();
        let bucket = fnv1a_bucket(&routing_key);
        canary_routes_to_edgezero(bucket, pct)
    }
};

If you want to skip the to_string() allocation in the canary path too, FNV-1a can be fed IpAddr octets directly:

fn fnv1a_bucket_octets(bytes: &[u8]) -> u8 { /* same algorithm */ }
let bucket = match req.get_client_ip_addr() {
    Some(IpAddr::V4(v)) => fnv1a_bucket_octets(&v.octets()),
    Some(IpAddr::V6(v)) => fnv1a_bucket_octets(&v.octets()),
    None => fnv1a_bucket_octets(&[]),
};

Not urgent.


if canary_routes_to_edgezero(bucket, rollout_pct) {
log::debug!(
"routing request through EdgeZero path (bucket={bucket}, rollout_pct={rollout_pct})"
);
edgezero_main(req, edgezero_config_store);
} else {
log::debug!("routing request through legacy path");
log::debug!(
"routing request through legacy path (bucket={bucket}, rollout_pct={rollout_pct})"
);
legacy_main(req);
}
}
Expand Down Expand Up @@ -595,6 +698,150 @@ mod tests {
assert!(!parse_edgezero_flag("yes"), "should not parse 'yes'");
}

// ---------------------------------------------------------------------------
// parse_rollout_pct
// ---------------------------------------------------------------------------

#[test]
fn parses_valid_rollout_percentages() {
assert_eq!(parse_rollout_pct("0"), Some(0), "should parse '0'");
assert_eq!(parse_rollout_pct("1"), Some(1), "should parse '1'");
assert_eq!(parse_rollout_pct("50"), Some(50), "should parse '50'");
assert_eq!(parse_rollout_pct("100"), Some(100), "should parse '100'");
assert_eq!(
parse_rollout_pct(" 50 "),
Some(50),
"should trim whitespace"
);
}

#[test]
fn rejects_invalid_rollout_percentages() {
assert_eq!(
parse_rollout_pct("101"),
None,
"should reject values above 100"
);
assert_eq!(parse_rollout_pct(""), None, "should reject empty string");
assert_eq!(parse_rollout_pct("abc"), None, "should reject non-integer");
assert_eq!(
parse_rollout_pct("-1"),
None,
"should reject negative value"
);
assert_eq!(
parse_rollout_pct("1.5"),
None,
"should reject decimal value"
);
}

// ---------------------------------------------------------------------------
// fnv1a_bucket
// ---------------------------------------------------------------------------

#[test]
fn bucket_is_in_range_0_to_99() {
for key in &["1.2.3.4", "255.255.255.255", "::1", "", "unknown"] {
let b = fnv1a_bucket(key);
assert!(b < 100, "bucket must be 0..100 for key {key:?}, got {b}");
}
}

#[test]
fn bucket_is_deterministic() {
let key = "192.168.1.1";
assert_eq!(
fnv1a_bucket(key),
fnv1a_bucket(key),
"same key must produce the same bucket"
);
}

#[test]
fn bucket_matches_known_fnv1a_vector() {
// FNV-1a 32-bit: XOR-then-multiply. Verified against reference implementation.
assert_eq!(
fnv1a_bucket("1.2.3.4"),
85,
"should match pinned FNV-1a vector"
);
assert_eq!(
fnv1a_bucket(""),
61,
"should match pinned FNV-1a vector for empty key"
);
}

#[test]
fn bucket_distributes_across_range() {
// Smoke-test that fnv1a_bucket produces a spread of values (not a constant).
// 256 distinct IP-like keys must produce at least 50 unique buckets.
let buckets: std::collections::HashSet<u8> = (0u16..=255)
.map(|i| fnv1a_bucket(&format!("10.0.0.{i}")))
.collect();
assert!(
buckets.len() > 50,
"fnv1a_bucket should distribute across buckets; got only {} unique values in 256 keys",
buckets.len()
);
}

#[test]
fn empty_key_bucket_is_valid() {
let b = fnv1a_bucket("");
assert!(
b < 100,
"empty key must still produce a valid bucket, got {b}"
);
}

// ---------------------------------------------------------------------------
// canary_routes_to_edgezero
// ---------------------------------------------------------------------------

#[test]
fn rollout_zero_routes_all_to_legacy() {
for bucket in 0u8..100 {
assert!(
!canary_routes_to_edgezero(bucket, 0),
"pct=0 should route all to legacy, bucket={bucket}"
);
}
}

#[test]
fn rollout_hundred_routes_all_to_edgezero() {
for bucket in 0u8..100 {
assert!(
canary_routes_to_edgezero(bucket, 100),
"pct=100 should route all to EdgeZero, bucket={bucket}"
);
}
}

#[test]
fn rollout_fifty_routes_exactly_half_of_bucket_space() {
let edgezero_count = (0u8..100)
.filter(|&b| canary_routes_to_edgezero(b, 50))
.count();
assert_eq!(
edgezero_count, 50,
"pct=50 should route exactly 50 out of 100 buckets to EdgeZero"
);
}

#[test]
fn rollout_one_routes_exactly_one_bucket() {
let edgezero_count = (0u8..100)
.filter(|&b| canary_routes_to_edgezero(b, 1))
.count();
assert_eq!(
edgezero_count, 1,
"pct=1 should route exactly 1 out of 100 buckets to EdgeZero"
);
}

#[test]
fn health_response_short_circuits_get_health() {
let req = FastlyRequest::get("https://example.com/health");
Expand Down
Loading
Loading