From 08cf1dd9368933ecf867f683de73170ef77bc472 Mon Sep 17 00:00:00 2001 From: Simon Bouchard Date: Sat, 16 May 2026 12:01:57 -0400 Subject: [PATCH 1/3] Added new ctf post command --- ctf/__main__.py | 7 ++ ctf/post.py | 218 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+) create mode 100644 ctf/post.py diff --git a/ctf/__main__.py b/ctf/__main__.py index 23f1648..97fe8c8 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -24,6 +24,7 @@ from ctf.list import app as list_app from ctf.logger import LOG from ctf.new import app as new_app +from ctf.post import app as post_app from ctf.redeploy import app as redeploy_app from ctf.services import app as services_app from ctf.stats import app as stats_app @@ -43,6 +44,12 @@ app.add_typer(init_app) app.add_typer(list_app) app.add_typer(new_app) +app.add_typer( + post_app, + name="post", + help="Commands to manage discourse post files.", + rich_help_panel="Subcommands", +) app.add_typer(redeploy_app) app.add_typer(services_app) app.add_typer(stats_app) diff --git a/ctf/post.py b/ctf/post.py new file mode 100644 index 0000000..3f209b2 --- /dev/null +++ b/ctf/post.py @@ -0,0 +1,218 @@ +import os +import re +import textwrap +from enum import StrEnum + +import typer +from typing_extensions import Annotated + +from ctf.logger import LOG +from ctf.utils import find_ctf_root_directory, parse_track_yaml + +app = typer.Typer(no_args_is_help=True) + + +class ApiUser(StrEnum): + NSEC = "nsec" + SYSTEM = "system" + + +class TriggerType(StrEnum): + FLAG = "flag" + + +def _format_yaml_block(text: str) -> str: + lines = text.splitlines() or [""] + return "\n".join(f" {line}" for line in lines) + + +def _default_post_filename(track: str, tag: str) -> str: + normalized_track = track.replace("-", "_") + suffix = tag + if tag.startswith(normalized_track + "_"): + suffix = tag[len(normalized_track) + 1 :] + suffix = re.sub(r"[^a-zA-Z0-9_-]+", "-", suffix).strip("-_") + if not suffix: + suffix = "post" + return f"{track}-{suffix.replace('_', '-')}.yaml" + + +def _get_available_discourse_tags(track: str) -> list[str]: + track_yaml = parse_track_yaml(track_name=track) + tags: set[str] = set() + for flag in track_yaml.get("flags", []): + discourse_tag = ((flag or {}).get("tags") or {}).get("discourse") + if isinstance(discourse_tag, str) and discourse_tag.strip(): + tags.add(discourse_tag.strip()) + return sorted(tags) + + +def _add_counter_to_filename(posts_directory: str, filename: str) -> str: + base, ext = os.path.splitext(filename) + if not ext: + ext = ".yaml" + + candidate = f"{base}{ext}" + if not os.path.exists(os.path.join(posts_directory, candidate)): + return candidate + + counter = 2 + while os.path.exists(os.path.join(posts_directory, f"{base}-{counter}{ext}")): + counter += 1 + return f"{base}-{counter}{ext}" + + +def _resolve_post_file_path( + posts_directory: str, + track: str, + name: str | None, + tag: str | None, + force: bool, +) -> str: + filename = ( + f"{track}-{name}.yaml" + if name + else ( + _default_post_filename(track=track, tag=tag) + if tag + else f"{track}-post.yaml" + ) + ) + + if not force: + filename = _add_counter_to_filename(posts_directory, filename) + + return os.path.join(posts_directory, filename) + + +def _render_post_yaml( + track: str, + user: ApiUser, + body: str, + trigger: TriggerType | None = None, + tag: str | None = None, +) -> str: + trigger_block = "" + if trigger == TriggerType.FLAG: + trigger_block = textwrap.dedent( + f"""\ + trigger: + type: flag + tag: {tag} + """ + ) + + return ( + textwrap.dedent( + f"""\ + type: post + topic: {track} + {trigger_block}api: + user: {user.value} + body: |- + {_format_yaml_block(body)} + """ + ).rstrip() + + "\n" + ) + + +@app.command("new", help="Create a new discourse post YAML file for a track.") +def new_post( + track: Annotated[ + str, + typer.Option( + "--track", + "-t", + help="Track name (challenge directory name).", + ), + ], + tag: Annotated[ + str | None, + typer.Option( + "--tag", + help="Discourse trigger tag, usually from track.yaml flag tags.discourse. Required when --trigger flag is set.", + ), + ] = None, + trigger: Annotated[ + TriggerType | None, + typer.Option( + "--trigger", + help="Trigger type for this post. If omitted, no trigger block is added.", + ), + ] = None, + name: Annotated[ + str | None, + typer.Option( + "--name", + "-n", + help="Post file name. Defaults to a name derived from the track and tag.", + ), + ] = None, + user: Annotated[ + ApiUser, + typer.Option("--user", help="Discourse user posting this message."), + ] = ApiUser.NSEC, + body: Annotated[ + str, + typer.Option("--body", help="Post body. Markdown is supported."), + ] = "CHANGE_ME", + force: Annotated[ + bool, + typer.Option("--force", help="Overwrite the post file if it already exists."), + ] = False, +) -> None: + challenges_track_directory = os.path.join( + find_ctf_root_directory(), "challenges", track + ) + if not os.path.isdir(challenges_track_directory): + LOG.critical( + f"Track directory not found: {challenges_track_directory}. Verify --track." + ) + raise typer.Exit(code=1) + + posts_directory = os.path.join(challenges_track_directory, "posts") + os.makedirs(posts_directory, exist_ok=True) + + # TODO: add support for other triggers + if trigger == TriggerType.FLAG and not tag: + LOG.critical("--tag is required when --trigger flag is provided.") + raise typer.Exit(code=1) + + if trigger != TriggerType.FLAG and tag: + LOG.critical("--tag can only be used with --trigger flag.") + raise typer.Exit(code=1) + + if trigger == TriggerType.FLAG and tag: + valid_tags = _get_available_discourse_tags(track=track) + if tag not in valid_tags: + if valid_tags: + LOG.critical( + f'Invalid --tag "{tag}" for track "{track}". Valid tags: {", ".join(valid_tags)}' + ) + else: + LOG.critical( + f'Invalid --tag "{tag}" for track "{track}". No discourse tags were found in track.yaml flags[].tags.discourse.' + ) + raise typer.Exit(code=1) + + post_file_path = _resolve_post_file_path( + posts_directory=posts_directory, + track=track, + name=name, + tag=tag, + force=force, + ) + + post_yaml = _render_post_yaml( + track=track, + user=user, + body=body, + trigger=trigger, + tag=tag, + ) + + with open(post_file_path, "w", encoding="utf-8") as f: + f.write(post_yaml) + + LOG.info(f"Created post file: {post_file_path}") From e9125265c877404916fe0631180f4e2573cde940 Mon Sep 17 00:00:00 2001 From: Simon Bouchard Date: Sat, 16 May 2026 12:06:27 -0400 Subject: [PATCH 2/3] Fixed yaml format --- ctf/post.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/ctf/post.py b/ctf/post.py index 3f209b2..48fb329 100644 --- a/ctf/post.py +++ b/ctf/post.py @@ -1,6 +1,5 @@ import os import re -import textwrap from enum import StrEnum import typer @@ -92,30 +91,31 @@ def _render_post_yaml( trigger: TriggerType | None = None, tag: str | None = None, ) -> str: - trigger_block = "" + lines = [ + "type: post", + f"topic: {track}", + ] + if trigger == TriggerType.FLAG: - trigger_block = textwrap.dedent( - f"""\ - trigger: - type: flag - tag: {tag} - """ + lines.extend( + [ + "trigger:", + " type: flag", + f" tag: {tag}", + ] ) - return ( - textwrap.dedent( - f"""\ - type: post - topic: {track} - {trigger_block}api: - user: {user.value} - body: |- - {_format_yaml_block(body)} - """ - ).rstrip() - + "\n" + lines.extend( + [ + "api:", + f" user: {user.value}", + "body: |-", + _format_yaml_block(body), + ] ) + return "\n".join(lines) + "\n" + @app.command("new", help="Create a new discourse post YAML file for a track.") def new_post( From 4d87bdd47c4102fe2fc4694d9eda5ef2756618df Mon Sep 17 00:00:00 2001 From: Marc Olivier Bergeron Date: Thu, 21 May 2026 21:02:41 -0400 Subject: [PATCH 3/3] Fixed a few things for post subcommand --- ctf/models.py | 6 ++++ ctf/post/__init__.py | 6 ++++ ctf/{post.py => post/new.py} | 66 +++++++++++++++++++----------------- ctf/utils.py | 7 ++-- 4 files changed, 51 insertions(+), 34 deletions(-) create mode 100644 ctf/post/__init__.py rename ctf/{post.py => post/new.py} (74%) diff --git a/ctf/models.py b/ctf/models.py index 3caa404..1a48fdd 100644 --- a/ctf/models.py +++ b/ctf/models.py @@ -26,6 +26,12 @@ class Track(BaseModel): has_virtual_machine: bool = False already_deployed: bool = False + @property + def location(self): + from ctf.utils import find_ctf_root_directory + + return find_ctf_root_directory() / self.name + def __eq__(self, other: Any) -> bool: match other: case str(): diff --git a/ctf/post/__init__.py b/ctf/post/__init__.py new file mode 100644 index 0000000..8d41116 --- /dev/null +++ b/ctf/post/__init__.py @@ -0,0 +1,6 @@ +import typer + +from .new import app as new_app + +app = typer.Typer(no_args_is_help=True) +app.add_typer(new_app) diff --git a/ctf/post.py b/ctf/post/new.py similarity index 74% rename from ctf/post.py rename to ctf/post/new.py index 48fb329..a8378d1 100644 --- a/ctf/post.py +++ b/ctf/post/new.py @@ -1,14 +1,19 @@ import os import re from enum import StrEnum +from pathlib import Path import typer from typing_extensions import Annotated from ctf.logger import LOG -from ctf.utils import find_ctf_root_directory, parse_track_yaml +from ctf.models import Track +from ctf.utils import ( + get_all_available_tracks, + parse_track_yaml, +) -app = typer.Typer(no_args_is_help=True) +app = typer.Typer() class ApiUser(StrEnum): @@ -25,8 +30,8 @@ def _format_yaml_block(text: str) -> str: return "\n".join(f" {line}" for line in lines) -def _default_post_filename(track: str, tag: str) -> str: - normalized_track = track.replace("-", "_") +def _default_post_filename(track: Track, tag: str) -> str: + normalized_track = track.name.replace("-", "_") suffix = tag if tag.startswith(normalized_track + "_"): suffix = tag[len(normalized_track) + 1 :] @@ -36,8 +41,8 @@ def _default_post_filename(track: str, tag: str) -> str: return f"{track}-{suffix.replace('_', '-')}.yaml" -def _get_available_discourse_tags(track: str) -> list[str]: - track_yaml = parse_track_yaml(track_name=track) +def _get_available_discourse_tags(track: Track) -> list[str]: + track_yaml = parse_track_yaml(track_name=track.name) tags: set[str] = set() for flag in track_yaml.get("flags", []): discourse_tag = ((flag or {}).get("tags") or {}).get("discourse") @@ -46,24 +51,24 @@ def _get_available_discourse_tags(track: str) -> list[str]: return sorted(tags) -def _add_counter_to_filename(posts_directory: str, filename: str) -> str: +def _add_counter_to_filename(posts_directory: Path, filename: str) -> str: base, ext = os.path.splitext(filename) if not ext: ext = ".yaml" candidate = f"{base}{ext}" - if not os.path.exists(os.path.join(posts_directory, candidate)): + if not (posts_directory / candidate).exists(): return candidate counter = 2 - while os.path.exists(os.path.join(posts_directory, f"{base}-{counter}{ext}")): + while (posts_directory / f"{base}-{counter}{ext}").exists(): counter += 1 return f"{base}-{counter}{ext}" def _resolve_post_file_path( - posts_directory: str, - track: str, + posts_directory: Path, + track: Track, name: str | None, tag: str | None, force: bool, @@ -85,7 +90,7 @@ def _resolve_post_file_path( def _render_post_yaml( - track: str, + track: Track, user: ApiUser, body: str, trigger: TriggerType | None = None, @@ -117,8 +122,12 @@ def _render_post_yaml( return "\n".join(lines) + "\n" -@app.command("new", help="Create a new discourse post YAML file for a track.") -def new_post( +@app.command( + "new", + help="Create a new discourse post YAML file for a track.", + no_args_is_help=True, +) +def new( track: Annotated[ str, typer.Option( @@ -162,50 +171,45 @@ def new_post( typer.Option("--force", help="Overwrite the post file if it already exists."), ] = False, ) -> None: - challenges_track_directory = os.path.join( - find_ctf_root_directory(), "challenges", track - ) - if not os.path.isdir(challenges_track_directory): - LOG.critical( - f"Track directory not found: {challenges_track_directory}. Verify --track." - ) - raise typer.Exit(code=1) + if (track_obj := Track(name=track)) not in get_all_available_tracks(): + LOG.critical(f"Track directory not found: {track_obj.name}. Verify --track.") + raise typer.Exit(1) - posts_directory = os.path.join(challenges_track_directory, "posts") + posts_directory: Path = track_obj.location / "posts" os.makedirs(posts_directory, exist_ok=True) # TODO: add support for other triggers if trigger == TriggerType.FLAG and not tag: LOG.critical("--tag is required when --trigger flag is provided.") - raise typer.Exit(code=1) + raise typer.Exit(1) if trigger != TriggerType.FLAG and tag: LOG.critical("--tag can only be used with --trigger flag.") - raise typer.Exit(code=1) + raise typer.Exit(1) if trigger == TriggerType.FLAG and tag: - valid_tags = _get_available_discourse_tags(track=track) + valid_tags = _get_available_discourse_tags(track=track_obj) if tag not in valid_tags: if valid_tags: LOG.critical( - f'Invalid --tag "{tag}" for track "{track}". Valid tags: {", ".join(valid_tags)}' + f'Invalid --tag "{tag}" for track "{track_obj.name}". Valid tags: {", ".join(valid_tags)}' ) else: LOG.critical( - f'Invalid --tag "{tag}" for track "{track}". No discourse tags were found in track.yaml flags[].tags.discourse.' + f'Invalid --tag "{tag}" for track "{track_obj.name}". No discourse tags were found in track.yaml flags[].tags.discourse.' ) - raise typer.Exit(code=1) + raise typer.Exit(1) post_file_path = _resolve_post_file_path( posts_directory=posts_directory, - track=track, + track=track_obj, name=name, tag=tag, force=force, ) post_yaml = _render_post_yaml( - track=track, + track=track_obj, user=user, body=body, trigger=trigger, diff --git a/ctf/utils.py b/ctf/utils.py index 3f2c8e9..3c88ad1 100644 --- a/ctf/utils.py +++ b/ctf/utils.py @@ -4,6 +4,7 @@ import shutil import subprocess import textwrap +from pathlib import Path from typing import Any, Generator import jinja2 @@ -407,10 +408,10 @@ def parse_post_yamls(track_name: str) -> list[dict]: return posts -def find_ctf_root_directory() -> str: +def find_ctf_root_directory() -> Path: global __CTF_ROOT_DIRECTORY if __CTF_ROOT_DIRECTORY: - return __CTF_ROOT_DIRECTORY + return Path(__CTF_ROOT_DIRECTORY) path: str = ( str(ENV.get("CTF_ROOT_DIR")) @@ -431,7 +432,7 @@ def find_ctf_root_directory() -> str: exit(1) LOG.debug(msg=f"Found root directory: {path}") - return (__CTF_ROOT_DIRECTORY := path) + return Path(__CTF_ROOT_DIRECTORY := path) def is_ctf_dir(path):