diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2dab64b8a7f3..0fcaf751b325 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -12,7 +12,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/SigNoz/signoz/pkg/prometheus" @@ -386,142 +385,6 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc } func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { - // Call servicesOptimized here and print the result for comparison with results of this function. - - if r.indexTable == "" { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} - } - - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil) - if apiErr != nil { - return nil, apiErr - } - - serviceItems := []model.ServiceItem{} - var wg sync.WaitGroup - // limit the number of concurrent queries to not overload the clickhouse server - sem := make(chan struct{}, 10) - var mtx sync.RWMutex - - for svc, ops := range *topLevelOps { - sem <- struct{}{} - wg.Add(1) - go func(svc string, ops []string) { - defer wg.Done() - defer func() { <-sem }() - var serviceItem model.ServiceItem - var numErrors uint64 - - // Even if the total number of operations within the time range is less and the all - // the top level operations are high, we want to warn to let user know the issue - // with the instrumentation - serviceItem.DataWarning = model.DataWarning{ - TopLevelOps: (*topLevelOps)[svc], - } - - // default max_query_size = 262144 - // Let's assume the average size of the item in `ops` is 50 bytes - // We can have 262144/50 = 5242 items in the `ops` array - // Although we have make it as big as 5k, We cap the number of items - // in the `ops` array to 1500 - - ops = ops[:int(math.Min(1500, float64(len(ops))))] - - query := fmt.Sprintf( - `SELECT - quantile(0.99)(duration_nano) as p99, - avg(duration_nano) as avgDuration, - count(*) as numCalls - FROM %s.%s - WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.traceTableName, - ) - errorQuery := fmt.Sprintf( - `SELECT - count(*) as numErrors - FROM %s.%s - WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.traceTableName, - ) - - args := []interface{}{} - args = append(args, - clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), - clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), - clickhouse.Named("serviceName", svc), - clickhouse.Named("names", ops), - ) - - resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - query += ` - AND ( - resource_fingerprint GLOBAL IN ` + - resourceSubQuery + - `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` - - args = append(args, - clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), - clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), - ) - - err = r.db.QueryRow( - ctx, - query, - args..., - ).ScanStruct(&serviceItem) - - if serviceItem.NumCalls == 0 { - return - } - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - - errorQuery += ` - AND ( - resource_fingerprint GLOBAL IN ` + - resourceSubQuery + - `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` - - err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - - serviceItem.ServiceName = svc - serviceItem.NumErrors = numErrors - mtx.Lock() - serviceItems = append(serviceItems, serviceItem) - mtx.Unlock() - }(svc, ops) - } - wg.Wait() - - for idx := range serviceItems { - serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period) - serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls) - } - - optimizedResult, optimizedErr := r.GetServicesOptimized(ctx, queryParams) - if optimizedErr != nil { - fmt.Printf("Error from servicesOptimized = %v\n", optimizedErr) - } else { - fmt.Printf("servicesOptimized result = %v\n", optimizedResult) - } - - fmt.Printf("Actual serviceItems = %v\n", serviceItems) - - return &serviceItems, nil -} - -func (r *ClickHouseReader) GetServicesOptimized(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { if r.indexTable == "" { return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} }