Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ctf/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ctf.list import app as list_app
from ctf.logger import LOG
from ctf.new import app as new_app
from ctf.post import app as post_app
from ctf.redeploy import app as redeploy_app
from ctf.services import app as services_app
from ctf.stats import app as stats_app
Expand All @@ -43,6 +44,12 @@
app.add_typer(init_app)
app.add_typer(list_app)
app.add_typer(new_app)
app.add_typer(
post_app,
name="post",
help="Commands to manage discourse post files.",
rich_help_panel="Subcommands",
)
app.add_typer(redeploy_app)
app.add_typer(services_app)
app.add_typer(stats_app)
Expand Down
6 changes: 6 additions & 0 deletions ctf/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class Track(BaseModel):
has_virtual_machine: bool = False
already_deployed: bool = False

@property
def location(self):
from ctf.utils import find_ctf_root_directory

return find_ctf_root_directory() / self.name

def __eq__(self, other: Any) -> bool:
match other:
case str():
Expand Down
6 changes: 6 additions & 0 deletions ctf/post/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import typer

from .new import app as new_app

app = typer.Typer(no_args_is_help=True)
app.add_typer(new_app)
222 changes: 222 additions & 0 deletions ctf/post/new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import os
import re
from enum import StrEnum
from pathlib import Path

import typer
from typing_extensions import Annotated

from ctf.logger import LOG
from ctf.models import Track
from ctf.utils import (
get_all_available_tracks,
parse_track_yaml,
)

app = typer.Typer()


class ApiUser(StrEnum):
NSEC = "nsec"
SYSTEM = "system"


class TriggerType(StrEnum):
FLAG = "flag"


def _format_yaml_block(text: str) -> str:
lines = text.splitlines() or [""]
return "\n".join(f" {line}" for line in lines)


def _default_post_filename(track: Track, tag: str) -> str:
normalized_track = track.name.replace("-", "_")
suffix = tag
if tag.startswith(normalized_track + "_"):
suffix = tag[len(normalized_track) + 1 :]
suffix = re.sub(r"[^a-zA-Z0-9_-]+", "-", suffix).strip("-_")
if not suffix:
suffix = "post"
return f"{track}-{suffix.replace('_', '-')}.yaml"


def _get_available_discourse_tags(track: Track) -> list[str]:
track_yaml = parse_track_yaml(track_name=track.name)
tags: set[str] = set()
for flag in track_yaml.get("flags", []):
discourse_tag = ((flag or {}).get("tags") or {}).get("discourse")
if isinstance(discourse_tag, str) and discourse_tag.strip():
tags.add(discourse_tag.strip())
return sorted(tags)


def _add_counter_to_filename(posts_directory: Path, filename: str) -> str:
base, ext = os.path.splitext(filename)
if not ext:
ext = ".yaml"

candidate = f"{base}{ext}"
if not (posts_directory / candidate).exists():
return candidate

counter = 2
while (posts_directory / f"{base}-{counter}{ext}").exists():
counter += 1
return f"{base}-{counter}{ext}"


def _resolve_post_file_path(
posts_directory: Path,
track: Track,
name: str | None,
tag: str | None,
force: bool,
) -> str:
filename = (
f"{track}-{name}.yaml"
if name
else (
_default_post_filename(track=track, tag=tag)
if tag
else f"{track}-post.yaml"
)
)

if not force:
filename = _add_counter_to_filename(posts_directory, filename)

return os.path.join(posts_directory, filename)


def _render_post_yaml(
track: Track,
user: ApiUser,
body: str,
trigger: TriggerType | None = None,
tag: str | None = None,
) -> str:
lines = [
"type: post",
f"topic: {track}",
]

if trigger == TriggerType.FLAG:
lines.extend(
[
"trigger:",
" type: flag",
f" tag: {tag}",
]
)

lines.extend(
[
"api:",
f" user: {user.value}",
"body: |-",
_format_yaml_block(body),
]
)

return "\n".join(lines) + "\n"


@app.command(
"new",
help="Create a new discourse post YAML file for a track.",
no_args_is_help=True,
)
def new(
track: Annotated[
str,
typer.Option(
"--track",
"-t",
help="Track name (challenge directory name).",
),
],
tag: Annotated[
str | None,
typer.Option(
"--tag",
help="Discourse trigger tag, usually from track.yaml flag tags.discourse. Required when --trigger flag is set.",
),
] = None,
trigger: Annotated[
TriggerType | None,
typer.Option(
"--trigger",
help="Trigger type for this post. If omitted, no trigger block is added.",
),
] = None,
name: Annotated[
str | None,
typer.Option(
"--name",
"-n",
help="Post file name. Defaults to a name derived from the track and tag.",
),
] = None,
user: Annotated[
ApiUser,
typer.Option("--user", help="Discourse user posting this message."),
] = ApiUser.NSEC,
body: Annotated[
str,
typer.Option("--body", help="Post body. Markdown is supported."),
] = "CHANGE_ME",
force: Annotated[
bool,
typer.Option("--force", help="Overwrite the post file if it already exists."),
] = False,
) -> None:
if (track_obj := Track(name=track)) not in get_all_available_tracks():
LOG.critical(f"Track directory not found: {track_obj.name}. Verify --track.")
raise typer.Exit(1)

posts_directory: Path = track_obj.location / "posts"
os.makedirs(posts_directory, exist_ok=True)

# TODO: add support for other triggers
if trigger == TriggerType.FLAG and not tag:
LOG.critical("--tag is required when --trigger flag is provided.")
raise typer.Exit(1)

if trigger != TriggerType.FLAG and tag:
LOG.critical("--tag can only be used with --trigger flag.")
raise typer.Exit(1)

if trigger == TriggerType.FLAG and tag:
valid_tags = _get_available_discourse_tags(track=track_obj)
if tag not in valid_tags:
if valid_tags:
LOG.critical(
f'Invalid --tag "{tag}" for track "{track_obj.name}". Valid tags: {", ".join(valid_tags)}'
)
else:
LOG.critical(
f'Invalid --tag "{tag}" for track "{track_obj.name}". No discourse tags were found in track.yaml flags[].tags.discourse.'
)
raise typer.Exit(1)

post_file_path = _resolve_post_file_path(
posts_directory=posts_directory,
track=track_obj,
name=name,
tag=tag,
force=force,
)

post_yaml = _render_post_yaml(
track=track_obj,
user=user,
body=body,
trigger=trigger,
tag=tag,
)

with open(post_file_path, "w", encoding="utf-8") as f:
f.write(post_yaml)

LOG.info(f"Created post file: {post_file_path}")
7 changes: 4 additions & 3 deletions ctf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import subprocess
import textwrap
from pathlib import Path
from typing import Any, Generator

import jinja2
Expand Down Expand Up @@ -407,10 +408,10 @@ def parse_post_yamls(track_name: str) -> list[dict]:
return posts


def find_ctf_root_directory() -> str:
def find_ctf_root_directory() -> Path:
global __CTF_ROOT_DIRECTORY
if __CTF_ROOT_DIRECTORY:
return __CTF_ROOT_DIRECTORY
return Path(__CTF_ROOT_DIRECTORY)

path: str = (
str(ENV.get("CTF_ROOT_DIR"))
Expand All @@ -431,7 +432,7 @@ def find_ctf_root_directory() -> str:
exit(1)

LOG.debug(msg=f"Found root directory: {path}")
return (__CTF_ROOT_DIRECTORY := path)
return Path(__CTF_ROOT_DIRECTORY := path)


def is_ctf_dir(path):
Expand Down
Loading