diff --git a/ctf/askgod/__init__.py b/ctf/askgod/__init__.py index c22cf6b..1f5dc95 100644 --- a/ctf/askgod/__init__.py +++ b/ctf/askgod/__init__.py @@ -2,5 +2,5 @@ from ctf.askgod.stats import app as stats_app -app = typer.Typer() +app = typer.Typer(no_args_is_help=True) app.add_typer(stats_app) diff --git a/ctf/askgod/stats.py b/ctf/askgod/stats.py index 070943e..f3ad844 100644 --- a/ctf/askgod/stats.py +++ b/ctf/askgod/stats.py @@ -45,7 +45,9 @@ def stats( f"Could not find flag for score with flag_id {score['flag_id']}" ) LOG.info(f"Analyzing {len(scores)} scores...") - ai_agent_scores = [s for s in scores if s["ai_agent"]] + ai_agent_scores = [ + s for s in scores if "agent" in s["source"] or "mcp" in s["source"] + ] stats["total_scores"] = len(scores) stats["ai_agent_scores"] = len(ai_agent_scores) stats["ai_agent_score_percentage"] = ( @@ -125,7 +127,7 @@ def stats( } for score in scores: stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] += 1 - if score["ai_agent"]: + if "agent" in score["source"] or "mcp" in score["source"]: stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] += 1 stats["ai_agent_solve_per_point"][score["value"]][ "ai_agent_solve_percentage" @@ -156,7 +158,7 @@ def stats( if bucket_key not in buckets: buckets[bucket_key] = {"ai_count": 0, "total_count": 0} buckets[bucket_key]["total_count"] += 1 - if score["ai_agent"]: + if "agent" in score["source"] or "mcp" in score["source"]: buckets[bucket_key]["ai_count"] += 1 stats["ai_agent_percentage_over_time"] = [ { @@ -418,7 +420,7 @@ def generate_html(stats: dict) -> str: def get(session: requests.Session, url: str) -> dict: try: - response = session.get(url=f"{session.base_url}{url}") + response = session.get(url=f"{session.base_url}{url}", verify=False) response.raise_for_status() return response.json() except requests.HTTPError as e: diff --git a/ctf/services.py b/ctf/services.py index 24cbe22..f0d99cc 100644 --- a/ctf/services.py +++ b/ctf/services.py @@ -1,5 +1,7 @@ import os +import socket +import requests import rich import typer from typing_extensions import Annotated @@ -20,6 +22,9 @@ def services( help="Only services from the given tracks (use the directory name)", ), ] = [], + check: Annotated[ + bool, typer.Option("--check", "-c", help="Check every service") + ] = False, ) -> None: distinct_tracks: set[str] = set() for entry in os.listdir( @@ -37,20 +42,87 @@ def services( elif entry in tracks: distinct_tracks.add(entry) + all_services = [] + for track in distinct_tracks: LOG.debug(msg=f"Parsing track.yaml for track {track}") track_yaml = parse_track_yaml(track_name=track) - if len(track_yaml["services"]) == 0: + services = track_yaml.get("services", []) + for instance_name, instance in track_yaml.get("instances", {}).items(): + services += [ + {"instance": instance_name, "address": instance.get("ipv6"), **service} + for service in instance.get("services", []) + ] + + if len(services) == 0: LOG.debug(msg=f"No service in track {track}. Skipping...") continue - for service in track_yaml["services"]: + for service in services: contact = ",".join(track_yaml["contacts"]["support"]) name = service["name"] instance = service["instance"] address = service["address"] - check = service["check"] + check_type = service["check"] + port = service["port"] + + rich.print( + f"{instance}/{name} {contact.replace(' ', '_')} {address} {check_type} {port}", + ) + + all_services += services + + if check: + LOG.info("Checking services...") + for service in all_services: + name = service["name"] + instance = service["instance"] + address = service["address"] + check_type = service["check"] port = service["port"] - rich.print(f"{track}/{instance}/{name} {contact} {address} {check} {port}") + LOG.info(f"Checking {check_type} {instance}/{name} at {address}:{port}...") + + if check_type == "tcp": + success = check_tcp_port(host=address, port=port) + if not success: + LOG.error( + f"TCP Service {instance}/{name} is NOT responding on {address}:{port}" + ) + elif check_type == "http": + try: + response = requests.get( + f"http://[{address}]:{port}", timeout=5, verify=False + ) + success = response.status_code == 200 + print( + f"HTTP Service {instance}/{name} returned status code: {response.status_code}" + ) + + except Exception as e: + LOG.error( + f"Error occurred while checking HTTP service {instance}/{name}: {e}" + ) + success = False + if not success: + LOG.error( + f"HTTP Service {instance}/{name} is NOT responding on {address}:{port}" + ) + else: + LOG.warning( + f"Unknown check type {check_type} for service {instance}/{name}. Skipping check." + ) + + +def check_tcp_port(host, port, timeout=2): + """ + Checks if a TCP port is open at a specific host. + Returns True if open, False otherwise. + """ + with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: + s.settimeout(timeout) + # connect_ex returns 0 on success, and an error code on failure + result = s.connect_ex((host, port)) + success = result == 0 + return success