cfn-stack-updater/cfn_updater/permissions.py
Vijaya Manne 632ac9e328 Initial commit: One-Click CloudFormation Stack Updater
- Python CLI tool for rolling updates of CL-AppPipe-* and CL-SvcPipe-* stacks
- Async update engine with configurable concurrency (asyncio.Semaphore)
- Exponential backoff retry for API throttling
- Dry-run mode for safe preview
- IAM permission pre-validation
- Comprehensive test suite (80 tests: 11 property-based + 69 unit)
- Full spec documentation (requirements, design, tasks)
2026-05-29 14:56:59 -04:00

116 lines
3.8 KiB
Python

"""Permission validation for the CloudFormation Stack Updater."""
from __future__ import annotations
from botocore.exceptions import ClientError
# Permissions we need to validate before proceeding with an update run.
REQUIRED_PERMISSIONS = [
"cloudformation:ListStacks",
"cloudformation:DescribeStacks",
"cloudformation:UpdateStack",
]
# Error codes that indicate missing IAM permissions.
_ACCESS_DENIED_CODES = frozenset({
"AccessDenied",
"AccessDeniedException",
"UnauthorizedAccess",
"AuthorizationError",
})
def validate_permissions(
cfn_client,
sts_client=None,
iam_client=None,
) -> list[str]:
"""Check required IAM permissions by performing dry-run API calls.
Args:
cfn_client: A boto3 CloudFormation client.
sts_client: Optional boto3 STS client (used for UpdateStack simulation).
iam_client: Optional boto3 IAM client (used for UpdateStack simulation).
Returns:
A list of missing permission names. An empty list means all
permissions are present.
"""
missing: list[str] = []
# --- cloudformation:ListStacks ---
if not _check_list_stacks(cfn_client):
missing.append("cloudformation:ListStacks")
# --- cloudformation:DescribeStacks ---
if not _check_describe_stacks(cfn_client):
missing.append("cloudformation:DescribeStacks")
# --- cloudformation:UpdateStack ---
if not _check_update_stack(sts_client, iam_client):
missing.append("cloudformation:UpdateStack")
return missing
def _check_list_stacks(cfn_client) -> bool:
"""Return True if the caller has cloudformation:ListStacks."""
try:
cfn_client.list_stacks(StackStatusFilter=["CREATE_COMPLETE"])
return True
except ClientError as exc:
if exc.response["Error"]["Code"] in _ACCESS_DENIED_CODES:
return False
raise
def _check_describe_stacks(cfn_client) -> bool:
"""Return True if the caller has cloudformation:DescribeStacks."""
try:
cfn_client.describe_stacks(
StackName="cfn-updater-permission-check-nonexistent",
)
return True
except ClientError as exc:
code = exc.response["Error"]["Code"]
if code in _ACCESS_DENIED_CODES:
return False
if code == "ValidationError":
# "Stack … does not exist" — permission is present, stack just
# doesn't exist. This is the expected happy-path response.
return True
raise
def _check_update_stack(sts_client, iam_client) -> bool:
"""Best-effort check for cloudformation:UpdateStack via IAM simulation.
Uses ``iam:SimulatePrincipalPolicy`` to verify the permission. If the
STS or IAM clients are not provided, or if the simulation call itself
is denied, the check is deferred (returns True) — the real UpdateStack
call will surface the error later.
"""
if sts_client is None or iam_client is None:
# Cannot simulate without both clients — defer the check.
return True
try:
caller_arn = sts_client.get_caller_identity()["Arn"]
result = iam_client.simulate_principal_policy(
PolicySourceArn=caller_arn,
ActionNames=["cloudformation:UpdateStack"],
)
for eval_result in result.get("EvaluationResults", []):
if eval_result.get("EvalDecision") != "allowed":
return False
return True
except ClientError as exc:
if exc.response["Error"]["Code"] in _ACCESS_DENIED_CODES:
return False
# If simulation itself fails for another reason (e.g. missing
# iam:SimulatePrincipalPolicy), defer — the real call will fail later.
return True
except Exception:
# Any unexpected error — defer.
return True