Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ctf/askgod/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

from ctf.askgod.stats import app as stats_app

app = typer.Typer()
app = typer.Typer(no_args_is_help=True)
app.add_typer(stats_app)
10 changes: 6 additions & 4 deletions ctf/askgod/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def stats(
f"Could not find flag for score with flag_id {score['flag_id']}"
)
LOG.info(f"Analyzing {len(scores)} scores...")
ai_agent_scores = [s for s in scores if s["ai_agent"]]
ai_agent_scores = [
s for s in scores if "agent" in s["source"] or "mcp" in s["source"]
]
stats["total_scores"] = len(scores)
stats["ai_agent_scores"] = len(ai_agent_scores)
stats["ai_agent_score_percentage"] = (
Expand Down Expand Up @@ -125,7 +127,7 @@ def stats(
}
for score in scores:
stats["ai_agent_solve_per_point"][score["value"]]["total_solves"] += 1
if score["ai_agent"]:
if "agent" in score["source"] or "mcp" in score["source"]:
stats["ai_agent_solve_per_point"][score["value"]]["ai_agent_solves"] += 1
stats["ai_agent_solve_per_point"][score["value"]][
"ai_agent_solve_percentage"
Expand Down Expand Up @@ -156,7 +158,7 @@ def stats(
if bucket_key not in buckets:
buckets[bucket_key] = {"ai_count": 0, "total_count": 0}
buckets[bucket_key]["total_count"] += 1
if score["ai_agent"]:
if "agent" in score["source"] or "mcp" in score["source"]:
buckets[bucket_key]["ai_count"] += 1
stats["ai_agent_percentage_over_time"] = [
{
Expand Down Expand Up @@ -418,7 +420,7 @@ def generate_html(stats: dict) -> str:

def get(session: requests.Session, url: str) -> dict:
try:
response = session.get(url=f"{session.base_url}{url}")
response = session.get(url=f"{session.base_url}{url}", verify=False)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
Expand Down
80 changes: 76 additions & 4 deletions ctf/services.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import socket

import requests
import rich
import typer
from typing_extensions import Annotated
Expand All @@ -20,6 +22,9 @@ def services(
help="Only services from the given tracks (use the directory name)",
),
] = [],
check: Annotated[
bool, typer.Option("--check", "-c", help="Check every service")
] = False,
) -> None:
distinct_tracks: set[str] = set()
for entry in os.listdir(
Expand All @@ -37,20 +42,87 @@ def services(
elif entry in tracks:
distinct_tracks.add(entry)

all_services = []

for track in distinct_tracks:
LOG.debug(msg=f"Parsing track.yaml for track {track}")
track_yaml = parse_track_yaml(track_name=track)

if len(track_yaml["services"]) == 0:
services = track_yaml.get("services", [])
for instance_name, instance in track_yaml.get("instances", {}).items():
services += [
{"instance": instance_name, "address": instance.get("ipv6"), **service}
for service in instance.get("services", [])
]

if len(services) == 0:
LOG.debug(msg=f"No service in track {track}. Skipping...")
continue

for service in track_yaml["services"]:
for service in services:
contact = ",".join(track_yaml["contacts"]["support"])
name = service["name"]
instance = service["instance"]
address = service["address"]
check = service["check"]
check_type = service["check"]
port = service["port"]

rich.print(
f"{instance}/{name} {contact.replace(' ', '_')} {address} {check_type} {port}",
)

all_services += services

if check:
LOG.info("Checking services...")
for service in all_services:
name = service["name"]
instance = service["instance"]
address = service["address"]
check_type = service["check"]
port = service["port"]

rich.print(f"{track}/{instance}/{name} {contact} {address} {check} {port}")
LOG.info(f"Checking {check_type} {instance}/{name} at {address}:{port}...")

if check_type == "tcp":
success = check_tcp_port(host=address, port=port)
if not success:
LOG.error(
f"TCP Service {instance}/{name} is NOT responding on {address}:{port}"
)
elif check_type == "http":
try:
response = requests.get(
f"http://[{address}]:{port}", timeout=5, verify=False
)
success = response.status_code == 200
print(
f"HTTP Service {instance}/{name} returned status code: {response.status_code}"
)

except Exception as e:
LOG.error(
f"Error occurred while checking HTTP service {instance}/{name}: {e}"
)
success = False
if not success:
LOG.error(
f"HTTP Service {instance}/{name} is NOT responding on {address}:{port}"
)
else:
LOG.warning(
f"Unknown check type {check_type} for service {instance}/{name}. Skipping check."
)


def check_tcp_port(host, port, timeout=2):
"""
Checks if a TCP port is open at a specific host.
Returns True if open, False otherwise.
"""
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
# connect_ex returns 0 on success, and an error code on failure
result = s.connect_ex((host, port))
success = result == 0
return success
Loading