Files
codex/scripts/codex_package/v8.py
T
Adam Perry @ OpenAI 281b416c44 Check root Python script formatting in CI (#25165)
## Why

Python files under `scripts/` were not covered by the repository
formatting recipe or the CI formatting job, so formatting drift could
merge unnoticed.

## What

- Add a dedicated `scripts/pyproject.toml` and `scripts/uv.lock` so
root-script formatting uses a locked Ruff version.
- Extend `just fmt` to format root Python scripts and add
`fmt-scripts-check` for CI.
- Run `just fmt-scripts-check` from `.github/workflows/ci.yml`,
installing `uv` through SHA-pinned `astral-sh/setup-uv` while retaining
the `uv` `0.11.3` pin.
- Apply Ruff formatting to the root Python scripts, including
`scripts/just-shell.py`, and extend
`sdk/python/tests/test_artifact_workflow_and_binaries.py` to cover the
root formatting recipe.
- Update `AGENTS.md` so agents run `just fmt` after code changes
anywhere in the repository.

## Validation

- Extended the existing Python SDK workflow test to assert that `just
fmt` includes root Python scripts.
2026-06-01 18:50:23 +00:00

181 lines
5.4 KiB
Python

"""Codex-built V8 artifact overrides for package Cargo builds."""
from __future__ import annotations
import hashlib
import os
import shutil
import tempfile
from collections.abc import Mapping
from dataclasses import dataclass
from pathlib import Path
from urllib.request import urlopen
from .targets import REPO_ROOT
from .targets import TargetSpec
DOWNLOAD_TIMEOUT_SECS = 120
@dataclass(frozen=True)
class RustyV8ArtifactPair:
archive: Path
binding: Path
def resolve_codex_v8_cargo_env(
spec: TargetSpec,
*,
environ: Mapping[str, str] | None = None,
cache_root: Path | None = None,
) -> dict[str, str]:
if spec.is_windows:
return {}
environ = os.environ if environ is None else environ
if environ.get("V8_FROM_SOURCE") in {"true", "1", "yes"}:
return {}
archive_override = environ.get("RUSTY_V8_ARCHIVE")
binding_override = environ.get("RUSTY_V8_SRC_BINDING_PATH")
if archive_override and binding_override:
return {}
if archive_override or binding_override:
raise RuntimeError(
"Cargo package builds need RUSTY_V8_ARCHIVE and RUSTY_V8_SRC_BINDING_PATH set together."
)
artifacts = fetch_codex_v8_artifacts(spec, cache_root=cache_root)
return {
"RUSTY_V8_ARCHIVE": str(artifacts.archive),
"RUSTY_V8_SRC_BINDING_PATH": str(artifacts.binding),
}
def fetch_codex_v8_artifacts(
spec: TargetSpec,
*,
version: str | None = None,
cache_root: Path | None = None,
) -> RustyV8ArtifactPair:
if spec.is_windows:
raise RuntimeError(
f"No Codex-built V8 release artifacts for target: {spec.target}"
)
version = version or resolved_v8_crate_version()
release_url = (
f"https://github.com/openai/codex/releases/download/rusty-v8-v{version}"
)
target = spec.target
cache_dir = (cache_root or default_cache_root()) / f"rusty-v8-{version}-{target}"
archive = cache_dir / f"librusty_v8_release_{target}.a.gz"
binding = cache_dir / f"src_binding_release_{target}.rs"
checksums = cache_dir / f"rusty_v8_release_{target}.sha256"
download_file(f"{release_url}/{checksums.name}", checksums)
expected_checksums = load_checksums(checksums, {archive.name, binding.name})
for artifact in [archive, binding]:
ensure_valid_artifact(
artifact,
expected_checksums[artifact.name],
f"{release_url}/{artifact.name}",
)
return RustyV8ArtifactPair(archive=archive, binding=binding)
def resolved_v8_crate_version() -> str:
import tomllib
cargo_lock = tomllib.loads((REPO_ROOT / "codex-rs" / "Cargo.lock").read_text())
versions = sorted(
{
package["version"]
for package in cargo_lock["package"]
if package["name"] == "v8"
}
)
if len(versions) != 1:
raise RuntimeError(
f"Expected exactly one resolved v8 version, found: {versions}"
)
return versions[0]
def default_cache_root() -> Path:
return Path(tempfile.gettempdir()) / "codex-package"
def load_checksums(checksums_path: Path, artifact_names: set[str]) -> dict[str, str]:
checksums: dict[str, str] = {}
lines = checksums_path.read_text(encoding="utf-8").splitlines()
if len(lines) != len(artifact_names):
raise RuntimeError(
f"Expected {len(artifact_names)} V8 checksums in {checksums_path}, found {len(lines)}."
)
for line in lines:
parts = line.split(maxsplit=1)
if len(parts) != 2:
raise RuntimeError(
f"Invalid V8 checksum line in {checksums_path}: {line!r}"
)
digest, artifact_name = parts[0], parts[1].strip()
if len(digest) != 64 or any(char not in "0123456789abcdef" for char in digest):
raise RuntimeError(
f"Invalid V8 checksum digest in {checksums_path}: {digest}"
)
if artifact_name not in artifact_names:
raise RuntimeError(
f"Unexpected V8 checksum artifact in {checksums_path}: {artifact_name}"
)
checksums[artifact_name] = digest
if checksums.keys() != artifact_names:
raise RuntimeError(
f"V8 checksum manifest {checksums_path} does not cover {artifact_names}."
)
return checksums
def ensure_valid_artifact(artifact: Path, checksum: str, url: str) -> None:
if has_checksum(artifact, checksum):
return
artifact.unlink(missing_ok=True)
download_file(url, artifact)
if has_checksum(artifact, checksum):
return
artifact.unlink(missing_ok=True)
raise RuntimeError(
f"Codex-built V8 artifact {artifact} failed checksum validation."
)
def has_checksum(path: Path, expected: str) -> bool:
if not path.is_file():
return False
digest = hashlib.sha256()
with path.open("rb") as artifact:
for chunk in iter(lambda: artifact.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest() == expected
def download_file(url: str, dest: Path) -> None:
dest.parent.mkdir(parents=True, exist_ok=True)
temp_path = dest.with_suffix(f"{dest.suffix}.tmp")
temp_path.unlink(missing_ok=True)
try:
with urlopen(url, timeout=DOWNLOAD_TIMEOUT_SECS) as response:
with temp_path.open("wb") as output:
shutil.copyfileobj(response, output)
temp_path.replace(dest)
finally:
temp_path.unlink(missing_ok=True)