diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index 32cdfbd7..c030c08b 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -115,6 +115,30 @@ "source": "./plugins/aws-amplify", "tags": ["aws", "amplify", "fullstack"], "version": "1.0.0" + }, + { + "category": "cost-optimization", + "description": "Analyze DynamoDB tables for cost optimization: capacity mode, table class, utilization, and unused GSI detection.", + "keywords": [ + "aws", + "aws agent skills", + "amazon", + "dynamodb", + "cost-optimization", + "capacity", + "on-demand", + "provisioned", + "table-class", + "gsi" + ], + "name": "dynamodb-cost-optimizer", + "source": "./plugins/dynamodb-cost-optimizer", + "tags": [ + "aws", + "dynamodb", + "cost-optimization" + ], + "version": "1.0.0" } ] } diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7d0aeeed..00166778 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -34,6 +34,7 @@ plugins/amazon-location-service @awslabs/agent-plugins-admins @awslabs/agent plugins/aws-amplify @awslabs/agent-plugins-admins @awslabs/agent-plugins-maintainers @awslabs/agent-plugins-amplify plugins/aws-serverless @awslabs/agent-plugins-admins @awslabs/agent-plugins-maintainers @awslabs/agent-plugins-aws-serverless plugins/deploy-on-aws @awslabs/agent-plugins-admins @awslabs/agent-plugins-maintainers @awslabs/agent-plugins-deploy-on-aws +plugins/dynamodb-cost-optimizer @awslabs/agent-plugins-admins @awslabs/agent-plugins-maintainers @LeeroyHannigan plugins/migration-to-aws @awslabs/agent-plugins-admins @awslabs/agent-plugins-maintainers @awslabs/agent-plugins-migrate-to-aws ## File must end with CODEOWNERS file diff --git a/.gitignore b/.gitignore index 3c6f9ca9..bb47e012 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,10 @@ build/ .DS_Store Thumbs.db +# Python +__pycache__/ +*.pyc + # IDE .idea/ .vscode/ diff --git a/README.md b/README.md index 8c306951..f0b8d09f 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ To maximize the benefits of plugin-assisted development while maintaining securi | **migration-to-aws** | Migrate GCP infrastructure to AWS with resource discovery, architecture mapping, cost analysis, and execution planning | Available | | **aws-amplify** | Build full-stack apps with AWS Amplify Gen 2 using guided workflows for auth, data, storage, and functions | Available | | **aws-serverless** | Build serverless applications with Lambda, API Gateway, EventBridge, Step Functions, and durable functions | Available | +| **dynamodb-cost-optimizer** | Analyze DynamoDB tables for cost optimization: capacity mode, table class, utilization, and unused GSI detection | Available | ## Installation @@ -48,6 +49,7 @@ To maximize the benefits of plugin-assisted development while maintaining securi ```bash /plugin install deploy-on-aws@agent-plugins-for-aws +/plugin install dynamodb-cost-optimizer@agent-plugins-for-aws ``` or @@ -199,6 +201,27 @@ Design, build, deploy, test, and debug serverless applications with AWS Lambda, | --------------------------- | --------------------------------------------- | --------------------------------------------- | | **SAM template validation** | After edits to `template.yaml`/`template.yml` | Runs `sam validate` and reports errors inline | +## dynamodb-cost-optimizer + +Analyzes DynamoDB tables for cost optimization opportunities across four dimensions: capacity mode (on-demand vs provisioned), table class (Standard vs Standard-IA), utilization right-sizing, and unused GSI detection. + +### Workflow + +1. **Discover** - List and describe DynamoDB tables in target region(s) +2. **Analyze** - Run all four analyzers in parallel with automatic pricing lookup +3. **Report** - Present savings recommendations in a formatted table + +### Agent Skill Triggers + +| Agent Skill | Triggers | +| --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | +| **optimize-dynamodb** | "optimize DynamoDB", "DynamoDB cost analysis", "reduce DynamoDB costs", "on-demand vs provisioned", "table class analysis", "find unused GSIs" | + +### Prerequisites + +- Python 3.9+ with boto3 +- AWS credentials with: `dynamodb:DescribeTable`, `dynamodb:ListTables`, `dynamodb:DescribeContinuousBackups`, `cloudwatch:GetMetricData`, `pricing:GetProducts`, `ce:GetCostAndUsage` + ## Requirements - Claude Code >=2.1.29 or [Cursor >= 2.5](https://cursor.com/changelog/2-5) diff --git a/plugins/dynamodb-cost-optimizer/.claude-plugin/plugin.json b/plugins/dynamodb-cost-optimizer/.claude-plugin/plugin.json new file mode 100644 index 00000000..f750c31f --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/.claude-plugin/plugin.json @@ -0,0 +1,22 @@ +{ + "author": { + "name": "Amazon Web Services" + }, + "description": "Analyze DynamoDB tables for cost optimization: capacity mode, table class, utilization, and unused GSI detection.", + "homepage": "https://github.com/awslabs/agent-plugins", + "keywords": [ + "aws", + "dynamodb", + "cost-optimization", + "capacity", + "provisioned", + "on-demand", + "table-class", + "utilization", + "gsi" + ], + "license": "Apache-2.0", + "name": "dynamodb-cost-optimizer", + "repository": "https://github.com/awslabs/agent-plugins", + "version": "1.0.0" +} diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/SKILL.md b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/SKILL.md new file mode 100644 index 00000000..12bb2099 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/SKILL.md @@ -0,0 +1,105 @@ +--- +name: optimize-dynamodb +description: "Analyze DynamoDB tables for cost optimization opportunities. Triggers on: optimize DynamoDB, DynamoDB cost analysis, reduce DynamoDB costs, DynamoDB capacity mode, on-demand vs provisioned, table class analysis, unused GSI, DynamoDB utilization, right-size DynamoDB." +license: Apache-2.0 +metadata: + tags: aws, dynamodb, cost-optimization, capacity, provisioned, on-demand, table-class, utilization, gsi + dependencies: python>=3.9, boto3 +--- + +# DynamoDB Cost Optimizer + +Scripts are fully self-contained — they fetch pricing, metrics, and costs from AWS +via boto3 and return only a small summary. Execute them, do NOT reimplement the logic. + +## Prerequisites + +Before running any scripts, detect the Python command: + +1. Run `python --version`. If it returns Python 3.9+, use `python` for all scripts. +2. If not, try `python3 --version`. If 3.9+, use `python3` for all scripts. +3. If neither works, tell the user to install Python 3. +4. Run ` -c "import boto3"`. If it fails, tell the user: `pip install boto3`. +5. AWS credentials configured with: `dynamodb:DescribeTable`, `dynamodb:ListTables`, + `dynamodb:DescribeContinuousBackups`, `cloudwatch:GetMetricData`, `pricing:GetProducts`, + `ce:GetCostAndUsage` + +## Workflow + +### Step 1: Region + +Ask user for AWS region(s). Default: `us-east-1`. Supports multiple regions. + +### Step 2: Run Analysis + +Use the batch script to analyze tables. It auto-discovers all tables when `tables` is omitted. +Pricing is fetched automatically per region — no need to pass it. + +Script: `scripts/analyze_all.py` + +IMPORTANT: Run the script from the user's current working directory using the absolute path +to the script. This ensures the report is saved locally. + +Example: +`python3 /path/to/skill/scripts/analyze_all.py '{"region":"REGION","days":14}'` + +All tables in a region: +`{"region":"REGION","days":14}` + +Specific tables: +`{"region":"REGION","tables":["table1","table2"],"days":14}` + +Multi-region: +`{"regions":{"us-east-1":["t1","t2"],"eu-west-1":["t3"]},"days":14}` + +This runs all four analyzers (capacity mode, table class, utilization, unused GSIs) +with parallel execution (10 concurrent by default). One command, one approval. + +Individual scripts are also available if the user only wants one type of analysis. +These require a `prices` object — use `scripts/get_pricing.py REGION` to fetch it first: + +- `scripts/capacity_mode.py` — Input: `{"region":"REGION","tableName":"TABLE","days":14,"prices":PRICING}` +- `scripts/table_class.py` — Input: `{"region":"REGION","tableName":"TABLE","days":14,"prices":PRICING}` +- `scripts/utilization.py` — Input: `{"region":"REGION","tableName":"TABLE","days":14,"prices":PRICING}` +- `scripts/unused_gsi.py` — Input: `{"region":"REGION","tableName":"TABLE","days":14}` + +### Step 3: Present Results + +The script outputs a summary line and saves the full report to `dynamodb-cost-report.md` +in the user's current working directory. + +DO NOT read or summarize the report file. Simply display the script's output, which +includes the summary and file path. The user can open the file themselves if needed. + +After displaying the output, ask if the user wants CLI commands for any recommendations. + +### Step 4: Generate Actions + +For accepted recommendations: + +```bash +# Switch to on-demand +aws dynamodb update-table --table-name TABLE --billing-mode PAY_PER_REQUEST + +# Switch to provisioned +aws dynamodb update-table --table-name TABLE --billing-mode PROVISIONED \ + --provisioned-throughput ReadCapacityUnits=RCU,WriteCapacityUnits=WCU + +# Change table class +aws dynamodb update-table --table-name TABLE --table-class STANDARD_INFREQUENT_ACCESS + +# Delete unused GSI +aws dynamodb update-table --table-name TABLE \ + --global-secondary-index-updates '[{"Delete":{"IndexName":"GSI_NAME"}}]' +``` + +DO NOT execute update commands without explicit user confirmation. + +## Error Handling + +- Script fails → show error output, DO NOT reimplement logic. +- Reserved capacity detected → table class script handles this, reports it. +- ON_DEMAND table → utilization script handles this, reports it. +- CloudWatch throttling → scripts retry with exponential backoff (up to 5 retries). +- Per-table errors → reported in the output, other tables still analyzed. +- AWS credentials missing → scripts exit with clear error message. diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/requirements.txt b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/requirements.txt new file mode 100644 index 00000000..30ddf823 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/requirements.txt @@ -0,0 +1 @@ +boto3 diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/analyze_all.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/analyze_all.py new file mode 100644 index 00000000..6a3857fd --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/analyze_all.py @@ -0,0 +1,219 @@ +"""Batch analyzer - runs all analyzers for multiple tables in parallel with formatted output. + +Usage: echo '{"region":"eu-west-1"}' | python analyze_all.py # all tables + echo '{"region":"eu-west-1","tables":["t1","t2"],"days":14}' | python analyze_all.py # specific tables + +Multi-region: echo '{"regions":{"eu-west-1":["t1"],"us-east-1":["t2"]},"days":14}' | python analyze_all.py +Optional: "concurrency": 10 (default 10) +""" +import json +import sys +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict, List + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from capacity_mode import analyze as analyze_capacity +from table_class import analyze as analyze_table_class +from utilization import analyze as analyze_utilization +from unused_gsi import analyze as analyze_unused_gsi +from get_pricing import get_pricing +from discover import discover + +MODE_LABELS = {'ON_DEMAND': 'On-Demand', 'PROVISIONED': 'Provisioned'} +CLASS_LABELS = {'STANDARD': 'Standard', 'STANDARD_INFREQUENT_ACCESS': 'Standard-IA'} + +def analyze_table(region: str, table_name: str, days: int, prices: Dict[str, float]) -> Dict[str, Any]: + entry = {'tableName': table_name, 'region': region, 'errors': []} + + # Fetch protection status once + try: + from config import get_client + ddb = get_client('dynamodb', region) + info = ddb.describe_table(TableName=table_name)['Table'] + entry['deletionProtection'] = info.get('DeletionProtectionEnabled', False) + try: + cb = ddb.describe_continuous_backups(TableName=table_name) + entry['pointInTimeRecovery'] = cb.get('ContinuousBackupsDescription', {}).get( + 'PointInTimeRecoveryDescription', {}).get('PointInTimeRecoveryStatus') == 'ENABLED' + except Exception: + entry['pointInTimeRecovery'] = False + except Exception: + entry['deletionProtection'] = False + entry['pointInTimeRecovery'] = False + + for key, fn, inp in [ + ('capacityMode', analyze_capacity, {'region': region, 'tableName': table_name, 'days': days, 'prices': prices}), + ('tableClass', analyze_table_class, {'region': region, 'tableName': table_name, 'days': days, 'prices': prices}), + ('utilization', analyze_utilization, {'region': region, 'tableName': table_name, 'days': days, 'prices': prices}), + ('unusedGsi', analyze_unused_gsi, {'region': region, 'tableName': table_name, 'days': days, 'prices': prices}), + ]: + try: + entry[key] = fn(inp) + except Exception as e: + entry[key] = {'error': str(e)} + entry['errors'].append(f"{key}: {e}") + return entry + +def format_results(days: int, results: List[Dict[str, Any]]) -> str: + recs = [] + optimized = [] + errors = [] + total_savings = 0.0 + + for t in results: + name = t['tableName'] + region = t.get('region', '') + label = f"{name} ({region})" if len(set(r.get('region', '') for r in results)) > 1 else name + + if t['errors']: + errors.append({'table': label, 'errors': t['errors']}) + + table_recs = [] + + cm = t.get('capacityMode', {}) + if cm.get('potentialMonthlySavings', 0) > 0: + table_recs.append({ + 'type': 'Billing Mode', + 'change': f"{MODE_LABELS.get(cm['currentMode'], cm['currentMode'])} → {MODE_LABELS.get(cm['recommendedMode'], cm['recommendedMode'])}", + 'savings': cm['potentialMonthlySavings'], + }) + + tc = t.get('tableClass', {}) + if tc.get('potentialMonthlySavings', 0) > 0: + table_recs.append({ + 'type': 'Table Class', + 'change': f"{CLASS_LABELS.get(tc['currentClass'], tc['currentClass'])} → {CLASS_LABELS.get(tc['recommendedClass'], tc['recommendedClass'])}", + 'savings': tc['potentialMonthlySavings'], + }) + + ut = t.get('utilization', {}) + for r in ut.get('recommendations', []): + if r.get('monthlySavings', 0) > 0: + rt = r['recommendationType'] + if rt == 'SWITCH_TO_ON_DEMAND': + desc = 'Switch to On-Demand (low utilization)' + else: + desc = f"Right-size (Read: {r.get('recommendedRead')}, Write: {r.get('recommendedWrite')})" + rtype = f"Utilization ({r['resourceName'].split('#')[-1]})" if r.get('resourceType') == 'GSI' else 'Utilization' + table_recs.append({'type': rtype, 'change': desc, 'savings': r['monthlySavings']}) + + gsi = t.get('unusedGsi', {}) + for g in gsi.get('unusedGSIs', []): + table_recs.append({ + 'type': 'Unused GSI', + 'change': f"Review {g['indexName']} (zero reads in {days} days — verify not needed)", + 'savings': g.get('monthlySavings', 0), + }) + + # Check protection status + if not t.get('deletionProtection', True): + table_recs.append({'type': 'Protection', 'change': 'Enable Deletion Protection', 'savings': 0}) + if not t.get('pointInTimeRecovery', True): + table_recs.append({'type': 'Protection', 'change': 'Enable Point-in-Time Recovery (PITR)', 'savings': 0}) + + if table_recs: + s = sum(r['savings'] for r in table_recs) + total_savings += s + recs.append({'table': label, 'recommendations': table_recs, 'totalSavings': s}) + else: + optimized.append(label) + + recs.sort(key=lambda x: x['totalSavings'], reverse=True) + + regions = sorted(set(r.get('region', '') for r in results)) + region_str = ', '.join(regions) if len(regions) > 1 else regions[0] if regions else '' + + lines = [] + lines.append(f"Region: {region_str} | Analysis: {days} days | Tables: {len(results)} | Savings: ${total_savings:,.2f}/month (${total_savings * 12:,.2f}/year)") + lines.append("") + + if recs: + # Calculate column widths + col_t = max(max((len(t['table']) for t in recs), default=5), 5) + col_r = max(max((len(f"{r['type']}: {r['change']}") for t in recs for r in t['recommendations']), default=14), 14) + col_s = 12 + + def row(a, b, c): + return f"│ {a:<{col_t}} │ {b:<{col_r}} │ {c:>{col_s}} │" + def sep(l, m, r, f='─'): + return f"{l}{f*(col_t+2)}{m}{f*(col_r+2)}{m}{f*(col_s+2)}{r}" + + lines.append(sep('┌', '┬', '┐')) + lines.append(row('Table', 'Recommendation', 'Savings')) + lines.append(sep('├', '┼', '┤')) + for idx, t in enumerate(recs): + if idx > 0: + lines.append(sep('├', '┼', '┤')) + first = True + for r in t['recommendations']: + tname = t['table'] if first else '' + sav = f"${r['savings']:,.2f}/mo" if r['savings'] > 0 else ('⚠ enable' if r['type'] == 'Protection' else 'cleanup') + lines.append(row(tname, f"{r['type']}: {r['change']}", sav)) + first = False + lines.append(sep('├', '┼', '┤')) + lines.append(row('TOTAL', '', f"${total_savings:,.2f}/mo")) + lines.append(sep('└', '┴', '┘')) + lines.append("") + + if optimized: + lines.append(f"Already optimized ({len(optimized)}): {', '.join(optimized)}") + lines.append("") + + if errors: + lines.append(f"Errors ({len(errors)} tables):") + for e in errors: + lines.append(f" {e['table']}: {'; '.join(e['errors'])}") + lines.append("") + + return '\n'.join(lines) + +def analyze_all(data: Dict[str, Any]) -> str: + # Support single-region or multi-region input + if 'regions' in data: + region_tables = data['regions'] + else: + region = data['region'] + tables = data.get('tables') + if not tables: + tables = [t['tableName'] for t in discover(region)] + region_tables = {region: tables} + + days = data.get('days', 14) + workers = data.get('concurrency', 10) + + # Auto-fetch pricing per region if not provided + prices_by_region: Dict[str, Dict[str, float]] = {} + for region in region_tables: + if 'prices' in data: + prices_by_region[region] = data['prices'] + else: + prices_by_region[region] = get_pricing(region) + + all_tasks = [] + for region, tables in region_tables.items(): + for table in tables: + all_tasks.append((region, table, prices_by_region[region])) + + results = [None] * len(all_tasks) + with ThreadPoolExecutor(max_workers=min(workers, len(all_tasks))) as ex: + futures = {ex.submit(analyze_table, r, t, days, p): i for i, (r, t, p) in enumerate(all_tasks)} + for f in as_completed(futures): + results[futures[f]] = f.result() + + report = format_results(days, results) + + import os + report_path = os.path.join(os.getcwd(), 'dynamodb-cost-report.md') + with open(report_path, 'w') as f: + f.write(f"# DynamoDB Cost Optimization Report\n\n```\n{report}\n```\n") + summary = report.split('\n')[0] + return f"{summary}\n\nFull report saved to: {report_path}" + +if __name__ == '__main__': + from config import parse_input, fail + data = parse_input() + if 'regions' not in data and 'region' not in data: + fail("Missing required field: 'region' or 'regions'") + print(analyze_all(data)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/autoscaling_sim.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/autoscaling_sim.py new file mode 100644 index 00000000..b2740b0f --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/autoscaling_sim.py @@ -0,0 +1,95 @@ +""" +DynamoDB Autoscaling Simulation. + +Simulates how DynamoDB autoscaling would provision capacity based on consumed +capacity metrics, enabling accurate cost comparison between On-Demand and +optimally-autoscaled Provisioned modes. + +Autoscaling Rules (per AWS documentation): +- Scale-Out: consumption > target utilization for 2 consecutive minutes +- Scale-In: consumption < (target - 20%) for 15 consecutive minutes +- First 4 scale-ins per day can happen anytime +- After 4 scale-ins, only 1 scale-in per hour +""" +import json +import sys +from typing import List + + +def simulate( + metrics: List[float], + target_utilization: float = 0.7, + min_cap: int = 1, + max_cap: int = 40000, +) -> List[float]: + """ + Simulate autoscaling on a list of per-minute consumed units/sec values. + + Args: + metrics: consumed units per second, one per minute + target_utilization: target % (0.7 = 70%) + min_cap: minimum provisioned capacity + max_cap: maximum provisioned capacity + + Returns: + list of simulated provisioned capacity values, one per minute + """ + n = len(metrics) + if n == 0: + return [] + + prov: List[float] = [0.0] * n + prov[0] = max(min_cap, min(metrics[0] / target_utilization, max_cap)) + + scale_in_count: int = 0 + last_scale_in_minute: int = -61 + + for i in range(1, n): + prov[i] = prov[i - 1] + + if i % 1440 == 0: + scale_in_count = 0 + + # Scale-out: 2 consecutive minutes above target + if i >= 2: + util_prev = metrics[i - 1] / prov[i - 1] if prov[i - 1] > 0 else 0 + util_curr = metrics[i] / prov[i] if prov[i] > 0 else 0 + if util_prev > target_utilization and util_curr > target_utilization: + needed = max(metrics[i - 1], metrics[i]) / target_utilization + prov[i] = min(max(needed, prov[i]), max_cap) + + # Scale-in: 15 consecutive minutes below (target - 20%) + if i >= 15: + scale_in_threshold: float = target_utilization - 0.2 + all_below: bool = True + peak: float = 0.0 + for j in range(i - 14, i + 1): + util = metrics[j] / prov[j] if prov[j] > 0 else 0 + if util >= scale_in_threshold: + all_below = False + break + peak = max(peak, metrics[j]) + + if all_below: + can_scale_in = ( + scale_in_count < 4 or (i - last_scale_in_minute) >= 60 + ) + if can_scale_in: + new_cap = max(min_cap, peak / target_utilization) + if new_cap * 1.2 < prov[i]: + prov[i] = new_cap + scale_in_count += 1 + last_scale_in_minute = i + + return prov + + +if __name__ == '__main__': + data = json.loads(sys.stdin.read()) + result = simulate( + data['metrics'], + data.get('targetUtilization', 0.7), + data.get('minCapacity', 1), + data.get('maxCapacity', 40000), + ) + print(json.dumps(result)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/capacity_mode.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/capacity_mode.py new file mode 100644 index 00000000..6184dda5 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/capacity_mode.py @@ -0,0 +1,104 @@ +"""Capacity mode analysis - uses GetMetricData batch API. + +Usage: echo '{"region":"eu-west-1","tableName":"my-table","days":14,"prices":{...}}' | python capacity_mode.py +""" +import json +import sys +import os +import boto3 +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from autoscaling_sim import simulate +from cw_batch import batch_get_metrics +from config import get_client, parse_input, validate_keys +from typing import Any, Dict, List + +def analyze(data: Dict[str, Any]) -> Dict[str, Any]: + region = data['region'] + table_name = data['tableName'] + days = data.get('days', 14) + prices = data['prices'] + + ddb = get_client('dynamodb', region) + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + + info = ddb.describe_table(TableName=table_name)['Table'] + mode = info.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED') + if mode == 'PAY_PER_REQUEST': + mode = 'ON_DEMAND' + cur_rcu = info.get('ProvisionedThroughput', {}).get('ReadCapacityUnits', 0) + cur_wcu = info.get('ProvisionedThroughput', {}).get('WriteCapacityUnits', 0) + + # Select pricing keys based on table class + from config import get_price_keys + pk = get_price_keys(info) + + # Single batch call for all metrics + metrics = batch_get_metrics(region, [ + {'id': 'cr', 'table': table_name, 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + {'id': 'cw', 'table': table_name, 'metric': 'ConsumedWriteCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], start, now) + + reads = metrics.get('cr', []) + writes = metrics.get('cw', []) + + total_r = sum(dp['value'] for dp in reads) + total_w = sum(dp['value'] for dp in writes) + + # On-demand cost + period_cost = Decimal(str(total_r)) * Decimal(str(prices[pk['read_req']])) + \ + Decimal(str(total_w)) * Decimal(str(prices[pk['write_req']])) + od_cost = (period_cost / Decimal(str(days))) * Decimal('30.4') + + # Current provisioned cost + cur_prov_cost = (Decimal(str(cur_rcu)) * Decimal('730') * Decimal(str(prices[pk['rcu']]))) + \ + (Decimal(str(cur_wcu)) * Decimal('730') * Decimal(str(prices[pk['wcu']]))) + + # Autoscaling simulation + read_ups = [dp['value'] / 300.0 for dp in reads] + write_ups = [dp['value'] / 300.0 for dp in writes] + sim_r = simulate(read_ups) if read_ups else [] + sim_w = simulate(write_ups) if write_ups else [] + + if sim_r and sim_w: + avg_sim_rcu = sum(sim_r) / len(sim_r) + avg_sim_wcu = sum(sim_w) / len(sim_w) + optimal_cost = (Decimal(str(avg_sim_rcu)) * Decimal('730') * Decimal(str(prices[pk['rcu']]))) + \ + (Decimal(str(avg_sim_wcu)) * Decimal('730') * Decimal(str(prices[pk['wcu']]))) + else: + optimal_cost = cur_prov_cost + + if total_r == 0 and total_w == 0: + rec = 'ON_DEMAND' + else: + rec = 'ON_DEMAND' if od_cost < optimal_cost else 'PROVISIONED' + + current_cost = cur_prov_cost if mode == 'PROVISIONED' else od_cost + savings = max(Decimal('0'), current_cost - min(od_cost, optimal_cost)) + + result = { + 'tableName': table_name, + 'currentMode': mode, + 'recommendedMode': rec, + 'currentMonthlyCost': float(current_cost), + 'onDemandMonthlyCost': float(od_cost), + 'currentProvisionedMonthlyCost': float(cur_prov_cost), + 'optimalProvisionedMonthlyCost': float(optimal_cost), + 'potentialMonthlySavings': float(savings), + 'savingsPercentage': float(savings / current_cost * 100) if current_cost > 0 else 0, + 'analysisDays': days, + } + if rec == 'PROVISIONED' and sim_r and sim_w: + result['recommendedMinRead'] = max(1, int(min(sim_r))) + result['recommendedMaxRead'] = int(max(sim_r)) + result['recommendedMinWrite'] = max(1, int(min(sim_w))) + result['recommendedMaxWrite'] = int(max(sim_w)) + return result + +if __name__ == '__main__': + data = parse_input() + validate_keys(data, ['region', 'tableName', 'prices']) + print(json.dumps(analyze(data), indent=2, default=str)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/config.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/config.py new file mode 100644 index 00000000..cfee3085 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/config.py @@ -0,0 +1,83 @@ +"""Shared configuration and utilities for all analyzer scripts.""" +import json +import sys +from typing import Any, Dict, List, NoReturn + +import boto3 +from botocore.exceptions import NoCredentialsError, ClientError +from decimal import Decimal + +# Storage-to-throughput cost ratio thresholds (from AWS pricing structure) +# Standard storage: $0.25/GB, Standard-IA storage: $0.10/GB (60% cheaper) +# Standard-IA throughput: ~2.5x more expensive than Standard +# Switch Standard → IA when storage cost dominates throughput cost +STANDARD_TO_IA_RATIO: Decimal = Decimal('0.25') / Decimal('0.6') # ≈0.417 + +# Switch IA → Standard when throughput cost dominates storage cost +IA_TO_STANDARD_RATIO: Decimal = Decimal('0.2') / Decimal('1.5') # ≈0.133 + +# Utilization below this % triggers right-sizing recommendation +UTILIZATION_THRESHOLD: int = 45 + +# Below this %, recommend switching to On-Demand entirely +ON_DEMAND_THRESHOLD: int = 30 + +# Minimum monthly savings to surface a recommendation (USD) +MIN_SAVINGS: Decimal = Decimal('1.0') + +# Autoscaling simulation target utilization +AUTOSCALE_TARGET: float = 0.7 + +# Default analysis window +DEFAULT_DAYS: int = 14 + +# Maximum analysis window (CloudWatch retains 15 months of data) +MAX_DAYS: int = 90 + +# Parallel workers for batch analysis +CONCURRENT_WORKERS: int = 10 + + +def get_client(service: str, region: str) -> Any: + """Create a boto3 client with credential error handling.""" + try: + return boto3.client(service, region_name=region) + except NoCredentialsError: + fail('AWS credentials not configured. Run `aws configure` or set AWS_PROFILE.') + except ClientError as e: + fail(f'AWS client error: {e}') + + +def parse_input() -> Dict[str, Any]: + """Parse JSON from argv[1] or stdin with validation.""" + try: + raw = sys.argv[1] if len(sys.argv) > 1 else sys.stdin.read() + data = json.loads(raw) + except (json.JSONDecodeError, IndexError) as e: + fail(f'Invalid JSON input: {e}') + return data + + +def validate_keys(data: Dict[str, Any], required: List[str]) -> None: + """Validate required keys exist in data dict.""" + missing = [k for k in required if k not in data] + if missing: + fail(f"Missing required fields: {', '.join(missing)}") + if 'days' in data: + data['days'] = max(1, min(int(data['days']), MAX_DAYS)) + + +def get_price_keys(table_info: Dict[str, Any]) -> Dict[str, str]: + """Return pricing dict keys appropriate for the table's class.""" + tc = table_info.get('TableClassSummary', {}).get('TableClass', 'STANDARD') + if tc == 'STANDARD_INFREQUENT_ACCESS': + return {'rcu': 'ia_rcu_hour', 'wcu': 'ia_wcu_hour', + 'read_req': 'ia_read', 'write_req': 'ia_write'} + return {'rcu': 'rcu_hour', 'wcu': 'wcu_hour', + 'read_req': 'read_request', 'write_req': 'write_request'} + + +def fail(message: str) -> NoReturn: + """Print error JSON and exit.""" + print(json.dumps({'error': message})) + sys.exit(1) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/cw_batch.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/cw_batch.py new file mode 100644 index 00000000..2627ee68 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/cw_batch.py @@ -0,0 +1,100 @@ +"""Shared CloudWatch helper using GetMetricData batch API with throttle handling.""" +import time +from typing import Any, Dict, List + +from botocore.exceptions import ClientError +from datetime import datetime + +import os +import sys +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from config import get_client + + +def batch_get_metrics( + region: str, + queries: List[Dict[str, Any]], + start: datetime, + end: datetime, +) -> Dict[str, List[Dict[str, Any]]]: + """ + Fetch multiple metrics in a single GetMetricData call. + + Args: + region: AWS region + queries: list of dicts with keys: id, table, metric, period, stat + optional: gsi (GSI name) + start: start datetime + end: end datetime + + Returns: + dict mapping query id → list of {timestamp, value} + """ + cw = get_client('cloudwatch', region) + + metric_queries: List[Dict[str, Any]] = [] + for q in queries: + dims: List[Dict[str, str]] = [{'Name': 'TableName', 'Value': q['table']}] + if q.get('gsi'): + dims.append({'Name': 'GlobalSecondaryIndexName', 'Value': q['gsi']}) + + metric_queries.append({ + 'Id': q['id'], + 'MetricStat': { + 'Metric': { + 'Namespace': 'AWS/DynamoDB', + 'MetricName': q['metric'], + 'Dimensions': dims, + }, + 'Period': q['period'], + 'Stat': q['stat'], + }, + 'ReturnData': True, + }) + + results: Dict[str, List[Dict[str, Any]]] = {} + + for i in range(0, len(metric_queries), 500): + batch = metric_queries[i:i+500] + next_token = None # type: str | None + + while True: + params: Dict[str, Any] = { + 'MetricDataQueries': batch, + 'StartTime': start, + 'EndTime': end, + } + if next_token: + params['NextToken'] = next_token + + resp = _call_with_retry(cw.get_metric_data, **params) + + for r in resp.get('MetricDataResults', []): + qid: str = r['Id'] + if qid not in results: + results[qid] = [] + for ts, val in zip(r.get('Timestamps', []), r.get('Values', [])): + results[qid].append({'timestamp': ts, 'value': val}) + + next_token = resp.get('NextToken') + if not next_token: + break + + for qid in results: + results[qid].sort(key=lambda x: x['timestamp']) + + return results + + +def _call_with_retry(fn: Any, max_retries: int = 5, **kwargs: Any) -> Dict[str, Any]: + """Call with exponential backoff on throttling.""" + for attempt in range(max_retries): + try: + return fn(**kwargs) + except ClientError as e: + code = e.response['Error']['Code'] + if code in ('Throttling', 'ThrottlingException') and attempt < max_retries - 1: + time.sleep(2 ** attempt) + else: + raise + return {} # unreachable, satisfies type checker diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/discover.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/discover.py new file mode 100644 index 00000000..1b4794ee --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/discover.py @@ -0,0 +1,57 @@ +"""DynamoDB table discovery. + +Usage: + python discover.py REGION # all tables + python discover.py REGION my-table # single table + python discover.py REGION table-1 table-2 table-3 # specific tables +""" +import json +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from config import get_client +from typing import Any, Dict, List, Optional + +def discover(region: str, table_names: Optional[List[str]] = None) -> List[Dict[str, Any]]: + ddb = get_client('dynamodb', region) + + if table_names is None: + table_names = [] + for page in ddb.get_paginator('list_tables').paginate(): + table_names.extend(page['TableNames']) + + tables = [] + for name in table_names: + try: + t = ddb.describe_table(TableName=name)['Table'] + billing = t.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED') + if billing == 'PAY_PER_REQUEST': + billing = 'ON_DEMAND' + pitr = False + try: + cb = ddb.describe_continuous_backups(TableName=name) + pitr = cb.get('ContinuousBackupsDescription', {}).get( + 'PointInTimeRecoveryDescription', {}).get('PointInTimeRecoveryStatus') == 'ENABLED' + except Exception: # nosec B110 - PITR check is best-effort; missing permissions shouldn't block discovery + pass + tables.append({ + 'tableName': name, + 'billingMode': billing, + 'tableClass': t.get('TableClassSummary', {}).get('TableClass', 'STANDARD'), + 'deletionProtection': t.get('DeletionProtectionEnabled', False), + 'pointInTimeRecovery': pitr, + 'itemCount': t.get('ItemCount', 0), + 'tableSizeBytes': t.get('TableSizeBytes', 0), + 'provisionedRead': t.get('ProvisionedThroughput', {}).get('ReadCapacityUnits', 0), + 'provisionedWrite': t.get('ProvisionedThroughput', {}).get('WriteCapacityUnits', 0), + 'gsiCount': len(t.get('GlobalSecondaryIndexes', [])), + }) + except Exception as e: + tables.append({'tableName': name, 'error': str(e)}) + return tables + +if __name__ == '__main__': + region = sys.argv[1] if len(sys.argv) > 1 else 'us-east-1' + names = sys.argv[2:] if len(sys.argv) > 2 else None + result = discover(region, names) + print(json.dumps({'tables': result, 'count': len(result)}, indent=2)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/get_pricing.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/get_pricing.py new file mode 100644 index 00000000..08d8f3fb --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/get_pricing.py @@ -0,0 +1,90 @@ +"""Fetch all DynamoDB pricing for a region via boto3 Pricing API.""" +import json +import sys +import os +from typing import Any, Dict + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from config import get_client + +def get_pricing(region: str) -> Dict[str, float]: + """Fetch DynamoDB pricing. Pricing API is always in us-east-1.""" + pricing = get_client('pricing', 'us-east-1') + prices: Dict[str, float] = {} + + for family, mappings in [ + ('Amazon DynamoDB PayPerRequest Throughput', { + 'DDB-WriteUnits': 'write_request', + 'DDB-ReadUnits': 'read_request', + 'DDB-WriteUnitsIA': 'ia_write', + 'DDB-ReadUnitsIA': 'ia_read', + }), + ('Provisioned IOPS', { + 'DDB-WriteUnits': 'wcu_hour', + 'DDB-ReadUnits': 'rcu_hour', + 'DDB-WriteUnitsIA': 'ia_wcu_hour', + 'DDB-ReadUnitsIA': 'ia_rcu_hour', + }), + ('Database Storage', {}), + ]: + next_token = None + while True: + params: Dict[str, Any] = { + 'ServiceCode': 'AmazonDynamoDB', + 'Filters': [ + {'Type': 'TERM_MATCH', 'Field': 'regionCode', 'Value': region}, + {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': family}, + ], + 'MaxResults': 100, + } + if next_token: + params['NextToken'] = next_token + + resp = pricing.get_products(**params) + + for item in resp['PriceList']: + data = json.loads(item) + attrs = data.get('product', {}).get('attributes', {}) + group = attrs.get('group', '') + usage = attrs.get('usagetype', '') + vol = attrs.get('volumeType', '') + + for term in data.get('terms', {}).get('OnDemand', {}).values(): + for dim in term.get('priceDimensions', {}).values(): + p = float(dim['pricePerUnit']['USD']) + if p <= 0: + continue + + # On-demand / provisioned + for key, name in mappings.items(): + if (group == key or usage == key) and name not in prices: + prices[name] = p + + # Storage + if family == 'Database Storage': + if '- IA' in vol and 'ia_storage' not in prices: + prices['ia_storage'] = p + elif '- IA' not in vol and 'standard_storage' not in prices: + prices['standard_storage'] = p + + next_token = resp.get('NextToken') + if not next_token: + break + + required = ['read_request', 'write_request', 'rcu_hour', 'wcu_hour', 'standard_storage'] + missing = [k for k in required if k not in prices] + if missing: + from config import fail + fail(f"Could not fetch pricing for: {', '.join(missing)} in {region}") + + # Aliases + prices.setdefault('standard_read', prices['read_request']) + prices.setdefault('standard_write', prices['write_request']) + prices.setdefault('on_demand_read', prices['read_request']) + prices.setdefault('on_demand_write', prices['write_request']) + + return prices + +if __name__ == '__main__': + region = sys.argv[1] if len(sys.argv) > 1 else 'us-east-1' + print(json.dumps(get_pricing(region), indent=2)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/table_class.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/table_class.py new file mode 100644 index 00000000..982c1732 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/table_class.py @@ -0,0 +1,125 @@ +"""Table class analysis - uses GetMetricData batch API. + +Usage: echo '{"region":"eu-west-1","tableName":"my-table","days":14,"prices":{...}}' | python table_class.py +""" +import json +import sys +import os +import boto3 +from datetime import datetime, timedelta, timezone +from decimal import Decimal + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from cw_batch import batch_get_metrics +from config import STANDARD_TO_IA_RATIO, IA_TO_STANDARD_RATIO, MIN_SAVINGS, get_client, parse_input, validate_keys +from typing import Any, Dict, Optional + +def analyze(data: Dict[str, Any]) -> Dict[str, Any]: + region = data['region'] + table_name = data['tableName'] + days = data.get('days', 14) + prices = data.get('prices') + min_savings = Decimal(str(data.get('minMonthlySavings', MIN_SAVINGS))) + + if not prices: + return {'tableName': table_name, 'error': 'prices object is required'} + + ddb = get_client('dynamodb', region) + info = ddb.describe_table(TableName=table_name)['Table'] + current_class = info.get('TableClassSummary', {}).get('TableClass', 'STANDARD') + size_gb = info.get('TableSizeBytes', 0) / (1024 ** 3) + + reserved = _check_reserved_capacity(region) + if reserved: + return {'tableName': table_name, 'currentClass': current_class, + 'recommendedClass': current_class, 'potentialMonthlySavings': 0.0, + 'note': 'Account uses DynamoDB reserved capacity — estimate may differ'} + + note = None + if reserved is None: + note = 'Could not verify reserved capacity status — savings estimate may differ' + + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + + metrics = batch_get_metrics(region, [ + {'id': 'cr', 'table': table_name, 'metric': 'ConsumedReadCapacityUnits', 'period': 86400, 'stat': 'Sum'}, + {'id': 'cw', 'table': table_name, 'metric': 'ConsumedWriteCapacityUnits', 'period': 86400, 'stat': 'Sum'}, + ], start, now) + + total_reads = sum(dp['value'] for dp in metrics.get('cr', [])) + total_writes = sum(dp['value'] for dp in metrics.get('cw', [])) + + storage_cost = Decimal(str(size_gb * prices['standard_storage'])) + scale = Decimal('30.4') / Decimal(str(days)) + throughput_cost = (Decimal(str(total_reads * prices['standard_read'])) + + Decimal(str(total_writes * prices['standard_write']))) * scale + + total = storage_cost + throughput_cost + if total == 0: + return {'tableName': table_name, 'currentClass': current_class, + 'recommendedClass': current_class, 'potentialMonthlySavings': 0.0} + + ratio = storage_cost / throughput_cost if throughput_cost > Decimal('0.01') else Decimal('999.99') + + rec = current_class + savings = Decimal('0') + + if current_class == 'STANDARD': + if ratio > STANDARD_TO_IA_RATIO or (throughput_cost <= Decimal('0.01') and storage_cost > Decimal('1.0')): + proj_s = storage_cost * Decimal('0.4') + proj_t = throughput_cost * Decimal('2.5') + savings = total - (proj_s + proj_t) + rec = 'STANDARD_INFREQUENT_ACCESS' if savings >= min_savings else current_class + if rec == current_class: + savings = Decimal('0') + else: + if ratio < IA_TO_STANDARD_RATIO: + proj_s = storage_cost * Decimal('2.5') + proj_t = throughput_cost * Decimal('0.4') + savings = total - (proj_s + proj_t) + rec = 'STANDARD' if savings >= min_savings else current_class + if rec == current_class: + savings = Decimal('0') + + result = { + 'tableName': table_name, + 'currentClass': current_class, + 'recommendedClass': rec, + 'monthlyStorageCost': float(storage_cost), + 'monthlyThroughputCost': float(throughput_cost), + 'potentialMonthlySavings': float(savings), + 'storageToThroughputRatio': float(ratio), + 'analysisDays': days, + } + if note: + result['note'] = note + return result + +def _check_reserved_capacity(region: str) -> Optional[bool]: + """Check for reserved capacity. Returns True/False, or None if check failed.""" + try: + ce = boto3.client('ce', region_name='us-east-1') + now = datetime.now(timezone.utc) + resp = ce.get_cost_and_usage( + TimePeriod={'Start': (now - timedelta(days=30)).strftime('%Y-%m-%d'), + 'End': now.strftime('%Y-%m-%d')}, + Granularity='MONTHLY', Metrics=['UnblendedCost'], + Filter={'And': [ + {'Dimensions': {'Key': 'SERVICE', 'Values': ['Amazon DynamoDB']}}, + {'Dimensions': {'Key': 'REGION', 'Values': [region]}}, + ]}, + GroupBy=[{'Type': 'DIMENSION', 'Key': 'USAGE_TYPE'}], + ) + for period in resp.get('ResultsByTime', []): + for group in period.get('Groups', []): + if 'Commit' in group['Keys'][0]: + return True + except Exception: + return None + return False + +if __name__ == '__main__': + data = parse_input() + validate_keys(data, ['region', 'tableName', 'prices']) + print(json.dumps(analyze(data), indent=2, default=str)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/unused_gsi.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/unused_gsi.py new file mode 100644 index 00000000..d0b51f03 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/unused_gsi.py @@ -0,0 +1,82 @@ +"""Unused GSI detection - uses GetMetricData batch API. + +Usage: echo '{"region":"eu-west-1","tableName":"my-table","days":14,"prices":{...}}' | python unused_gsi.py +""" +import json +import sys +import os +import boto3 +from datetime import datetime, timedelta, timezone + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from cw_batch import batch_get_metrics +from config import get_client, get_price_keys, parse_input, validate_keys +from typing import Any, Dict + +def analyze(data: Dict[str, Any]) -> Dict[str, Any]: + region = data['region'] + table_name = data['tableName'] + days = data.get('days', 14) + prices = data.get('prices') + + ddb = get_client('dynamodb', region) + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + + info = ddb.describe_table(TableName=table_name)['Table'] + gsis = info.get('GlobalSecondaryIndexes', []) + billing = info.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED') + is_on_demand = billing == 'PAY_PER_REQUEST' + pk = get_price_keys(info) if prices else None + + if not gsis: + return {'tableName': table_name, 'hasGSIs': False, 'unusedGSIs': [], 'analysisDays': days} + + queries = [] + for i, gsi in enumerate(gsis): + queries.append({'id': f'r{i}', 'table': table_name, 'gsi': gsi['IndexName'], + 'metric': 'ConsumedReadCapacityUnits', 'period': 86400, 'stat': 'Sum'}) + if is_on_demand: + queries.append({'id': f'w{i}', 'table': table_name, 'gsi': gsi['IndexName'], + 'metric': 'ConsumedWriteCapacityUnits', 'period': 86400, 'stat': 'Sum'}) + else: + queries.append({'id': f'pr{i}', 'table': table_name, 'gsi': gsi['IndexName'], + 'metric': 'ProvisionedReadCapacityUnits', 'period': 86400, 'stat': 'Average'}) + queries.append({'id': f'pw{i}', 'table': table_name, 'gsi': gsi['IndexName'], + 'metric': 'ProvisionedWriteCapacityUnits', 'period': 86400, 'stat': 'Average'}) + + metrics = batch_get_metrics(region, queries, start, now) + + unused = [] + total_savings = 0.0 + for i, gsi in enumerate(gsis): + total_reads = sum(dp['value'] for dp in metrics.get(f'r{i}', [])) + if total_reads > 0: + continue + + savings = 0.0 + if prices and pk: + if is_on_demand: + total_writes = sum(dp['value'] for dp in metrics.get(f'w{i}', [])) + savings = (total_writes * prices.get(pk['write_req'], 0) / days) * 30.4 + else: + pr = metrics.get(f'pr{i}', []) + pw = metrics.get(f'pw{i}', []) + avg_r = sum(dp['value'] for dp in pr) / len(pr) if pr else 0 + avg_w = sum(dp['value'] for dp in pw) / len(pw) if pw else 0 + savings = (avg_r * prices.get(pk['rcu'], 0) + avg_w * prices.get(pk['wcu'], 0)) * 730 + + total_savings += savings + entry: Dict[str, Any] = {'indexName': gsi['IndexName'], 'monthlySavings': round(savings, 2)} + unused.append(entry) + + return { + 'tableName': table_name, 'hasGSIs': True, + 'totalGSIs': len(gsis), 'unusedGSIs': unused, + 'totalMonthlySavings': round(total_savings, 2), 'analysisDays': days, + } + +if __name__ == '__main__': + data = parse_input() + validate_keys(data, ['region', 'tableName']) + print(json.dumps(analyze(data), indent=2, default=str)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/utilization.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/utilization.py new file mode 100644 index 00000000..79bac09d --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/scripts/utilization.py @@ -0,0 +1,120 @@ +"""Utilization analysis - uses GetMetricData batch API for table + GSIs. + +Usage: echo '{"region":"eu-west-1","tableName":"my-table","days":14,"prices":{...}}' | python utilization.py +""" +import json +import sys +import os +import boto3 +from datetime import datetime, timedelta, timezone + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from cw_batch import batch_get_metrics +from config import UTILIZATION_THRESHOLD, ON_DEMAND_THRESHOLD, get_client, get_price_keys, parse_input, validate_keys +from typing import Any, Dict + +# Seconds per month (30.4 days) — converts avg units/sec to monthly request units +SECONDS_PER_MONTH: float = 30.4 * 86400 # 2,626,560 + +# Hours per month for provisioned cost +HOURS_PER_MONTH: int = 730 + +def analyze(data: Dict[str, Any]) -> Dict[str, Any]: + region = data['region'] + table_name = data['tableName'] + days = data.get('days', 14) + prices = data['prices'] + threshold = data.get('utilizationThreshold', UTILIZATION_THRESHOLD) + + ddb = get_client('dynamodb', region) + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + + info = ddb.describe_table(TableName=table_name)['Table'] + billing = info.get('BillingModeSummary', {}).get('BillingMode', 'PROVISIONED') + if billing == 'PAY_PER_REQUEST': + return {'tableName': table_name, 'billingMode': 'ON_DEMAND', + 'message': 'Utilization analysis only applies to PROVISIONED tables'} + + pk = get_price_keys(info) + + # Build queries for table + all GSIs in one batch + resources = [{ + 'name': table_name, 'type': 'TABLE', + 'provR': info['ProvisionedThroughput']['ReadCapacityUnits'], + 'provW': info['ProvisionedThroughput']['WriteCapacityUnits'], + }] + for gsi in info.get('GlobalSecondaryIndexes', []): + resources.append({ + 'name': f"{table_name}#{gsi['IndexName']}", 'type': 'GSI', + 'gsi': gsi['IndexName'], + 'provR': gsi.get('ProvisionedThroughput', {}).get('ReadCapacityUnits', 0), + 'provW': gsi.get('ProvisionedThroughput', {}).get('WriteCapacityUnits', 0), + }) + + queries = [] + for i, res in enumerate(resources): + base = {'table': table_name, 'period': 300} + if res.get('gsi'): + base['gsi'] = res['gsi'] + queries.append({**base, 'id': f'r{i}', 'metric': 'ConsumedReadCapacityUnits', 'stat': 'Sum'}) + queries.append({**base, 'id': f'w{i}', 'metric': 'ConsumedWriteCapacityUnits', 'stat': 'Sum'}) + queries.append({**base, 'id': f'rm{i}', 'metric': 'ConsumedReadCapacityUnits', 'stat': 'Maximum'}) + queries.append({**base, 'id': f'wm{i}', 'metric': 'ConsumedWriteCapacityUnits', 'stat': 'Maximum'}) + + metrics = batch_get_metrics(region, queries, start, now) + + results = [] + total_savings = 0 + + for i, res in enumerate(resources): + cr = metrics.get(f'r{i}', []) + cw = metrics.get(f'w{i}', []) + cr_max = metrics.get(f'rm{i}', []) + cw_max = metrics.get(f'wm{i}', []) + + avg_r = sum(dp['value'] / 300 for dp in cr) / len(cr) if cr else 0 + avg_w = sum(dp['value'] / 300 for dp in cw) / len(cw) if cw else 0 + max_r = max((dp['value'] for dp in cr_max), default=0) + max_w = max((dp['value'] for dp in cw_max), default=0) + + r_util = (avg_r / res['provR'] * 100) if res['provR'] > 0 else 0 + w_util = (avg_w / res['provW'] * 100) if res['provW'] > 0 else 0 + + if r_util >= threshold and w_util >= threshold: + continue + + if r_util < ON_DEMAND_THRESHOLD and w_util < ON_DEMAND_THRESHOLD: + rec_type = 'SWITCH_TO_ON_DEMAND' + current = (res['provR'] * prices[pk['rcu']] + res['provW'] * prices[pk['wcu']]) * HOURS_PER_MONTH + od = (avg_r * SECONDS_PER_MONTH * prices.get(pk['read_req'], 0)) + \ + (avg_w * SECONDS_PER_MONTH * prices.get(pk['write_req'], 0)) + sav = max(0, current - od) + rec_r, rec_w = None, None + else: + rec_type = 'REDUCE_CAPACITY' + rec_r = max(5, int(max_r * 1.2)) if r_util < threshold else res['provR'] + rec_w = max(5, int(max_w * 1.2)) if w_util < threshold else res['provW'] + sav = max(0, (res['provR'] - rec_r) * prices[pk['rcu']] * HOURS_PER_MONTH) + \ + max(0, (res['provW'] - rec_w) * prices[pk['wcu']] * HOURS_PER_MONTH) + + results.append({ + 'resourceName': res['name'], 'resourceType': res['type'], + 'readUtilization': round(r_util, 1), 'writeUtilization': round(w_util, 1), + 'recommendationType': rec_type, + 'recommendedRead': rec_r, 'recommendedWrite': rec_w, + 'monthlySavings': round(sav, 2), + }) + total_savings += sav + + return { + 'tableName': table_name, + 'recommendations': results, + 'totalMonthlySavings': round(total_savings, 2), + 'analysisDays': days, + } + +if __name__ == '__main__': + data = parse_input() + validate_keys(data, ['region', 'tableName', 'prices']) + print(json.dumps(analyze(data), indent=2, default=str)) diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_analyzers.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_analyzers.py new file mode 100644 index 00000000..e020c547 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_analyzers.py @@ -0,0 +1,387 @@ +"""Test suite for DynamoDB Cost Optimizer scripts.""" +import json +import sys +import os +import unittest +from unittest.mock import patch, MagicMock +from decimal import Decimal +from datetime import datetime, timezone, timedelta + +# Add scripts to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts')) + +from autoscaling_sim import simulate + +# Shared test pricing +PRICES = { + 'read_request': 0.00000025, 'write_request': 0.00000125, + 'rcu_hour': 0.00013, 'wcu_hour': 0.00065, + 'ia_read': 0.00000031, 'ia_write': 0.00000156, + 'ia_rcu_hour': 0.00016, 'ia_wcu_hour': 0.00081, + 'standard_read': 0.00000025, 'standard_write': 0.00000125, + 'on_demand_read': 0.00000025, 'on_demand_write': 0.00000125, + 'standard_storage': 0.25, 'ia_storage': 0.10, +} + +def mock_describe_table(billing='PROVISIONED', rcu=100, wcu=50, size_bytes=0, + table_class='STANDARD', gsis=None): + """Build a mock describe_table response.""" + resp = { + 'Table': { + 'TableArn': 'arn:aws:dynamodb:us-east-1:123:table/test', + 'TableStatus': 'ACTIVE', + 'BillingModeSummary': {'BillingMode': billing}, + 'ProvisionedThroughput': {'ReadCapacityUnits': rcu, 'WriteCapacityUnits': wcu}, + 'TableSizeBytes': size_bytes, + 'TableClassSummary': {'TableClass': table_class}, + 'GlobalSecondaryIndexes': gsis or [], + } + } + return resp + +def mock_batch_metrics(metric_map): + """Build mock return for batch_get_metrics. metric_map: {id: [(value, ts_offset_min), ...]}""" + base = datetime(2025, 1, 15, tzinfo=timezone.utc) + result = {} + for qid, points in metric_map.items(): + result[qid] = [{'timestamp': base + timedelta(minutes=offset), 'value': val} + for val, offset in points] + return result + + +class TestAutoscalingSim(unittest.TestCase): + + def test_empty_metrics(self): + self.assertEqual(simulate([]), []) + + def test_constant_load(self): + metrics = [10.0] * 100 + prov = simulate(metrics, target_utilization=0.7) + self.assertEqual(len(prov), 100) + # Should provision above consumed to hit 70% target + self.assertGreater(prov[0], 10.0) + + def test_scale_out_on_spike(self): + metrics = [5.0] * 20 + [50.0] * 10 + [5.0] * 20 + prov = simulate(metrics, target_utilization=0.7) + # After spike, provisioned should increase + self.assertGreater(max(prov[20:30]), max(prov[0:5])) + + def test_scale_in_after_drop(self): + metrics = [50.0] * 5 + [1.0] * 100 + prov = simulate(metrics, target_utilization=0.7) + # After sustained low usage, should scale in + self.assertLess(prov[-1], prov[5]) + + def test_respects_min_capacity(self): + metrics = [0.001] * 50 + prov = simulate(metrics, min_cap=5) + self.assertTrue(all(p >= 5 for p in prov)) + + def test_respects_max_capacity(self): + metrics = [99999.0] * 50 + prov = simulate(metrics, max_cap=100) + self.assertTrue(all(p <= 100 for p in prov)) + + def test_daily_scale_in_reset(self): + # 1440 minutes = 1 day, scale-in count should reset + metrics = [50.0] * 5 + [0.1] * 1500 + prov = simulate(metrics, target_utilization=0.7) + self.assertEqual(len(prov), 1505) + + +class TestCapacityMode(unittest.TestCase): + + @patch('capacity_mode.batch_get_metrics') + @patch('capacity_mode.get_client') + def test_zero_usage_recommends_on_demand(self, mock_gc, mock_cw): + from capacity_mode import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table() + mock_cw.return_value = mock_batch_metrics({'cr': [], 'cw': []}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['recommendedMode'], 'ON_DEMAND') + self.assertEqual(result['currentMode'], 'PROVISIONED') + + @patch('capacity_mode.batch_get_metrics') + @patch('capacity_mode.get_client') + def test_high_usage_recommends_provisioned(self, mock_gc, mock_cw): + from capacity_mode import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST') + points = [(50000.0, i * 5) for i in range(4032)] + mock_cw.return_value = mock_batch_metrics({'cr': points, 'cw': points}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['currentMode'], 'ON_DEMAND') + self.assertGreater(result['onDemandMonthlyCost'], 0) + + @patch('capacity_mode.batch_get_metrics') + @patch('capacity_mode.get_client') + def test_savings_zero_when_already_on_demand_no_usage(self, mock_gc, mock_cw): + from capacity_mode import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST') + mock_cw.return_value = mock_batch_metrics({'cr': [], 'cw': []}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['recommendedMode'], 'ON_DEMAND') + self.assertEqual(result['potentialMonthlySavings'], 0.0) + + @patch('capacity_mode.batch_get_metrics') + @patch('capacity_mode.get_client') + def test_ia_table_uses_ia_pricing(self, mock_gc, mock_cw): + from capacity_mode import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + table_class='STANDARD_INFREQUENT_ACCESS', rcu=100, wcu=50) + mock_cw.return_value = mock_batch_metrics({'cr': [], 'cw': []}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + # Provisioned cost should use IA rates: 100*730*0.00016 + 50*730*0.00081 = 41.245 + expected = 100 * 730 * 0.00016 + 50 * 730 * 0.00081 + self.assertAlmostEqual(result['currentProvisionedMonthlyCost'], expected, places=2) + + +class TestTableClass(unittest.TestCase): + + @patch('table_class._check_reserved_capacity', return_value=False) + @patch('table_class.batch_get_metrics') + @patch('table_class.get_client') + def test_large_storage_low_throughput_recommends_ia(self, mock_gc, mock_cw, mock_rc): + from table_class import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST', size_bytes=100 * 1024**3) + mock_cw.return_value = mock_batch_metrics({ + 'cr': [(100.0, i * 1440) for i in range(14)], + 'cw': [(50.0, i * 1440) for i in range(14)], + }) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['recommendedClass'], 'STANDARD_INFREQUENT_ACCESS') + self.assertGreater(result['potentialMonthlySavings'], 0) + + @patch('table_class._check_reserved_capacity', return_value=False) + @patch('table_class.batch_get_metrics') + @patch('table_class.get_client') + def test_high_throughput_stays_standard(self, mock_gc, mock_cw, mock_rc): + from table_class import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST', size_bytes=1 * 1024**3) + mock_cw.return_value = mock_batch_metrics({ + 'cr': [(999999999.0, i * 1440) for i in range(14)], + 'cw': [(999999999.0, i * 1440) for i in range(14)], + }) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['recommendedClass'], 'STANDARD') + self.assertEqual(result['potentialMonthlySavings'], 0.0) + + @patch('table_class._check_reserved_capacity', return_value=True) + @patch('table_class.get_client') + def test_reserved_capacity_skips(self, mock_gc, mock_rc): + from table_class import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + size_bytes=100 * 1024**3) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertIn('note', result) + self.assertEqual(result['potentialMonthlySavings'], 0.0) + + @patch('table_class._check_reserved_capacity', return_value=False) + @patch('table_class.batch_get_metrics') + @patch('table_class.get_client') + def test_empty_table_no_recommendation(self, mock_gc, mock_cw, mock_rc): + from table_class import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST', size_bytes=0) + mock_cw.return_value = mock_batch_metrics({'cr': [], 'cw': []}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['potentialMonthlySavings'], 0.0) + + def test_missing_prices_returns_error(self): + from table_class import analyze + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14}) + self.assertIn('error', result) + + +class TestUtilization(unittest.TestCase): + + @patch('utilization.batch_get_metrics') + @patch('utilization.get_client') + def test_low_utilization_recommends_on_demand(self, mock_gc, mock_cw): + from utilization import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table(rcu=100, wcu=50) + mock_cw.return_value = mock_batch_metrics({ + 'r0': [(1.0, i * 5) for i in range(4032)], + 'w0': [(1.0, i * 5) for i in range(4032)], + 'rm0': [(0.01, i * 5) for i in range(4032)], + 'wm0': [(0.01, i * 5) for i in range(4032)], + }) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertGreater(len(result['recommendations']), 0) + self.assertEqual(result['recommendations'][0]['recommendationType'], 'SWITCH_TO_ON_DEMAND') + + @patch('utilization.get_client') + def test_on_demand_table_skipped(self, mock_gc): + from utilization import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + billing='PAY_PER_REQUEST') + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(result['billingMode'], 'ON_DEMAND') + self.assertIn('message', result) + + @patch('utilization.batch_get_metrics') + @patch('utilization.get_client') + def test_well_utilized_no_recommendations(self, mock_gc, mock_cw): + from utilization import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table(rcu=10, wcu=10) + mock_cw.return_value = mock_batch_metrics({ + 'r0': [(3000.0, i * 5) for i in range(4032)], + 'w0': [(3000.0, i * 5) for i in range(4032)], + 'rm0': [(10.0, i * 5) for i in range(4032)], + 'wm0': [(10.0, i * 5) for i in range(4032)], + }) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(len(result['recommendations']), 0) + + @patch('utilization.batch_get_metrics') + @patch('utilization.get_client') + def test_ia_table_uses_ia_pricing(self, mock_gc, mock_cw): + from utilization import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + rcu=100, wcu=50, table_class='STANDARD_INFREQUENT_ACCESS') + mock_cw.return_value = mock_batch_metrics({ + 'r0': [(1.0, i * 5) for i in range(4032)], + 'w0': [(1.0, i * 5) for i in range(4032)], + 'rm0': [(0.01, i * 5) for i in range(4032)], + 'wm0': [(0.01, i * 5) for i in range(4032)], + }) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + rec = result['recommendations'][0] + # Should use IA on-demand rates for comparison, not standard + self.assertEqual(rec['recommendationType'], 'SWITCH_TO_ON_DEMAND') + + +class TestUnusedGsi(unittest.TestCase): + + @patch('unused_gsi.batch_get_metrics') + @patch('unused_gsi.get_client') + def test_unused_gsi_detected(self, mock_gc, mock_cw): + from unused_gsi import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + gsis=[{'IndexName': 'gsi-email', 'IndexStatus': 'ACTIVE', + 'ProvisionedThroughput': {'ReadCapacityUnits': 10, 'WriteCapacityUnits': 5}}]) + mock_cw.return_value = mock_batch_metrics({'r0': [], 'pr0': [(10.0, 0)], 'pw0': [(5.0, 0)]}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertTrue(result['hasGSIs']) + self.assertEqual(len(result['unusedGSIs']), 1) + self.assertEqual(result['unusedGSIs'][0]['indexName'], 'gsi-email') + self.assertGreater(result['unusedGSIs'][0]['monthlySavings'], 0) + self.assertGreater(result['totalMonthlySavings'], 0) + + @patch('unused_gsi.batch_get_metrics') + @patch('unused_gsi.get_client') + def test_used_gsi_not_flagged(self, mock_gc, mock_cw): + from unused_gsi import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + gsis=[{'IndexName': 'gsi-email', 'IndexStatus': 'ACTIVE', + 'ProvisionedThroughput': {'ReadCapacityUnits': 10, 'WriteCapacityUnits': 5}}]) + mock_cw.return_value = mock_batch_metrics({'r0': [(1000.0, 0)]}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + self.assertEqual(len(result['unusedGSIs']), 0) + + @patch('unused_gsi.get_client') + def test_no_gsis(self, mock_gc): + from unused_gsi import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table() + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14}) + self.assertFalse(result['hasGSIs']) + + @patch('unused_gsi.batch_get_metrics') + @patch('unused_gsi.get_client') + def test_ia_table_uses_ia_pricing(self, mock_gc, mock_cw): + from unused_gsi import analyze + mock_gc.return_value.describe_table.return_value = mock_describe_table( + table_class='STANDARD_INFREQUENT_ACCESS', + gsis=[{'IndexName': 'gsi-email', 'IndexStatus': 'ACTIVE', + 'ProvisionedThroughput': {'ReadCapacityUnits': 10, 'WriteCapacityUnits': 5}}]) + mock_cw.return_value = mock_batch_metrics({'r0': [], 'pr0': [(10.0, 0)], 'pw0': [(5.0, 0)]}) + + result = analyze({'region': 'us-east-1', 'tableName': 'test', 'days': 14, 'prices': PRICES}) + # Should use IA rates: 10*0.00016*730 + 5*0.00081*730 = 4.125 + expected = round(10 * 0.00016 * 730 + 5 * 0.00081 * 730, 2) + self.assertAlmostEqual(result['unusedGSIs'][0]['monthlySavings'], expected, places=2) + + +class TestOutputFormatting(unittest.TestCase): + + def test_format_with_recommendations(self): + from analyze_all import format_results + results = [{ + 'tableName': 'orders', 'region': 'us-east-1', 'errors': [], + 'capacityMode': {'potentialMonthlySavings': 10.0, 'currentMode': 'PROVISIONED', + 'recommendedMode': 'ON_DEMAND'}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + }] + output = format_results(14, results) + self.assertIn('orders', output) + self.assertIn('On-Demand', output) + self.assertIn('$10.00/mo', output) + self.assertIn('┌', output) # box drawing + + def test_format_with_no_recommendations(self): + from analyze_all import format_results + results = [{ + 'tableName': 'logs', 'region': 'us-east-1', 'errors': [], + 'capacityMode': {'potentialMonthlySavings': 0}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + }] + output = format_results(14, results) + self.assertIn('Already optimized', output) + self.assertIn('logs', output) + + def test_format_with_protection_warnings(self): + from analyze_all import format_results + results = [{ + 'tableName': 'orders', 'region': 'us-east-1', 'errors': [], + 'deletionProtection': False, 'pointInTimeRecovery': False, + 'capacityMode': {'potentialMonthlySavings': 0}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + }] + output = format_results(14, results) + self.assertIn('Deletion Protection', output) + self.assertIn('PITR', output) + self.assertIn('⚠ enable', output) + self.assertIn('┌', output) + + def test_format_with_errors(self): + from analyze_all import format_results + results = [{ + 'tableName': 'broken', 'region': 'us-east-1', + 'errors': ['capacityMode: timeout'], + 'capacityMode': {'error': 'timeout'}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + }] + output = format_results(14, results) + self.assertIn('Errors', output) + self.assertIn('broken', output) + + +if __name__ == '__main__': + unittest.main() diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_config.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_config.py new file mode 100644 index 00000000..dcebc971 --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_config.py @@ -0,0 +1,109 @@ +"""Tests for config.py - credential handling, input parsing, validation.""" +import json +import sys +import os +import unittest +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts')) + +from config import get_client, parse_input, validate_keys, fail, STANDARD_TO_IA_RATIO, IA_TO_STANDARD_RATIO +from decimal import Decimal + + +class TestConstants(unittest.TestCase): + + def test_breakeven_ratios(self): + self.assertAlmostEqual(float(STANDARD_TO_IA_RATIO), 0.4167, places=3) + self.assertAlmostEqual(float(IA_TO_STANDARD_RATIO), 0.1333, places=3) + + def test_ratio_ordering(self): + self.assertGreater(STANDARD_TO_IA_RATIO, IA_TO_STANDARD_RATIO) + + +class TestGetClient(unittest.TestCase): + + @patch('config.boto3') + def test_returns_client(self, mock_boto): + mock_boto.client.return_value = MagicMock() + client = get_client('dynamodb', 'us-east-1') + mock_boto.client.assert_called_once_with('dynamodb', region_name='us-east-1') + self.assertIsNotNone(client) + + @patch('config.boto3') + def test_no_credentials_exits(self, mock_boto): + from botocore.exceptions import NoCredentialsError + mock_boto.client.side_effect = NoCredentialsError() + with self.assertRaises(SystemExit), patch('builtins.print'): + get_client('dynamodb', 'us-east-1') + + @patch('config.boto3') + def test_client_error_exits(self, mock_boto): + from botocore.exceptions import ClientError + mock_boto.client.side_effect = ClientError( + {'Error': {'Code': 'InvalidRegion', 'Message': 'bad'}}, 'op') + with self.assertRaises(SystemExit), patch('builtins.print'): + get_client('dynamodb', 'us-east-1') + + +class TestParseInput(unittest.TestCase): + + def test_from_argv(self): + with patch('sys.argv', ['script', '{"region":"us-east-1"}']): + data = parse_input() + self.assertEqual(data['region'], 'us-east-1') + + def test_from_stdin(self): + with patch('sys.argv', ['script']), \ + patch('sys.stdin') as mock_stdin: + mock_stdin.read.return_value = '{"tableName":"test"}' + data = parse_input() + self.assertEqual(data['tableName'], 'test') + + def test_invalid_json_exits(self): + with patch('sys.argv', ['script', 'not json']): + with self.assertRaises(SystemExit), patch('builtins.print'): + parse_input() + + def test_empty_stdin_exits(self): + with patch('sys.argv', ['script']), \ + patch('sys.stdin') as mock_stdin, \ + patch('builtins.print'): + mock_stdin.read.return_value = '' + with self.assertRaises(SystemExit): + parse_input() + + +class TestValidateKeys(unittest.TestCase): + + def test_all_present(self): + validate_keys({'region': 'us-east-1', 'prices': {}}, ['region', 'prices']) + + def test_missing_key_exits(self): + with self.assertRaises(SystemExit), patch('builtins.print'): + validate_keys({'region': 'us-east-1'}, ['region', 'prices']) + + def test_multiple_missing_exits(self): + with self.assertRaises(SystemExit), patch('builtins.print'): + validate_keys({}, ['region', 'prices', 'tableName']) + + +class TestFail(unittest.TestCase): + + def test_prints_json_error(self): + with self.assertRaises(SystemExit) as ctx: + with patch('builtins.print') as mock_print: + fail('something broke') + mock_print.assert_called_once() + output = json.loads(mock_print.call_args[0][0]) + self.assertEqual(output['error'], 'something broke') + + def test_exits_with_code_1(self): + with self.assertRaises(SystemExit) as ctx: + with patch('builtins.print'): + fail('err') + self.assertEqual(ctx.exception.code, 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_infrastructure.py b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_infrastructure.py new file mode 100644 index 00000000..509a438a --- /dev/null +++ b/plugins/dynamodb-cost-optimizer/skills/optimize-dynamodb/tests/test_infrastructure.py @@ -0,0 +1,320 @@ +"""Tests for cw_batch, get_pricing, discover, and analyze_all orchestration.""" +import json +import sys +import os +import unittest +from unittest.mock import patch, MagicMock, call +from datetime import datetime, timezone, timedelta + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts')) + + +class TestCwBatch(unittest.TestCase): + + @patch('cw_batch.get_client') + def test_basic_query(self, mock_gc): + from cw_batch import batch_get_metrics + ts = datetime(2025, 1, 15, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + mock_cw.get_metric_data.return_value = { + 'MetricDataResults': [ + {'Id': 'r0', 'Timestamps': [ts], 'Values': [42.0]}, + ], + } + + result = batch_get_metrics('us-east-1', [ + {'id': 'r0', 'table': 'tbl', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts - timedelta(days=1), ts) + + self.assertEqual(len(result['r0']), 1) + self.assertEqual(result['r0'][0]['value'], 42.0) + + @patch('cw_batch.get_client') + def test_gsi_dimension_added(self, mock_gc): + from cw_batch import batch_get_metrics + ts = datetime(2025, 1, 15, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + mock_cw.get_metric_data.return_value = {'MetricDataResults': []} + + batch_get_metrics('us-east-1', [ + {'id': 'g0', 'table': 'tbl', 'gsi': 'idx', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts - timedelta(days=1), ts) + + call_kwargs = mock_cw.get_metric_data.call_args[1] + dims = call_kwargs['MetricDataQueries'][0]['MetricStat']['Metric']['Dimensions'] + self.assertEqual(len(dims), 2) + self.assertEqual(dims[1]['Value'], 'idx') + + @patch('cw_batch.get_client') + def test_pagination(self, mock_gc): + from cw_batch import batch_get_metrics + ts = datetime(2025, 1, 15, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + mock_cw.get_metric_data.side_effect = [ + {'MetricDataResults': [{'Id': 'r0', 'Timestamps': [ts], 'Values': [1.0]}], 'NextToken': 'tok'}, + {'MetricDataResults': [{'Id': 'r0', 'Timestamps': [ts + timedelta(minutes=5)], 'Values': [2.0]}]}, + ] + + result = batch_get_metrics('us-east-1', [ + {'id': 'r0', 'table': 'tbl', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts - timedelta(days=1), ts) + + self.assertEqual(len(result['r0']), 2) + self.assertEqual(mock_cw.get_metric_data.call_count, 2) + + @patch('cw_batch.time.sleep') + @patch('cw_batch.get_client') + def test_retry_on_throttle(self, mock_gc, mock_sleep): + from cw_batch import batch_get_metrics + from botocore.exceptions import ClientError + ts = datetime(2025, 1, 15, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + throttle_err = ClientError({'Error': {'Code': 'ThrottlingException', 'Message': 'Rate exceeded'}}, 'GetMetricData') + mock_cw.get_metric_data.side_effect = [ + throttle_err, + {'MetricDataResults': [{'Id': 'r0', 'Timestamps': [ts], 'Values': [1.0]}]}, + ] + + result = batch_get_metrics('us-east-1', [ + {'id': 'r0', 'table': 'tbl', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts - timedelta(days=1), ts) + + self.assertEqual(len(result['r0']), 1) + mock_sleep.assert_called_once_with(1) + + @patch('cw_batch.time.sleep') + @patch('cw_batch.get_client') + def test_non_throttle_error_raises(self, mock_gc, mock_sleep): + from cw_batch import batch_get_metrics + from botocore.exceptions import ClientError + ts = datetime(2025, 1, 15, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + mock_cw.get_metric_data.side_effect = ClientError( + {'Error': {'Code': 'AccessDenied', 'Message': 'nope'}}, 'GetMetricData') + + with self.assertRaises(ClientError): + batch_get_metrics('us-east-1', [ + {'id': 'r0', 'table': 'tbl', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts - timedelta(days=1), ts) + + @patch('cw_batch.get_client') + def test_results_sorted_by_timestamp(self, mock_gc): + from cw_batch import batch_get_metrics + ts1 = datetime(2025, 1, 15, 10, 0, tzinfo=timezone.utc) + ts2 = datetime(2025, 1, 15, 9, 0, tzinfo=timezone.utc) + mock_cw = MagicMock() + mock_gc.return_value = mock_cw + mock_cw.get_metric_data.return_value = { + 'MetricDataResults': [{'Id': 'r0', 'Timestamps': [ts1, ts2], 'Values': [2.0, 1.0]}], + } + + result = batch_get_metrics('us-east-1', [ + {'id': 'r0', 'table': 'tbl', 'metric': 'ConsumedReadCapacityUnits', 'period': 300, 'stat': 'Sum'}, + ], ts2, ts1) + + self.assertEqual(result['r0'][0]['value'], 1.0) + self.assertEqual(result['r0'][1]['value'], 2.0) + + +class TestGetPricing(unittest.TestCase): + + @patch('get_pricing.get_client') + def test_parses_pricing(self, mock_gc): + from get_pricing import get_pricing + mock_pricing = MagicMock() + mock_gc.return_value = mock_pricing + + def make_price_item(group, usage, price, vol=''): + return json.dumps({ + 'product': {'attributes': {'group': group, 'usagetype': usage, 'volumeType': vol}}, + 'terms': {'OnDemand': {'t1': {'priceDimensions': {'d1': {'pricePerUnit': {'USD': str(price)}}}}}}, + }) + + mock_pricing.get_products.side_effect = [ + {'PriceList': [ + make_price_item('DDB-ReadUnits', '', 0.00000025), + make_price_item('DDB-WriteUnits', '', 0.00000125), + ]}, + {'PriceList': [ + make_price_item('DDB-ReadUnits', 'EU-ReadCapacityUnit-Hrs', 0.00013), + make_price_item('DDB-WriteUnits', 'EU-WriteCapacityUnit-Hrs', 0.00065), + ]}, + {'PriceList': [ + make_price_item('', '', 0.25, vol='Amazon DynamoDB'), + make_price_item('', '', 0.10, vol='Amazon DynamoDB - IA'), + ]}, + ] + + prices = get_pricing('us-east-1') + self.assertEqual(prices['read_request'], 0.00000025) + self.assertEqual(prices['write_request'], 0.00000125) + self.assertEqual(prices['rcu_hour'], 0.00013) + self.assertEqual(prices['standard_storage'], 0.25) + self.assertEqual(prices['ia_storage'], 0.10) + # Check aliases + self.assertEqual(prices['standard_read'], prices['read_request']) + self.assertEqual(prices['on_demand_write'], prices['write_request']) + + @patch('get_pricing.get_client') + def test_pagination(self, mock_gc): + from get_pricing import get_pricing + mock_pricing = MagicMock() + mock_gc.return_value = mock_pricing + + def make_item(group, price, vol=''): + return json.dumps({ + 'product': {'attributes': {'group': group, 'usagetype': '', 'volumeType': vol}}, + 'terms': {'OnDemand': {'t1': {'priceDimensions': {'d1': {'pricePerUnit': {'USD': str(price)}}}}}}, + }) + + mock_pricing.get_products.side_effect = [ + {'PriceList': [make_item('DDB-ReadUnits', 0.25e-6)], 'NextToken': 'page2'}, + {'PriceList': [make_item('DDB-WriteUnits', 1.25e-6)]}, + {'PriceList': [make_item('DDB-ReadUnits', 0.00013), make_item('DDB-WriteUnits', 0.00065)]}, + {'PriceList': [make_item('', 0.25, vol='Amazon DynamoDB')]}, + ] + + get_pricing('us-east-1') + self.assertEqual(mock_pricing.get_products.call_count, 4) + + @patch('builtins.print') + @patch('get_pricing.get_client') + def test_missing_prices_fails_fast(self, mock_gc, mock_print): + from get_pricing import get_pricing + mock_pricing = MagicMock() + mock_gc.return_value = mock_pricing + mock_pricing.get_products.return_value = {'PriceList': []} + + with self.assertRaises(SystemExit): + get_pricing('us-east-1') + + +class TestDiscover(unittest.TestCase): + + @patch('discover.get_client') + def test_lists_all_tables(self, mock_gc): + from discover import discover + mock_ddb = MagicMock() + mock_gc.return_value = mock_ddb + mock_ddb.get_paginator.return_value.paginate.return_value = [ + {'TableNames': ['t1', 't2']}, + ] + mock_ddb.describe_table.side_effect = [ + {'Table': {'BillingModeSummary': {'BillingMode': 'PROVISIONED'}, + 'ProvisionedThroughput': {'ReadCapacityUnits': 10, 'WriteCapacityUnits': 5}, + 'ItemCount': 100, 'TableSizeBytes': 5000}}, + {'Table': {'BillingModeSummary': {'BillingMode': 'PAY_PER_REQUEST'}, + 'ProvisionedThroughput': {'ReadCapacityUnits': 0, 'WriteCapacityUnits': 0}, + 'ItemCount': 50, 'TableSizeBytes': 2000}}, + ] + + result = discover('us-east-1') + self.assertEqual(len(result), 2) + self.assertEqual(result[0]['billingMode'], 'PROVISIONED') + self.assertEqual(result[1]['billingMode'], 'ON_DEMAND') + + @patch('discover.get_client') + def test_specific_tables(self, mock_gc): + from discover import discover + mock_ddb = MagicMock() + mock_gc.return_value = mock_ddb + mock_ddb.describe_table.return_value = { + 'Table': {'BillingModeSummary': {'BillingMode': 'PROVISIONED'}, + 'ProvisionedThroughput': {'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}, + 'ItemCount': 10, 'TableSizeBytes': 100}} + + result = discover('us-east-1', ['my-table']) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['tableName'], 'my-table') + mock_ddb.get_paginator.assert_not_called() + + @patch('discover.get_client') + def test_error_on_single_table(self, mock_gc): + from discover import discover + mock_ddb = MagicMock() + mock_gc.return_value = mock_ddb + mock_ddb.describe_table.side_effect = Exception('not found') + + result = discover('us-east-1', ['bad-table']) + self.assertEqual(len(result), 1) + self.assertIn('error', result[0]) + + @patch('discover.get_client') + def test_deletion_protection_and_pitr(self, mock_gc): + from discover import discover + mock_ddb = MagicMock() + mock_gc.return_value = mock_ddb + mock_ddb.describe_table.return_value = { + 'Table': {'BillingModeSummary': {'BillingMode': 'PROVISIONED'}, + 'ProvisionedThroughput': {'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}, + 'DeletionProtectionEnabled': True, + 'ItemCount': 10, 'TableSizeBytes': 100}} + mock_ddb.describe_continuous_backups.return_value = { + 'ContinuousBackupsDescription': { + 'PointInTimeRecoveryDescription': {'PointInTimeRecoveryStatus': 'ENABLED'}}} + + result = discover('us-east-1', ['my-table']) + self.assertTrue(result[0]['deletionProtection']) + self.assertTrue(result[0]['pointInTimeRecovery']) + + +class TestAnalyzeAllOrchestration(unittest.TestCase): + + @patch('analyze_all.get_pricing') + @patch('analyze_all.analyze_table') + def test_single_region(self, mock_at, mock_gp): + from analyze_all import analyze_all + mock_gp.return_value = {'rcu_hour': 0.00013} + mock_at.return_value = { + 'tableName': 't1', 'region': 'us-east-1', 'errors': [], + 'capacityMode': {'potentialMonthlySavings': 0}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + } + + output = analyze_all({'region': 'us-east-1', 'tables': ['t1'], 'days': 14}) + self.assertIn('us-east-1', output) + self.assertIn('dynamodb-cost-report.md', output) + mock_at.assert_called_once() + + @patch('analyze_all.get_pricing') + @patch('analyze_all.analyze_table') + def test_multi_region(self, mock_at, mock_gp): + from analyze_all import analyze_all + mock_gp.return_value = {'rcu_hour': 0.00013} + mock_at.side_effect = lambda r, t, d, p: { + 'tableName': t, 'region': r, 'errors': [], + 'capacityMode': {'potentialMonthlySavings': 0}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + } + + output = analyze_all({'regions': {'us-east-1': ['t1'], 'eu-west-1': ['t2']}, 'days': 7}) + self.assertIn('Tables: 2', output) + self.assertEqual(mock_gp.call_count, 2) + + @patch('analyze_all.get_pricing') + @patch('analyze_all.analyze_table') + def test_uses_provided_prices(self, mock_at, mock_gp): + from analyze_all import analyze_all + mock_at.return_value = { + 'tableName': 't1', 'region': 'us-east-1', 'errors': [], + 'capacityMode': {'potentialMonthlySavings': 0}, + 'tableClass': {'potentialMonthlySavings': 0}, + 'utilization': {'recommendations': []}, + 'unusedGsi': {'unusedGSIs': []}, + } + + analyze_all({'region': 'us-east-1', 'tables': ['t1'], 'days': 14, 'prices': {'rcu_hour': 0.1}}) + mock_gp.assert_not_called() + + +if __name__ == '__main__': + unittest.main()