Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .tekton/on-pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ spec:
script: |
#!/bin/bash
set -eou pipefail

print_banner() {
echo
echo "----------- ${1} -----------"
Expand All @@ -219,10 +219,12 @@ spec:
# Mark the workspace as safe before any tool (uv, git, etc.) touches it.
git config --global --add safe.directory /workspace/source

# Install uv and Python 3.12 to shared PVC
# Install uv
print_banner "Installing uv"
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.local/bin/env
# Download Python 3.12 to the shared PVC before creating the venv.
# UV_PYTHON_INSTALL_DIR alone does not trigger a download during uv venv.
uv python install 3.12

print_banner "CREATING AND ACTIVATING TEST ENV"
Expand Down Expand Up @@ -292,7 +294,6 @@ spec:
# This is handled in the Makefile's lint-pr target and should be reverted after migration.
make lint-pr TARGET_BRANCH=$TARGET_BRANCH_NAME


print_banner "RUNNING UNIT TESTS"
make test-unit PYTEST_OPTS="--log-cli-level=DEBUG"

Expand Down
113 changes: 101 additions & 12 deletions src/exploit_iq_commons/utils/dep_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,20 @@
ROOT_LEVEL_SENTINEL = 'root-top-level-agent-morpheus'

TRANSITIVE_ENV_NAME = 'transitive_env'
INSTALLED_PACKAGES_FILE = 'installed_packages.txt'

PYPROJECT_TOML = 'pyproject.toml'
SETUP_PY = 'setup.py'
SETUP_CFG = 'setup.cfg'
PIPFILE = 'Pipfile'
UV_LOCK = 'uv.lock'
POETRY_LOCK = 'poetry.lock'
README_MD = 'README.md'

# Manifest formats tried in priority order when requirements.txt is absent.
# Each entry is the filename; the install strategy is determined in _install_from_best_manifest.
_PYTHON_MANIFEST_FALLBACK_ORDER = [UV_LOCK, POETRY_LOCK, PYPROJECT_TOML, SETUP_PY, SETUP_CFG, PIPFILE]

_WALK_EXCLUDE_DIRS = frozenset({
".venv",
"venv",
Expand Down Expand Up @@ -125,10 +134,9 @@ def detect_ecosystem(git_repo_path: Path) -> Ecosystem | None:
"""
if os.path.isfile(git_repo_path / GOLANG_MANIFEST):
return MANIFESTS_TO_ECOSYSTEMS[GOLANG_MANIFEST]
if (
os.path.isfile(git_repo_path / PYTHON_MANIFEST)
or os.path.isfile(git_repo_path / PYPROJECT_TOML)
or os.path.isfile(git_repo_path / SETUP_PY)
if any(
os.path.isfile(git_repo_path / m)
for m in (PYTHON_MANIFEST, PYPROJECT_TOML, SETUP_PY, SETUP_CFG, UV_LOCK, POETRY_LOCK, PIPFILE)
):
return MANIFESTS_TO_ECOSYSTEMS[PYTHON_MANIFEST]
if os.path.isfile(git_repo_path / JS_MANIFEST):
Expand Down Expand Up @@ -1594,28 +1602,109 @@ def _try_file(path: Path, extractor) -> str | None:

return None

def _ensure_venv(self, manifest_path: Path) -> str:
"""Ensure transitive_env exists with a working python binary."""
venv_python = f'{manifest_path}/{TRANSITIVE_ENV_NAME}/bin/python'
if Path(venv_python).exists():
return venv_python
logger.warning("Venv python not found at %s — creating venv", venv_python)
python_version = self.determine_python_version(str(manifest_path))
if not python_version:
import sys
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
logger.info("Python version undetermined; using current interpreter %s", python_version)
logger.info("Creating transitive_env with Python %s using uv", python_version)
run_command(["uv", "venv" ,TRANSITIVE_ENV_NAME, "--python", python_version] ,cwd=manifest_path)
return venv_python

def install_dependencies(self, manifest_path: Path):
"""Install Python dependencies for the given repository into a virtual environment.

Calls :meth:`determine_python_version` to select the interpreter; when a
version is found ``uv venv`` is invoked with ``--python <version>``,
otherwise ``uv`` selects the default interpreter. Each line of
``requirements.txt`` is then installed via :meth:`install_dependency`.
Calls :meth:`_ensure_venv` to select the interpreter and create the venv.
Installs packages from whichever manifest format is present, trying formats
in this priority order:

1. ``requirements.txt``: line-by-line install (original behaviour)
2. ``uv.lock`` or ``poetry.lock``: ``uv export | uv pip install -r -``
3. ``pyproject.toml``, ``setup.py``, or ``setup.cfg``: ``uv pip install .``
4. ``Pipfile``: ``pipenv requirements | uv pip install -r -``

After installation, writes ``installed_packages.txt`` containing a
freeze-format snapshot of every package in the venv so that Code
Keyword Search can answer "is package X installed?" without source
traversal.

Args:
manifest_path: Absolute path to the root of the cloned repository,
which is expected to contain a ``requirements.txt`` manifest.
manifest_path: Absolute path to the root of the cloned repository.
"""
self._ensure_venv(manifest_path)
venv_python = self._ensure_venv(manifest_path)
site_packages = self._find_site_packages(manifest_path)
with open(manifest_path / PYTHON_MANIFEST, 'r') as manifest:

installed_via = self._install_from_best_manifest(manifest_path, venv_python, site_packages)
if installed_via:
logger.info("Installed Python dependencies via %s", installed_via)
else:
logger.warning("No supported Python manifest found in %s; transitive_env will be empty", manifest_path)

self._write_installed_packages(manifest_path)

def _install_from_best_manifest(self, manifest_path: Path, venv_python: str,
site_packages: Optional[Path]) -> Optional[str]:
"""Try each Python manifest format in priority order; return the format name on success."""
req_txt = manifest_path / PYTHON_MANIFEST
if req_txt.exists():
self._install_from_requirements_txt(req_txt, manifest_path, site_packages)
return PYTHON_MANIFEST

# Lock files: export to requirements format then pipe to uv pip install
for lock_file in (UV_LOCK, POETRY_LOCK):
if (manifest_path / lock_file).exists():
working_dir = manifest_path
create_requirements_command = ["uv","export","--format","requirements-txt","no-dev","2>/dev/null"]
requirements_txt_result = run_command(args=create_requirements_command, cwd=working_dir)

if requirements_txt_result is not None:
run_command(args=["uv","pip","install","-r","-", "--python", venv_python],
input_data=requirements_txt_result)

return lock_file

# Project manifests: uv pip install . resolves and installs all declared deps
for manifest_name in (PYPROJECT_TOML, SETUP_PY, SETUP_CFG):
if (manifest_path / manifest_name).exists():
run_command([ "uv", "pip", "install", "." , "--python" "venv_python"] , cwd=manifest_path)
return manifest_name

# Pipfile: requires pipenv; skip silently if not available
if (manifest_path / PIPFILE).exists():
requirements_txt_result = run_command(["pipenv","requirements","2>/dev/null"], cwd=manifest_path)
if requirements_txt_result is not None:
run_command(["uv","pip","install","-r","-", "--python", venv_python],input_data=requirements_txt_result)
return PIPFILE

return None

def _install_from_requirements_txt(self, req_txt: Path, manifest_path: Path,
site_packages: Optional[Path]) -> None:
"""Install dependencies line-by-line from requirements.txt (original behaviour)."""
with open(req_txt, 'r') as manifest:
for line in tqdm(manifest):
if line.strip() and not PythonLanguageFunctionsParser.is_comment_line(line):
self.install_dependency(line, manifest_path)
if site_packages:
package_name = re.split(r'[=>< \n]', line.strip())[0]
self._fallback_if_stub_only(package_name, site_packages)

def _write_installed_packages(self, manifest_path: Path) -> None:
"""Write a freeze-format snapshot of the venv to installed_packages.txt."""
pip_full_path= f"{manifest_path}/{TRANSITIVE_ENV_NAME}/bin/pip"
pip_freeze = run_command([pip_full_path,"list","--format=freeze"])
if pip_freeze:
(manifest_path / INSTALLED_PACKAGES_FILE).write_text(pip_freeze)
logger.info("Wrote installed packages snapshot to %s/%s", manifest_path, INSTALLED_PACKAGES_FILE)
else:
logger.warning("Could not generate installed packages list for %s", manifest_path)

def install_dependency(self, dependency, repo_path):
dependency = dependency.strip()
valid_signs = ['==', '>=', '<=', '!=']
Expand Down
70 changes: 68 additions & 2 deletions src/exploit_iq_commons/utils/source_code_git_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,26 @@
_credential_id_ctx,
fetch_and_decrypt_credential,
)
from exploit_iq_commons.utils.dep_tree import INSTALLED_PACKAGES_FILE, TRANSITIVE_ENV_NAME
from exploit_iq_commons.utils.transitive_code_searcher_tool import (
TransitiveCodeSearcher,
)

# Maximum number of .py files a site-packages package may contain before it
# is excluded from automatic indexing.
_SITE_PKG_MAX_PY_FILES: int = 150

# Directory-name suffixes/names to skip when scanning site-packages.
_SITE_PKG_SKIP_SUFFIXES: frozenset[str] = frozenset({".dist-info", ".egg-info"})
_SITE_PKG_SKIP_DIRS: frozenset[str] = frozenset({
"__pycache__",
"ansible_collections", # excluded: exceeds file-count threshold by a large margin
"tests",
"test",
"docs",
"doc",
})

PathLike = typing.Union[str, os.PathLike]


Expand Down Expand Up @@ -426,8 +442,18 @@ def yield_blobs(self) -> typing.Iterator[Blob]:
for exc in self.exclude or {}:
exclude_files = exclude_files.union(set(str(x.relative_to(base_path)) for x in base_path.glob(exc)))

# Filter out files that are not in the repo
# include_files = include_files.intersection(all_files_in_repo)
# Always include installed_packages.txt when present so that Code
# Keyword Search can answer "is package X installed?" for transitive deps.
installed_pkg_file = base_path / INSTALLED_PACKAGES_FILE
if installed_pkg_file.is_file():
include_files.add(INSTALLED_PACKAGES_FILE)
logger.debug("Including %s in document index", INSTALLED_PACKAGES_FILE)

# Include Python source from site-packages so that CCA and Code Keyword
# Search can trace transitive call chains across package boundaries.
# Packages exceeding _SITE_PKG_MAX_PY_FILES .py files and known noisy
# directories are excluded to bound indexing cost.
self._add_site_packages_blobs(base_path, include_files)

# Take the include files and remove the exclude files.
final_files = include_files - exclude_files
Expand All @@ -449,3 +475,43 @@ def yield_blobs(self) -> typing.Iterator[Blob]:
logger.warning("Failed to read blob for '%s'. Ignoring this file. Error: %s", abs_file_path, e)
else:
logger.debug("Skipping path as it is a directory, not a file: '%s'", abs_file_path)

@staticmethod
def _add_site_packages_blobs(base_path: Path, include_files: set[str]) -> None:
"""Add Python source files from transitive_env site-packages to include_files.

Only packages with at most ``_SITE_PKG_MAX_PY_FILES`` .py files are
indexed. Known heavy or noisy directories are skipped.
Files inside ``__pycache__`` sub-directories are always excluded.
"""
added_pkgs: list[str] = []
skipped_pkgs: list[str] = []

for sp_dir in base_path.glob(f"{TRANSITIVE_ENV_NAME}/lib/*/site-packages"):
if not sp_dir.is_dir():
continue
for pkg_dir in sp_dir.iterdir():
if not pkg_dir.is_dir():
continue
# Skip metadata directories and known noisy dirs
if any(pkg_dir.name.endswith(sfx) for sfx in _SITE_PKG_SKIP_SUFFIXES):
continue
if pkg_dir.name in _SITE_PKG_SKIP_DIRS:
continue
py_files = [
f for f in pkg_dir.rglob("*.py")
if "__pycache__" not in f.parts
]
if len(py_files) <= _SITE_PKG_MAX_PY_FILES:
for f in py_files:
include_files.add(str(f.relative_to(base_path)))
added_pkgs.append(pkg_dir.name)
else:
skipped_pkgs.append(f"{pkg_dir.name}({len(py_files)} files)")

if added_pkgs:
logger.info("Indexed %d site-packages package(s) for transitive analysis: %s",
len(added_pkgs), ", ".join(added_pkgs))
if skipped_pkgs:
logger.info("Skipped %d oversized site-packages package(s): %s",
len(skipped_pkgs), ", ".join(skipped_pkgs))
8 changes: 4 additions & 4 deletions src/vuln_analysis/utils/output_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _add_header(markdown_content, model_dict: AgentMorpheusOutput):
"""
input_image = model_dict.input.image
# iterate over a list of dict objects, with vuln_id and ghsa being 2 keys in each element
for output in model_dict.output:
for output in model_dict.output.analysis:
cve_id = output.vuln_id
markdown_content[cve_id].append(f"# Vulnerability Analysis Report for {cve_id}")
markdown_content[cve_id].append(f"> **Container Analyzed:** `{input_image.name}:{input_image.tag}`\n\n")
Expand Down Expand Up @@ -279,7 +279,7 @@ def _add_table_of_contents(markdown_content, model_dict: AgentMorpheusOutput):
None
This function modifies `markdown_content` in place.
"""
for entry in model_dict.output:
for entry in model_dict.output.analysis:
cve_id = entry.vuln_id
checklist = entry.checklist
markdown_content[cve_id].append("### Checklist <a name='checklist-toc' id='checklist-toc'></a>")
Expand Down Expand Up @@ -313,7 +313,7 @@ def _add_checklist_info(markdown_content, model_dict: AgentMorpheusOutput):
None
This function modifies `markdown_content` in place.
"""
for entry in model_dict.output:
for entry in model_dict.output.analysis:
cve_id = entry.vuln_id
checklist = entry.checklist
if checklist:
Expand Down Expand Up @@ -424,7 +424,7 @@ def _add_vulnerability_analysis(markdown_content, model_dict: AgentMorpheusOutpu
None
This function modifies `markdown_content` in place.
"""
for entry in model_dict.output:
for entry in model_dict.output.analysis:
cve_id = entry.vuln_id
summary = entry.summary
justification = entry.justification
Expand Down
16 changes: 0 additions & 16 deletions tests/test_python_version_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,3 @@ def test_python2_project(self, builder, tmp_path):
"setup.py": "from setuptools import setup\nsetup(python_requires='>=2.7,<3')\n",
})
assert builder.determine_python_version(str(repo)) == "2.7"


class TestInstallDependenciesVersionGuard:

def test_uses_python_flag_when_version_known(self, builder, tmp_path, monkeypatch):
(tmp_path / "requirements.txt").write_text("requests\n")
(tmp_path / "pyproject.toml").write_text("[project]\nrequires-python = \">=3.9\"\n")
calls = []
monkeypatch.setattr(
"exploit_iq_commons.utils.dep_tree.run_command",
lambda args, cwd="", input_data=None: calls.extend(args) or "",
)
builder.install_dependencies(tmp_path)
assert any("--python" in c for c in calls) and any("3.9" in c for c in calls) , f"calls: {calls}"
# def run_command(args: list[str], cwd: str | Path | None = None, input_data: str | None = None) -> str | None: