refactor: consolidate PM Agent optimization and pending changes

PM Agent optimization (already committed separately):
- superclaude/commands/pm.md: 1652→14 lines
- superclaude/agents/pm-agent.md: 735→429 lines
- docs/agents/pm-agent-guide.md: new guide file

Other pending changes:
- setup: framework_docs, mcp, logger, remove ui.py
- superclaude: __main__, cli/app, cli/commands/install
- tests: test_ui updates
- scripts: workflow metrics analysis tools
- docs/memory: session state updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
kazuki
2025-10-17 04:54:31 +09:00
parent d168278879
commit a4ffe52724
13 changed files with 1298 additions and 1247 deletions

309
scripts/ab_test_workflows.py Executable file
View File

@@ -0,0 +1,309 @@
#!/usr/bin/env python3
"""
A/B Testing Framework for Workflow Variants
Compares two workflow variants with statistical significance testing.
Usage:
python scripts/ab_test_workflows.py \\
--variant-a progressive_v3_layer2 \\
--variant-b experimental_eager_layer3 \\
--metric tokens_used
"""
import json
import argparse
from pathlib import Path
from typing import Dict, List, Tuple
import statistics
from scipy import stats
class ABTestAnalyzer:
"""A/B testing framework for workflow optimization"""
def __init__(self, metrics_file: Path):
self.metrics_file = metrics_file
self.metrics: List[Dict] = []
self._load_metrics()
def _load_metrics(self):
"""Load metrics from JSONL file"""
if not self.metrics_file.exists():
print(f"Error: {self.metrics_file} not found")
return
with open(self.metrics_file, 'r') as f:
for line in f:
if line.strip():
self.metrics.append(json.loads(line))
def get_variant_metrics(self, workflow_id: str) -> List[Dict]:
"""Get all metrics for a specific workflow variant"""
return [m for m in self.metrics if m['workflow_id'] == workflow_id]
def extract_metric_values(self, metrics: List[Dict], metric: str) -> List[float]:
"""Extract specific metric values from metrics list"""
values = []
for m in metrics:
if metric in m:
value = m[metric]
# Handle boolean metrics
if isinstance(value, bool):
value = 1.0 if value else 0.0
values.append(float(value))
return values
def calculate_statistics(self, values: List[float]) -> Dict:
"""Calculate statistical measures"""
if not values:
return {
'count': 0,
'mean': 0,
'median': 0,
'stdev': 0,
'min': 0,
'max': 0
}
return {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'stdev': statistics.stdev(values) if len(values) > 1 else 0,
'min': min(values),
'max': max(values)
}
def perform_ttest(
self,
variant_a_values: List[float],
variant_b_values: List[float]
) -> Tuple[float, float]:
"""
Perform independent t-test between two variants.
Returns:
(t_statistic, p_value)
"""
if len(variant_a_values) < 2 or len(variant_b_values) < 2:
return 0.0, 1.0 # Not enough data
t_stat, p_value = stats.ttest_ind(variant_a_values, variant_b_values)
return t_stat, p_value
def determine_winner(
self,
variant_a_stats: Dict,
variant_b_stats: Dict,
p_value: float,
metric: str,
lower_is_better: bool = True
) -> str:
"""
Determine winning variant based on statistics.
Args:
variant_a_stats: Statistics for variant A
variant_b_stats: Statistics for variant B
p_value: Statistical significance (p-value)
metric: Metric being compared
lower_is_better: True if lower values are better (e.g., tokens_used)
Returns:
Winner description
"""
# Require statistical significance (p < 0.05)
if p_value >= 0.05:
return "No significant difference (p ≥ 0.05)"
# Require minimum sample size (20 trials per variant)
if variant_a_stats['count'] < 20 or variant_b_stats['count'] < 20:
return f"Insufficient data (need 20 trials, have {variant_a_stats['count']}/{variant_b_stats['count']})"
# Compare means
a_mean = variant_a_stats['mean']
b_mean = variant_b_stats['mean']
if lower_is_better:
if a_mean < b_mean:
improvement = ((b_mean - a_mean) / b_mean) * 100
return f"Variant A wins ({improvement:.1f}% better)"
else:
improvement = ((a_mean - b_mean) / a_mean) * 100
return f"Variant B wins ({improvement:.1f}% better)"
else:
if a_mean > b_mean:
improvement = ((a_mean - b_mean) / b_mean) * 100
return f"Variant A wins ({improvement:.1f}% better)"
else:
improvement = ((b_mean - a_mean) / a_mean) * 100
return f"Variant B wins ({improvement:.1f}% better)"
def generate_recommendation(
self,
winner: str,
variant_a_stats: Dict,
variant_b_stats: Dict,
p_value: float
) -> str:
"""Generate actionable recommendation"""
if "No significant difference" in winner:
return "⚖️ Keep current workflow (no improvement detected)"
if "Insufficient data" in winner:
return "📊 Continue testing (need more trials)"
if "Variant A wins" in winner:
return "✅ Keep Variant A as standard (statistically better)"
if "Variant B wins" in winner:
if variant_b_stats['mean'] > variant_a_stats['mean'] * 0.8: # At least 20% better
return "🚀 Promote Variant B to standard (significant improvement)"
else:
return "⚠️ Marginal improvement - continue testing before promotion"
return "🤔 Manual review recommended"
def compare_variants(
self,
variant_a_id: str,
variant_b_id: str,
metric: str = 'tokens_used',
lower_is_better: bool = True
) -> str:
"""
Compare two workflow variants on a specific metric.
Args:
variant_a_id: Workflow ID for variant A
variant_b_id: Workflow ID for variant B
metric: Metric to compare (default: tokens_used)
lower_is_better: True if lower values are better
Returns:
Comparison report
"""
# Get metrics for each variant
variant_a_metrics = self.get_variant_metrics(variant_a_id)
variant_b_metrics = self.get_variant_metrics(variant_b_id)
if not variant_a_metrics:
return f"Error: No data for variant A ({variant_a_id})"
if not variant_b_metrics:
return f"Error: No data for variant B ({variant_b_id})"
# Extract metric values
a_values = self.extract_metric_values(variant_a_metrics, metric)
b_values = self.extract_metric_values(variant_b_metrics, metric)
# Calculate statistics
a_stats = self.calculate_statistics(a_values)
b_stats = self.calculate_statistics(b_values)
# Perform t-test
t_stat, p_value = self.perform_ttest(a_values, b_values)
# Determine winner
winner = self.determine_winner(a_stats, b_stats, p_value, metric, lower_is_better)
# Generate recommendation
recommendation = self.generate_recommendation(winner, a_stats, b_stats, p_value)
# Format report
report = []
report.append("=" * 80)
report.append("A/B TEST COMPARISON REPORT")
report.append("=" * 80)
report.append("")
report.append(f"Metric: {metric}")
report.append(f"Better: {'Lower' if lower_is_better else 'Higher'} values")
report.append("")
report.append(f"## Variant A: {variant_a_id}")
report.append(f" Trials: {a_stats['count']}")
report.append(f" Mean: {a_stats['mean']:.2f}")
report.append(f" Median: {a_stats['median']:.2f}")
report.append(f" Std Dev: {a_stats['stdev']:.2f}")
report.append(f" Range: {a_stats['min']:.2f} - {a_stats['max']:.2f}")
report.append("")
report.append(f"## Variant B: {variant_b_id}")
report.append(f" Trials: {b_stats['count']}")
report.append(f" Mean: {b_stats['mean']:.2f}")
report.append(f" Median: {b_stats['median']:.2f}")
report.append(f" Std Dev: {b_stats['stdev']:.2f}")
report.append(f" Range: {b_stats['min']:.2f} - {b_stats['max']:.2f}")
report.append("")
report.append("## Statistical Significance")
report.append(f" t-statistic: {t_stat:.4f}")
report.append(f" p-value: {p_value:.4f}")
if p_value < 0.01:
report.append(" Significance: *** (p < 0.01) - Highly significant")
elif p_value < 0.05:
report.append(" Significance: ** (p < 0.05) - Significant")
elif p_value < 0.10:
report.append(" Significance: * (p < 0.10) - Marginally significant")
else:
report.append(" Significance: n.s. (p ≥ 0.10) - Not significant")
report.append("")
report.append(f"## Result: {winner}")
report.append(f"## Recommendation: {recommendation}")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description="A/B test workflow variants")
parser.add_argument(
'--variant-a',
required=True,
help='Workflow ID for variant A'
)
parser.add_argument(
'--variant-b',
required=True,
help='Workflow ID for variant B'
)
parser.add_argument(
'--metric',
default='tokens_used',
help='Metric to compare (default: tokens_used)'
)
parser.add_argument(
'--higher-is-better',
action='store_true',
help='Higher values are better (default: lower is better)'
)
parser.add_argument(
'--output',
help='Output file (default: stdout)'
)
args = parser.parse_args()
# Find metrics file
metrics_file = Path('docs/memory/workflow_metrics.jsonl')
analyzer = ABTestAnalyzer(metrics_file)
report = analyzer.compare_variants(
args.variant_a,
args.variant_b,
args.metric,
lower_is_better=not args.higher_is_better
)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to {args.output}")
else:
print(report)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,331 @@
#!/usr/bin/env python3
"""
Workflow Metrics Analysis Script
Analyzes workflow_metrics.jsonl for continuous optimization and A/B testing.
Usage:
python scripts/analyze_workflow_metrics.py --period week
python scripts/analyze_workflow_metrics.py --period month
python scripts/analyze_workflow_metrics.py --task-type bug_fix
"""
import json
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import defaultdict
import statistics
class WorkflowMetricsAnalyzer:
"""Analyze workflow metrics for optimization"""
def __init__(self, metrics_file: Path):
self.metrics_file = metrics_file
self.metrics: List[Dict] = []
self._load_metrics()
def _load_metrics(self):
"""Load metrics from JSONL file"""
if not self.metrics_file.exists():
print(f"Warning: {self.metrics_file} not found")
return
with open(self.metrics_file, 'r') as f:
for line in f:
if line.strip():
self.metrics.append(json.loads(line))
print(f"Loaded {len(self.metrics)} metric records")
def filter_by_period(self, period: str) -> List[Dict]:
"""Filter metrics by time period"""
now = datetime.now()
if period == "week":
cutoff = now - timedelta(days=7)
elif period == "month":
cutoff = now - timedelta(days=30)
elif period == "all":
return self.metrics
else:
raise ValueError(f"Invalid period: {period}")
filtered = [
m for m in self.metrics
if datetime.fromisoformat(m['timestamp']) >= cutoff
]
print(f"Filtered to {len(filtered)} records in last {period}")
return filtered
def analyze_by_task_type(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by task type"""
by_task = defaultdict(list)
for m in metrics:
by_task[m['task_type']].append(m)
results = {}
for task_type, task_metrics in by_task.items():
results[task_type] = {
'count': len(task_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in task_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in task_metrics),
'success_rate': sum(m['success'] for m in task_metrics) / len(task_metrics) * 100,
'avg_files_read': statistics.mean(m.get('files_read', 0) for m in task_metrics),
}
return results
def analyze_by_complexity(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by complexity level"""
by_complexity = defaultdict(list)
for m in metrics:
by_complexity[m['complexity']].append(m)
results = {}
for complexity, comp_metrics in by_complexity.items():
results[complexity] = {
'count': len(comp_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in comp_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in comp_metrics),
'success_rate': sum(m['success'] for m in comp_metrics) / len(comp_metrics) * 100,
}
return results
def analyze_by_workflow(self, metrics: List[Dict]) -> Dict:
"""Analyze metrics grouped by workflow variant"""
by_workflow = defaultdict(list)
for m in metrics:
by_workflow[m['workflow_id']].append(m)
results = {}
for workflow_id, wf_metrics in by_workflow.items():
results[workflow_id] = {
'count': len(wf_metrics),
'avg_tokens': statistics.mean(m['tokens_used'] for m in wf_metrics),
'median_tokens': statistics.median(m['tokens_used'] for m in wf_metrics),
'avg_time_ms': statistics.mean(m['time_ms'] for m in wf_metrics),
'success_rate': sum(m['success'] for m in wf_metrics) / len(wf_metrics) * 100,
}
return results
def identify_best_workflows(self, metrics: List[Dict]) -> Dict[str, str]:
"""Identify best workflow for each task type"""
by_task_workflow = defaultdict(lambda: defaultdict(list))
for m in metrics:
by_task_workflow[m['task_type']][m['workflow_id']].append(m)
best_workflows = {}
for task_type, workflows in by_task_workflow.items():
best_workflow = None
best_score = float('inf')
for workflow_id, wf_metrics in workflows.items():
# Score = avg_tokens (lower is better)
avg_tokens = statistics.mean(m['tokens_used'] for m in wf_metrics)
success_rate = sum(m['success'] for m in wf_metrics) / len(wf_metrics)
# Only consider if success rate >= 95%
if success_rate >= 0.95:
if avg_tokens < best_score:
best_score = avg_tokens
best_workflow = workflow_id
if best_workflow:
best_workflows[task_type] = best_workflow
return best_workflows
def identify_inefficiencies(self, metrics: List[Dict]) -> List[Dict]:
"""Identify inefficient patterns"""
inefficiencies = []
# Expected token budgets by complexity
budgets = {
'ultra-light': 800,
'light': 2000,
'medium': 5000,
'heavy': 20000,
'ultra-heavy': 50000
}
for m in metrics:
issues = []
# Check token budget overrun
expected_budget = budgets.get(m['complexity'], 5000)
if m['tokens_used'] > expected_budget * 1.3: # 30% over budget
issues.append(f"Token overrun: {m['tokens_used']} vs {expected_budget}")
# Check success rate
if not m['success']:
issues.append("Task failed")
# Check time performance (light tasks should be fast)
if m['complexity'] in ['ultra-light', 'light'] and m['time_ms'] > 10000:
issues.append(f"Slow execution: {m['time_ms']}ms for {m['complexity']} task")
if issues:
inefficiencies.append({
'timestamp': m['timestamp'],
'task_type': m['task_type'],
'complexity': m['complexity'],
'workflow_id': m['workflow_id'],
'issues': issues
})
return inefficiencies
def calculate_token_savings(self, metrics: List[Dict]) -> Dict:
"""Calculate token savings vs unlimited baseline"""
# Unlimited baseline estimates
baseline = {
'ultra-light': 1000,
'light': 2500,
'medium': 7500,
'heavy': 30000,
'ultra-heavy': 100000
}
total_actual = 0
total_baseline = 0
for m in metrics:
total_actual += m['tokens_used']
total_baseline += baseline.get(m['complexity'], 7500)
savings = total_baseline - total_actual
savings_percent = (savings / total_baseline * 100) if total_baseline > 0 else 0
return {
'total_actual': total_actual,
'total_baseline': total_baseline,
'total_savings': savings,
'savings_percent': savings_percent
}
def generate_report(self, period: str) -> str:
"""Generate comprehensive analysis report"""
metrics = self.filter_by_period(period)
if not metrics:
return "No metrics available for analysis"
report = []
report.append("=" * 80)
report.append(f"WORKFLOW METRICS ANALYSIS REPORT - Last {period}")
report.append("=" * 80)
report.append("")
# Overall statistics
report.append("## Overall Statistics")
report.append(f"Total Tasks: {len(metrics)}")
report.append(f"Success Rate: {sum(m['success'] for m in metrics) / len(metrics) * 100:.1f}%")
report.append(f"Avg Tokens: {statistics.mean(m['tokens_used'] for m in metrics):.0f}")
report.append(f"Avg Time: {statistics.mean(m['time_ms'] for m in metrics):.0f}ms")
report.append("")
# Token savings
savings = self.calculate_token_savings(metrics)
report.append("## Token Efficiency")
report.append(f"Actual Usage: {savings['total_actual']:,} tokens")
report.append(f"Unlimited Baseline: {savings['total_baseline']:,} tokens")
report.append(f"Total Savings: {savings['total_savings']:,} tokens ({savings['savings_percent']:.1f}%)")
report.append("")
# By task type
report.append("## Analysis by Task Type")
by_task = self.analyze_by_task_type(metrics)
for task_type, stats in sorted(by_task.items()):
report.append(f"\n### {task_type}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Avg Time: {stats['avg_time_ms']:.0f}ms")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append(f" Avg Files Read: {stats['avg_files_read']:.1f}")
report.append("")
# By complexity
report.append("## Analysis by Complexity")
by_complexity = self.analyze_by_complexity(metrics)
for complexity in ['ultra-light', 'light', 'medium', 'heavy', 'ultra-heavy']:
if complexity in by_complexity:
stats = by_complexity[complexity]
report.append(f"\n### {complexity}")
report.append(f" Count: {stats['count']}")
report.append(f" Avg Tokens: {stats['avg_tokens']:.0f}")
report.append(f" Success Rate: {stats['success_rate']:.1f}%")
report.append("")
# Best workflows
report.append("## Best Workflows per Task Type")
best = self.identify_best_workflows(metrics)
for task_type, workflow_id in sorted(best.items()):
report.append(f" {task_type}: {workflow_id}")
report.append("")
# Inefficiencies
inefficiencies = self.identify_inefficiencies(metrics)
if inefficiencies:
report.append("## Inefficiencies Detected")
report.append(f"Total Issues: {len(inefficiencies)}")
for issue in inefficiencies[:5]: # Show top 5
report.append(f"\n {issue['timestamp']}")
report.append(f" Task: {issue['task_type']} ({issue['complexity']})")
report.append(f" Workflow: {issue['workflow_id']}")
for problem in issue['issues']:
report.append(f" - {problem}")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description="Analyze workflow metrics")
parser.add_argument(
'--period',
choices=['week', 'month', 'all'],
default='week',
help='Analysis time period'
)
parser.add_argument(
'--task-type',
help='Filter by specific task type'
)
parser.add_argument(
'--output',
help='Output file (default: stdout)'
)
args = parser.parse_args()
# Find metrics file
metrics_file = Path('docs/memory/workflow_metrics.jsonl')
analyzer = WorkflowMetricsAnalyzer(metrics_file)
report = analyzer.generate_report(args.period)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to {args.output}")
else:
print(report)
if __name__ == '__main__':
main()