signoz/pkg/query-service/transition/v3_to_v5_resp.go

443 lines
10 KiB
Go

package transition
import (
"encoding/json"
"fmt"
"sort"
"strings"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
v5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
func ConvertV3ResponseToV5(v3Response *v3.QueryRangeResponse, requestType v5.RequestType) (*v5.QueryRangeResponse, error) {
if v3Response == nil {
return nil, fmt.Errorf("v3 response is nil")
}
v5Response := &v5.QueryRangeResponse{
Type: requestType,
}
switch requestType {
case v5.RequestTypeTimeSeries:
data, err := convertToTimeSeriesData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
case v5.RequestTypeScalar:
data, err := convertToScalarData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
case v5.RequestTypeRaw:
data, err := convertToRawData(v3Response.Result)
if err != nil {
return nil, err
}
v5Response.Data = data
default:
return nil, fmt.Errorf("unsupported request type: %v", requestType)
}
return v5Response, nil
}
func convertToTimeSeriesData(v3Results []*v3.Result) ([]*v5.TimeSeriesData, error) {
v5Data := []*v5.TimeSeriesData{}
for _, result := range v3Results {
if result == nil {
continue
}
tsData := &v5.TimeSeriesData{
QueryName: result.QueryName,
Aggregations: []*v5.AggregationBucket{},
}
if len(result.Series) > 0 {
bucket := &v5.AggregationBucket{
Index: 0,
Alias: "",
Series: convertSeries(result.Series),
}
tsData.Aggregations = append(tsData.Aggregations, bucket)
}
v5Data = append(v5Data, tsData)
}
return v5Data, nil
}
func convertSeries(v3Series []*v3.Series) []*v5.TimeSeries {
v5Series := []*v5.TimeSeries{}
for _, series := range v3Series {
if series == nil {
continue
}
v5TimeSeries := &v5.TimeSeries{
Labels: convertLabels(series.Labels),
Values: convertPoints(series.Points),
}
v5Series = append(v5Series, v5TimeSeries)
}
return v5Series
}
func convertLabels(v3Labels map[string]string) []*v5.Label {
v5Labels := []*v5.Label{}
keys := make([]string, 0, len(v3Labels))
for k := range v3Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
v5Labels = append(v5Labels, &v5.Label{
Key: telemetrytypes.TelemetryFieldKey{
Name: key,
},
Value: v3Labels[key],
})
}
return v5Labels
}
func convertPoints(v3Points []v3.Point) []*v5.TimeSeriesValue {
v5Values := []*v5.TimeSeriesValue{}
for _, point := range v3Points {
v5Values = append(v5Values, &v5.TimeSeriesValue{
Timestamp: point.Timestamp,
Value: point.Value,
})
}
return v5Values
}
func convertToScalarData(v3Results []*v3.Result) (*v5.ScalarData, error) {
scalarData := &v5.ScalarData{
Columns: []*v5.ColumnDescriptor{},
Data: [][]any{},
}
for _, result := range v3Results {
if result == nil || result.Table == nil {
continue
}
for _, col := range result.Table.Columns {
columnType := v5.ColumnTypeGroup
if col.IsValueColumn {
columnType = v5.ColumnTypeAggregation
}
scalarData.Columns = append(scalarData.Columns, &v5.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: col.Name,
},
QueryName: col.QueryName,
AggregationIndex: 0,
Type: columnType,
})
}
for _, row := range result.Table.Rows {
rowData := []any{}
for _, col := range result.Table.Columns {
if val, ok := row.Data[col.Name]; ok {
rowData = append(rowData, val)
} else {
rowData = append(rowData, nil)
}
}
scalarData.Data = append(scalarData.Data, rowData)
}
}
return scalarData, nil
}
func convertToRawData(v3Results []*v3.Result) ([]*v5.RawData, error) {
v5Data := []*v5.RawData{}
for _, result := range v3Results {
if result == nil {
continue
}
rawData := &v5.RawData{
QueryName: result.QueryName,
NextCursor: "",
Rows: []*v5.RawRow{},
}
for _, row := range result.List {
if row == nil {
continue
}
dataMap := make(map[string]*any)
for k, v := range row.Data {
val := v
dataMap[k] = &val
}
rawData.Rows = append(rawData.Rows, &v5.RawRow{
Timestamp: row.Timestamp,
Data: dataMap,
})
}
v5Data = append(v5Data, rawData)
}
return v5Data, nil
}
func LogV5Response(response *v5.QueryRangeResponse, logger func(string)) {
if response == nil {
logger("Response: nil")
return
}
logger(fmt.Sprintf("[%s] Meta{rows:%d bytes:%d ms:%d}",
response.Type, response.Meta.RowsScanned, response.Meta.BytesScanned, response.Meta.DurationMS))
switch response.Type {
case v5.RequestTypeTimeSeries:
logTimeSeriesDataCompact(response.Data, logger)
case v5.RequestTypeScalar:
logScalarDataCompact(response.Data, logger)
case v5.RequestTypeRaw:
logRawDataCompact(response.Data, logger)
default:
logger(fmt.Sprintf("Unknown response type: %v", response.Type))
}
}
func logTimeSeriesDataCompact(data any, logger func(string)) {
tsData, ok := data.([]*v5.TimeSeriesData)
if !ok {
logger("ERROR: Failed to cast data to TimeSeriesData")
return
}
sort.Slice(tsData, func(i, j int) bool {
return tsData[i].QueryName < tsData[j].QueryName
})
for _, ts := range tsData {
allSeries := flattenSeries(ts.Aggregations)
sort.Slice(allSeries, func(i, j int) bool {
return createLabelSignature(allSeries[i].Labels) < createLabelSignature(allSeries[j].Labels)
})
for _, series := range allSeries {
labels := []string{}
for _, label := range series.Labels {
labels = append(labels, fmt.Sprintf("%s:%v", label.Key.Name, label.Value))
}
labelStr := strings.Join(labels, ",")
values := make([]*v5.TimeSeriesValue, len(series.Values))
copy(values, series.Values)
sort.Slice(values, func(i, j int) bool {
return values[i].Timestamp < values[j].Timestamp
})
valueStrs := []string{}
for _, val := range values {
relTime := val.Timestamp
if len(values) > 0 && values[0].Timestamp > 0 {
relTime = (val.Timestamp - values[0].Timestamp) / 1000 // Convert to seconds
}
valueStrs = append(valueStrs, fmt.Sprintf("%d:%.2f", relTime, val.Value))
}
logger(fmt.Sprintf("%s {%s} [%s]", ts.QueryName, labelStr, strings.Join(valueStrs, " ")))
}
}
}
func createLabelSignature(labels []*v5.Label) string {
parts := []string{}
for _, label := range labels {
parts = append(parts, fmt.Sprintf("%s=%v", label.Key.Name, label.Value))
}
sort.Strings(parts)
return strings.Join(parts, ",")
}
func logScalarDataCompact(data any, logger func(string)) {
scalar, ok := data.(*v5.ScalarData)
if !ok {
logger("ERROR: Failed to cast data to ScalarData")
return
}
colNames := []string{}
for _, col := range scalar.Columns {
colNames = append(colNames, col.Name)
}
logger(fmt.Sprintf("SCALAR [%s]", strings.Join(colNames, "|")))
for i, row := range scalar.Data {
rowVals := []string{}
for _, val := range row {
rowVals = append(rowVals, fmt.Sprintf("%v", val))
}
logger(fmt.Sprintf(" %d: [%s]", i, strings.Join(rowVals, "|")))
}
}
func flattenSeries(buckets []*v5.AggregationBucket) []*v5.TimeSeries {
var allSeries []*v5.TimeSeries
for _, bucket := range buckets {
allSeries = append(allSeries, bucket.Series...)
}
return allSeries
}
func logRawDataCompact(data any, logger func(string)) {
rawData, ok := data.([]*v5.RawData)
if !ok {
logger("ERROR: Failed to cast data to RawData")
return
}
sort.Slice(rawData, func(i, j int) bool {
return rawData[i].QueryName < rawData[j].QueryName
})
for _, rd := range rawData {
logger(fmt.Sprintf("RAW %s (rows:%d cursor:%s)", rd.QueryName, len(rd.Rows), rd.NextCursor))
rows := make([]*v5.RawRow, len(rd.Rows))
copy(rows, rd.Rows)
sort.Slice(rows, func(i, j int) bool {
return rows[i].Timestamp.Before(rows[j].Timestamp)
})
allFields := make(map[string]bool)
for _, row := range rows {
for k := range row.Data {
allFields[k] = true
}
}
fieldNames := []string{}
for k := range allFields {
fieldNames = append(fieldNames, k)
}
sort.Strings(fieldNames)
logger(fmt.Sprintf(" Fields: [%s]", strings.Join(fieldNames, "|")))
for i, row := range rows {
vals := []string{}
for _, field := range fieldNames {
if val, exists := row.Data[field]; exists && val != nil {
vals = append(vals, fmt.Sprintf("%v", *val))
} else {
vals = append(vals, "-")
}
}
tsStr := row.Timestamp.Format("15:04:05")
logger(fmt.Sprintf(" %d@%s: [%s]", i, tsStr, strings.Join(vals, "|")))
}
}
}
func LogV5ResponseJSON(response *v5.QueryRangeResponse, logger func(string)) {
sortedResponse := sortV5ResponseForLogging(response)
jsonBytes, err := json.MarshalIndent(sortedResponse, "", " ")
if err != nil {
logger(fmt.Sprintf("ERROR: Failed to marshal response: %v", err))
return
}
logger(string(jsonBytes))
}
func sortV5ResponseForLogging(response *v5.QueryRangeResponse) *v5.QueryRangeResponse {
if response == nil {
return nil
}
responseCopy := &v5.QueryRangeResponse{
Type: response.Type,
Meta: response.Meta,
}
switch response.Type {
case v5.RequestTypeTimeSeries:
if tsData, ok := response.Data.([]*v5.TimeSeriesData); ok {
sortedData := make([]*v5.TimeSeriesData, len(tsData))
for i, ts := range tsData {
sortedData[i] = &v5.TimeSeriesData{
QueryName: ts.QueryName,
Aggregations: make([]*v5.AggregationBucket, len(ts.Aggregations)),
}
for j, bucket := range ts.Aggregations {
sortedBucket := &v5.AggregationBucket{
Index: bucket.Index,
Alias: bucket.Alias,
Series: make([]*v5.TimeSeries, len(bucket.Series)),
}
for k, series := range bucket.Series {
sortedSeries := &v5.TimeSeries{
Labels: series.Labels,
Values: make([]*v5.TimeSeriesValue, len(series.Values)),
}
copy(sortedSeries.Values, series.Values)
sort.Slice(sortedSeries.Values, func(i, j int) bool {
return sortedSeries.Values[i].Timestamp < sortedSeries.Values[j].Timestamp
})
sortedBucket.Series[k] = sortedSeries
}
sort.Slice(sortedBucket.Series, func(i, j int) bool {
return createLabelSignature(sortedBucket.Series[i].Labels) <
createLabelSignature(sortedBucket.Series[j].Labels)
})
sortedData[i].Aggregations[j] = sortedBucket
}
}
sort.Slice(sortedData, func(i, j int) bool {
return sortedData[i].QueryName < sortedData[j].QueryName
})
responseCopy.Data = sortedData
}
default:
responseCopy.Data = response.Data
}
return responseCopy
}