diff --git a/ctf/__main__.py b/ctf/__main__.py index 23f1648..97fe8c8 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -24,6 +24,7 @@ from ctf.list import app as list_app from ctf.logger import LOG from ctf.new import app as new_app +from ctf.post import app as post_app from ctf.redeploy import app as redeploy_app from ctf.services import app as services_app from ctf.stats import app as stats_app @@ -43,6 +44,12 @@ app.add_typer(init_app) app.add_typer(list_app) app.add_typer(new_app) +app.add_typer( + post_app, + name="post", + help="Commands to manage discourse post files.", + rich_help_panel="Subcommands", +) app.add_typer(redeploy_app) app.add_typer(services_app) app.add_typer(stats_app) diff --git a/ctf/models.py b/ctf/models.py index 3caa404..1a48fdd 100644 --- a/ctf/models.py +++ b/ctf/models.py @@ -26,6 +26,12 @@ class Track(BaseModel): has_virtual_machine: bool = False already_deployed: bool = False + @property + def location(self): + from ctf.utils import find_ctf_root_directory + + return find_ctf_root_directory() / self.name + def __eq__(self, other: Any) -> bool: match other: case str(): diff --git a/ctf/post/__init__.py b/ctf/post/__init__.py new file mode 100644 index 0000000..8d41116 --- /dev/null +++ b/ctf/post/__init__.py @@ -0,0 +1,6 @@ +import typer + +from .new import app as new_app + +app = typer.Typer(no_args_is_help=True) +app.add_typer(new_app) diff --git a/ctf/post/new.py b/ctf/post/new.py new file mode 100644 index 0000000..a8378d1 --- /dev/null +++ b/ctf/post/new.py @@ -0,0 +1,222 @@ +import os +import re +from enum import StrEnum +from pathlib import Path + +import typer +from typing_extensions import Annotated + +from ctf.logger import LOG +from ctf.models import Track +from ctf.utils import ( + get_all_available_tracks, + parse_track_yaml, +) + +app = typer.Typer() + + +class ApiUser(StrEnum): + NSEC = "nsec" + SYSTEM = "system" + + +class TriggerType(StrEnum): + FLAG = "flag" + + +def _format_yaml_block(text: str) -> str: + lines = text.splitlines() or [""] + return "\n".join(f" {line}" for line in lines) + + +def _default_post_filename(track: Track, tag: str) -> str: + normalized_track = track.name.replace("-", "_") + suffix = tag + if tag.startswith(normalized_track + "_"): + suffix = tag[len(normalized_track) + 1 :] + suffix = re.sub(r"[^a-zA-Z0-9_-]+", "-", suffix).strip("-_") + if not suffix: + suffix = "post" + return f"{track}-{suffix.replace('_', '-')}.yaml" + + +def _get_available_discourse_tags(track: Track) -> list[str]: + track_yaml = parse_track_yaml(track_name=track.name) + tags: set[str] = set() + for flag in track_yaml.get("flags", []): + discourse_tag = ((flag or {}).get("tags") or {}).get("discourse") + if isinstance(discourse_tag, str) and discourse_tag.strip(): + tags.add(discourse_tag.strip()) + return sorted(tags) + + +def _add_counter_to_filename(posts_directory: Path, filename: str) -> str: + base, ext = os.path.splitext(filename) + if not ext: + ext = ".yaml" + + candidate = f"{base}{ext}" + if not (posts_directory / candidate).exists(): + return candidate + + counter = 2 + while (posts_directory / f"{base}-{counter}{ext}").exists(): + counter += 1 + return f"{base}-{counter}{ext}" + + +def _resolve_post_file_path( + posts_directory: Path, + track: Track, + name: str | None, + tag: str | None, + force: bool, +) -> str: + filename = ( + f"{track}-{name}.yaml" + if name + else ( + _default_post_filename(track=track, tag=tag) + if tag + else f"{track}-post.yaml" + ) + ) + + if not force: + filename = _add_counter_to_filename(posts_directory, filename) + + return os.path.join(posts_directory, filename) + + +def _render_post_yaml( + track: Track, + user: ApiUser, + body: str, + trigger: TriggerType | None = None, + tag: str | None = None, +) -> str: + lines = [ + "type: post", + f"topic: {track}", + ] + + if trigger == TriggerType.FLAG: + lines.extend( + [ + "trigger:", + " type: flag", + f" tag: {tag}", + ] + ) + + lines.extend( + [ + "api:", + f" user: {user.value}", + "body: |-", + _format_yaml_block(body), + ] + ) + + return "\n".join(lines) + "\n" + + +@app.command( + "new", + help="Create a new discourse post YAML file for a track.", + no_args_is_help=True, +) +def new( + track: Annotated[ + str, + typer.Option( + "--track", + "-t", + help="Track name (challenge directory name).", + ), + ], + tag: Annotated[ + str | None, + typer.Option( + "--tag", + help="Discourse trigger tag, usually from track.yaml flag tags.discourse. Required when --trigger flag is set.", + ), + ] = None, + trigger: Annotated[ + TriggerType | None, + typer.Option( + "--trigger", + help="Trigger type for this post. If omitted, no trigger block is added.", + ), + ] = None, + name: Annotated[ + str | None, + typer.Option( + "--name", + "-n", + help="Post file name. Defaults to a name derived from the track and tag.", + ), + ] = None, + user: Annotated[ + ApiUser, + typer.Option("--user", help="Discourse user posting this message."), + ] = ApiUser.NSEC, + body: Annotated[ + str, + typer.Option("--body", help="Post body. Markdown is supported."), + ] = "CHANGE_ME", + force: Annotated[ + bool, + typer.Option("--force", help="Overwrite the post file if it already exists."), + ] = False, +) -> None: + if (track_obj := Track(name=track)) not in get_all_available_tracks(): + LOG.critical(f"Track directory not found: {track_obj.name}. Verify --track.") + raise typer.Exit(1) + + posts_directory: Path = track_obj.location / "posts" + os.makedirs(posts_directory, exist_ok=True) + + # TODO: add support for other triggers + if trigger == TriggerType.FLAG and not tag: + LOG.critical("--tag is required when --trigger flag is provided.") + raise typer.Exit(1) + + if trigger != TriggerType.FLAG and tag: + LOG.critical("--tag can only be used with --trigger flag.") + raise typer.Exit(1) + + if trigger == TriggerType.FLAG and tag: + valid_tags = _get_available_discourse_tags(track=track_obj) + if tag not in valid_tags: + if valid_tags: + LOG.critical( + f'Invalid --tag "{tag}" for track "{track_obj.name}". Valid tags: {", ".join(valid_tags)}' + ) + else: + LOG.critical( + f'Invalid --tag "{tag}" for track "{track_obj.name}". No discourse tags were found in track.yaml flags[].tags.discourse.' + ) + raise typer.Exit(1) + + post_file_path = _resolve_post_file_path( + posts_directory=posts_directory, + track=track_obj, + name=name, + tag=tag, + force=force, + ) + + post_yaml = _render_post_yaml( + track=track_obj, + user=user, + body=body, + trigger=trigger, + tag=tag, + ) + + with open(post_file_path, "w", encoding="utf-8") as f: + f.write(post_yaml) + + LOG.info(f"Created post file: {post_file_path}") diff --git a/ctf/utils.py b/ctf/utils.py index 3f2c8e9..3c88ad1 100644 --- a/ctf/utils.py +++ b/ctf/utils.py @@ -4,6 +4,7 @@ import shutil import subprocess import textwrap +from pathlib import Path from typing import Any, Generator import jinja2 @@ -407,10 +408,10 @@ def parse_post_yamls(track_name: str) -> list[dict]: return posts -def find_ctf_root_directory() -> str: +def find_ctf_root_directory() -> Path: global __CTF_ROOT_DIRECTORY if __CTF_ROOT_DIRECTORY: - return __CTF_ROOT_DIRECTORY + return Path(__CTF_ROOT_DIRECTORY) path: str = ( str(ENV.get("CTF_ROOT_DIR")) @@ -431,7 +432,7 @@ def find_ctf_root_directory() -> str: exit(1) LOG.debug(msg=f"Found root directory: {path}") - return (__CTF_ROOT_DIRECTORY := path) + return Path(__CTF_ROOT_DIRECTORY := path) def is_ctf_dir(path):