withParts=append(withParts,fmt.Sprintf("%d AS contains_error_t%d",step.ContainsError,i+1))
withParts=append(withParts,fmt.Sprintf("('%s','%s') AS step%d",step.ServiceName,step.SpanName,i+1))
}
// Build SELECT fields for each step time
selectFields:=[]string{"trace_id"}
fori:=0;i<numSteps;i++{
selectFields=append(selectFields,fmt.Sprintf("minIf(timestamp, resource_string_service$$name = step%d.1 AND name = step%d.2) AS t%d_time",i+1,i+1,i+1))
}
// Build WHERE conditions
whereConditions:=[]string{"timestamp BETWEEN start_ts AND end_ts"}
orConditions:=[]string{}
fori,step:=rangesteps{
condition:=fmt.Sprintf("(resource_string_service$$name = step%d.1 AND name = step%d.2 AND (contains_error_t%d = 0 OR has_error = true) %s)",
i+1,i+1,i+1,step.Clause)
orConditions=append(orConditions,condition)
}
iflen(orConditions)>0{
whereConditions=append(whereConditions,fmt.Sprintf("(%s)",strings.Join(orConditions," OR ")))
fmt.Sprintf("minIf(timestamp, resource_string_service$$name = step%d.1 AND name = step%d.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step%d.1 AND name = step%d.2)) AS t%d_time",i+1,i+1,i+1,i+1,i+1))
}else{
funnelSelectFields=append(funnelSelectFields,
fmt.Sprintf("minIf(timestamp, resource_string_service$$name = step%d.1 AND name = step%d.2) AS t%d_time",i+1,i+1,i+1))
}
funnelSelectFields=append(funnelSelectFields,
fmt.Sprintf("toUInt8(anyIf(has_error, resource_string_service$$name = step%d.1 AND name = step%d.2)) AS s%d_error",i+1,i+1,i+1))
}
// Build WHERE conditions
whereConditions:=[]string{"timestamp BETWEEN start_ts AND end_ts"}
orConditions:=[]string{}
fori,step:=rangesteps{
condition:=fmt.Sprintf("(resource_string_service$$name = step%d.1 AND name = step%d.2 AND (contains_error_t%d = 0 OR has_error = true) %s)",
i+1,i+1,i+1,step.Clause)
orConditions=append(orConditions,condition)
}
iflen(orConditions)>0{
whereConditions=append(whereConditions,fmt.Sprintf("(%s)",strings.Join(orConditions," OR ")))
}
// Build HAVING conditions for temporal ordering
havingConditions:=[]string{"t1_time > 0"}
// Build conversion count fields
conversionFields:=[]string{"count(DISTINCT trace_id) AS total_s1_spans"}
// For each subsequent step, add conversion counts with proper temporal conditions
fori:=1;i<numSteps;i++{
// Build condition for this step (all previous steps must be in order)
// Return a fallback query for invalid step ranges
return`SELECT 0 AS conversion_rate, 0 AS avg_rate, 0 AS errors, 0 AS avg_duration, 0 AS latency;`
}
// Convert to 0-based indices
startIdx:=int(stepStart-1)
endIdx:=int(stepEnd-1)
// Build WITH clause
withParts:=[]string{
fmt.Sprintf("toDateTime64(%d / 1e9, 9) AS start_ts",startTs),
fmt.Sprintf("toDateTime64(%d / 1e9, 9) AS end_ts",endTs),
fmt.Sprintf("(%d - %d) / 1e9 AS time_window_sec",endTs,startTs),
}
// Add contains_error and step definitions for all steps
fori,step:=rangesteps{
withParts=append(withParts,fmt.Sprintf("%d AS contains_error_t%d",step.ContainsError,i+1))
withParts=append(withParts,fmt.Sprintf("('%s','%s') AS step%d",step.ServiceName,step.SpanName,i+1))
}
// Build funnel CTE select fields
funnelSelectFields:=[]string{"trace_id"}
fori:=0;i<numSteps;i++{
// Check if latency_pointer is 'end' for this step
ifsteps[i].LatencyPointer=="end"{
funnelSelectFields=append(funnelSelectFields,
fmt.Sprintf("minIf(timestamp, resource_string_service$$name = step%d.1 AND name = step%d.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step%d.1 AND name = step%d.2)) AS t%d_time",i+1,i+1,i+1,i+1,i+1))
}else{
funnelSelectFields=append(funnelSelectFields,
fmt.Sprintf("minIf(timestamp, resource_string_service$$name = step%d.1 AND name = step%d.2) AS t%d_time",i+1,i+1,i+1))
}
funnelSelectFields=append(funnelSelectFields,
fmt.Sprintf("toUInt8(anyIf(has_error, resource_string_service$$name = step%d.1 AND name = step%d.2)) AS s%d_error",i+1,i+1,i+1))
}
// Build WHERE conditions
whereConditions:=[]string{"timestamp BETWEEN start_ts AND end_ts"}
orConditions:=[]string{}
fori,step:=rangesteps{
condition:=fmt.Sprintf("(resource_string_service$$name = step%d.1 AND name = step%d.2 AND (contains_error_t%d = 0 OR has_error = true) %s)",
i+1,i+1,i+1,step.Clause)
orConditions=append(orConditions,condition)
}
iflen(orConditions)>0{
whereConditions=append(whereConditions,fmt.Sprintf("(%s)",strings.Join(orConditions," OR ")))
// Build time expressions based on latency pointers
t1TimeExpr:="minIf(timestamp, resource_string_service$$name = step1.1 AND name = step1.2)"
iflatencyPointerT1=="end"{
t1TimeExpr="minIf(timestamp, resource_string_service$$name = step1.1 AND name = step1.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step1.1 AND name = step1.2))"
}
t2TimeExpr:="minIf(timestamp, resource_string_service$$name = step2.1 AND name = step2.2)"
iflatencyPointerT2=="end"{
t2TimeExpr="minIf(timestamp, resource_string_service$$name = step2.1 AND name = step2.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step2.1 AND name = step2.2))"
// Build time expressions based on latency pointers
t1TimeExpr:="minIf(timestamp, resource_string_service$$name = step1.1 AND name = step1.2)"
iflatencyPointerT1=="end"{
t1TimeExpr="minIf(timestamp, resource_string_service$$name = step1.1 AND name = step1.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step1.1 AND name = step1.2))"
}
t2TimeExpr:="minIf(timestamp, resource_string_service$$name = step2.1 AND name = step2.2)"
iflatencyPointerT2=="end"{
t2TimeExpr="minIf(timestamp, resource_string_service$$name = step2.1 AND name = step2.2) + toIntervalNanosecond(minIf(duration_nano, resource_string_service$$name = step2.1 AND name = step2.2))"