|
| 1 | +""" |
| 2 | +Security-focused tests for pr_agent.git_providers.utils.apply_repo_settings. |
| 3 | +
|
| 4 | +These tests verify: |
| 5 | +- The repo settings fetch path is skipped when use_repo_settings_file is disabled. |
| 6 | +- Valid repo TOML overrides only the specified keys and preserves siblings. |
| 7 | +- Invalid TOML produces exactly one local-category configuration error and |
| 8 | + does not pollute global settings. |
| 9 | +- Forbidden directives (e.g. dynaconf_include) are rejected and produce a |
| 10 | + local-category configuration error without polluting settings. |
| 11 | +- The temporary file created from the repo settings bytes is removed after |
| 12 | + apply_repo_settings, both on success and on failure. |
| 13 | +""" |
| 14 | + |
| 15 | +import copy |
| 16 | +import os |
| 17 | +import tempfile |
| 18 | + |
| 19 | +import pytest |
| 20 | + |
| 21 | +from pr_agent.config_loader import get_settings |
| 22 | +from pr_agent.git_providers import utils as git_utils |
| 23 | +from pr_agent.git_providers.utils import apply_repo_settings |
| 24 | + |
| 25 | + |
| 26 | +class FakeGitProvider: |
| 27 | + """Minimal fake provider exposing the methods apply_repo_settings touches.""" |
| 28 | + |
| 29 | + def __init__(self, repo_settings_bytes=b""): |
| 30 | + self._repo_settings = repo_settings_bytes |
| 31 | + self.persistent_comments = [] |
| 32 | + self.comments = [] |
| 33 | + self.get_repo_settings_calls = 0 |
| 34 | + |
| 35 | + def get_repo_settings(self): |
| 36 | + self.get_repo_settings_calls += 1 |
| 37 | + return self._repo_settings |
| 38 | + |
| 39 | + def is_supported(self, capability): |
| 40 | + return capability == "gfm_markdown" |
| 41 | + |
| 42 | + def publish_persistent_comment(self, body, initial_header, update_header, final_update_message): |
| 43 | + self.persistent_comments.append( |
| 44 | + { |
| 45 | + "body": body, |
| 46 | + "initial_header": initial_header, |
| 47 | + "update_header": update_header, |
| 48 | + "final_update_message": final_update_message, |
| 49 | + } |
| 50 | + ) |
| 51 | + |
| 52 | + def publish_comment(self, body): |
| 53 | + self.comments.append(body) |
| 54 | + |
| 55 | + |
| 56 | +SNAPSHOT_SECTIONS = ("CONFIG", "PR_REVIEWER", "CUSTOM_SECTION_FOR_TEST") |
| 57 | + |
| 58 | + |
| 59 | +def _snapshot_settings_sections(settings): |
| 60 | + return {section: copy.deepcopy(settings.as_dict().get(section)) for section in SNAPSHOT_SECTIONS} |
| 61 | + |
| 62 | + |
| 63 | +def _restore_settings_sections(settings, snapshot): |
| 64 | + for section, data in snapshot.items(): |
| 65 | + try: |
| 66 | + settings.unset(section, force=True) |
| 67 | + except Exception: |
| 68 | + pass |
| 69 | + if data is not None: |
| 70 | + settings.set(section, copy.deepcopy(data), merge=False) |
| 71 | + |
| 72 | + |
| 73 | +_ENV_ABSENT = object() |
| 74 | + |
| 75 | + |
| 76 | +@pytest.fixture |
| 77 | +def settings_snapshot(): |
| 78 | + """Snapshot the keys mutated by these tests and restore them afterwards. |
| 79 | +
|
| 80 | + Also snapshots the ``AUTO_CAST_FOR_DYNACONF`` environment variable, which |
| 81 | + ``apply_repo_settings`` unconditionally sets to ``"false"``. Using a |
| 82 | + sentinel for "originally absent" ensures the env restore is exact: |
| 83 | + keys that were absent are deleted, never left as a stray ``None``-like |
| 84 | + string that could leak into other tests. |
| 85 | + """ |
| 86 | + settings = get_settings() |
| 87 | + snapshot = _snapshot_settings_sections(settings) |
| 88 | + env_before = os.environ.get("AUTO_CAST_FOR_DYNACONF", _ENV_ABSENT) |
| 89 | + try: |
| 90 | + yield |
| 91 | + finally: |
| 92 | + _restore_settings_sections(settings, snapshot) |
| 93 | + if env_before is _ENV_ABSENT: |
| 94 | + os.environ.pop("AUTO_CAST_FOR_DYNACONF", None) |
| 95 | + else: |
| 96 | + os.environ["AUTO_CAST_FOR_DYNACONF"] = env_before |
| 97 | + |
| 98 | + |
| 99 | +def _install_provider(monkeypatch, provider): |
| 100 | + captured = {"errors": None, "git_provider": None} |
| 101 | + |
| 102 | + def fake_get_git_provider_with_context(pr_url): |
| 103 | + return provider |
| 104 | + |
| 105 | + def fake_handle_configurations_errors(errors, git_provider): |
| 106 | + captured["errors"] = errors |
| 107 | + captured["git_provider"] = git_provider |
| 108 | + |
| 109 | + monkeypatch.setattr(git_utils, "get_git_provider_with_context", fake_get_git_provider_with_context) |
| 110 | + monkeypatch.setattr(git_utils, "handle_configurations_errors", fake_handle_configurations_errors) |
| 111 | + return captured |
| 112 | + |
| 113 | + |
| 114 | +def test_disabled_repo_settings_skips_provider_fetch(monkeypatch, settings_snapshot): |
| 115 | + provider = FakeGitProvider(repo_settings_bytes=b"[pr_reviewer]\nnum_max_findings = 99\n") |
| 116 | + captured = _install_provider(monkeypatch, provider) |
| 117 | + |
| 118 | + get_settings().set("config.use_repo_settings_file", False) |
| 119 | + original_num = get_settings().as_dict().get("PR_REVIEWER", {}).get("num_max_findings") |
| 120 | + |
| 121 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 122 | + |
| 123 | + assert provider.get_repo_settings_calls == 0 |
| 124 | + assert captured["errors"] is None |
| 125 | + # Settings were not touched. |
| 126 | + assert get_settings().as_dict().get("PR_REVIEWER", {}).get("num_max_findings") == original_num |
| 127 | + |
| 128 | + |
| 129 | +def _section(settings, name): |
| 130 | + """Return a section dict from settings using a case-insensitive lookup.""" |
| 131 | + data = settings.as_dict() |
| 132 | + upper = name.upper() |
| 133 | + for key, value in data.items(): |
| 134 | + if key.upper() == upper: |
| 135 | + return value if isinstance(value, dict) else {} |
| 136 | + return {} |
| 137 | + |
| 138 | + |
| 139 | +def test_valid_repo_settings_merge_overrides_key_and_preserves_siblings(monkeypatch, settings_snapshot): |
| 140 | + provider = FakeGitProvider(repo_settings_bytes=b"[pr_reviewer]\nnum_max_findings = 11\n") |
| 141 | + captured = _install_provider(monkeypatch, provider) |
| 142 | + |
| 143 | + get_settings().set("config.use_repo_settings_file", True) |
| 144 | + settings = get_settings() |
| 145 | + sibling_before = _section(settings, "pr_reviewer").get("require_tests_review") |
| 146 | + assert sibling_before is not None, "Test precondition: sibling key should already be present" |
| 147 | + |
| 148 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 149 | + |
| 150 | + assert provider.get_repo_settings_calls == 1 |
| 151 | + assert captured["errors"] is None, f"Unexpected configuration errors: {captured['errors']}" |
| 152 | + |
| 153 | + pr_reviewer = _section(settings, "pr_reviewer") |
| 154 | + assert pr_reviewer.get("num_max_findings") == 11 |
| 155 | + # Unrelated sibling key in the same section must be preserved by the merge logic. |
| 156 | + assert pr_reviewer.get("require_tests_review") == sibling_before |
| 157 | + |
| 158 | + |
| 159 | +def test_invalid_toml_does_not_pollute_settings(monkeypatch, settings_snapshot): |
| 160 | + """ |
| 161 | + Malformed TOML must never leak into the live settings. The custom loader |
| 162 | + currently swallows the TOMLDecodeError and logs it, so no local error is |
| 163 | + propagated to handle_configurations_errors; the surviving security |
| 164 | + guarantee is that the existing settings are untouched. |
| 165 | + """ |
| 166 | + malformed = b"[pr_reviewer\nnum_max_findings = 7\n" |
| 167 | + provider = FakeGitProvider(repo_settings_bytes=malformed) |
| 168 | + _install_provider(monkeypatch, provider) |
| 169 | + |
| 170 | + get_settings().set("config.use_repo_settings_file", True) |
| 171 | + settings = get_settings() |
| 172 | + before = copy.deepcopy(_section(settings, "pr_reviewer")) |
| 173 | + |
| 174 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 175 | + |
| 176 | + after = _section(settings, "pr_reviewer") |
| 177 | + assert after == before |
| 178 | + # Whatever errors may or may not be published, the malformed payload must |
| 179 | + # never be merged silently into pr_reviewer. |
| 180 | + assert after.get("num_max_findings") != 7 |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.xfail( |
| 184 | + reason=( |
| 185 | + "Behavior gap: pr_agent.custom_merge_loader is invoked with silent=True, " |
| 186 | + "so TOMLDecodeError is logged and swallowed instead of being surfaced to " |
| 187 | + "handle_configurations_errors. apply_repo_settings therefore never publishes " |
| 188 | + "a 'local' configuration-error comment for malformed TOML." |
| 189 | + ), |
| 190 | + strict=True, |
| 191 | +) |
| 192 | +def test_invalid_toml_publishes_one_local_error(monkeypatch, settings_snapshot): |
| 193 | + malformed = b"[pr_reviewer\nnum_max_findings = 7\n" |
| 194 | + provider = FakeGitProvider(repo_settings_bytes=malformed) |
| 195 | + captured = _install_provider(monkeypatch, provider) |
| 196 | + get_settings().set("config.use_repo_settings_file", True) |
| 197 | + |
| 198 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 199 | + |
| 200 | + assert captured["errors"] is not None |
| 201 | + assert len(captured["errors"]) == 1 |
| 202 | + assert captured["errors"][0]["category"] == "local" |
| 203 | + assert captured["errors"][0]["settings"] == malformed |
| 204 | + |
| 205 | + |
| 206 | +def test_forbidden_directive_does_not_pollute_settings(monkeypatch, settings_snapshot): |
| 207 | + """ |
| 208 | + A repo TOML containing forbidden directives (e.g. dynaconf_include) must |
| 209 | + not leak into the live settings. As with malformed TOML, the loader's |
| 210 | + SecurityError is currently swallowed silently; the security guarantee |
| 211 | + checked here is that no part of the payload (including the legitimate |
| 212 | + pr_reviewer override) reaches the settings. |
| 213 | + """ |
| 214 | + forbidden_toml = b"dynaconf_include = ['evil.toml']\n[pr_reviewer]\nnum_max_findings = 42\n" |
| 215 | + provider = FakeGitProvider(repo_settings_bytes=forbidden_toml) |
| 216 | + _install_provider(monkeypatch, provider) |
| 217 | + |
| 218 | + get_settings().set("config.use_repo_settings_file", True) |
| 219 | + settings = get_settings() |
| 220 | + before = copy.deepcopy(_section(settings, "pr_reviewer")) |
| 221 | + |
| 222 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 223 | + |
| 224 | + after = _section(settings, "pr_reviewer") |
| 225 | + # The forbidden file must be rejected wholesale; neither the directive |
| 226 | + # nor the piggy-backed pr_reviewer override should be applied. |
| 227 | + assert after == before |
| 228 | + assert after.get("num_max_findings") != 42 |
| 229 | + assert "dynaconf_include" not in {k.lower() for k in settings.as_dict().keys()} |
| 230 | + |
| 231 | + |
| 232 | +@pytest.mark.xfail( |
| 233 | + reason=( |
| 234 | + "Behavior gap: forbidden-directive SecurityError raised by " |
| 235 | + "validate_file_security is swallowed by the silent-loader path, so " |
| 236 | + "apply_repo_settings does not publish a 'local' configuration-error " |
| 237 | + "comment for forbidden TOML directives." |
| 238 | + ), |
| 239 | + strict=True, |
| 240 | +) |
| 241 | +def test_forbidden_directive_publishes_one_local_error(monkeypatch, settings_snapshot): |
| 242 | + forbidden_toml = b"dynaconf_include = ['evil.toml']\n[pr_reviewer]\nnum_max_findings = 42\n" |
| 243 | + provider = FakeGitProvider(repo_settings_bytes=forbidden_toml) |
| 244 | + captured = _install_provider(monkeypatch, provider) |
| 245 | + get_settings().set("config.use_repo_settings_file", True) |
| 246 | + |
| 247 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 248 | + |
| 249 | + assert captured["errors"] is not None |
| 250 | + assert len(captured["errors"]) == 1 |
| 251 | + assert captured["errors"][0]["category"] == "local" |
| 252 | + assert captured["errors"][0]["settings"] == forbidden_toml |
| 253 | + |
| 254 | + |
| 255 | +def test_temp_file_is_removed_after_successful_apply(monkeypatch, tmp_path, settings_snapshot): |
| 256 | + provider = FakeGitProvider(repo_settings_bytes=b"[pr_reviewer]\nnum_max_findings = 5\n") |
| 257 | + _install_provider(monkeypatch, provider) |
| 258 | + get_settings().set("config.use_repo_settings_file", True) |
| 259 | + |
| 260 | + known_path = tmp_path / "repo_settings_success.toml" |
| 261 | + |
| 262 | + def fake_mkstemp(suffix=None, prefix=None, dir=None, text=False): |
| 263 | + fd = os.open(str(known_path), os.O_RDWR | os.O_CREAT | os.O_TRUNC) |
| 264 | + return fd, str(known_path) |
| 265 | + |
| 266 | + monkeypatch.setattr(tempfile, "mkstemp", fake_mkstemp) |
| 267 | + |
| 268 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 269 | + |
| 270 | + assert not known_path.exists(), "Temp settings file must be removed after successful apply" |
| 271 | + |
| 272 | + |
| 273 | +def test_temp_file_is_removed_after_failed_apply(monkeypatch, tmp_path, settings_snapshot): |
| 274 | + """The temp file must be removed even when the Dynaconf load step raises. |
| 275 | +
|
| 276 | + We use *valid* TOML bytes (so the failure cannot be confused with the |
| 277 | + silent-swallow malformed-TOML path) and force the failure by replacing |
| 278 | + the ``Dynaconf`` symbol bound inside ``pr_agent.git_providers.utils`` |
| 279 | + with a stub that raises *after* ``mkstemp`` has been called. We do not |
| 280 | + patch the external ``dynaconf`` module — only the imported reference |
| 281 | + that ``apply_repo_settings`` actually uses. |
| 282 | + """ |
| 283 | + valid_toml = b"[pr_reviewer]\nnum_max_findings = 3\n" |
| 284 | + provider = FakeGitProvider(repo_settings_bytes=valid_toml) |
| 285 | + captured = _install_provider(monkeypatch, provider) |
| 286 | + get_settings().set("config.use_repo_settings_file", True) |
| 287 | + |
| 288 | + known_path = tmp_path / "repo_settings_failure.toml" |
| 289 | + mkstemp_calls = {"n": 0} |
| 290 | + |
| 291 | + def fake_mkstemp(suffix=None, prefix=None, dir=None, text=False): |
| 292 | + mkstemp_calls["n"] += 1 |
| 293 | + fd = os.open(str(known_path), os.O_RDWR | os.O_CREAT | os.O_TRUNC) |
| 294 | + return fd, str(known_path) |
| 295 | + |
| 296 | + monkeypatch.setattr(tempfile, "mkstemp", fake_mkstemp) |
| 297 | + |
| 298 | + def exploding_dynaconf(*args, **kwargs): |
| 299 | + raise RuntimeError("boom") |
| 300 | + |
| 301 | + monkeypatch.setattr(git_utils, "Dynaconf", exploding_dynaconf) |
| 302 | + |
| 303 | + apply_repo_settings("https://example.com/owner/repo/pull/1") |
| 304 | + |
| 305 | + assert mkstemp_calls["n"] == 1, "mkstemp must have run before the failure" |
| 306 | + assert not known_path.exists(), "Temp settings file must be removed even after a failed apply" |
| 307 | + |
| 308 | + # The local-category configuration error path must have been exercised. |
| 309 | + assert captured["errors"] is not None, "handle_configurations_errors should have been called" |
| 310 | + assert len(captured["errors"]) == 1 |
| 311 | + err = captured["errors"][0] |
| 312 | + assert err["category"] == "local" |
| 313 | + assert err["settings"] == valid_toml |
| 314 | + assert "boom" in err["error"] |
| 315 | + |
| 316 | + |
| 317 | +def test_restore_settings_sections_removes_section_created_after_snapshot(): |
| 318 | + settings = get_settings() |
| 319 | + original_snapshot = _snapshot_settings_sections(settings) |
| 320 | + |
| 321 | + try: |
| 322 | + settings.unset("CUSTOM_SECTION_FOR_TEST", force=True) |
| 323 | + assert "CUSTOM_SECTION_FOR_TEST" not in settings.as_dict() |
| 324 | + |
| 325 | + snapshot = _snapshot_settings_sections(settings) |
| 326 | + assert snapshot["CUSTOM_SECTION_FOR_TEST"] is None |
| 327 | + |
| 328 | + settings.set("CUSTOM_SECTION_FOR_TEST", {"foo": "bar"}, merge=False) |
| 329 | + assert settings.as_dict()["CUSTOM_SECTION_FOR_TEST"] == {"foo": "bar"} |
| 330 | + |
| 331 | + _restore_settings_sections(settings, snapshot) |
| 332 | + |
| 333 | + assert "CUSTOM_SECTION_FOR_TEST" not in settings.as_dict() |
| 334 | + finally: |
| 335 | + _restore_settings_sections(settings, original_snapshot) |
0 commit comments