diff --git a/ctfcli/cli/challenges.py b/ctfcli/cli/challenges.py index 64edae7..6b26b63 100644 --- a/ctfcli/cli/challenges.py +++ b/ctfcli/cli/challenges.py @@ -19,7 +19,7 @@ LintException, RemoteChallengeNotFound, ) -from ctfcli.utils.git import check_if_git_subrepo_is_installed, get_git_repo_head_branch +from ctfcli.utils.git import check_if_git_subrepo_is_installed, resolve_repo_url log = logging.getLogger("ctfcli.cli.challenges") @@ -157,8 +157,8 @@ def add( if yaml_path: challenge_key = challenge_key / yaml_path - # Add a new challenge to the config - config["challenges"][str(challenge_key)] = repo + # Add a new challenge to the config, with the branch if specified + config["challenges"][str(challenge_key)] = f"{repo}@{branch}" if branch else repo if use_subrepo: # Clone with subrepo if configured @@ -171,8 +171,8 @@ def add( cmd += ["-f"] else: # Otherwise default to the built-in subtree - head_branch = get_git_repo_head_branch(repo) - cmd = ["git", "subtree", "add", "--prefix", challenge_path, repo, head_branch, "--squash"] + _, target_branch = resolve_repo_url(repo, branch=branch) + cmd = ["git", "subtree", "add", "--prefix", challenge_path, repo, target_branch, "--squash"] log.debug(f"call({cmd}, cwd='{project_path}')") if subprocess.call(cmd, cwd=project_path) != 0: @@ -211,8 +211,8 @@ def add( click.secho(f"Could not process the challenge path: '{repo}'", fg="red") return 1 - def push(self, challenge: str | None = None, no_auto_pull: bool = False, quiet=False) -> int: - log.debug(f"push: (challenge={challenge}, no_auto_pull={no_auto_pull}, quiet={quiet})") + def push(self, challenge: str | None = None, quiet=False) -> int: + log.debug(f"push: (challenge={challenge}, quiet={quiet})") config = Config() if challenge: @@ -226,91 +226,91 @@ def push(self, challenge: str | None = None, no_auto_pull: bool = False, quiet=F failed_pushes = [] - if quiet or len(challenges) <= 1: - context = contextlib.nullcontext(challenges) - else: - context = click.progressbar(challenges, label="Pushing challenges") - use_subrepo = config["config"].getboolean("use_subrepo", fallback=False) if use_subrepo and not check_if_git_subrepo_is_installed(): click.secho("This project is configured to use git subrepo, but it's not installed.") return 1 - with context as context_challenges: - for challenge_instance in context_challenges: - click.echo() - - # Get a relative path from project root to the challenge - # As this is what git subtree push requires - challenge_path = challenge_instance.challenge_directory.resolve().relative_to(config.project_path) - challenge_repo = config.challenges.get(str(challenge_path), None) + # Validate all challenges and check for uncommitted changes upfront + challenges_to_push = [] + challenges_with_uncommitted_changes = [] - # if we don't find the challenge by the directory, - # check if it's saved with a direct path to challenge.yml - if not challenge_repo: - challenge_repo = config.challenges.get(str(challenge_path / "challenge.yml"), None) + for challenge_instance in challenges: + # Get a relative path from project root to the challenge + challenge_path = challenge_instance.challenge_directory.resolve().relative_to(config.project_path) + challenge_repo = config.challenges.get(str(challenge_path), None) - if not challenge_repo: - click.secho( - f"Could not find added challenge '{challenge_path}' " - "Please check that the challenge is added to .ctf/config and that your path matches", - fg="red", - ) - failed_pushes.append(challenge_instance) - continue + # if we don't find the challenge by the directory, + # check if it's saved with a direct path to challenge.yml + if not challenge_repo: + challenge_repo = config.challenges.get(str(challenge_path / "challenge.yml"), None) - if not challenge_repo.endswith(".git"): - click.secho( - f"Cannot push challenge '{challenge_path}', as it's not a git-based challenge", - fg="yellow", - ) - failed_pushes.append(challenge_instance) - continue + if not challenge_repo: + click.secho( + f"Could not find added challenge '{challenge_path}' " + "Please check that the challenge is added to .ctf/config and that your path matches", + fg="red", + ) + failed_pushes.append(challenge_instance) + continue - click.secho(f"Pushing '{challenge_path}' to '{challenge_repo}'", fg="blue") + challenge_repo, challenge_branch = resolve_repo_url(challenge_repo) - log.debug( - f"call(['git', 'status', '--porcelain'], cwd='{config.project_path / challenge_path}'," - f" stdout=subprocess.PIPE, text=True)" - ) - git_status = subprocess.run( - ["git", "status", "--porcelain"], - cwd=config.project_path / challenge_path, - stdout=subprocess.PIPE, - text=True, + if not challenge_repo.endswith(".git"): + click.secho( + f"Cannot push challenge '{challenge_path}', as it's not a git-based challenge", + fg="yellow", ) + failed_pushes.append(challenge_instance) + continue - if git_status.stdout.strip() == "" and git_status.returncode == 0: - click.secho(f"No changes to be pushed for {challenge_path}", fg="green") - continue + # Check for uncommitted changes + log.debug( + f"call(['git', 'status', '--porcelain'], cwd='{config.project_path / challenge_path}'," + f" stdout=subprocess.PIPE, text=True)" + ) + git_status = subprocess.run( + ["git", "status", "--porcelain"], + cwd=config.project_path / challenge_path, + stdout=subprocess.PIPE, + text=True, + ) - log.debug(f"call(['git', 'add', '.'], cwd='{config.project_path / challenge_path}')") - git_add = subprocess.call(["git", "add", "."], cwd=config.project_path / challenge_path) + if git_status.stdout.strip() != "" and git_status.returncode == 0: + challenges_with_uncommitted_changes.append(challenge_path) - log.debug( - f"call(['git', 'commit', '-m', 'Pushing changes to {challenge_path}'], " - f"cwd='{config.project_path / challenge_path}')" - ) - git_commit = subprocess.call( - ["git", "commit", "-m", f"Pushing changes to {challenge_path}"], - cwd=config.project_path / challenge_path, - ) + challenges_to_push.append((challenge_instance, challenge_path, challenge_repo, challenge_branch)) - if any(r != 0 for r in [git_add, git_commit]): - click.secho( - "Could not commit the challenge changes. Please check git error messages above.", - fg="red", - ) - failed_pushes.append(challenge_instance) - continue + # If any challenges have uncommitted changes, error out + if challenges_with_uncommitted_changes: + click.secho( + "Cannot push: the following challenges have uncommitted changes:", + fg="red", + ) + for challenge_path in challenges_with_uncommitted_changes: + click.echo(f" - {challenge_path}") + + click.echo() + click.secho("Please commit your changes before pushing.", fg="yellow") + return 1 + + # Push all challenges (working directory is clean) + if not quiet and len(challenges_to_push) > 1: + push_context = click.progressbar(challenges_to_push, label="Pushing challenges") + else: + push_context = contextlib.nullcontext(challenges_to_push) + + with push_context as challenges_iterator: + for challenge_instance, challenge_path, challenge_repo, challenge_branch in challenges_iterator: + click.echo() + click.secho(f"Pushing '{challenge_path}' to '{challenge_repo}'", fg="blue") if use_subrepo: cmd = ["git", "subrepo", "push", challenge_path] else: - head_branch = get_git_repo_head_branch(challenge_repo) - cmd = ["git", "subtree", "push", "--prefix", challenge_path, challenge_repo, head_branch] + cmd = ["git", "subtree", "push", "--prefix", challenge_path, challenge_repo, challenge_branch] - log.debug(f"call({cmd}, cwd='{config.project_path / challenge_path}')") + log.debug(f"call({cmd}, cwd='{config.project_path}')") if subprocess.call(cmd, cwd=config.project_path) != 0: click.secho( "Could not push the challenge repository. Please check git error messages above.", @@ -319,10 +319,6 @@ def push(self, challenge: str | None = None, no_auto_pull: bool = False, quiet=F failed_pushes.append(challenge_instance) continue - # if auto pull is not disabled - if not no_auto_pull: - self.pull(str(challenge_path), quiet=True) - if len(failed_pushes) == 0: if not quiet: click.secho("Success! All challenges pushed!", fg="green") @@ -383,6 +379,8 @@ def pull(self, challenge: str | None = None, strategy: str = "fast-forward", qui failed_pulls.append(challenge_instance) continue + challenge_repo, challenge_branch = resolve_repo_url(challenge_repo) + if not challenge_repo.endswith(".git"): click.secho( f"Cannot pull challenge '{challenge_path}', as it's not a git-based challenge", @@ -408,7 +406,6 @@ def pull(self, challenge: str | None = None, strategy: str = "fast-forward", qui else: click.secho(f"Cannot pull challenge - '{strategy}' is not a valid pull strategy", fg="red") else: - head_branch = get_git_repo_head_branch(challenge_repo) pull_env["GIT_MERGE_AUTOEDIT"] = "no" cmd = [ "git", @@ -417,7 +414,7 @@ def pull(self, challenge: str | None = None, strategy: str = "fast-forward", qui "--prefix", challenge_path, challenge_repo, - head_branch, + challenge_branch, "--squash", ] @@ -527,11 +524,12 @@ def restore(self, challenge: str | None = None) -> int: f"Restoring git repo '{challenge_source}' to '{challenge_key}'", fg="blue", ) - head_branch = get_git_repo_head_branch(challenge_source) + + challenge_source, challenge_branch = resolve_repo_url(challenge_source) log.debug( f"call(['git', 'subtree', 'add', '--prefix', '{challenge_key}', '{challenge_source}', " - f"'{head_branch}', '--squash'], cwd='{config.project_path}')" + f"'{challenge_branch}', '--squash'], cwd='{config.project_path}')" ) git_subtree_add = subprocess.call( [ @@ -541,7 +539,7 @@ def restore(self, challenge: str | None = None) -> int: "--prefix", challenge_key, challenge_source, - head_branch, + challenge_branch, "--squash", ], cwd=config.project_path, diff --git a/ctfcli/utils/git.py b/ctfcli/utils/git.py index baac71f..2daa866 100644 --- a/ctfcli/utils/git.py +++ b/ctfcli/utils/git.py @@ -2,34 +2,60 @@ from os import PathLike -def check_if_git_subrepo_is_installed() -> bool: - output = subprocess.run(["git", "subrepo"], capture_output=True, text=True) - return "git: 'subrepo' is not a git command" not in output.stderr +def resolve_repo_url(repo: str, branch: str | None = None) -> tuple[str, str | None]: + """ + Resolves a repo string to (clean_url, branch). + Resolution order: + 1. The `branch` parameter, if provided + 2. An inline @branch parsed from the repo string + 3. The remote's HEAD branch, detected via git ls-remote -def get_git_repo_head_branch(repo: str) -> str | None: - """ - A helper method to get the reference of the HEAD branch of a git remote repo. - https://stackoverflow.com/a/41925348 + Returns (url, None) if no branch can be determined. """ + # Strip an inline @branch suffix if present + marker = ".git@" + idx = repo.rfind(marker) + if idx != -1: + inline_branch = repo[idx + 5 :] + repo = repo[: idx + 4] # clean URL up to .git + if not branch and inline_branch: + branch = inline_branch + + # Branch already resolved + if branch: + return repo, branch + + # Non-git paths have no remote to query + if not repo.endswith(".git"): + return repo, None + + # Fall back to detecting the remote HEAD branch + # https://stackoverflow.com/a/41925348 try: - output = subprocess.check_output(["git", "ls-remote", "--symref", repo, "HEAD"], stderr=subprocess.DEVNULL) + output = subprocess.check_output( + ["git", "ls-remote", "--symref", repo, "HEAD"], + stderr=subprocess.DEVNULL, + ) - # if for some reason subprocess didn't error, but returned None or an empty byte-string - return None - # this can happen if a repository exists, but doesn't have a head branch + # repo exists but doesn't have a head branch if type(output) != bytes or len(output) == 0: - return None + return repo, None except subprocess.CalledProcessError: - return None + return repo, None - # otherwise process the output output = output.decode().strip() head_branch_line = output.split()[1] if head_branch_line.startswith("refs/heads/"): - return head_branch_line[11:] + return repo, head_branch_line[11:] + + return repo, None + - return None +def check_if_git_subrepo_is_installed() -> bool: + output = subprocess.run(["git", "subrepo"], capture_output=True, text=True) + return "git: 'subrepo' is not a git command" not in output.stderr def check_if_dir_is_inside_git_repo(cwd: str | PathLike | None = None) -> bool: diff --git a/tests/utils/test_git.py b/tests/utils/test_git.py index 983804d..fb1c7ca 100644 --- a/tests/utils/test_git.py +++ b/tests/utils/test_git.py @@ -3,74 +3,105 @@ from pathlib import Path from unittest import mock -from ctfcli.utils.git import check_if_dir_is_inside_git_repo, get_git_repo_head_branch +from ctfcli.utils.git import check_if_dir_is_inside_git_repo, resolve_repo_url -class TestGetGitRepoHeadBranch(unittest.TestCase): - def test_gets_head_branch_if_head_exists(self): +class TestResolveRepoUrl(unittest.TestCase): + def test_parses_branch_from_https_url(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + url, branch = resolve_repo_url("https://github.com/user/repo.git@develop") + + self.assertEqual("https://github.com/user/repo.git", url) + self.assertEqual("develop", branch) + mock_check_output.assert_not_called() + + def test_parses_branch_from_ssh_url(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + url, branch = resolve_repo_url("git@github.com:user/repo.git@develop") + + self.assertEqual("git@github.com:user/repo.git", url) + self.assertEqual("develop", branch) + mock_check_output.assert_not_called() + + def test_explicit_branch_overrides_inline(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + url, branch = resolve_repo_url("https://github.com/user/repo.git@inline", branch="explicit") + + self.assertEqual("https://github.com/user/repo.git", url) + self.assertEqual("explicit", branch) + mock_check_output.assert_not_called() + + def test_explicit_branch_with_no_inline(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + url, branch = resolve_repo_url("https://github.com/user/repo.git", branch="develop") + + self.assertEqual("https://github.com/user/repo.git", url) + self.assertEqual("develop", branch) + mock_check_output.assert_not_called() + + def test_detects_head_branch_when_none_specified(self): # example output taken from ctfcli repo mock_output = b""" ref: refs/heads/master HEAD 7b4a09af8414eb1f5f6da9a8422fb53b5e9cbc15 HEAD 0370595efd5e9a211b05c55778fc4c0ae2fe70af refs/heads/15-blank-challenge-template -410d49503971cf0e16b29a1707b52d911945a59f refs/heads/21-add-deploy-functionality -ca1f8be5c207e911a3110f1e0223a1f3db9aa269 refs/heads/30-writeup-folder -2f12163e6c6a4d105bd5e4ac25167fac8c6f5168 refs/heads/32-add-flag-data-into-spec -58270ef683a3f92c4beffc3528e1188e11ddb04f refs/heads/46-install-state-simplified -4d0e530edd08baebccafb71b6b613c49b67458bd refs/heads/47-topics -47cb2285604da69c9e5c9b6cd9b59a9b9c50b647 refs/heads/70-pages-support -c75b674c1b582852f18719b08b474515d138a14d refs/heads/75-challenge-healthcheck """ with mock.patch("ctfcli.utils.git.subprocess.check_output", return_value=mock_output) as mock_check_output: - expected_head_branch = "master" - head_branch = get_git_repo_head_branch("https://github.com/CTFd/ctfcli") + url, branch = resolve_repo_url("https://github.com/CTFd/ctfcli.git") + self.assertEqual("https://github.com/CTFd/ctfcli.git", url) + self.assertEqual("master", branch) mock_check_output.assert_called_once_with( - [ - "git", - "ls-remote", - "--symref", - "https://github.com/CTFd/ctfcli", - "HEAD", - ], + ["git", "ls-remote", "--symref", "https://github.com/CTFd/ctfcli.git", "HEAD"], stderr=subprocess.DEVNULL, ) - self.assertEqual(expected_head_branch, head_branch) - def test_returns_none_if_repository_not_found(self): - with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: - mock_check_output.side_effect = subprocess.CalledProcessError(128, []) - head_branch = get_git_repo_head_branch("https://github.com/example/does-not-exist") + def test_detects_head_branch_for_ssh_url(self): + mock_output = b"ref: refs/heads/main HEAD\nabc123 HEAD\n" + with mock.patch("ctfcli.utils.git.subprocess.check_output", return_value=mock_output) as mock_check_output: + url, branch = resolve_repo_url("git@github.com:user/repo.git") + + self.assertEqual("git@github.com:user/repo.git", url) + self.assertEqual("main", branch) mock_check_output.assert_called_once_with( - [ - "git", - "ls-remote", - "--symref", - "https://github.com/example/does-not-exist", - "HEAD", - ], + ["git", "ls-remote", "--symref", "git@github.com:user/repo.git", "HEAD"], stderr=subprocess.DEVNULL, ) - self.assertIsNone(head_branch) - def test_returns_none_if_head_not_found(self): - mock_output = b"" + def test_trailing_at_falls_through_to_head_detection(self): + mock_output = b"ref: refs/heads/main HEAD\nabc123 HEAD\n" with mock.patch("ctfcli.utils.git.subprocess.check_output", return_value=mock_output) as mock_check_output: - head_branch = get_git_repo_head_branch("https://github.com/example/does-not-have-a-head-branch") + url, branch = resolve_repo_url("https://github.com/user/repo.git@") - mock_check_output.assert_called_once_with( - [ - "git", - "ls-remote", - "--symref", - "https://github.com/example/does-not-have-a-head-branch", - "HEAD", - ], - stderr=subprocess.DEVNULL, - ) - self.assertIsNone(head_branch) + self.assertEqual("https://github.com/user/repo.git", url) + self.assertEqual("main", branch) + mock_check_output.assert_called_once() + + def test_returns_none_if_repository_not_found(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + mock_check_output.side_effect = subprocess.CalledProcessError(128, []) + url, branch = resolve_repo_url("https://github.com/example/does-not-exist.git") + + self.assertEqual("https://github.com/example/does-not-exist.git", url) + self.assertIsNone(branch) + + def test_returns_none_if_head_not_set(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output", return_value=b"") as mock_check_output: + url, branch = resolve_repo_url("https://github.com/example/no-head.git") + + self.assertEqual("https://github.com/example/no-head.git", url) + self.assertIsNone(branch) + mock_check_output.assert_called_once() + + def test_returns_none_for_non_git_path_without_subprocess(self): + with mock.patch("ctfcli.utils.git.subprocess.check_output") as mock_check_output: + url, branch = resolve_repo_url("some/local/path") + + self.assertEqual("some/local/path", url) + self.assertIsNone(branch) + mock_check_output.assert_not_called() class TestCheckIfDirIsInsideGitRepo(unittest.TestCase):