cfn-stack-updater/cfn_updater/updater.py
Vijaya Manne 632ac9e328 Initial commit: One-Click CloudFormation Stack Updater
- Python CLI tool for rolling updates of CL-AppPipe-* and CL-SvcPipe-* stacks
- Async update engine with configurable concurrency (asyncio.Semaphore)
- Exponential backoff retry for API throttling
- Dry-run mode for safe preview
- IAM permission pre-validation
- Comprehensive test suite (80 tests: 11 property-based + 69 unit)
- Full spec documentation (requirements, design, tasks)
2026-05-29 14:56:59 -04:00

170 lines
6.1 KiB
Python

"""Stack update engine for the CloudFormation Stack Updater."""
from __future__ import annotations
import asyncio
import time
from botocore.exceptions import ClientError
from cfn_updater.config import BASE_RETRY_DELAY, DEFAULT_CONCURRENCY, MAX_RETRIES, TEMPLATE_URL
from cfn_updater.models import DiscoveredStack, StackUpdateResult
async def update_stack(
cfn_client,
stack_name: str,
template_url: str,
max_retries: int = MAX_RETRIES,
) -> StackUpdateResult:
"""
Updates a single CloudFormation stack.
Fetches current parameters, calls UpdateStack with UsePreviousValue=True
for all existing parameters and the configured template URL.
Handles 'No updates are to be performed' as no-update-needed.
Retries on throttling errors with exponential backoff.
"""
start = time.monotonic()
try:
# Fetch current stack parameters
describe_resp = cfn_client.describe_stacks(StackName=stack_name)
stacks = describe_resp.get("Stacks", [])
if not stacks:
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="failed",
error=f"Stack '{stack_name}' not found",
duration_seconds=duration,
)
existing_params = stacks[0].get("Parameters", [])
# Build parameter list with UsePreviousValue=True
params = [
{"ParameterKey": p["ParameterKey"], "UsePreviousValue": True}
for p in existing_params
]
# Attempt UpdateStack with retry logic for throttling
for attempt in range(max_retries + 1):
try:
cfn_client.update_stack(
StackName=stack_name,
TemplateURL=template_url,
Parameters=params,
UsePreviousTemplate=False,
Capabilities=["CAPABILITY_NAMED_IAM"],
)
# Update initiated successfully
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="succeeded",
duration_seconds=duration,
)
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"].get("Message", str(e))
# Handle "No updates are to be performed"
if "No updates are to be performed" in error_message:
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="no-update-needed",
duration_seconds=duration,
)
# Handle throttling with exponential backoff
if error_code in ("Throttling", "RequestLimitExceeded") and attempt < max_retries:
delay = BASE_RETRY_DELAY * (2 ** attempt)
await asyncio.sleep(delay)
continue
# Non-retryable error or retries exhausted
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="failed",
error=error_message,
duration_seconds=duration,
)
except ClientError as e:
# Error from describe_stacks
duration = time.monotonic() - start
error_message = e.response["Error"].get("Message", str(e))
return StackUpdateResult(
stack_name=stack_name,
status="failed",
error=error_message,
duration_seconds=duration,
)
except Exception as e:
# Catch-all for unexpected errors (network issues, etc.)
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="failed",
error=str(e),
duration_seconds=duration,
)
# Should not reach here, but safety net if max_retries loop completes
# without returning (all attempts were throttled)
duration = time.monotonic() - start
return StackUpdateResult(
stack_name=stack_name,
status="failed",
error="Max retries exhausted due to throttling",
duration_seconds=duration,
)
async def update_all_stacks(
cfn_client,
stacks: list[DiscoveredStack],
template_url: str,
concurrency: int = DEFAULT_CONCURRENCY,
max_retries: int = MAX_RETRIES,
) -> list[StackUpdateResult]:
"""
Updates all stacks with bounded concurrency using asyncio.Semaphore.
Non-updatable stacks are immediately recorded as skipped.
Updatable stacks are updated concurrently, bounded by the semaphore.
Results are returned in the same order as the input stacks list.
"""
sem = asyncio.Semaphore(concurrency)
async def _update_with_semaphore(stack_name: str) -> StackUpdateResult:
async with sem:
return await update_stack(cfn_client, stack_name, template_url, max_retries)
# Build a list of coroutines/futures in input order
coros: list = []
for stack in stacks:
if not stack.updatable:
# Wrap skipped results as a coroutine so gather() can handle them uniformly
async def _skipped(name: str = stack.name) -> StackUpdateResult:
return StackUpdateResult(stack_name=name, status="skipped")
coros.append(_skipped())
else:
coros.append(_update_with_semaphore(stack.name))
results: list[StackUpdateResult] = await asyncio.gather(*coros)
return list(results)
def format_dry_run_output(stacks: list[DiscoveredStack]) -> str:
"""Format discovered stacks for dry-run output. No updates are performed."""
if not stacks:
return "Dry-run: No stacks found."
lines = [f"Dry-run: {len(stacks)} stack(s) discovered:\n"]
for stack in stacks:
lines.append(f" {stack.name}{stack.status}")
return "\n".join(lines)