fix: backedn query fixes

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
This commit is contained in:
Shivanshu Raj Shrivastava 2025-05-05 05:57:04 +05:30
parent 73e60f298d
commit 7e9e31a8c7
No known key found for this signature in database
GPG Key ID: D34D26C62AC3E9AE
3 changed files with 274 additions and 58 deletions

View File

@ -43,6 +43,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2: first span
@ -58,6 +59,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join steps to validate funnel ordering
@ -150,6 +152,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
@ -165,6 +168,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 3
@ -180,6 +184,7 @@ WITH
AND (contains_error_t3 = 0 OR has_error = true)
-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join steps to validate funnel ordering
@ -278,6 +283,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT clause_step1 HERE IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
@ -294,6 +300,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT clause_step2 HERE IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join steps
@ -302,29 +309,20 @@ WITH
s1.trace_id,
s1.first_time AS t1_time,
s2.first_time AS t2_time,
s1.has_error_flag AS t1_has_error,
s2.has_error_flag AS t2_has_error
s1.has_error_flag AS s1_has_error,
s2.has_error_flag AS s2_has_error
FROM step1 s1
INNER JOIN step2 s2 ON s1.trace_id = s2.trace_id
WHERE s2.first_time > s1.first_time
)
-- Final metrics
, final AS (
SELECT
trace_id,
abs(CAST(t2_time AS Decimal(20, 9)) - CAST(t1_time AS Decimal(20, 9))) AS duration_nanos,
t1_has_error,
t2_has_error
FROM joined
)
SELECT
round(count() / (SELECT count() FROM step1) * 100, 2) AS conversion_rate,
count() / time_window_sec AS avg_rate,
countIf(t1_has_error OR t2_has_error) AS errors,
avg(duration_nanos * 1000) AS avg_duration,
quantile(0.99)(duration_nanos * 1000) AS p99_latency
FROM final;`
round((count(DISTINCT trace_id) * 100.0) / (SELECT count(DISTINCT trace_id) FROM step1), 2) AS conversion_rate,
count(DISTINCT trace_id) / time_window_sec AS avg_rate,
greatest(countIf(s1_has_error), countIf(s2_has_error)) AS errors,
avg(abs(CAST(t2_time AS Decimal(20, 9)) - CAST(t1_time AS Decimal(20, 9))) * 1000) AS avg_duration,
quantile(0.99)(abs(CAST(t2_time AS Decimal(20, 9)) - CAST(t1_time AS Decimal(20, 9))) * 1000) AS p99_latency
FROM joined;`
query := fmt.Sprintf(queryTemplate,
containsErrorT1,
@ -408,6 +406,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
@ -424,6 +423,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 3
@ -440,32 +440,206 @@ WITH
AND (contains_error_t3 = 0 OR has_error = true)
-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join steps and apply ordering
, joined AS (
-- Join T1 and T2
, joined_t2 AS (
SELECT
s1.trace_id,
s1.first_time AS t1_time,
s2.first_time AS t2_time,
s3.first_time AS t3_time,
s1.has_error_flag AS t1_has_error,
s2.has_error_flag AS t2_has_error,
s3.has_error_flag AS t3_has_error
s1.has_error_flag AS s1_has_error,
s2.has_error_flag AS s2_has_error
FROM step1 s1
INNER JOIN step2 s2 ON s1.trace_id = s2.trace_id
INNER JOIN step3 s3 ON s1.trace_id = s3.trace_id
WHERE s2.first_time > s1.first_time AND s3.first_time > s2.first_time
WHERE s2.first_time > s1.first_time
)
-- Join with T3 for complete funnel
, joined_t3 AS (
SELECT
j2.trace_id,
j2.t1_time,
j2.t2_time,
s3.first_time AS t3_time,
j2.s1_has_error,
j2.s2_has_error,
s3.has_error_flag AS s3_has_error
FROM joined_t2 j2
INNER JOIN step3 s3 ON j2.trace_id = s3.trace_id
WHERE s3.first_time > j2.t2_time
)
-- Final metrics
SELECT
round(count() / (SELECT count() FROM step1) * 100, 2) AS conversion_rate,
count() / time_window_sec AS avg_rate,
countIf(t1_has_error OR t2_has_error OR t3_has_error) AS errors,
round((count(DISTINCT joined_t3.trace_id) * 100.0) / (SELECT count(DISTINCT trace_id) FROM step1), 2) AS conversion_rate,
count(DISTINCT joined_t3.trace_id) / time_window_sec AS avg_rate,
greatest(countIf(s1_has_error), countIf(s2_has_error), countIf(s3_has_error)) AS errors,
avg(abs(CAST(t3_time AS Decimal(20, 9)) - CAST(t1_time AS Decimal(20, 9))) * 1000) AS avg_duration,
quantile(0.99)(abs(CAST(t3_time AS Decimal(20, 9)) - CAST(t1_time AS Decimal(20, 9))) * 1000) AS p99_latency
FROM joined;`
FROM joined_t3;`
query := fmt.Sprintf(queryTemplate,
containsErrorT1,
containsErrorT2,
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
startTs,
endTs,
serviceNameT1,
spanNameT1,
serviceNameT2,
spanNameT2,
serviceNameT3,
spanNameT3,
)
if clauseStep1 != "" {
query = strings.Replace(query, "-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>", "AND "+clauseStep1, 1)
} else {
query = strings.Replace(query, "-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>", "", 1)
}
if clauseStep2 != "" {
query = strings.Replace(query, "-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>", "AND "+clauseStep2, 1)
} else {
query = strings.Replace(query, "-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>", "", 1)
}
if clauseStep3 != "" {
query = strings.Replace(query, "-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>", "AND "+clauseStep3, 1)
} else {
query = strings.Replace(query, "-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>", "", 1)
}
return query
}
func BuildFunnelStepFunnelOverviewQuery(
containsErrorT1 int,
containsErrorT2 int,
containsErrorT3 int,
latencyPointerT1 string,
latencyPointerT2 string,
latencyPointerT3 string,
startTs int64,
endTs int64,
serviceNameT1 string,
spanNameT1 string,
serviceNameT2 string,
spanNameT2 string,
serviceNameT3 string,
spanNameT3 string,
clauseStep1 string,
clauseStep2 string,
clauseStep3 string,
) string {
queryTemplate := `
WITH
%[1]d AS contains_error_t1,
%[2]d AS contains_error_t2,
%[3]d AS contains_error_t3,
'%[4]s' AS latency_pointer_t1,
'%[5]s' AS latency_pointer_t2,
'%[6]s' AS latency_pointer_t3,
toDateTime64(%[7]d / 1000000000, 9) AS start_ts,
toDateTime64(%[8]d / 1000000000, 9) AS end_ts,
(%[8]d - %[7]d) / 1000000000 AS time_window_sec,
'%[9]s' AS service_name_t1,
'%[10]s' AS span_name_t1,
'%[11]s' AS service_name_t2,
'%[12]s' AS span_name_t2,
'%[13]s' AS service_name_t3,
'%[14]s' AS span_name_t3
-- Step 1
, step1 AS (
SELECT
trace_id,
argMin(timestamp, timestamp) AS first_time,
argMin(has_error, timestamp) AS has_error_flag
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND serviceName = service_name_t1
AND name = span_name_t1
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
, step2 AS (
SELECT
trace_id,
argMin(timestamp, timestamp) AS first_time,
argMin(has_error, timestamp) AS has_error_flag
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND serviceName = service_name_t2
AND name = span_name_t2
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 3
, step3 AS (
SELECT
trace_id,
argMin(timestamp, timestamp) AS first_time,
argMin(has_error, timestamp) AS has_error_flag
FROM signoz_traces.signoz_index_v3
WHERE
timestamp BETWEEN start_ts AND end_ts
AND serviceName = service_name_t3
AND name = span_name_t3
AND (contains_error_t3 = 0 OR has_error = true)
-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join T1 and T2
, joined_t2 AS (
SELECT
s1.trace_id,
s1.first_time AS t1_time,
s2.first_time AS t2_time,
s2.has_error_flag AS s2_has_error
FROM step1 s1
INNER JOIN step2 s2 ON s1.trace_id = s2.trace_id
WHERE s2.first_time > s1.first_time
)
-- Join with T3 for complete funnel
, joined_t3 AS (
SELECT
j2.trace_id,
j2.t2_time,
s3.first_time AS t3_time,
j2.s2_has_error,
s3.has_error_flag AS s3_has_error
FROM joined_t2 j2
INNER JOIN step3 s3 ON j2.trace_id = s3.trace_id
WHERE s3.first_time > j2.t2_time
)
SELECT
round((count(DISTINCT joined_t3.trace_id) * 100.0) / (SELECT count(DISTINCT trace_id) FROM joined_t2), 2) AS conversion_rate,
count(DISTINCT joined_t3.trace_id) / time_window_sec AS avg_rate,
greatest(countIf(s2_has_error), countIf(s3_has_error)) AS errors,
avg(abs(CAST(t3_time AS Decimal(20, 9)) - CAST(t2_time AS Decimal(20, 9))) * 1000) AS avg_duration,
quantile(0.99)(abs(CAST(t3_time AS Decimal(20, 9)) - CAST(t2_time AS Decimal(20, 9))) * 1000) AS p99_latency
FROM joined_t3;`
query := fmt.Sprintf(queryTemplate,
containsErrorT1,
@ -544,6 +718,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
@ -560,6 +735,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join T1 and T2 and apply ordering
@ -655,6 +831,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 2
@ -671,6 +848,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Step 3
@ -687,6 +865,7 @@ WITH
AND (contains_error_t3 = 0 OR has_error = true)
-- <<< INJECT AND clause_step3 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
)
-- Join T1 and T2
@ -798,6 +977,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
), step1 AS (
SELECT s1.trace_id, s1.timestamp AS first_time
FROM signoz_traces.signoz_index_v3 s1
@ -815,6 +995,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
), step2 AS (
SELECT s2.trace_id, s2.timestamp AS first_time
FROM signoz_traces.signoz_index_v3 s2
@ -920,6 +1101,7 @@ WITH
AND (contains_error_t1 = 0 OR has_error = true)
-- <<< INJECT AND clause_step1 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
), step1 AS (
SELECT s1.trace_id, s1.timestamp AS first_time, s1.has_error AS has_error_flag
FROM signoz_traces.signoz_index_v3 s1
@ -937,6 +1119,7 @@ WITH
AND (contains_error_t2 = 0 OR has_error = true)
-- <<< INJECT AND clause_step2 IN GO IF NOT EMPTY >>>
GROUP BY trace_id
LIMIT 100000
), step2 AS (
SELECT s2.trace_id, s2.timestamp AS first_time, s2.has_error AS has_error_flag
FROM signoz_traces.signoz_index_v3 s2

View File

@ -22,7 +22,7 @@ func ValidateTraces(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange)
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if funnelSteps[2].HasErrors {
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
@ -63,7 +63,7 @@ func ValidateTraces(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange)
}, nil
}
func GetFunnelAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange, stepStart, stepEnd int64) (*v3.ClickHouseQuery, error) {
func GetFunnelAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
var query string
funnelSteps := funnel.Steps
@ -82,7 +82,7 @@ func GetFunnelAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRa
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if funnelSteps[2].HasErrors {
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
@ -120,16 +120,14 @@ func GetFunnelAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRa
//"db_operation = 'SELECT'", // clauseStep3
)
} else {
if stepStart != stepEnd {
stepStartOrder = int(stepStart) - 1
stepEndOrder = int(stepEnd) - 1
if funnelSteps[stepStartOrder].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[stepEndOrder].HasErrors {
containsErrorT2 = 1
}
if funnelSteps[stepStartOrder].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[stepEndOrder].HasErrors {
containsErrorT2 = 1
}
query = BuildTwoStepFunnelOverviewQuery(
containsErrorT1, // containsErrorT1
containsErrorT2, // containsErrorT2
@ -154,11 +152,23 @@ func GetFunnelStepAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.Ti
funnelSteps := funnel.Steps
containsErrorT1 := 0
containsErrorT2 := 0
containsErrorT3 := 0
latencyPointerT1 := "start"
latencyPointerT2 := "start"
latencyPointerT3 := "start"
stepStartOrder := 0
stepEndOrder := 1
if funnelSteps[0].HasErrors {
containsErrorT1 = 1
}
if funnelSteps[1].HasErrors {
containsErrorT2 = 1
}
if len(funnel.Steps) > 2 && funnelSteps[2].HasErrors {
containsErrorT3 = 1
}
if stepStart != stepEnd {
stepStartOrder = int(stepStart) - 1
stepEndOrder = int(stepEnd) - 1
@ -178,23 +188,46 @@ func GetFunnelStepAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.Ti
latencyPointerT1 = "end"
}
if funnelSteps[stepEndOrder].LatencyPointer != "" {
latencyPointerT1 = "end"
latencyPointerT2 = "end"
}
}
query = BuildTwoStepFunnelOverviewQuery(
containsErrorT1, // containsErrorT1
containsErrorT2, // containsErrorT2
latencyPointerT1,
latencyPointerT2,
timeRange.StartTime, // startTs
timeRange.EndTime, // endTs
funnelSteps[stepStartOrder].ServiceName, // serviceNameT1
funnelSteps[stepStartOrder].SpanName, // spanNameT1
funnelSteps[stepEndOrder].ServiceName, // serviceNameT1
funnelSteps[stepEndOrder].SpanName, // spanNameT2
"",
"",
)
if stepStart == 2 {
query = BuildFunnelStepFunnelOverviewQuery(
containsErrorT1, // containsErrorT1
containsErrorT2, // containsErrorT2
containsErrorT3,
latencyPointerT1,
latencyPointerT2,
latencyPointerT3,
timeRange.StartTime, // startTs
timeRange.EndTime, // endTs
funnelSteps[0].ServiceName,
funnelSteps[0].SpanName,
funnelSteps[stepStartOrder].ServiceName, // serviceNameT1
funnelSteps[stepStartOrder].SpanName, // spanNameT1
funnelSteps[stepEndOrder].ServiceName, // serviceNameT1
funnelSteps[stepEndOrder].SpanName, // spanNameT2
"",
"",
"",
)
} else {
query = BuildTwoStepFunnelOverviewQuery(
containsErrorT1, // containsErrorT1
containsErrorT2, // containsErrorT2
latencyPointerT1,
latencyPointerT2,
timeRange.StartTime, // startTs
timeRange.EndTime, // endTs
funnelSteps[stepStartOrder].ServiceName, // serviceNameT1
funnelSteps[stepStartOrder].SpanName, // spanNameT1
funnelSteps[stepEndOrder].ServiceName, // serviceNameT1
funnelSteps[stepEndOrder].SpanName, // spanNameT2
"",
"",
)
}
return &v3.ClickHouseQuery{Query: query}, nil
}

View File

@ -5217,7 +5217,7 @@ func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Reque
return
}
chq, err := tracefunnels.GetFunnelAnalytics(funnel, stepTransition.TimeRange, stepTransition.StepAOrder, stepTransition.StepBOrder)
chq, err := tracefunnels.GetFunnelAnalytics(funnel, stepTransition.TimeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return