- Python CLI tool for rolling updates of CL-AppPipe-* and CL-SvcPipe-* stacks - Async update engine with configurable concurrency (asyncio.Semaphore) - Exponential backoff retry for API throttling - Dry-run mode for safe preview - IAM permission pre-validation - Comprehensive test suite (80 tests: 11 property-based + 69 unit) - Full spec documentation (requirements, design, tasks)
116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
"""Permission validation for the CloudFormation Stack Updater."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from botocore.exceptions import ClientError
|
|
|
|
|
|
# Permissions we need to validate before proceeding with an update run.
|
|
REQUIRED_PERMISSIONS = [
|
|
"cloudformation:ListStacks",
|
|
"cloudformation:DescribeStacks",
|
|
"cloudformation:UpdateStack",
|
|
]
|
|
|
|
# Error codes that indicate missing IAM permissions.
|
|
_ACCESS_DENIED_CODES = frozenset({
|
|
"AccessDenied",
|
|
"AccessDeniedException",
|
|
"UnauthorizedAccess",
|
|
"AuthorizationError",
|
|
})
|
|
|
|
|
|
def validate_permissions(
|
|
cfn_client,
|
|
sts_client=None,
|
|
iam_client=None,
|
|
) -> list[str]:
|
|
"""Check required IAM permissions by performing dry-run API calls.
|
|
|
|
Args:
|
|
cfn_client: A boto3 CloudFormation client.
|
|
sts_client: Optional boto3 STS client (used for UpdateStack simulation).
|
|
iam_client: Optional boto3 IAM client (used for UpdateStack simulation).
|
|
|
|
Returns:
|
|
A list of missing permission names. An empty list means all
|
|
permissions are present.
|
|
"""
|
|
missing: list[str] = []
|
|
|
|
# --- cloudformation:ListStacks ---
|
|
if not _check_list_stacks(cfn_client):
|
|
missing.append("cloudformation:ListStacks")
|
|
|
|
# --- cloudformation:DescribeStacks ---
|
|
if not _check_describe_stacks(cfn_client):
|
|
missing.append("cloudformation:DescribeStacks")
|
|
|
|
# --- cloudformation:UpdateStack ---
|
|
if not _check_update_stack(sts_client, iam_client):
|
|
missing.append("cloudformation:UpdateStack")
|
|
|
|
return missing
|
|
|
|
|
|
def _check_list_stacks(cfn_client) -> bool:
|
|
"""Return True if the caller has cloudformation:ListStacks."""
|
|
try:
|
|
cfn_client.list_stacks(StackStatusFilter=["CREATE_COMPLETE"])
|
|
return True
|
|
except ClientError as exc:
|
|
if exc.response["Error"]["Code"] in _ACCESS_DENIED_CODES:
|
|
return False
|
|
raise
|
|
|
|
|
|
def _check_describe_stacks(cfn_client) -> bool:
|
|
"""Return True if the caller has cloudformation:DescribeStacks."""
|
|
try:
|
|
cfn_client.describe_stacks(
|
|
StackName="cfn-updater-permission-check-nonexistent",
|
|
)
|
|
return True
|
|
except ClientError as exc:
|
|
code = exc.response["Error"]["Code"]
|
|
if code in _ACCESS_DENIED_CODES:
|
|
return False
|
|
if code == "ValidationError":
|
|
# "Stack … does not exist" — permission is present, stack just
|
|
# doesn't exist. This is the expected happy-path response.
|
|
return True
|
|
raise
|
|
|
|
|
|
def _check_update_stack(sts_client, iam_client) -> bool:
|
|
"""Best-effort check for cloudformation:UpdateStack via IAM simulation.
|
|
|
|
Uses ``iam:SimulatePrincipalPolicy`` to verify the permission. If the
|
|
STS or IAM clients are not provided, or if the simulation call itself
|
|
is denied, the check is deferred (returns True) — the real UpdateStack
|
|
call will surface the error later.
|
|
"""
|
|
if sts_client is None or iam_client is None:
|
|
# Cannot simulate without both clients — defer the check.
|
|
return True
|
|
|
|
try:
|
|
caller_arn = sts_client.get_caller_identity()["Arn"]
|
|
result = iam_client.simulate_principal_policy(
|
|
PolicySourceArn=caller_arn,
|
|
ActionNames=["cloudformation:UpdateStack"],
|
|
)
|
|
for eval_result in result.get("EvaluationResults", []):
|
|
if eval_result.get("EvalDecision") != "allowed":
|
|
return False
|
|
return True
|
|
except ClientError as exc:
|
|
if exc.response["Error"]["Code"] in _ACCESS_DENIED_CODES:
|
|
return False
|
|
# If simulation itself fails for another reason (e.g. missing
|
|
# iam:SimulatePrincipalPolicy), defer — the real call will fail later.
|
|
return True
|
|
except Exception:
|
|
# Any unexpected error — defer.
|
|
return True
|