Files
SuperClaude/scripts/analyze_workflow_metrics.py
kazuki a4ffe52724 refactor: consolidate PM Agent optimization and pending changes
PM Agent optimization (already committed separately):
- superclaude/commands/pm.md: 1652→14 lines
- superclaude/agents/pm-agent.md: 735→429 lines
- docs/agents/pm-agent-guide.md: new guide file

Other pending changes:
- setup: framework_docs, mcp, logger, remove ui.py
- superclaude: __main__, cli/app, cli/commands/install
- tests: test_ui updates
- scripts: workflow metrics analysis tools
- docs/memory: session state updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 04:54:31 +09:00

332 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Workflow Metrics Analysis Script
Analyzes workflow_metrics.jsonl for continuous optimization and A/B testing.
Usage:
python scripts/analyze_workflow_metrics.py --period week
python scripts/analyze_workflow_metrics.py --period month
python scripts/analyze_workflow_metrics.py --task-type bug_fix
"""
import json
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import defaultdict
import statistics
class WorkflowMetricsAnalyzer:
"""Analyze workflow metrics for optimization"""
def __init__(self, metrics_file: Path):
self.metrics_file = metrics_file
self.metrics: List[Dict] = []
self._load_metrics()
def _load_metrics(self):
"""Load metrics from JSONL file"""
if not self.metrics_file.exists():
print(f"Warning: {self.metrics_file} not found")
return
with open(self.metrics_file, 'r') as f:
for line in f:
if line.strip():
self.metrics.append(json.loads(line))
print(f"Loaded {len(self.metrics)} metric records")
def filter_by_period(self, period: str) -> List[Dict]:
"""Filter metrics by time period"""
now = datetime.now()
if period == "week":
cutoff = now - timedelta(days=7)
elif period == "month":
cutoff = now - timedelta(days=30)
elif period == "all":
return self.metrics
else:
raise ValueError(f"Invalid period: {period}")
filtered = [
m for m in self.metrics
if datetime.fromisoformat(m['timestamp']) >= cutoff
]
print(f"Filtered to {len(filtered)} records in last {period}")
return filtered
def analyze_by_task_type(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by task type"""
by_task = defaultdict(list)
for m in metrics:
by_task[m['task_type']].append(m)
results = {}
for task_type, task_metrics in by_task.items():
results[task_type] = {
'count': len(task_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in task_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in task_metrics),
'success_rate': sum(m['success'] for m in task_metrics) / len(task_metrics) * 100,
'avg_files_read': statistics.mean(m.get('files_read', 0) for m in task_metrics),
}
return results
def analyze_by_complexity(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by complexity level"""
by_complexity = defaultdict(list)
for m in metrics:
by_complexity[m['complexity']].append(m)
results = {}
for complexity, comp_metrics in by_complexity.items():
results[complexity] = {
'count': len(comp_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in comp_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in comp_metrics),
'success_rate': sum(m['success'] for m in comp_metrics) / len(comp_metrics) * 100,
}
return results
def analyze_by_workflow(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by workflow variant"""
by_workflow = defaultdict(list)
for m in metrics:
by_workflow[m['workflow_id']].append(m)
results = {}
for workflow_id, wf_metrics in by_workflow.items():
results[workflow_id] = {
'count': len(wf_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in wf_metrics),
'median_tokens': statistics.median(m['tokens_used'] for m in wf_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in wf_metrics),
'success_rate': sum(m['success'] for m in wf_metrics) / len(wf_metrics) * 100,
}
return results
def identify_best_workflows(self, metrics: List[Dict]) -> Dict[str, str]:
"""Identify best workflow for each task type"""
by_task_workflow = defaultdict(lambda: defaultdict(list))
for m in metrics:
by_task_workflow[m['task_type']][m['workflow_id']].append(m)
best_workflows = {}
for task_type, workflows in by_task_workflow.items():
best_workflow = None
best_score = float('inf')
for workflow_id, wf_metrics in workflows.items():
# Score = avg_tokens (lower is better)
avg_tokens = statistics.mean(m['tokens_used'] for m in wf_metrics)
success_rate = sum(m['success'] for m in wf_metrics) / len(wf_metrics)
# Only consider if success rate >= 95%
if success_rate >= 0.95:
if avg_tokens < best_score:
best_score = avg_tokens
best_workflow = workflow_id
if best_workflow:
best_workflows[task_type] = best_workflow
return best_workflows
def identify_inefficiencies(self, metrics: List[Dict]) -> List[Dict]:
"""Identify inefficient patterns"""
inefficiencies = []
# Expected token budgets by complexity
budgets = {
'ultra-light': 800,
'light': 2000,
'medium': 5000,
'heavy': 20000,
'ultra-heavy': 50000
}
for m in metrics:
issues = []
# Check token budget overrun
expected_budget = budgets.get(m['complexity'], 5000)
if m['tokens_used'] > expected_budget * 1.3: # 30% over budget
issues.append(f"Token overrun: {m['tokens_used']} vs {expected_budget}")
# Check success rate
if not m['success']:
issues.append("Task failed")
# Check time performance (light tasks should be fast)
if m['complexity'] in ['ultra-light', 'light'] and m['time_ms'] > 10000:
issues.append(f"Slow execution: {m['time_ms']}ms for {m['complexity']} task")
if issues:
inefficiencies.append({
'timestamp': m['timestamp'],
'task_type': m['task_type'],
'complexity': m['complexity'],
'workflow_id': m['workflow_id'],
'issues': issues
})
return inefficiencies
def calculate_token_savings(self, metrics: List[Dict]) -> Dict:
"""Calculate token savings vs unlimited baseline"""
# Unlimited baseline estimates
baseline = {
'ultra-light': 1000,
'light': 2500,
'medium': 7500,
'heavy': 30000,
'ultra-heavy': 100000
}
total_actual = 0
total_baseline = 0
for m in metrics:
total_actual += m['tokens_used']
total_baseline += baseline.get(m['complexity'], 7500)
savings = total_baseline - total_actual
savings_percent = (savings / total_baseline * 100) if total_baseline > 0 else 0
return {
'total_actual': total_actual,
'total_baseline': total_baseline,
'total_savings': savings,
'savings_percent': savings_percent
}
def generate_report(self, period: str) -> str:
"""Generate comprehensive analysis report"""
metrics = self.filter_by_period(period)
if not metrics:
return "No metrics available for analysis"
report = []
report.append("=" * 80)
report.append(f"WORKFLOW METRICS ANALYSIS REPORT - Last {period}")
report.append("=" * 80)
report.append("")
# Overall statistics
report.append("## Overall Statistics")
report.append(f"Total Tasks: {len(metrics)}")
report.append(f"Success Rate: {sum(m['success'] for m in metrics) / len(metrics) * 100:.1f}%")
report.append(f"Avg Tokens: {statistics.mean(m['tokens_used'] for m in metrics):.0f}")
report.append(f"Avg Time: {statistics.mean(m['time_ms'] for m in metrics):.0f}ms")
report.append("")
# Token savings
savings = self.calculate_token_savings(metrics)
report.append("## Token Efficiency")
report.append(f"Actual Usage: {savings['total_actual']:,} tokens")
report.append(f"Unlimited Baseline: {savings['total_baseline']:,} tokens")
report.append(f"Total Savings: {savings['total_savings']:,} tokens ({savings['savings_percent']:.1f}%)")
report.append("")
# By task type
report.append("## Analysis by Task Type")
by_task = self.analyze_by_task_type(metrics)
for task_type, stats in sorted(by_task.items()):
report.append(f"\n### {task_type}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Avg Time: {stats['avg_time_ms']:.0f}ms")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append(f" Avg Files Read: {stats['avg_files_read']:.1f}")
report.append("")
# By complexity
report.append("## Analysis by Complexity")
by_complexity = self.analyze_by_complexity(metrics)
for complexity in ['ultra-light', 'light', 'medium', 'heavy', 'ultra-heavy']:
if complexity in by_complexity:
stats = by_complexity[complexity]
report.append(f"\n### {complexity}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append("")
# Best workflows
report.append("## Best Workflows per Task Type")
best = self.identify_best_workflows(metrics)
for task_type, workflow_id in sorted(best.items()):
report.append(f" {task_type}: {workflow_id}")
report.append("")
# Inefficiencies
inefficiencies = self.identify_inefficiencies(metrics)
if inefficiencies:
report.append("## Inefficiencies Detected")
report.append(f"Total Issues: {len(inefficiencies)}")
for issue in inefficiencies[:5]: # Show top 5
report.append(f"\n {issue['timestamp']}")
report.append(f" Task: {issue['task_type']} ({issue['complexity']})")
report.append(f" Workflow: {issue['workflow_id']}")
for problem in issue['issues']:
report.append(f" - {problem}")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description="Analyze workflow metrics")
parser.add_argument(
'--period',
choices=['week', 'month', 'all'],
default='week',
help='Analysis time period'
)
parser.add_argument(
'--task-type',
help='Filter by specific task type'
)
parser.add_argument(
'--output',
help='Output file (default: stdout)'
)
args = parser.parse_args()
# Find metrics file
metrics_file = Path('docs/memory/workflow_metrics.jsonl')
analyzer = WorkflowMetricsAnalyzer(metrics_file)
report = analyzer.generate_report(args.period)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to {args.output}")
else:
print(report)
if __name__ == '__main__':
main()