diff --git a/Cargo.lock b/Cargo.lock index 9032ce18d..e3c29a702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7082,19 +7082,29 @@ dependencies = [ name = "testsuite" version = "0.0.0" dependencies = [ + "agent-tunnel", + "agent-tunnel-proto", "anyhow", "assert_cmd", "base64 0.22.1", + "camino", + "devolutions-gateway-task", "dynosaur", "escargot", "expect-test", "fastrand", + "ipnetwork", "libsql", "mcp-proxy", "network-scanner", "network-scanner-proto", "proxy-socks", + "quinn", + "rcgen", "rstest", + "rustls 0.23.40", + "rustls-pemfile 2.2.0", + "rustls-pki-types", "serde", "serde_json", "sysevent", @@ -7107,6 +7117,7 @@ dependencies = [ "tokio-tungstenite 0.26.2", "tokio-util", "typed-builder", + "uuid", ] [[package]] diff --git a/testsuite/Cargo.toml b/testsuite/Cargo.toml index 6473a602b..2569df9a8 100644 --- a/testsuite/Cargo.toml +++ b/testsuite/Cargo.toml @@ -31,18 +31,29 @@ typed-builder = "0.21" tokio-tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] } [dev-dependencies] +agent-tunnel = { path = "../crates/agent-tunnel", features = ["test-utils"] } +agent-tunnel-proto = { path = "../crates/agent-tunnel-proto", features = ["serde"] } +devolutions-gateway-task = { path = "../crates/devolutions-gateway-task" } base64 = "0.22" -proxy-socks = { path = "../crates/proxy-socks" } +camino = "1" +ipnetwork = "0.20" libsql = { version = "0.9", default-features = false, features = ["core"] } mcp-proxy.path = "../crates/mcp-proxy" network-scanner = { path = "../crates/network-scanner", features = ["test-utils"] } network-scanner-proto = { path = "../crates/network-scanner-proto" } +proxy-socks = { path = "../crates/proxy-socks" } +quinn = "0.11" +rcgen = { version = "0.13", features = ["pem", "x509-parser"] } rstest = "0.25" +rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } +rustls-pemfile = "2" +rustls-pki-types = "1" serde_json = "1" sysevent.path = "../crates/sysevent" tempfile = "3" test-utils.path = "../crates/test-utils" tokio-rustls = { version = "0.26", features = ["ring"] } +uuid = { version = "1", features = ["v4"] } [target.'cfg(unix)'.dev-dependencies] sysevent-syslog.path = "../crates/sysevent-syslog" diff --git a/testsuite/tests/agent_tunnel/cert.rs b/testsuite/tests/agent_tunnel/cert.rs new file mode 100644 index 000000000..0490f73ea --- /dev/null +++ b/testsuite/tests/agent_tunnel/cert.rs @@ -0,0 +1,74 @@ +//! Unit tests for `agent-tunnel/src/cert.rs`. +//! +//! Focus on the identity invariants exercised by enrollment (#1773) and +//! certificate renewal (#1775): the gateway must encode the agent's UUID in +//! the issued cert's URN SAN, and recover the same UUID from any cert it +//! later sees on the wire. + +use agent_tunnel::cert::{CaManager, extract_agent_id_from_pem}; +use camino::Utf8PathBuf; +use uuid::Uuid; + +use super::common::{generate_csr_with_cn, generate_test_key_and_csr}; + +fn fresh_ca() -> std::sync::Arc { + let temp_dir = tempfile::tempdir().expect("create tempdir"); + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); + // Leak the TempDir for the test's lifetime: CaManager owns the files + // already loaded, so dropping the dir while still in use is fine, but + // leaking removes any chance of TOCTOU surprises. + std::mem::forget(temp_dir); + CaManager::load_or_generate(&data_dir).expect("CA generation") +} + +/// Security invariant from #1775 review: when the gateway re-signs (or +/// initially signs) an agent CSR, the issued cert's URN SAN encodes the +/// `agent_id` parameter passed in by the caller, **not** anything from the +/// CSR's own subject. A compromised agent crafting a CSR with someone else's +/// CN must not be able to impersonate. +#[test] +fn sign_agent_csr_ignores_csr_subject_uses_passed_identity() { + let ca_manager = fresh_ca(); + + let real_agent_id = Uuid::new_v4(); + let (_evil_key, evil_csr_pem) = generate_csr_with_cn("evil-impersonator"); + + let signed = ca_manager + .sign_agent_csr(real_agent_id, "legit-name", &evil_csr_pem, None) + .expect("sign agent CSR"); + + let recovered = extract_agent_id_from_pem(&signed.client_cert_pem).expect("issued cert has urn:uuid SAN"); + assert_eq!( + recovered, real_agent_id, + "issued cert must encode the agent_id passed by the caller, not the CSR subject" + ); +} + +#[test] +fn extract_agent_id_from_pem_round_trips() { + let ca_manager = fresh_ca(); + + let known_id = Uuid::new_v4(); + let (_key, csr_pem) = generate_test_key_and_csr("round-trip-agent"); + + let signed = ca_manager + .sign_agent_csr(known_id, "round-trip-agent", &csr_pem, None) + .expect("sign agent CSR"); + + let recovered = extract_agent_id_from_pem(&signed.client_cert_pem).expect("urn:uuid SAN present"); + assert_eq!(recovered, known_id); +} + +#[test] +fn extract_agent_id_from_pem_rejects_cert_without_san() { + let ca_manager = fresh_ca(); + + // The CA's own root cert does not carry an `urn:uuid:` SAN. + let error = extract_agent_id_from_pem(ca_manager.ca_cert_pem()).expect_err("CA cert has no urn:uuid SAN"); + + let msg = format!("{error:#}"); + assert!( + msg.contains("urn:uuid"), + "error should reference the missing urn:uuid SAN, got: {msg}" + ); +} diff --git a/testsuite/tests/agent_tunnel/common.rs b/testsuite/tests/agent_tunnel/common.rs new file mode 100644 index 000000000..46f1f75b8 --- /dev/null +++ b/testsuite/tests/agent_tunnel/common.rs @@ -0,0 +1,192 @@ +//! Shared helpers for the agent-tunnel test suite. +//! +//! These were originally private to `integration.rs`; consolidated here so +//! the cert-renewal E2E and the routing tests can reuse them without +//! duplicating ~80 lines of QUIC + mTLS scaffolding per test. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use agent_tunnel::AgentTunnelHandle; +use agent_tunnel::cert::CaManager; +use agent_tunnel::listener::AgentTunnelListener; +use agent_tunnel::registry::AgentRegistry; +use camino::Utf8PathBuf; +use devolutions_gateway_task::ShutdownHandle; +use tempfile::TempDir; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; +use uuid::Uuid; + +/// Start a TCP echo server that echoes back whatever it receives. +pub(super) async fn start_echo_server() -> (SocketAddr, JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn(async move { + loop { + let (mut stream, _) = match listener.accept().await { + Ok(v) => v, + Err(_) => break, + }; + + tokio::spawn(async move { + let mut buf = vec![0u8; 65535]; + loop { + let n = match stream.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(n) => n, + }; + if stream.write_all(&buf[..n]).await.is_err() { + break; + } + } + }); + } + }); + + (addr, handle) +} + +/// Generate a key pair and CSR (same as the real agent does during enrollment). +pub(super) fn generate_test_key_and_csr(agent_name: &str) -> (rcgen::KeyPair, String) { + generate_csr_with_cn(agent_name) +} + +/// Generate a key pair and CSR with the given Common Name on the CSR subject. +/// +/// Useful for the security-invariant test that checks `sign_agent_csr` ignores +/// the CSR subject in favor of the mTLS-authenticated agent name. +pub(super) fn generate_csr_with_cn(cn: &str) -> (rcgen::KeyPair, String) { + let key_pair = rcgen::KeyPair::generate_for(&rcgen::PKCS_ECDSA_P256_SHA256).expect("generate test key pair"); + let mut params = rcgen::CertificateParams::default(); + params.distinguished_name.push(rcgen::DnType::CommonName, cn); + let csr = params.serialize_request(&key_pair).expect("serialize test CSR"); + let csr_pem = csr.pem().expect("CSR to PEM"); + (key_pair, csr_pem) +} + +/// Create a Quinn client connection to the gateway with mTLS. +pub(super) async fn connect_quinn_client( + ca_cert_pem: &str, + client_cert_pem: &str, + client_key_pem: &str, + server_addr: SocketAddr, +) -> quinn::Connection { + use rustls_pemfile::{certs, private_key}; + + let _ = rustls::crypto::ring::default_provider().install_default(); + + let client_certs: Vec> = + certs(&mut std::io::BufReader::new(client_cert_pem.as_bytes())) + .collect::, _>>() + .expect("parse client certs"); + let client_key = private_key(&mut std::io::BufReader::new(client_key_pem.as_bytes())) + .expect("parse private key") + .expect("no private key found"); + + let mut roots = rustls::RootCertStore::empty(); + let ca_certs: Vec> = + certs(&mut std::io::BufReader::new(ca_cert_pem.as_bytes())) + .collect::, _>>() + .expect("parse CA certs"); + for cert in ca_certs { + roots.add(cert).expect("add CA cert to root store"); + } + + // Trust only the test CA. Hostname verification is still on (SNI = "localhost"). + let verifier = rustls::client::WebPkiServerVerifier::builder(Arc::new(roots)) + .build() + .expect("build verifier"); + + let mut client_crypto = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(verifier) + .with_client_auth_cert(client_certs, client_key) + .expect("client auth config"); + + client_crypto.alpn_protocols = vec![agent_tunnel_proto::ALPN_PROTOCOL.to_vec()]; + + let client_config = quinn::ClientConfig::new(Arc::new( + quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto).expect("QUIC client config"), + )); + + let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().expect("bind addr")).expect("create endpoint"); + endpoint.set_default_client_config(client_config); + + endpoint + .connect(server_addr, "localhost") + .expect("initiate connection") + .await + .expect("QUIC handshake") +} + +/// Live `AgentTunnelListener` running on a random localhost port, plus the +/// resources needed to drive and shut it down cleanly. +pub(super) struct TestListener { + pub handle: AgentTunnelHandle, + shutdown: ShutdownHandle, + task: JoinHandle<()>, + _temp_dir: TempDir, +} + +impl TestListener { + /// Signal shutdown and wait for the listener task to exit (or time out). + pub(super) async fn shutdown(self) { + self.shutdown.signal(); + let _ = tokio::time::timeout(Duration::from_secs(2), self.task).await; + } +} + +/// Bring up a fresh `AgentTunnelListener` on `127.0.0.1:0` with a freshly +/// generated CA in a temp directory. +pub(super) async fn bind_test_listener() -> TestListener { + let temp_dir = tempfile::tempdir().expect("create tempdir"); + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); + + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let (listener, handle) = AgentTunnelListener::bind(listen_addr, ca_manager, "localhost") + .await + .expect("bind QUIC listener"); + + let (shutdown, shutdown_signal) = ShutdownHandle::new(); + let task = tokio::spawn(async move { + use devolutions_gateway_task::Task; + let _ = listener.run(shutdown_signal).await; + }); + + // Give listener time to be ready. + tokio::time::sleep(Duration::from_millis(50)).await; + + TestListener { + handle, + shutdown, + task, + _temp_dir: temp_dir, + } +} + +/// Poll the registry until `agent_id` is present and has applied at least one +/// route advertisement with epoch ≥ `min_epoch`, or panic after 5 seconds. +/// +/// Replaces the older fixed-sleep pattern that raced on slow CI runners: +/// `ctrl.send(&RouteAdvertise)` only guarantees the message is on the wire, +/// not that the gateway has processed it. Default `RouteAdvertisementState` +/// starts at epoch 0, so any successful RouteAdvertise bumps it to ≥ 1. +pub(super) async fn wait_for_route_advertised(registry: &AgentRegistry, agent_id: Uuid, min_epoch: u64) { + let deadline = Instant::now() + Duration::from_secs(5); + loop { + if let Some(peer) = registry.get(&agent_id).await + && peer.route_state().epoch >= min_epoch + { + return; + } + if Instant::now() >= deadline { + panic!("agent {agent_id} did not advertise route at epoch >= {min_epoch} within 5s"); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } +} diff --git a/testsuite/tests/agent_tunnel/integration.rs b/testsuite/tests/agent_tunnel/integration.rs new file mode 100644 index 000000000..dcd541b64 --- /dev/null +++ b/testsuite/tests/agent_tunnel/integration.rs @@ -0,0 +1,351 @@ +//! Full-stack integration test for the QUIC agent tunnel (Quinn). +//! +//! Verifies the full data path: +//! TCP echo server ← Agent (Quinn client) ← QUIC mTLS ← Gateway listener ← TunnelStream +//! +//! This test runs entirely in-process with real UDP sockets on localhost. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use agent_tunnel::AgentTunnelListener; +use agent_tunnel::cert::{CaManager, extract_agent_id_from_pem}; +use agent_tunnel_proto::{ + CertRenewalResult, ConnectResponse, ControlMessage, ControlStream, DomainAdvertisement, SessionStream, +}; +use camino::Utf8PathBuf; +use ipnetwork::Ipv4Network; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use uuid::Uuid; + +use super::common::{ + connect_quinn_client, generate_csr_with_cn, generate_test_key_and_csr, start_echo_server, wait_for_route_advertised, +}; + +/// Full E2E integration test. +/// +/// 1. Start TCP echo server +/// 2. Start QUIC listener (gateway, in-process) +/// 3. Connect a simulated agent (Quinn client) with mTLS +/// 4. Agent sends RouteAdvertise on control stream +/// 5. Gateway opens a proxy stream via connect_via_agent +/// 6. Agent reads ConnectRequest, connects to echo server, sends ConnectResponse::Success +/// 7. Bidirectional data flows through the full tunnel +/// 8. Verify echo response matches +#[tokio::test] +async fn quic_agent_tunnel_e2e() { + // ── 1. Setup certificates ── + + let temp_dir = tempfile::tempdir().expect("create tempdir"); + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); + + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); + + let agent_id = Uuid::new_v4(); + let (key_pair, csr_pem) = generate_test_key_and_csr("test-agent"); + let signed = ca_manager + .sign_agent_csr(agent_id, "test-agent", &csr_pem, Some("localhost")) + .expect("sign agent CSR"); + + // ── 2. Start TCP echo server ── + + let (echo_addr, _echo_handle) = start_echo_server().await; + let echo_subnet: Ipv4Network = format!("{}/32", echo_addr.ip()).parse().unwrap(); + + // ── 3. Start QUIC listener (gateway) ── + + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let (listener, handle) = AgentTunnelListener::bind(listen_addr, Arc::clone(&ca_manager), "localhost") + .await + .expect("bind QUIC listener"); + + let server_addr = listener.local_addr(); + + let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); + let listener_task = tokio::spawn(async move { + use devolutions_gateway_task::Task; + let _ = listener.run(shutdown_signal).await; + }); + + // Give listener time to be ready. + tokio::time::sleep(Duration::from_millis(50)).await; + + // ── 4. Connect simulated agent (Quinn client with mTLS) ── + + let connection = connect_quinn_client( + &signed.ca_cert_pem, + &signed.client_cert_pem, + &key_pair.serialize_pem(), + server_addr, + ) + .await; + + // ── 5. Open control stream and send RouteAdvertise ── + + let mut ctrl: ControlStream<_, _> = connection.open_bi().await.expect("open control stream").into(); + + let route_msg = ControlMessage::route_advertise(1, vec![echo_subnet], vec![]); + ctrl.send(&route_msg).await.expect("send RouteAdvertise"); + + // Wait for the gateway to actually process the RouteAdvertise. + wait_for_route_advertised(handle.registry(), agent_id, 1).await; + + // Verify agent is registered. + assert!( + handle.registry().get(&agent_id).await.is_some(), + "agent should be registered in the registry" + ); + assert_eq!(handle.registry().online_count().await, 1); + + // ── 6. Gateway opens proxy stream ── + + let session_id = Uuid::new_v4(); + let target_str = echo_addr.to_string(); + + let handle_clone = handle.clone(); + let target_clone = target_str.clone(); + let proxy_task = tokio::spawn(async move { + handle_clone + .connect_via_agent(agent_id, session_id, &target_clone) + .await + }); + + // ── 7. Agent accepts session stream ── + + let (send, recv) = connection + .accept_bi() + .await + .expect("accept session stream from gateway"); + let mut session: SessionStream<_, _> = (send, recv).into(); + + let connect_msg = session.recv_request().await.expect("recv ConnectRequest"); + assert_eq!(connect_msg.session_id(), session_id); + assert_eq!(connect_msg.target(), target_str); + + // Connect to echo server. + let mut tcp_stream = TcpStream::connect(echo_addr).await.expect("connect to echo server"); + + // Send success response. + session + .send_response(&ConnectResponse::success()) + .await + .expect("send ConnectResponse::Success"); + + // ── 8. Wait for proxy task to complete ── + + let tunnel_stream = tokio::time::timeout(Duration::from_secs(5), proxy_task) + .await + .expect("proxy task should complete in time") + .expect("proxy task should not panic") + .expect("connect_via_agent should succeed"); + + // ── 9. Bidirectional data test ── + + let test_data = b"Hello from the Quinn E2E integration test!"; + let (mut quic_read, mut quic_write) = tokio::io::split(tunnel_stream); + + // Gateway writes test data. + quic_write.write_all(test_data).await.expect("write to TunnelStream"); + + // Agent relays: QUIC → TCP echo → QUIC. + let (mut session_send, mut session_recv) = session.into_inner(); + let mut relay_buf = vec![0u8; test_data.len()]; + session_recv + .read_exact(&mut relay_buf) + .await + .expect("read from QUIC session stream"); + assert_eq!(&relay_buf, test_data); + + // Forward to echo server. + tcp_stream.write_all(&relay_buf).await.expect("write to echo server"); + + // Read echo response. + let mut echo_buf = vec![0u8; test_data.len()]; + tcp_stream.read_exact(&mut echo_buf).await.expect("read echo response"); + assert_eq!(&echo_buf, test_data); + + // Send echo response back through QUIC. + session_send + .write_all(&echo_buf) + .await + .expect("write echo response to QUIC"); + let _ = session_send.finish(); + + // Gateway reads the echoed data. + let mut response_buf = vec![0u8; test_data.len()]; + quic_read + .read_exact(&mut response_buf) + .await + .expect("read from TunnelStream"); + assert_eq!(&response_buf, test_data, "echo response should match"); + + // ── 10. Cleanup ── + + connection.close(0u32.into(), b"test done"); + shutdown_handle.signal(); + let _ = tokio::time::timeout(Duration::from_secs(2), listener_task).await; +} + +/// Domain routing E2E test. +/// +/// Same as above but agent advertises domain "test.local" alongside subnet. +/// Verifies domain appears in the registry. +#[tokio::test] +async fn quic_agent_tunnel_domain_routing_e2e() { + let temp_dir = tempfile::tempdir().expect("create tempdir"); + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); + + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); + + let agent_id = Uuid::new_v4(); + let (key_pair, csr_pem) = generate_test_key_and_csr("domain-agent"); + let signed = ca_manager + .sign_agent_csr(agent_id, "domain-agent", &csr_pem, Some("localhost")) + .expect("sign agent CSR"); + + let (echo_addr, _echo_handle) = start_echo_server().await; + let echo_subnet: Ipv4Network = format!("{}/32", echo_addr.ip()).parse().unwrap(); + + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let (listener, handle) = AgentTunnelListener::bind(listen_addr, Arc::clone(&ca_manager), "localhost") + .await + .expect("bind QUIC listener"); + + let server_addr = listener.local_addr(); + + let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); + let listener_task = tokio::spawn(async move { + use devolutions_gateway_task::Task; + let _ = listener.run(shutdown_signal).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let connection = connect_quinn_client( + &signed.ca_cert_pem, + &signed.client_cert_pem, + &key_pair.serialize_pem(), + server_addr, + ) + .await; + + // Send RouteAdvertise with domain. + let mut ctrl: ControlStream<_, _> = connection.open_bi().await.expect("open control stream").into(); + + let domains = vec![DomainAdvertisement { + domain: agent_tunnel_proto::DomainName::new("test.local"), + auto_detected: false, + }]; + let route_msg = ControlMessage::route_advertise(1, vec![echo_subnet], domains); + ctrl.send(&route_msg).await.expect("send RouteAdvertise"); + + wait_for_route_advertised(handle.registry(), agent_id, 1).await; + + // Verify agent + domain registered. + let peer = handle + .registry() + .get(&agent_id) + .await + .expect("agent should be registered"); + + let route_state = peer.route_state(); + assert_eq!(route_state.domains.len(), 1); + assert_eq!(route_state.domains[0].domain.as_str(), "test.local"); + assert!(!route_state.domains[0].auto_detected); + + // Cleanup. + connection.close(0u32.into(), b"test done"); + shutdown_handle.signal(); + let _ = tokio::time::timeout(Duration::from_secs(2), listener_task).await; +} + +/// Certificate renewal E2E test. +/// +/// Pins the security invariant introduced by #1775 review: the gateway must +/// re-sign with the agent's mTLS-authenticated identity, never the CSR +/// subject. Here the renewal CSR is deliberately filed under +/// `CN=evil-impersonator` — the renewed cert's URN SAN must still encode the +/// original `agent_id`. +#[tokio::test] +async fn cert_renewal_preserves_mtls_identity_e2e() { + let temp_dir = tempfile::tempdir().expect("create tempdir"); + let data_dir = Utf8PathBuf::from_path_buf(temp_dir.path().to_path_buf()).expect("UTF-8 temp path"); + + let ca_manager = CaManager::load_or_generate(&data_dir).expect("CA generation"); + + let agent_id = Uuid::new_v4(); + let (key_pair, csr_pem) = generate_test_key_and_csr("renewal-agent"); + let signed = ca_manager + .sign_agent_csr(agent_id, "renewal-agent", &csr_pem, Some("localhost")) + .expect("sign initial agent CSR"); + + let listen_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let (listener, handle) = AgentTunnelListener::bind(listen_addr, Arc::clone(&ca_manager), "localhost") + .await + .expect("bind QUIC listener"); + + let server_addr = listener.local_addr(); + + let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); + let listener_task = tokio::spawn(async move { + use devolutions_gateway_task::Task; + let _ = listener.run(shutdown_signal).await; + }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let connection = connect_quinn_client( + &signed.ca_cert_pem, + &signed.client_cert_pem, + &key_pair.serialize_pem(), + server_addr, + ) + .await; + + let mut ctrl: ControlStream<_, _> = connection.open_bi().await.expect("open control stream").into(); + + // Agent must announce routes first so the control loop is established. + let route_msg = ControlMessage::route_advertise(1, vec![], vec![]); + ctrl.send(&route_msg).await.expect("send RouteAdvertise"); + wait_for_route_advertised(handle.registry(), agent_id, 1).await; + + // Build the renewal CSR with an attacker-chosen Common Name. + let (_renewal_key, evil_csr_pem) = generate_csr_with_cn("evil-impersonator"); + let renewal_msg = ControlMessage::cert_renewal_request(evil_csr_pem); + ctrl.send(&renewal_msg).await.expect("send CertRenewalRequest"); + + let response = tokio::time::timeout(Duration::from_secs(5), ctrl.recv()) + .await + .expect("renewal response within timeout") + .expect("decode renewal response"); + + let renewed_pem = match response { + ControlMessage::CertRenewalResponse { + result: + CertRenewalResult::Success { + client_cert_pem, + gateway_ca_cert_pem, + }, + .. + } => { + assert_eq!( + gateway_ca_cert_pem, signed.ca_cert_pem, + "renewal must echo back the same CA cert" + ); + client_cert_pem + } + other => panic!("expected CertRenewalResponse::Success, got {other:?}"), + }; + + let renewed_agent_id = extract_agent_id_from_pem(&renewed_pem).expect("renewed cert has urn:uuid SAN"); + assert_eq!( + renewed_agent_id, agent_id, + "renewed cert must encode the mTLS-authenticated agent_id, not the CSR subject" + ); + + connection.close(0u32.into(), b"test done"); + shutdown_handle.signal(); + let _ = tokio::time::timeout(Duration::from_secs(2), listener_task).await; +} diff --git a/testsuite/tests/agent_tunnel/mod.rs b/testsuite/tests/agent_tunnel/mod.rs new file mode 100644 index 000000000..8bfd61eed --- /dev/null +++ b/testsuite/tests/agent_tunnel/mod.rs @@ -0,0 +1,13 @@ +//! Agent-tunnel integration tests. +//! +//! Cover the QUIC tunnel data path end-to-end (`integration`), the registry +//! online/offline accounting (`registry`), the routing decision pipeline +//! (`routing`), and the certificate signing / identity-extraction helpers +//! (`cert`). All exercise the live `agent-tunnel` crate; no mocking of the +//! QUIC layer. + +mod cert; +mod common; +mod integration; +mod registry; +mod routing; diff --git a/testsuite/tests/agent_tunnel/registry.rs b/testsuite/tests/agent_tunnel/registry.rs new file mode 100644 index 000000000..e3a1bac1d --- /dev/null +++ b/testsuite/tests/agent_tunnel/registry.rs @@ -0,0 +1,192 @@ +use std::sync::Arc; + +use agent_tunnel::registry::{AGENT_OFFLINE_TIMEOUT, AgentPeer, AgentRegistry}; +use agent_tunnel_proto::{DomainAdvertisement, DomainName}; +use ipnetwork::Ipv4Network; +use uuid::Uuid; + +fn make_peer(name: &str) -> Arc { + Arc::new(AgentPeer::new( + Uuid::new_v4(), + String::from(name), + String::from("sha256:deadbeef"), + )) +} + +fn domain(name: &str, auto_detected: bool) -> DomainAdvertisement { + DomainAdvertisement { + domain: DomainName::new(name), + auto_detected, + } +} + +#[tokio::test] +async fn register_and_lookup() { + let registry = AgentRegistry::new(); + let peer = make_peer("test-agent"); + let agent_id = peer.agent_id; + + registry.register(Arc::clone(&peer)).await; + assert_eq!(registry.len().await, 1); + + let found = registry.get(&agent_id).await.expect("agent should be found"); + assert_eq!(found.agent_id, agent_id); +} + +#[tokio::test] +async fn unregister_removes_agent() { + let registry = AgentRegistry::new(); + let peer = make_peer("test-agent"); + let agent_id = peer.agent_id; + + registry.register(Arc::clone(&peer)).await; + let removed = registry.unregister(&agent_id).await; + assert!(removed.is_some()); + assert_eq!(registry.len().await, 0); + assert!(registry.get(&agent_id).await.is_none()); +} + +#[test] +fn is_online_within_timeout() { + let peer = make_peer("online-agent"); + peer.touch(); + assert!(peer.is_online(AGENT_OFFLINE_TIMEOUT)); +} + +#[test] +fn is_offline_after_timeout() { + let peer = AgentPeer::new( + Uuid::new_v4(), + String::from("offline-agent"), + String::from("sha256:deadbeef"), + ); + peer.set_last_seen_for_test(0); + assert!(!peer.is_online(AGENT_OFFLINE_TIMEOUT)); +} + +#[test] +fn update_routes_new_epoch_replaces() { + let peer = make_peer("route-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(1, vec![subnet], vec![]); + let state = peer.route_state(); + assert_eq!(state.epoch, 1); + assert_eq!(state.subnets.len(), 1); + + let new_subnet: Ipv4Network = "192.168.0.0/16".parse().expect("valid CIDR"); + peer.update_routes(2, vec![new_subnet], vec![]); + let state = peer.route_state(); + assert_eq!(state.epoch, 2); + assert_eq!(state.subnets.len(), 1); + assert_eq!(state.subnets[0], new_subnet); +} + +#[test] +fn update_routes_same_epoch_refreshes_only() { + let peer = make_peer("refresh-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(1, vec![subnet], vec![]); + let state_before = peer.route_state(); + let received_at_before = state_before.received_at; + + let different_subnet: Ipv4Network = "172.16.0.0/12".parse().expect("valid CIDR"); + peer.update_routes(1, vec![different_subnet], vec![]); + + let state_after = peer.route_state(); + assert_eq!(state_after.epoch, 1); + assert_eq!(state_after.subnets[0], subnet); + assert_eq!(state_after.received_at, received_at_before); + assert!(state_after.updated_at >= state_before.updated_at); +} + +#[test] +fn update_routes_stale_epoch_ignored() { + let peer = make_peer("stale-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(5, vec![subnet], vec![]); + let old_subnet: Ipv4Network = "172.16.0.0/12".parse().expect("valid CIDR"); + peer.update_routes(3, vec![old_subnet], vec![]); + + let state = peer.route_state(); + assert_eq!(state.epoch, 5); + assert_eq!(state.subnets[0], subnet); +} + +#[tokio::test] +async fn agent_infos_snapshot() { + let registry = AgentRegistry::new(); + let peer = make_peer("info-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + peer.update_routes(1, vec![subnet], vec![]); + registry.register(peer).await; + + let infos = registry.agent_infos().await; + assert_eq!(infos.len(), 1); + assert_eq!(infos[0].name, "info-agent"); + assert!(infos[0].is_online); + assert_eq!(infos[0].subnets, vec!["10.0.0.0/8"]); + assert_eq!(infos[0].route_epoch, 1); +} + +#[tokio::test] +async fn online_count_accuracy() { + let registry = AgentRegistry::new(); + + let online_agent = make_peer("online"); + registry.register(Arc::clone(&online_agent)).await; + + let offline_agent = make_peer("offline"); + offline_agent.set_last_seen_for_test(0); + registry.register(offline_agent).await; + + assert_eq!(registry.len().await, 2); + assert_eq!(registry.online_count().await, 1); +} + +#[tokio::test] +async fn default_trait_creates_empty_registry() { + let registry = AgentRegistry::default(); + assert_eq!(registry.len().await, 0); +} + +#[test] +fn update_routes_stores_domains_with_source() { + let peer = make_peer("domain-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(1, vec![subnet], vec![domain("contoso.local", false)]); + let state = peer.route_state(); + assert_eq!(state.domains.len(), 1); + assert_eq!(state.domains[0].domain.as_str(), "contoso.local"); + assert!(!state.domains[0].auto_detected); +} + +#[test] +fn update_routes_new_epoch_replaces_domains() { + let peer = make_peer("domain-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(1, vec![subnet], vec![domain("old.local", false)]); + peer.update_routes(2, vec![subnet], vec![domain("new.local", true)]); + + let state = peer.route_state(); + assert_eq!(state.epoch, 2); + assert_eq!(state.domains[0].domain.as_str(), "new.local"); + assert!(state.domains[0].auto_detected); +} + +#[test] +fn update_routes_same_epoch_preserves_domains() { + let peer = make_peer("domain-agent"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + + peer.update_routes(1, vec![subnet], vec![domain("contoso.local", false)]); + peer.update_routes(1, vec![subnet], vec![domain("different.local", true)]); + + let state = peer.route_state(); + assert_eq!(state.domains[0].domain.as_str(), "contoso.local"); + assert!(!state.domains[0].auto_detected); +} diff --git a/testsuite/tests/agent_tunnel/routing.rs b/testsuite/tests/agent_tunnel/routing.rs new file mode 100644 index 000000000..2f6b6d9bb --- /dev/null +++ b/testsuite/tests/agent_tunnel/routing.rs @@ -0,0 +1,283 @@ +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::Arc; + +use agent_tunnel::registry::{AgentPeer, AgentRegistry}; +use agent_tunnel::routing::{RouteTarget, RoutingDecision, resolve_route, route_and_connect, try_route}; +use agent_tunnel_proto::{DomainAdvertisement, DomainName}; +use ipnetwork::Ipv4Network; +use uuid::Uuid; + +use super::common::bind_test_listener; + +fn ip(s: &str) -> RouteTarget { + RouteTarget::Ip(IpAddr::V4(s.parse::().expect("valid test ipv4"))) +} + +fn host(s: &str) -> RouteTarget { + RouteTarget::hostname(s) +} + +fn make_peer(name: &str) -> Arc { + Arc::new(AgentPeer::new( + Uuid::new_v4(), + name.to_owned(), + "sha256:test".to_owned(), + )) +} + +fn domain(name: &str) -> DomainAdvertisement { + DomainAdvertisement { + domain: DomainName::new(name), + auto_detected: false, + } +} + +#[tokio::test] +async fn route_explicit_agent_id() { + let registry = AgentRegistry::new(); + let peer = make_peer("agent-a"); + let agent_id = peer.agent_id; + registry.register(Arc::clone(&peer)).await; + + match resolve_route(®istry, Some(agent_id), &host("anything")).await { + RoutingDecision::ViaAgent(agents) => { + assert_eq!(agents.len(), 1); + assert_eq!(agents[0].agent_id, agent_id); + } + other => panic!("expected ViaAgent, got {other:?}"), + } +} + +#[tokio::test] +async fn route_explicit_agent_id_not_found() { + let registry = AgentRegistry::new(); + let bogus_id = Uuid::new_v4(); + + match resolve_route(®istry, Some(bogus_id), &host("anything")).await { + RoutingDecision::ExplicitAgentNotFound(id) => { + assert_eq!(id, bogus_id); + } + other => panic!("expected ExplicitAgentNotFound, got {other:?}"), + } +} + +#[tokio::test] +async fn route_ip_target_via_subnet() { + let registry = AgentRegistry::new(); + let peer = make_peer("agent-a"); + let agent_id = peer.agent_id; + let subnet: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer.update_routes(1, vec![subnet], vec![]); + registry.register(peer).await; + + match resolve_route(®istry, None, &ip("10.1.5.50")).await { + RoutingDecision::ViaAgent(agents) => { + assert_eq!(agents[0].agent_id, agent_id); + } + other => panic!("expected ViaAgent, got {other:?}"), + } +} + +#[tokio::test] +async fn route_hostname_via_domain() { + let registry = AgentRegistry::new(); + let peer = make_peer("agent-a"); + let agent_id = peer.agent_id; + let subnet: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer.update_routes(1, vec![subnet], vec![domain("contoso.local")]); + registry.register(peer).await; + + match resolve_route(®istry, None, &host("dc01.contoso.local")).await { + RoutingDecision::ViaAgent(agents) => { + assert_eq!(agents[0].agent_id, agent_id); + } + other => panic!("expected ViaAgent, got {other:?}"), + } +} + +#[tokio::test] +async fn route_no_match_returns_direct() { + let registry = AgentRegistry::new(); + let peer = make_peer("agent-a"); + let subnet: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer.update_routes(1, vec![subnet], vec![domain("contoso.local")]); + registry.register(peer).await; + + assert!(matches!( + resolve_route(®istry, None, &host("external.example.com")).await, + RoutingDecision::Direct + )); +} + +#[tokio::test] +async fn route_ip_no_match_returns_direct() { + let registry = AgentRegistry::new(); + let peer = make_peer("agent-a"); + let subnet: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer.update_routes(1, vec![subnet], vec![]); + registry.register(peer).await; + + assert!(matches!( + resolve_route(®istry, None, &ip("192.168.1.1")).await, + RoutingDecision::Direct + )); +} + +#[tokio::test] +async fn route_skips_offline_agents() { + let registry = AgentRegistry::new(); + let peer = make_peer("offline-agent"); + let subnet: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer.update_routes(1, vec![subnet], vec![domain("contoso.local")]); + peer.set_last_seen_for_test(0); + registry.register(peer).await; + + assert!(matches!( + resolve_route(®istry, None, &host("dc01.contoso.local")).await, + RoutingDecision::Direct + )); +} + +#[tokio::test] +async fn route_domain_match_returns_multiple_agents_ordered() { + let registry = AgentRegistry::new(); + + let peer_a = make_peer("agent-a"); + let subnet_a: Ipv4Network = "10.1.0.0/16".parse().expect("valid test subnet"); + peer_a.update_routes(1, vec![subnet_a], vec![domain("contoso.local")]); + // Pin `received_at` explicitly — do NOT rely on thread::sleep to advance + // SystemTime, since Windows timer resolution is ~16 ms and makes the + // ordering assertion below flaky. + peer_a.set_received_at_for_test(std::time::UNIX_EPOCH + std::time::Duration::from_secs(1)); + registry.register(Arc::clone(&peer_a)).await; + + let peer_b = make_peer("agent-b"); + let id_b = peer_b.agent_id; + let subnet_b: Ipv4Network = "10.2.0.0/16".parse().expect("valid test subnet"); + peer_b.update_routes(1, vec![subnet_b], vec![domain("contoso.local")]); + peer_b.set_received_at_for_test(std::time::UNIX_EPOCH + std::time::Duration::from_secs(2)); + registry.register(Arc::clone(&peer_b)).await; + + match resolve_route(®istry, None, &host("dc01.contoso.local")).await { + RoutingDecision::ViaAgent(agents) => { + assert_eq!(agents.len(), 2); + assert_eq!(agents[0].agent_id, id_b, "most recent first"); + } + other => panic!("expected ViaAgent, got {other:?}"), + } +} + +// A token that names a specific `jet_agent_id` must NOT silently fall back to a +// direct connect when the gateway has no agent-tunnel configured — doing so +// would bypass the intended network boundary. +#[tokio::test] +async fn try_route_rejects_explicit_agent_when_handle_missing() { + let result = try_route( + None, + Some(Uuid::new_v4()), + &host("host.example.com"), + Uuid::new_v4(), + "host.example.com:443", + ) + .await; + + assert!(result.is_err(), "expected Err for explicit agent with no handle"); +} + +// Without an explicit `jet_agent_id`, a missing handle just means "no tunneling +// available" — falling back to a direct connect is correct. +#[tokio::test] +async fn try_route_without_explicit_agent_falls_through_when_handle_missing() { + let result = try_route( + None, + None, + &host("host.example.com"), + Uuid::new_v4(), + "host.example.com:443", + ) + .await; + + match result { + Ok(None) => {} + Ok(Some(_)) => panic!("expected Ok(None), got Ok(Some(_))"), + Err(e) => panic!("expected Ok(None), got Err: {e:#}"), + } +} + +#[tokio::test] +async fn route_and_connect_with_empty_candidates_errors() { + let listener = bind_test_listener().await; + + let err = match route_and_connect(&listener.handle, &[], Uuid::new_v4(), "10.1.1.1:22").await { + Ok(_) => panic!("expected Err for empty candidate list"), + Err(e) => e, + }; + + let msg = format!("{err:#}"); + assert!( + msg.contains("empty candidates"), + "error should mention empty candidates, got: {msg}" + ); + + listener.shutdown().await; +} + +// With a real handle but a target that no registered agent can reach, +// `try_route` must return `Ok(None)` so the caller falls through to a +// direct connection — without ever attempting QUIC traffic. +#[tokio::test] +async fn try_route_falls_through_when_no_agent_matches() { + let listener = bind_test_listener().await; + + let peer = make_peer("agent-a"); + let subnet: Ipv4Network = "10.0.0.0/8".parse().expect("valid CIDR"); + peer.update_routes(1, vec![subnet], vec![domain("contoso.local")]); + listener.handle.registry().register(peer).await; + + let result = try_route( + Some(&listener.handle), + None, + &host("external.example.com"), + Uuid::new_v4(), + "external.example.com:443", + ) + .await; + + match result { + Ok(None) => {} + Ok(Some(_)) => panic!("expected Ok(None), got Ok(Some(_))"), + Err(e) => panic!("expected Ok(None), got Err: {e:#}"), + } + + listener.shutdown().await; +} + +// With a real handle but an explicit `jet_agent_id` that no agent in the +// registry can satisfy, `try_route` must error rather than silently fall +// back to a direct connection — the explicit claim is a security boundary. +#[tokio::test] +async fn try_route_errors_on_explicit_agent_not_found() { + let listener = bind_test_listener().await; + + let bogus_id = Uuid::new_v4(); + let err = match try_route( + Some(&listener.handle), + Some(bogus_id), + &host("anywhere.example.com"), + Uuid::new_v4(), + "anywhere.example.com:443", + ) + .await + { + Ok(_) => panic!("expected Err for explicit agent_id not in registry"), + Err(e) => e, + }; + + let msg = format!("{err:#}"); + assert!( + msg.contains("not found in registry"), + "error should mention the missing agent, got: {msg}" + ); + + listener.shutdown().await; +} diff --git a/testsuite/tests/main.rs b/testsuite/tests/main.rs index 5d8b1e6c5..c2a339878 100644 --- a/testsuite/tests/main.rs +++ b/testsuite/tests/main.rs @@ -2,6 +2,7 @@ #![allow(clippy::print_stdout, reason = "test code uses print for diagnostics")] #![allow(clippy::print_stderr, reason = "test code uses print for diagnostics")] +mod agent_tunnel; mod cli; mod mcp_proxy; mod network_scanner;