216 lines
5.7 KiB
Go
Raw Normal View History

2024-07-26 11:50:02 +05:30
package kafka
import (
"fmt"
2024-08-27 18:27:44 +05:30
"strings"
2024-08-27 14:04:00 +05:30
"go.signoz.io/signoz/pkg/query-service/common"
2024-07-26 11:50:02 +05:30
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
2024-07-26 13:02:45 +05:30
// ToDo: propagate this through APIs when there are different handlers
queueType := kafkaQueue
2024-07-26 11:50:02 +05:30
var cq *v3.CompositeQuery
2024-07-26 15:23:31 +05:30
chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext)
if err != nil {
return nil, err
2024-07-26 11:50:02 +05:30
}
2024-07-26 15:23:31 +05:30
cq, err = buildCompositeQuery(chq, queryContext)
2024-07-26 11:50:02 +05:30
queryRangeParams := &v3.QueryRangeParamsV3{
Start: messagingQueue.Start,
End: messagingQueue.End,
Step: defaultStepInterval,
CompositeQuery: cq,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}
2024-08-07 13:51:00 +05:30
func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("consumer_group not found in the request")
}
2024-08-27 18:27:44 +05:30
partitionID, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("partition not found in the request")
}
query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType)
2024-08-07 13:51:00 +05:30
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
2024-08-27 18:27:44 +05:30
func formatstring(str []string) string {
joined := strings.Join(str, ", ")
if len(joined) <= 2 {
return ""
}
return joined[1 : len(joined)-1]
}
2024-08-26 17:56:03 +05:30
2024-08-27 18:27:44 +05:30
func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("latency")
chq := &v3.BuilderQuery{
QueryName: queryName,
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_consumer_fetch_latency_avg",
},
AggregateOperator: v3.AggregateOperatorAvg,
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationAvg,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
2024-08-26 17:56:03 +05:30
},
2024-08-27 18:27:44 +05:30
Operator: v3.FilterOperatorIn,
Value: attributeCache.ServiceName,
},
{
Key: v3.AttributeKey{
Key: "client_id",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
2024-08-26 17:56:03 +05:30
},
2024-08-27 18:27:44 +05:30
Operator: v3.FilterOperatorIn,
Value: attributeCache.ClientID,
},
{
Key: v3.AttributeKey{
Key: "service_instance_id",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
2024-08-26 17:56:03 +05:30
},
2024-08-27 18:27:44 +05:30
Operator: v3.FilterOperatorIn,
Value: attributeCache.ServiceInstanceID,
2024-08-26 17:56:03 +05:30
},
},
2024-08-27 18:27:44 +05:30
},
Expression: queryName,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "client_id",
2024-08-26 17:56:03 +05:30
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
2024-08-27 18:27:44 +05:30
{
Key: "service_instance_id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
2024-08-26 17:56:03 +05:30
},
2024-08-27 18:27:44 +05:30
},
2024-08-07 13:51:00 +05:30
}
2024-08-27 18:27:44 +05:30
bq[queryName] = chq
2024-08-26 17:56:03 +05:30
return bq, nil
2024-08-07 13:51:00 +05:30
}
2024-08-27 18:27:44 +05:30
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
2024-08-07 13:51:00 +05:30
queueType := kafkaQueue
2024-08-27 14:04:00 +05:30
unixMilliStart := messagingQueue.Start / 1000000
unixMilliEnd := messagingQueue.End / 1000000
2024-08-07 13:51:00 +05:30
var cq *v3.CompositeQuery
if queryContext == "throughput" {
chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType)
if err != nil {
return nil, err
}
cq, err = buildCompositeQuery(chq, queryContext)
2024-08-26 17:56:03 +05:30
2024-08-07 13:51:00 +05:30
} else if queryContext == "fetch-latency" {
2024-08-27 14:04:00 +05:30
bhq, err := buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd, attributeCache)
2024-08-07 13:51:00 +05:30
if err != nil {
return nil, err
}
cq = &v3.CompositeQuery{
2024-08-26 17:56:03 +05:30
QueryType: v3.QueryTypeBuilder,
BuilderQueries: bhq,
PanelType: v3.PanelTypeTable,
2024-08-07 13:51:00 +05:30
}
}
queryRangeParams := &v3.QueryRangeParamsV3{
2024-08-27 14:04:00 +05:30
Start: unixMilliStart,
End: unixMilliEnd,
2024-08-07 13:51:00 +05:30
Step: defaultStepInterval,
CompositeQuery: cq,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}
2024-07-26 15:23:31 +05:30
func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
2024-07-26 11:50:02 +05:30
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
2024-07-26 15:23:31 +05:30
var query string
if queryContext == "producer" {
query = generateProducerSQL(start, end, topic, partition, queueType)
} else if queryContext == "consumer" {
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("invalid type for consumer group")
}
2024-07-31 17:55:13 +05:30
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
2024-07-26 11:50:02 +05:30
}
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
2024-07-26 15:23:31 +05:30
func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) {
2024-07-26 11:50:02 +05:30
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
2024-07-26 15:23:31 +05:30
ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},
2024-07-26 11:50:02 +05:30
PanelType: v3.PanelTypeTable,
}, nil
}