- Python CLI tool for rolling updates of CL-AppPipe-* and CL-SvcPipe-* stacks - Async update engine with configurable concurrency (asyncio.Semaphore) - Exponential backoff retry for API throttling - Dry-run mode for safe preview - IAM permission pre-validation - Comprehensive test suite (80 tests: 11 property-based + 69 unit) - Full spec documentation (requirements, design, tasks)
170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
"""Stack update engine for the CloudFormation Stack Updater."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
|
|
from botocore.exceptions import ClientError
|
|
|
|
from cfn_updater.config import BASE_RETRY_DELAY, DEFAULT_CONCURRENCY, MAX_RETRIES, TEMPLATE_URL
|
|
from cfn_updater.models import DiscoveredStack, StackUpdateResult
|
|
|
|
|
|
async def update_stack(
|
|
cfn_client,
|
|
stack_name: str,
|
|
template_url: str,
|
|
max_retries: int = MAX_RETRIES,
|
|
) -> StackUpdateResult:
|
|
"""
|
|
Updates a single CloudFormation stack.
|
|
|
|
Fetches current parameters, calls UpdateStack with UsePreviousValue=True
|
|
for all existing parameters and the configured template URL.
|
|
Handles 'No updates are to be performed' as no-update-needed.
|
|
Retries on throttling errors with exponential backoff.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
try:
|
|
# Fetch current stack parameters
|
|
describe_resp = cfn_client.describe_stacks(StackName=stack_name)
|
|
stacks = describe_resp.get("Stacks", [])
|
|
if not stacks:
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="failed",
|
|
error=f"Stack '{stack_name}' not found",
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
existing_params = stacks[0].get("Parameters", [])
|
|
# Build parameter list with UsePreviousValue=True
|
|
params = [
|
|
{"ParameterKey": p["ParameterKey"], "UsePreviousValue": True}
|
|
for p in existing_params
|
|
]
|
|
|
|
# Attempt UpdateStack with retry logic for throttling
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
cfn_client.update_stack(
|
|
StackName=stack_name,
|
|
TemplateURL=template_url,
|
|
Parameters=params,
|
|
UsePreviousTemplate=False,
|
|
Capabilities=["CAPABILITY_NAMED_IAM"],
|
|
)
|
|
# Update initiated successfully
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="succeeded",
|
|
duration_seconds=duration,
|
|
)
|
|
except ClientError as e:
|
|
error_code = e.response["Error"]["Code"]
|
|
error_message = e.response["Error"].get("Message", str(e))
|
|
|
|
# Handle "No updates are to be performed"
|
|
if "No updates are to be performed" in error_message:
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="no-update-needed",
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
# Handle throttling with exponential backoff
|
|
if error_code in ("Throttling", "RequestLimitExceeded") and attempt < max_retries:
|
|
delay = BASE_RETRY_DELAY * (2 ** attempt)
|
|
await asyncio.sleep(delay)
|
|
continue
|
|
|
|
# Non-retryable error or retries exhausted
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="failed",
|
|
error=error_message,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except ClientError as e:
|
|
# Error from describe_stacks
|
|
duration = time.monotonic() - start
|
|
error_message = e.response["Error"].get("Message", str(e))
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="failed",
|
|
error=error_message,
|
|
duration_seconds=duration,
|
|
)
|
|
except Exception as e:
|
|
# Catch-all for unexpected errors (network issues, etc.)
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="failed",
|
|
error=str(e),
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
# Should not reach here, but safety net if max_retries loop completes
|
|
# without returning (all attempts were throttled)
|
|
duration = time.monotonic() - start
|
|
return StackUpdateResult(
|
|
stack_name=stack_name,
|
|
status="failed",
|
|
error="Max retries exhausted due to throttling",
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
async def update_all_stacks(
|
|
cfn_client,
|
|
stacks: list[DiscoveredStack],
|
|
template_url: str,
|
|
concurrency: int = DEFAULT_CONCURRENCY,
|
|
max_retries: int = MAX_RETRIES,
|
|
) -> list[StackUpdateResult]:
|
|
"""
|
|
Updates all stacks with bounded concurrency using asyncio.Semaphore.
|
|
|
|
Non-updatable stacks are immediately recorded as skipped.
|
|
Updatable stacks are updated concurrently, bounded by the semaphore.
|
|
Results are returned in the same order as the input stacks list.
|
|
"""
|
|
sem = asyncio.Semaphore(concurrency)
|
|
|
|
async def _update_with_semaphore(stack_name: str) -> StackUpdateResult:
|
|
async with sem:
|
|
return await update_stack(cfn_client, stack_name, template_url, max_retries)
|
|
|
|
# Build a list of coroutines/futures in input order
|
|
coros: list = []
|
|
for stack in stacks:
|
|
if not stack.updatable:
|
|
# Wrap skipped results as a coroutine so gather() can handle them uniformly
|
|
async def _skipped(name: str = stack.name) -> StackUpdateResult:
|
|
return StackUpdateResult(stack_name=name, status="skipped")
|
|
|
|
coros.append(_skipped())
|
|
else:
|
|
coros.append(_update_with_semaphore(stack.name))
|
|
|
|
results: list[StackUpdateResult] = await asyncio.gather(*coros)
|
|
return list(results)
|
|
|
|
def format_dry_run_output(stacks: list[DiscoveredStack]) -> str:
|
|
"""Format discovered stacks for dry-run output. No updates are performed."""
|
|
if not stacks:
|
|
return "Dry-run: No stacks found."
|
|
|
|
lines = [f"Dry-run: {len(stacks)} stack(s) discovered:\n"]
|
|
for stack in stacks:
|
|
lines.append(f" {stack.name} — {stack.status}")
|
|
return "\n".join(lines)
|
|
|