mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-18 16:07:10 +00:00
chore: removed comparison block
This commit is contained in:
parent
d437998750
commit
1d9b457af6
@ -12,7 +12,6 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||
@ -386,142 +385,6 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
// Call servicesOptimized here and print the result for comparison with results of this function.
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
serviceItems := []model.ServiceItem{}
|
||||
var wg sync.WaitGroup
|
||||
// limit the number of concurrent queries to not overload the clickhouse server
|
||||
sem := make(chan struct{}, 10)
|
||||
var mtx sync.RWMutex
|
||||
|
||||
for svc, ops := range *topLevelOps {
|
||||
sem <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(svc string, ops []string) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
var serviceItem model.ServiceItem
|
||||
var numErrors uint64
|
||||
|
||||
// Even if the total number of operations within the time range is less and the all
|
||||
// the top level operations are high, we want to warn to let user know the issue
|
||||
// with the instrumentation
|
||||
serviceItem.DataWarning = model.DataWarning{
|
||||
TopLevelOps: (*topLevelOps)[svc],
|
||||
}
|
||||
|
||||
// default max_query_size = 262144
|
||||
// Let's assume the average size of the item in `ops` is 50 bytes
|
||||
// We can have 262144/50 = 5242 items in the `ops` array
|
||||
// Although we have make it as big as 5k, We cap the number of items
|
||||
// in the `ops` array to 1500
|
||||
|
||||
ops = ops[:int(math.Min(1500, float64(len(ops))))]
|
||||
|
||||
query := fmt.Sprintf(
|
||||
`SELECT
|
||||
quantile(0.99)(duration_nano) as p99,
|
||||
avg(duration_nano) as avgDuration,
|
||||
count(*) as numCalls
|
||||
FROM %s.%s
|
||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`,
|
||||
r.TraceDB, r.traceTableName,
|
||||
)
|
||||
errorQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
count(*) as numErrors
|
||||
FROM %s.%s
|
||||
WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`,
|
||||
r.TraceDB, r.traceTableName,
|
||||
)
|
||||
|
||||
args := []interface{}{}
|
||||
args = append(args,
|
||||
clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)),
|
||||
clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)),
|
||||
clickhouse.Named("serviceName", svc),
|
||||
clickhouse.Named("names", ops),
|
||||
)
|
||||
|
||||
resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
query += `
|
||||
AND (
|
||||
resource_fingerprint GLOBAL IN ` +
|
||||
resourceSubQuery +
|
||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
||||
|
||||
args = append(args,
|
||||
clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)),
|
||||
clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)),
|
||||
)
|
||||
|
||||
err = r.db.QueryRow(
|
||||
ctx,
|
||||
query,
|
||||
args...,
|
||||
).ScanStruct(&serviceItem)
|
||||
|
||||
if serviceItem.NumCalls == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
errorQuery += `
|
||||
AND (
|
||||
resource_fingerprint GLOBAL IN ` +
|
||||
resourceSubQuery +
|
||||
`) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket`
|
||||
|
||||
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
serviceItem.ServiceName = svc
|
||||
serviceItem.NumErrors = numErrors
|
||||
mtx.Lock()
|
||||
serviceItems = append(serviceItems, serviceItem)
|
||||
mtx.Unlock()
|
||||
}(svc, ops)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for idx := range serviceItems {
|
||||
serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period)
|
||||
serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls)
|
||||
}
|
||||
|
||||
optimizedResult, optimizedErr := r.GetServicesOptimized(ctx, queryParams)
|
||||
if optimizedErr != nil {
|
||||
fmt.Printf("Error from servicesOptimized = %v\n", optimizedErr)
|
||||
} else {
|
||||
fmt.Printf("servicesOptimized result = %v\n", optimizedResult)
|
||||
}
|
||||
|
||||
fmt.Printf("Actual serviceItems = %v\n", serviceItems)
|
||||
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServicesOptimized(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) {
|
||||
if r.indexTable == "" {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user