mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-17 15:36:48 +00:00
fix: added necessary 0 numCalls handling
This commit is contained in:
parent
f2c3946101
commit
46e5b407f7
@ -12,7 +12,6 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||||
@ -385,131 +384,6 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
|||||||
return resourceSubQuery, nil
|
return resourceSubQuery, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
|
||||||
|
|
||||||
if r.indexTable == "" {
|
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
|
||||||
}
|
|
||||||
|
|
||||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
|
|
||||||
if apiErr != nil {
|
|
||||||
return nil, apiErr
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceItems := []model.ServiceItem{}
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
// limit the number of concurrent queries to not overload the clickhouse server
|
|
||||||
sem := make(chan struct{}, 10)
|
|
||||||
var mtx sync.RWMutex
|
|
||||||
|
|
||||||
for svc, ops := range *topLevelOps {
|
|
||||||
sem <- struct{}{}
|
|
||||||
wg.Add(1)
|
|
||||||
go func(svc string, ops []string) {
|
|
||||||
defer wg.Done()
|
|
||||||
defer func() { <-sem }()
|
|
||||||
var serviceItem model.ServiceItem
|
|
||||||
var numErrors uint64
|
|
||||||
|
|
||||||
// Even if the total number of operations within the time range is less and the all
|
|
||||||
// the top level operations are high, we want to warn to let user know the issue
|
|
||||||
// with the instrumentation
|
|
||||||
serviceItem.DataWarning = model.DataWarning{
|
|
||||||
TopLevelOps: (*topLevelOps)[svc],
|
|
||||||
}
|
|
||||||
|
|
||||||
// default max_query_size = 262144
|
|
||||||
// Let's assume the average size of the item in `ops` is 50 bytes
|
|
||||||
// We can have 262144/50 = 5242 items in the `ops` array
|
|
||||||
// Although we have make it as big as 5k, We cap the number of items
|
|
||||||
// in the `ops` array to 1500
|
|
||||||
|
|
||||||
ops = ops[:int(math.Min(1500, float64(len(ops))))]
|
|
||||||
|
|
||||||
query := fmt.Sprintf(
|
|
||||||
`SELECT
|
|
||||||
quantile(0.99)(duration_nano) as p99,
|
|
||||||
avg(duration_nano) as avgDuration,
|
|
||||||
count(*) as numCalls
|
|
||||||
FROM %s.%s
|
|
||||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
|
||||||
r.TraceDB, r.traceTableName,
|
|
||||||
)
|
|
||||||
errorQuery := fmt.Sprintf(
|
|
||||||
`SELECT
|
|
||||||
count(*) as numErrors
|
|
||||||
FROM %s.%s
|
|
||||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
|
||||||
r.TraceDB, r.traceTableName,
|
|
||||||
)
|
|
||||||
|
|
||||||
args := []interface{}{}
|
|
||||||
args = append(args,
|
|
||||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
|
||||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
|
||||||
clickhouse.Named("serviceName", svc),
|
|
||||||
clickhouse.Named("names", ops),
|
|
||||||
)
|
|
||||||
|
|
||||||
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
query += `
|
|
||||||
AND (
|
|
||||||
resource_fingerprint GLOBAL IN ` +
|
|
||||||
resourceSubQuery +
|
|
||||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
|
||||||
|
|
||||||
args = append(args,
|
|
||||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
|
||||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
|
||||||
)
|
|
||||||
|
|
||||||
err = r.db.QueryRow(
|
|
||||||
ctx,
|
|
||||||
query,
|
|
||||||
args...,
|
|
||||||
).ScanStruct(&serviceItem)
|
|
||||||
|
|
||||||
if serviceItem.NumCalls == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
errorQuery += `
|
|
||||||
AND (
|
|
||||||
resource_fingerprint GLOBAL IN ` +
|
|
||||||
resourceSubQuery +
|
|
||||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
|
||||||
|
|
||||||
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceItem.ServiceName = svc
|
|
||||||
serviceItem.NumErrors = numErrors
|
|
||||||
mtx.Lock()
|
|
||||||
serviceItems = append(serviceItems, serviceItem)
|
|
||||||
mtx.Unlock()
|
|
||||||
}(svc, ops)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for idx := range serviceItems {
|
|
||||||
serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
|
|
||||||
serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
|
|
||||||
}
|
|
||||||
return &serviceItems, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||||
if r.indexTable == "" {
|
if r.indexTable == "" {
|
||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||||
@ -626,68 +500,6 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
|||||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Remove this once we are sure the optimized query is working
|
|
||||||
OGResults, OGerror := r.GetServicesOG(ctx, queryParams)
|
|
||||||
if OGerror != nil {
|
|
||||||
fmt.Printf("Error from servicesOG = %v\n", OGerror)
|
|
||||||
} else {
|
|
||||||
// Helper function to compare service items from GetServicesOG and this function
|
|
||||||
compareServiceItems := func(items1, items2 []model.ServiceItem) []string {
|
|
||||||
// Returns a slice of string describing differences
|
|
||||||
diffs := []string{}
|
|
||||||
// Build maps for quick lookup by service name
|
|
||||||
map1 := make(map[string]model.ServiceItem)
|
|
||||||
map2 := make(map[string]model.ServiceItem)
|
|
||||||
for _, item := range items1 {
|
|
||||||
map1[item.ServiceName] = item
|
|
||||||
}
|
|
||||||
for _, item := range items2 {
|
|
||||||
map2[item.ServiceName] = item
|
|
||||||
}
|
|
||||||
// Check for services in items1 but not in items2
|
|
||||||
for name, item1 := range map1 {
|
|
||||||
if item2, ok := map2[name]; ok {
|
|
||||||
// Compare fields of interest
|
|
||||||
if item1.NumCalls != item2.NumCalls {
|
|
||||||
diffs = append(diffs,
|
|
||||||
fmt.Sprintf("Service %s: NumCalls differs (OG: %d, Optimized: %d)", name, item2.NumCalls, item1.NumCalls))
|
|
||||||
}
|
|
||||||
if item1.NumErrors != item2.NumErrors {
|
|
||||||
diffs = append(diffs,
|
|
||||||
fmt.Sprintf("Service %s: NumErrors differs (OG: %d, Optimized: %d)", name, item2.NumErrors, item1.NumErrors))
|
|
||||||
}
|
|
||||||
if item1.Percentile99 != item2.Percentile99 {
|
|
||||||
diffs = append(diffs,
|
|
||||||
fmt.Sprintf("Service %s: P99 differs (OG: %f, Optimized: %f)", name, item2.Percentile99/1e6, item1.Percentile99/1e6))
|
|
||||||
}
|
|
||||||
if item1.AvgDuration != item2.AvgDuration {
|
|
||||||
diffs = append(diffs,
|
|
||||||
fmt.Sprintf("Service %s: AvgDuration differs (OG: %f, Optimized: %f)", name, item2.AvgDuration, item1.AvgDuration))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
diffs = append(diffs, fmt.Sprintf("Service %s present in Optimized but missing in OG", name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Check for services in items2 but not in items1
|
|
||||||
for name := range map2 {
|
|
||||||
if _, ok := map1[name]; !ok {
|
|
||||||
diffs = append(diffs, fmt.Sprintf("Service %s present in OG but missing in Optimized", name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return diffs
|
|
||||||
}
|
|
||||||
|
|
||||||
// Example usage: compare the two results and print differences
|
|
||||||
ogItems := *OGResults
|
|
||||||
differences := compareServiceItems(serviceItems, ogItems)
|
|
||||||
if len(differences) > 0 {
|
|
||||||
fmt.Println("Differences between Optimized and OG service items:")
|
|
||||||
for _, diff := range differences {
|
|
||||||
fmt.Println(diff)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &serviceItems, nil
|
return &serviceItems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user