- Python CLI tool for rolling updates of CL-AppPipe-* and CL-SvcPipe-* stacks - Async update engine with configurable concurrency (asyncio.Semaphore) - Exponential backoff retry for API throttling - Dry-run mode for safe preview - IAM permission pre-validation - Comprehensive test suite (80 tests: 11 property-based + 69 unit) - Full spec documentation (requirements, design, tasks)
266 lines
9.1 KiB
Python
266 lines
9.1 KiB
Python
# Feature: one-click-cfn-stack-updater, Property 11: Permission validation correctness
|
|
"""Property-based tests for permission validation correctness.
|
|
|
|
**Validates: Requirements 7.1, 7.2**
|
|
|
|
Property 11: For any subset of required permissions marked as missing,
|
|
the validator returns exactly those missing permissions. When any
|
|
permissions are missing, no UpdateStack calls are made.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from botocore.exceptions import ClientError
|
|
from hypothesis import given, settings, strategies as st
|
|
|
|
from cfn_updater.permissions import REQUIRED_PERMISSIONS, validate_permissions
|
|
|
|
# The three permissions under test.
|
|
_ALL_PERMS = frozenset(REQUIRED_PERMISSIONS)
|
|
|
|
|
|
def _access_denied_error(operation: str = "Unknown") -> ClientError:
|
|
"""Build a botocore ClientError that looks like an AccessDenied response."""
|
|
return ClientError(
|
|
{"Error": {"Code": "AccessDenied", "Message": "Access Denied"}},
|
|
operation,
|
|
)
|
|
|
|
|
|
def _build_mocks(missing: frozenset[str]):
|
|
"""Return (cfn_client, sts_client, iam_client) mocks configured so that
|
|
exactly the permissions in *missing* appear to be absent.
|
|
"""
|
|
cfn = MagicMock()
|
|
sts = MagicMock()
|
|
iam = MagicMock()
|
|
|
|
# --- ListStacks ---
|
|
if "cloudformation:ListStacks" in missing:
|
|
cfn.list_stacks.side_effect = _access_denied_error("ListStacks")
|
|
else:
|
|
cfn.list_stacks.return_value = {"StackStatusFilter": []}
|
|
|
|
# --- DescribeStacks ---
|
|
if "cloudformation:DescribeStacks" in missing:
|
|
cfn.describe_stacks.side_effect = _access_denied_error("DescribeStacks")
|
|
else:
|
|
# Permission present — the stack doesn't exist, so CFN returns
|
|
# a ValidationError which the code treats as "permission OK".
|
|
cfn.describe_stacks.side_effect = ClientError(
|
|
{
|
|
"Error": {
|
|
"Code": "ValidationError",
|
|
"Message": "Stack does not exist",
|
|
}
|
|
},
|
|
"DescribeStacks",
|
|
)
|
|
|
|
# --- UpdateStack (via IAM simulation) ---
|
|
if "cloudformation:UpdateStack" in missing:
|
|
sts.get_caller_identity.return_value = {
|
|
"Arn": "arn:aws:iam::123456789012:user/test"
|
|
}
|
|
iam.simulate_principal_policy.return_value = {
|
|
"EvaluationResults": [
|
|
{
|
|
"EvalActionName": "cloudformation:UpdateStack",
|
|
"EvalDecision": "implicitDeny",
|
|
}
|
|
]
|
|
}
|
|
else:
|
|
sts.get_caller_identity.return_value = {
|
|
"Arn": "arn:aws:iam::123456789012:user/test"
|
|
}
|
|
iam.simulate_principal_policy.return_value = {
|
|
"EvaluationResults": [
|
|
{
|
|
"EvalActionName": "cloudformation:UpdateStack",
|
|
"EvalDecision": "allowed",
|
|
}
|
|
]
|
|
}
|
|
|
|
return cfn, sts, iam
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Strategy: draw any subset of the three required permissions as "missing".
|
|
# ---------------------------------------------------------------------------
|
|
missing_perms_strategy = st.frozensets(
|
|
st.sampled_from(sorted(REQUIRED_PERMISSIONS)),
|
|
min_size=0,
|
|
max_size=len(REQUIRED_PERMISSIONS),
|
|
)
|
|
|
|
|
|
@settings(max_examples=100)
|
|
@given(missing=missing_perms_strategy)
|
|
def test_permission_validation_returns_exactly_missing_perms(
|
|
missing: frozenset[str],
|
|
) -> None:
|
|
"""Property 11: validate_permissions returns exactly the missing subset.
|
|
|
|
**Validates: Requirements 7.1, 7.2**
|
|
"""
|
|
cfn, sts, iam = _build_mocks(missing)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert set(result) == set(missing), (
|
|
f"Expected missing={sorted(missing)}, got {sorted(result)}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit tests for permission validation
|
|
# Validates: Requirements 7.1, 7.2
|
|
# ---------------------------------------------------------------------------
|
|
|
|
import pytest
|
|
|
|
|
|
class TestValidatePermissionsHappyPath:
|
|
"""All permissions present — validator returns an empty list."""
|
|
|
|
def test_all_permissions_present(self):
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert result == []
|
|
|
|
def test_all_present_calls_list_stacks(self):
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
|
|
validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
cfn.list_stacks.assert_called_once()
|
|
|
|
def test_all_present_calls_describe_stacks(self):
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
|
|
validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
cfn.describe_stacks.assert_called_once()
|
|
|
|
|
|
class TestValidatePermissionsSingleMissing:
|
|
"""Exactly one permission missing at a time."""
|
|
|
|
def test_missing_list_stacks(self):
|
|
cfn, sts, iam = _build_mocks(
|
|
missing=frozenset({"cloudformation:ListStacks"})
|
|
)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert result == ["cloudformation:ListStacks"]
|
|
|
|
def test_missing_describe_stacks(self):
|
|
cfn, sts, iam = _build_mocks(
|
|
missing=frozenset({"cloudformation:DescribeStacks"})
|
|
)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert result == ["cloudformation:DescribeStacks"]
|
|
|
|
def test_missing_update_stack(self):
|
|
cfn, sts, iam = _build_mocks(
|
|
missing=frozenset({"cloudformation:UpdateStack"})
|
|
)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert result == ["cloudformation:UpdateStack"]
|
|
|
|
|
|
class TestValidatePermissionsAllMissing:
|
|
"""All three permissions missing."""
|
|
|
|
def test_all_permissions_missing(self):
|
|
cfn, sts, iam = _build_mocks(missing=frozenset(REQUIRED_PERMISSIONS))
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert set(result) == set(REQUIRED_PERMISSIONS)
|
|
assert len(result) == 3
|
|
|
|
|
|
class TestValidatePermissionsBoto3Errors:
|
|
"""boto3 / botocore error handling during validation."""
|
|
|
|
def test_unexpected_client_error_propagates_from_list_stacks(self):
|
|
"""A non-access-denied ClientError should bubble up, not be swallowed."""
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
cfn.list_stacks.side_effect = ClientError(
|
|
{"Error": {"Code": "InternalError", "Message": "Something broke"}},
|
|
"ListStacks",
|
|
)
|
|
|
|
with pytest.raises(ClientError) as exc_info:
|
|
validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert exc_info.value.response["Error"]["Code"] == "InternalError"
|
|
|
|
def test_unexpected_client_error_propagates_from_describe_stacks(self):
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
cfn.describe_stacks.side_effect = ClientError(
|
|
{"Error": {"Code": "InternalError", "Message": "Something broke"}},
|
|
"DescribeStacks",
|
|
)
|
|
|
|
with pytest.raises(ClientError) as exc_info:
|
|
validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert exc_info.value.response["Error"]["Code"] == "InternalError"
|
|
|
|
def test_no_sts_client_defers_update_check(self):
|
|
"""Without sts_client, UpdateStack check is deferred (returns True)."""
|
|
cfn, _, iam = _build_mocks(missing=frozenset())
|
|
|
|
result = validate_permissions(cfn, sts_client=None, iam_client=iam)
|
|
|
|
assert "cloudformation:UpdateStack" not in result
|
|
|
|
def test_no_iam_client_defers_update_check(self):
|
|
"""Without iam_client, UpdateStack check is deferred (returns True)."""
|
|
cfn, sts, _ = _build_mocks(missing=frozenset())
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=None)
|
|
|
|
assert "cloudformation:UpdateStack" not in result
|
|
|
|
def test_simulate_policy_access_denied_reports_missing(self):
|
|
"""If simulate_principal_policy itself is denied, UpdateStack is flagged."""
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
sts.get_caller_identity.return_value = {
|
|
"Arn": "arn:aws:iam::123456789012:user/test"
|
|
}
|
|
iam.simulate_principal_policy.side_effect = _access_denied_error(
|
|
"SimulatePrincipalPolicy"
|
|
)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert "cloudformation:UpdateStack" in result
|
|
|
|
def test_simulate_policy_unexpected_error_defers(self):
|
|
"""Non-access-denied ClientError from simulation defers the check."""
|
|
cfn, sts, iam = _build_mocks(missing=frozenset())
|
|
sts.get_caller_identity.return_value = {
|
|
"Arn": "arn:aws:iam::123456789012:user/test"
|
|
}
|
|
iam.simulate_principal_policy.side_effect = ClientError(
|
|
{"Error": {"Code": "ServiceException", "Message": "Oops"}},
|
|
"SimulatePrincipalPolicy",
|
|
)
|
|
|
|
result = validate_permissions(cfn, sts_client=sts, iam_client=iam)
|
|
|
|
assert "cloudformation:UpdateStack" not in result
|