signoz/pkg/querier/postprocess_gaps.go
2025-06-12 16:50:10 +05:30

703 lines
18 KiB
Go

package querier
import (
"fmt"
"math"
"sort"
"strings"
"time"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// fillGaps fills missing data points with zeros in time series data
func (q *querier) fillGaps(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
// Only fill gaps for time series data
if req.RequestType != qbtypes.RequestTypeTimeSeries {
return results
}
// Get the step interval from the first query
var step int64 = 60000 // Default to 1 minute in milliseconds
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.StepInterval.Duration > 0 {
step = int64(spec.StepInterval.Duration) / int64(time.Millisecond)
break
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.StepInterval.Duration > 0 {
step = int64(spec.StepInterval.Duration) / int64(time.Millisecond)
break
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.StepInterval.Duration > 0 {
step = int64(spec.StepInterval.Duration) / int64(time.Millisecond)
break
}
}
}
startMs := int64(req.Start)
endMs := int64(req.End)
for name, result := range results {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok || tsData == nil {
continue
}
// If no aggregations, create an empty one
if len(tsData.Aggregations) == 0 {
tsData.Aggregations = []*qbtypes.AggregationBucket{
{
Index: 0,
Series: []*qbtypes.TimeSeries{
{
Labels: []*qbtypes.Label{},
Values: fillGapForSeries(nil, startMs, endMs, step),
},
},
},
}
continue
}
// Fill gaps for each series
for _, agg := range tsData.Aggregations {
if len(agg.Series) == 0 {
// Create empty series if none exist
agg.Series = []*qbtypes.TimeSeries{
{
Labels: []*qbtypes.Label{},
Values: fillGapForSeries(nil, startMs, endMs, step),
},
}
} else {
// Fill gaps for existing series
for _, series := range agg.Series {
series.Values = fillGapForSeries(series.Values, startMs, endMs, step)
}
}
}
results[name] = result
}
return results
}
// fillGapForSeries fills gaps in a single time series
func fillGapForSeries(values []*qbtypes.TimeSeriesValue, startMs, endMs, step int64) []*qbtypes.TimeSeriesValue {
// Safeguard against invalid step
if step <= 0 {
step = 60000 // Default to 1 minute
}
// Create a map of existing values
valueMap := make(map[int64]float64)
for _, v := range values {
if v != nil && !v.Partial {
valueMap[v.Timestamp] = v.Value
}
}
// Generate all expected timestamps
var filledValues []*qbtypes.TimeSeriesValue
for ts := startMs; ts <= endMs; ts += step {
value := 0.0
if v, ok := valueMap[ts]; ok {
value = v
}
filledValues = append(filledValues, &qbtypes.TimeSeriesValue{
Timestamp: ts,
Value: value,
})
}
return filledValues
}
// formatScalarResultsAsTable formats scalar results as a table for UI display
func (q *querier) formatScalarResultsAsTable(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]any {
if len(results) == 0 {
return map[string]any{"table": &qbtypes.ScalarData{}}
}
// Convert all results to ScalarData first
for name, result := range results {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
// Convert TimeSeriesData to ScalarData
columns := []*qbtypes.ColumnDescriptor{}
data := [][]any{}
// Extract group columns from labels
if len(tsData.Aggregations) > 0 && len(tsData.Aggregations[0].Series) > 0 {
// Get group columns from the first series
for _, label := range tsData.Aggregations[0].Series[0].Labels {
col := &qbtypes.ColumnDescriptor{
TelemetryFieldKey: label.Key,
QueryName: name,
Type: qbtypes.ColumnTypeGroup,
}
// Ensure Name is set
if col.Name == "" {
col.Name = label.Key.Name
}
columns = append(columns, col)
}
}
// Add aggregation columns
for _, agg := range tsData.Aggregations {
col := &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: agg.Alias,
},
QueryName: name,
AggregationIndex: int64(agg.Index),
Meta: agg.Meta,
Type: qbtypes.ColumnTypeAggregation,
}
if col.Name == "" {
col.Name = fmt.Sprintf("__result_%d", agg.Index)
}
columns = append(columns, col)
}
// Convert series to rows
for seriesIdx, series := range tsData.Aggregations[0].Series {
row := make([]any, len(columns))
colIdx := 0
// Add group values
for _, label := range series.Labels {
row[colIdx] = label.Value
colIdx++
}
// Add aggregation values (last value from each aggregation)
for _, agg := range tsData.Aggregations {
if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 {
value := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value
row[colIdx] = roundToTwoDecimal(value)
} else {
row[colIdx] = 0.0
}
colIdx++
}
data = append(data, row)
}
results[name] = &qbtypes.Result{
Value: &qbtypes.ScalarData{
Columns: columns,
Data: data,
},
}
}
}
// Check if we have a single result that already contains all columns from multiple queries
// This happens when the SQL query already joins multiple queries
if len(results) == 1 {
for queryName, result := range results {
if scalarData, ok := result.Value.(*qbtypes.ScalarData); ok {
// Check if this result already has columns from multiple queries
queryNamesInColumns := make(map[string]bool)
for _, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" {
queryNamesInColumns[col.QueryName] = true
}
}
// Debug: log what we found
if q.logger != nil {
q.logger.Debug("Single result analysis",
"queryNamesInColumns", queryNamesInColumns,
"num_columns", len(scalarData.Columns),
"num_rows", len(scalarData.Data))
}
// If we have columns from multiple queries, we need to deduplicate rows
if len(queryNamesInColumns) > 1 {
if q.logger != nil {
q.logger.Debug("Deduplicating scalar rows")
}
deduplicatedResult := q.deduplicateScalarRows(scalarData)
// Return the deduplicated result under the original query name
return map[string]any{queryName: deduplicatedResult["table"]}
}
}
}
}
// Now merge all ScalarData results
// First, collect all unique group columns
groupColumnMap := make(map[string]*qbtypes.ColumnDescriptor)
groupColumnOrder := []string{}
for _, result := range results {
if scalarData, ok := result.Value.(*qbtypes.ScalarData); ok {
for _, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
if _, exists := groupColumnMap[col.Name]; !exists {
groupColumnMap[col.Name] = col
groupColumnOrder = append(groupColumnOrder, col.Name)
if q.logger != nil {
q.logger.Debug("Found group column", "name", col.Name)
}
}
}
}
}
}
// Debug: log the group columns we found
if q.logger != nil {
q.logger.Debug("Group columns collected",
"groupColumnOrder", groupColumnOrder,
"num_group_columns", len(groupColumnOrder))
}
// Build final columns
mergedColumns := []*qbtypes.ColumnDescriptor{}
// Add group columns
for _, colName := range groupColumnOrder {
mergedColumns = append(mergedColumns, groupColumnMap[colName])
}
// Add aggregation columns from each query
queryNames := []string{}
for name := range results {
queryNames = append(queryNames, name)
}
sort.Strings(queryNames)
for _, queryName := range queryNames {
result := results[queryName]
if scalarData, ok := result.Value.(*qbtypes.ScalarData); ok {
for _, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
newCol := &qbtypes.ColumnDescriptor{
TelemetryFieldKey: col.TelemetryFieldKey,
QueryName: queryName,
AggregationIndex: col.AggregationIndex,
Meta: col.Meta,
Type: qbtypes.ColumnTypeAggregation,
}
mergedColumns = append(mergedColumns, newCol)
}
}
}
}
// Build a map of unique rows by group values
type rowKey struct {
values []string
}
rowMap := make(map[string][]any)
// Debug: log the input data
if q.logger != nil {
for _, queryName := range queryNames {
if scalarData, ok := results[queryName].Value.(*qbtypes.ScalarData); ok {
q.logger.Debug("Processing query result",
"query", queryName,
"num_columns", len(scalarData.Columns),
"num_rows", len(scalarData.Data),
"columns", func() []string {
names := []string{}
for _, col := range scalarData.Columns {
names = append(names, fmt.Sprintf("%s(%s)", col.Name, col.Type))
}
return names
}())
}
}
}
// Process each query's results
for _, queryName := range queryNames {
result := results[queryName]
if scalarData, ok := result.Value.(*qbtypes.ScalarData); ok {
// Map column indices
groupIndices := make(map[string]int)
aggIndices := []int{}
for i, col := range scalarData.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupIndices[col.Name] = i
} else if col.Type == qbtypes.ColumnTypeAggregation {
aggIndices = append(aggIndices, i)
}
}
// Process each row
for rowIdx, row := range scalarData.Data {
// Build key from group values in consistent order
keyParts := make([]string, len(groupColumnOrder))
for i, colName := range groupColumnOrder {
if idx, ok := groupIndices[colName]; ok && idx < len(row) {
// Convert the value to string properly
switch v := row[idx].(type) {
case string:
keyParts[i] = v
case *string:
if v != nil {
keyParts[i] = *v
} else {
keyParts[i] = "n/a"
}
default:
keyParts[i] = fmt.Sprintf("%v", v)
}
} else {
keyParts[i] = "n/a"
}
}
// Debug first few rows
if q.logger != nil && rowIdx < 3 {
q.logger.Debug("Building key",
"query", queryName,
"rowIdx", rowIdx,
"groupColumnOrder", groupColumnOrder,
"groupIndices", groupIndices,
"row", row,
"keyParts", keyParts)
}
// Create a unique key by joining parts with a delimiter
key := ""
for i, part := range keyParts {
if i > 0 {
key += "|"
}
key += part
}
// Debug: log the key generation
if q.logger != nil {
q.logger.Debug("Generated row key",
"query", queryName,
"key", key,
"keyParts", strings.Join(keyParts, ","),
"numKeyParts", len(keyParts),
"firstRowValue", func() string {
if len(row) > 0 {
return fmt.Sprintf("%v", row[0])
}
return "empty"
}())
}
// Initialize row if needed
if _, exists := rowMap[key]; !exists {
rowMap[key] = make([]any, len(mergedColumns))
// Set group values
for i, colName := range groupColumnOrder {
if idx, ok := groupIndices[colName]; ok && idx < len(row) {
// Store the actual value, not a pointer
switch v := row[idx].(type) {
case *string:
if v != nil {
rowMap[key][i] = *v
} else {
rowMap[key][i] = "n/a"
}
default:
rowMap[key][i] = v
}
} else {
rowMap[key][i] = "n/a"
}
}
// Initialize all aggregation values to "n/a"
for i := len(groupColumnOrder); i < len(mergedColumns); i++ {
rowMap[key][i] = "n/a"
}
}
// Set aggregation values for this query
aggStartIdx := len(groupColumnOrder)
for _, queryName2 := range queryNames {
if queryName2 == queryName {
// Copy aggregation values
for i, aggIdx := range aggIndices {
if aggIdx < len(row) {
rowMap[key][aggStartIdx+i] = row[aggIdx]
}
}
break
}
// Skip columns for other queries
result2 := results[queryName2]
if scalarData2, ok := result2.Value.(*qbtypes.ScalarData); ok {
aggCount := 0
for _, col := range scalarData2.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
aggCount++
}
}
aggStartIdx += aggCount
}
}
}
}
}
// Convert map to slice
mergedData := [][]any{}
for _, row := range rowMap {
mergedData = append(mergedData, row)
}
// Sort rows by first aggregation column (descending)
if len(mergedColumns) > len(groupColumnOrder) {
sort.SliceStable(mergedData, func(i, j int) bool {
valI := mergedData[i][len(groupColumnOrder)]
valJ := mergedData[j][len(groupColumnOrder)]
// Handle n/a values
if valI == "n/a" {
return false
}
if valJ == "n/a" {
return true
}
// Compare numeric values
switch vI := valI.(type) {
case float64:
if vJ, ok := valJ.(float64); ok {
return vI > vJ
}
case int64:
if vJ, ok := valJ.(int64); ok {
return vI > vJ
}
case int:
if vJ, ok := valJ.(int); ok {
return vI > vJ
}
}
return false
})
}
return map[string]any{
"table": &qbtypes.ScalarData{
Columns: mergedColumns,
Data: mergedData,
},
}
}
// sortTableRows sorts the table rows based on the query order
func sortTableRows(rows [][]any, columns []*qbtypes.ColumnDescriptor, req *qbtypes.QueryRangeRequest) {
// Get query names in order
var queryNames []string
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
queryNames = append(queryNames, spec.Name)
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
queryNames = append(queryNames, spec.Name)
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
queryNames = append(queryNames, spec.Name)
}
}
// Create a map of column indices by query name
columnIndices := make(map[string][]int)
for i, col := range columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" {
columnIndices[col.QueryName] = append(columnIndices[col.QueryName], i)
}
}
// Sort in reverse order of query names (stable sort)
for i := len(queryNames) - 1; i >= 0; i-- {
queryName := queryNames[i]
indices, ok := columnIndices[queryName]
if !ok || len(indices) == 0 {
continue
}
// Use the first aggregation column for this query
colIdx := indices[0]
sort.SliceStable(rows, func(i, j int) bool {
valI := rows[i][colIdx]
valJ := rows[j][colIdx]
// Handle n/a values
if valI == "n/a" && valJ == "n/a" {
return false
}
if valI == "n/a" {
return false
}
if valJ == "n/a" {
return true
}
// Compare numeric values (default descending)
if numI, ok := valI.(float64); ok {
if numJ, ok := valJ.(float64); ok {
return numI > numJ
}
}
// Compare int64 values
if numI, ok := valI.(int64); ok {
if numJ, ok := valJ.(int64); ok {
return numI > numJ
}
}
// Compare int values
if numI, ok := valI.(int); ok {
if numJ, ok := valJ.(int); ok {
return numI > numJ
}
}
return false
})
}
}
// deduplicateScalarRows deduplicates rows in a ScalarData that already contains columns from multiple queries
func (q *querier) deduplicateScalarRows(data *qbtypes.ScalarData) map[string]any {
// First, identify group columns
groupColumnIndices := []int{}
for i, col := range data.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupColumnIndices = append(groupColumnIndices, i)
}
}
// Build a map to merge rows by group key
rowMap := make(map[string][]any)
for _, row := range data.Data {
// Build key from group values
keyParts := make([]string, len(groupColumnIndices))
for i, colIdx := range groupColumnIndices {
if colIdx < len(row) {
// Convert the value to string properly
switch v := row[colIdx].(type) {
case string:
keyParts[i] = v
case *string:
if v != nil {
keyParts[i] = *v
} else {
keyParts[i] = "n/a"
}
default:
keyParts[i] = fmt.Sprintf("%v", v)
}
} else {
keyParts[i] = "n/a"
}
}
key := strings.Join(keyParts, "|")
if existingRow, exists := rowMap[key]; exists {
// Merge this row with existing row
// Replace "n/a" values with actual values
for i, val := range row {
if existingRow[i] == "n/a" && val != "n/a" {
existingRow[i] = val
}
}
} else {
// First time seeing this key, store the row
rowCopy := make([]any, len(row))
copy(rowCopy, row)
rowMap[key] = rowCopy
}
}
// Convert map back to slice
mergedData := make([][]any, 0, len(rowMap))
for _, row := range rowMap {
mergedData = append(mergedData, row)
}
// Sort by first aggregation column if available
firstAggCol := -1
for i, col := range data.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
firstAggCol = i
break
}
}
if firstAggCol >= 0 {
sort.SliceStable(mergedData, func(i, j int) bool {
valI := mergedData[i][firstAggCol]
valJ := mergedData[j][firstAggCol]
// Handle n/a values
if valI == "n/a" {
return false
}
if valJ == "n/a" {
return true
}
// Compare numeric values
switch vI := valI.(type) {
case float64:
if vJ, ok := valJ.(float64); ok {
return vI > vJ
}
case int64:
if vJ, ok := valJ.(int64); ok {
return vI > vJ
}
case int:
if vJ, ok := valJ.(int); ok {
return vI > vJ
}
}
return false
})
}
return map[string]any{
"table": &qbtypes.ScalarData{
Columns: data.Columns,
Data: mergedData,
},
}
}
// roundToTwoDecimal rounds a number to two decimal places
func roundToTwoDecimal(number float64) float64 {
// Handle very small numbers
if math.Abs(number) < 0.000001 {
return 0
}
// Determine the number of decimal places to round to
decimalPlaces := 2
if math.Abs(number) < 0.01 {
decimalPlaces = int(math.Ceil(-math.Log10(math.Abs(number)))) + 1
}
// Round to the determined number of decimal places
scale := math.Pow(10, float64(decimalPlaces))
return math.Round(number*scale) / scale
}