SuperClaude/scripts/analyze_workflow_metrics.py
BlackBear 18c0e4e127
Add missing install.sh script (#483)
* feat: add missing install.sh script referenced in README\n\n- Create comprehensive installation script with POSIX compatibility\n- Add interactive and non-interactive installation modes\n- Include prerequisites checking and MCP server setup guidance\n- Replace echo -e with printf for better POSIX compliance

* fix: resolve linting errors in install_mcp.py and clean_command_names.py

Fix multiple ruff linting errors to ensure CI/CD pipeline passes:

- install_mcp.py: Remove unused pathlib.Path import, replace bare except
  with specific exception types (ValueError, IndexError), remove
  extraneous f-string prefixes on lines without placeholders
- clean_command_names.py: Remove unused os import, convert f-strings
  without placeholders to regular strings
- pyproject.toml: Exclude docs/ directory from ruff checks to avoid
  N999 module naming violations in documentation templates

All linting checks now pass successfully.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* style: apply ruff format to Python source files

Apply ruff formatting rules to CLI and scripts modules to ensure
consistent code style across the codebase.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(ci): remove incompatible pip cache from quick-check workflow

## Problem
GitHub Actions was failing with error:
"Cache folder path is retrieved for pip but doesn't exist on disk:
/home/runner/.cache/pip. This likely indicates that there are no
dependencies to cache."

## Root Cause
The quick-check.yml workflow specified `cache: 'pip'` in the Python
setup step, but the workflow uses UV (not pip) for package management
via `uv pip install --system -e ".[dev]"`.

UV uses its own cache directory (~/.cache/uv), so the pip cache path
was never created, causing the error.

This was a migration oversight:
- When UV was adopted as the project standard (commit 00706f0), the
  CLAUDE.md established "CRITICAL: Never use pip directly" rule
- The test.yml workflow was created correctly without pip cache
- The quick-check.yml workflow incorrectly included pip cache from
  initial creation (commit 8c0559c) and was not updated during migration

## Solution
Remove `cache: 'pip'` line to align with:
- Project's UV-first architecture (CLAUDE.md)
- test.yml workflow (which runs successfully without pip cache)
- readme-quality-check.yml workflow (no cache needed)

Note: publish-pypi.yml intentionally uses pip cache as it directly
runs `python -m pip install` commands, which is correct for that workflow.

## Impact
-  Eliminates GitHub Actions cache warning
-  Aligns all UV-based workflows consistently
-  Follows project standards documented in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-14 08:03:04 +05:30

332 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Workflow Metrics Analysis Script
Analyzes workflow_metrics.jsonl for continuous optimization and A/B testing.
Usage:
python scripts/analyze_workflow_metrics.py --period week
python scripts/analyze_workflow_metrics.py --period month
python scripts/analyze_workflow_metrics.py --task-type bug_fix
"""
import argparse
import json
import statistics
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List
class WorkflowMetricsAnalyzer:
"""Analyze workflow metrics for optimization"""
def __init__(self, metrics_file: Path):
self.metrics_file = metrics_file
self.metrics: List[Dict] = []
self._load_metrics()
def _load_metrics(self):
"""Load metrics from JSONL file"""
if not self.metrics_file.exists():
print(f"Warning: {self.metrics_file} not found")
return
with open(self.metrics_file, 'r') as f:
for line in f:
if line.strip():
self.metrics.append(json.loads(line))
print(f"Loaded {len(self.metrics)} metric records")
def filter_by_period(self, period: str) -> List[Dict]:
"""Filter metrics by time period"""
now = datetime.now()
if period == "week":
cutoff = now - timedelta(days=7)
elif period == "month":
cutoff = now - timedelta(days=30)
elif period == "all":
return self.metrics
else:
raise ValueError(f"Invalid period: {period}")
filtered = [
m for m in self.metrics
if datetime.fromisoformat(m['timestamp']) >= cutoff
]
print(f"Filtered to {len(filtered)} records in last {period}")
return filtered
def analyze_by_task_type(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by task type"""
by_task = defaultdict(list)
for m in metrics:
by_task[m['task_type']].append(m)
results = {}
for task_type, task_metrics in by_task.items():
results[task_type] = {
'count': len(task_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in task_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in task_metrics),
'success_rate': sum(m['success'] for m in task_metrics) / len(task_metrics) * 100,
'avg_files_read': statistics.mean(m.get('files_read', 0) for m in task_metrics),
}
return results
def analyze_by_complexity(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by complexity level"""
by_complexity = defaultdict(list)
for m in metrics:
by_complexity[m['complexity']].append(m)
results = {}
for complexity, comp_metrics in by_complexity.items():
results[complexity] = {
'count': len(comp_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in comp_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in comp_metrics),
'success_rate': sum(m['success'] for m in comp_metrics) / len(comp_metrics) * 100,
}
return results
def analyze_by_workflow(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by workflow variant"""
by_workflow = defaultdict(list)
for m in metrics:
by_workflow[m['workflow_id']].append(m)
results = {}
for workflow_id, wf_metrics in by_workflow.items():
results[workflow_id] = {
'count': len(wf_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in wf_metrics),
'median_tokens': statistics.median(m['tokens_used'] for m in wf_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in wf_metrics),
'success_rate': sum(m['success'] for m in wf_metrics) / len(wf_metrics) * 100,
}
return results
def identify_best_workflows(self, metrics: List[Dict]) -> Dict[str, str]:
"""Identify best workflow for each task type"""
by_task_workflow = defaultdict(lambda: defaultdict(list))
for m in metrics:
by_task_workflow[m['task_type']][m['workflow_id']].append(m)
best_workflows = {}
for task_type, workflows in by_task_workflow.items():
best_workflow = None
best_score = float('inf')
for workflow_id, wf_metrics in workflows.items():
# Score = avg_tokens (lower is better)
avg_tokens = statistics.mean(m['tokens_used'] for m in wf_metrics)
success_rate = sum(m['success'] for m in wf_metrics) / len(wf_metrics)
# Only consider if success rate >= 95%
if success_rate >= 0.95:
if avg_tokens < best_score:
best_score = avg_tokens
best_workflow = workflow_id
if best_workflow:
best_workflows[task_type] = best_workflow
return best_workflows
def identify_inefficiencies(self, metrics: List[Dict]) -> List[Dict]:
"""Identify inefficient patterns"""
inefficiencies = []
# Expected token budgets by complexity
budgets = {
'ultra-light': 800,
'light': 2000,
'medium': 5000,
'heavy': 20000,
'ultra-heavy': 50000
}
for m in metrics:
issues = []
# Check token budget overrun
expected_budget = budgets.get(m['complexity'], 5000)
if m['tokens_used'] > expected_budget * 1.3: # 30% over budget
issues.append(f"Token overrun: {m['tokens_used']} vs {expected_budget}")
# Check success rate
if not m['success']:
issues.append("Task failed")
# Check time performance (light tasks should be fast)
if m['complexity'] in ['ultra-light', 'light'] and m['time_ms'] > 10000:
issues.append(f"Slow execution: {m['time_ms']}ms for {m['complexity']} task")
if issues:
inefficiencies.append({
'timestamp': m['timestamp'],
'task_type': m['task_type'],
'complexity': m['complexity'],
'workflow_id': m['workflow_id'],
'issues': issues
})
return inefficiencies
def calculate_token_savings(self, metrics: List[Dict]) -> Dict:
"""Calculate token savings vs unlimited baseline"""
# Unlimited baseline estimates
baseline = {
'ultra-light': 1000,
'light': 2500,
'medium': 7500,
'heavy': 30000,
'ultra-heavy': 100000
}
total_actual = 0
total_baseline = 0
for m in metrics:
total_actual += m['tokens_used']
total_baseline += baseline.get(m['complexity'], 7500)
savings = total_baseline - total_actual
savings_percent = (savings / total_baseline * 100) if total_baseline > 0 else 0
return {
'total_actual': total_actual,
'total_baseline': total_baseline,
'total_savings': savings,
'savings_percent': savings_percent
}
def generate_report(self, period: str) -> str:
"""Generate comprehensive analysis report"""
metrics = self.filter_by_period(period)
if not metrics:
return "No metrics available for analysis"
report = []
report.append("=" * 80)
report.append(f"WORKFLOW METRICS ANALYSIS REPORT - Last {period}")
report.append("=" * 80)
report.append("")
# Overall statistics
report.append("## Overall Statistics")
report.append(f"Total Tasks: {len(metrics)}")
report.append(f"Success Rate: {sum(m['success'] for m in metrics) / len(metrics) * 100:.1f}%")
report.append(f"Avg Tokens: {statistics.mean(m['tokens_used'] for m in metrics):.0f}")
report.append(f"Avg Time: {statistics.mean(m['time_ms'] for m in metrics):.0f}ms")
report.append("")
# Token savings
savings = self.calculate_token_savings(metrics)
report.append("## Token Efficiency")
report.append(f"Actual Usage: {savings['total_actual']:,} tokens")
report.append(f"Unlimited Baseline: {savings['total_baseline']:,} tokens")
report.append(f"Total Savings: {savings['total_savings']:,} tokens ({savings['savings_percent']:.1f}%)")
report.append("")
# By task type
report.append("## Analysis by Task Type")
by_task = self.analyze_by_task_type(metrics)
for task_type, stats in sorted(by_task.items()):
report.append(f"\n### {task_type}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Avg Time: {stats['avg_time_ms']:.0f}ms")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append(f" Avg Files Read: {stats['avg_files_read']:.1f}")
report.append("")
# By complexity
report.append("## Analysis by Complexity")
by_complexity = self.analyze_by_complexity(metrics)
for complexity in ['ultra-light', 'light', 'medium', 'heavy', 'ultra-heavy']:
if complexity in by_complexity:
stats = by_complexity[complexity]
report.append(f"\n### {complexity}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append("")
# Best workflows
report.append("## Best Workflows per Task Type")
best = self.identify_best_workflows(metrics)
for task_type, workflow_id in sorted(best.items()):
report.append(f" {task_type}: {workflow_id}")
report.append("")
# Inefficiencies
inefficiencies = self.identify_inefficiencies(metrics)
if inefficiencies:
report.append("## Inefficiencies Detected")
report.append(f"Total Issues: {len(inefficiencies)}")
for issue in inefficiencies[:5]: # Show top 5
report.append(f"\n {issue['timestamp']}")
report.append(f" Task: {issue['task_type']} ({issue['complexity']})")
report.append(f" Workflow: {issue['workflow_id']}")
for problem in issue['issues']:
report.append(f" - {problem}")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description="Analyze workflow metrics")
parser.add_argument(
'--period',
choices=['week', 'month', 'all'],
default='week',
help='Analysis time period'
)
parser.add_argument(
'--task-type',
help='Filter by specific task type'
)
parser.add_argument(
'--output',
help='Output file (default: stdout)'
)
args = parser.parse_args()
# Find metrics file
metrics_file = Path('docs/memory/workflow_metrics.jsonl')
analyzer = WorkflowMetricsAnalyzer(metrics_file)
report = analyzer.generate_report(args.period)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to {args.output}")
else:
print(report)
if __name__ == '__main__':
main()