diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index c351caa01b0c..c8cadf67e4e5 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -12,7 +12,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/SigNoz/signoz/pkg/prometheus" @@ -385,131 +384,6 @@ func (r *ClickHouseReader) buildResourceSubQuery(tags []model.TagQueryParam, svc return resourceSubQuery, nil } -func (r *ClickHouseReader) GetServicesOG(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { - - if r.indexTable == "" { - return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} - } - - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, *queryParams.Start, *queryParams.End, nil) - if apiErr != nil { - return nil, apiErr - } - - serviceItems := []model.ServiceItem{} - var wg sync.WaitGroup - // limit the number of concurrent queries to not overload the clickhouse server - sem := make(chan struct{}, 10) - var mtx sync.RWMutex - - for svc, ops := range *topLevelOps { - sem <- struct{}{} - wg.Add(1) - go func(svc string, ops []string) { - defer wg.Done() - defer func() { <-sem }() - var serviceItem model.ServiceItem - var numErrors uint64 - - // Even if the total number of operations within the time range is less and the all - // the top level operations are high, we want to warn to let user know the issue - // with the instrumentation - serviceItem.DataWarning = model.DataWarning{ - TopLevelOps: (*topLevelOps)[svc], - } - - // default max_query_size = 262144 - // Let's assume the average size of the item in `ops` is 50 bytes - // We can have 262144/50 = 5242 items in the `ops` array - // Although we have make it as big as 5k, We cap the number of items - // in the `ops` array to 1500 - - ops = ops[:int(math.Min(1500, float64(len(ops))))] - - query := fmt.Sprintf( - `SELECT - quantile(0.99)(duration_nano) as p99, - avg(duration_nano) as avgDuration, - count(*) as numCalls - FROM %s.%s - WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.traceTableName, - ) - errorQuery := fmt.Sprintf( - `SELECT - count(*) as numErrors - FROM %s.%s - WHERE resource_string_service$$name = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.traceTableName, - ) - - args := []interface{}{} - args = append(args, - clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), - clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), - clickhouse.Named("serviceName", svc), - clickhouse.Named("names", ops), - ) - - resourceSubQuery, err := r.buildResourceSubQuery(queryParams.Tags, svc, *queryParams.Start, *queryParams.End) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - query += ` - AND ( - resource_fingerprint GLOBAL IN ` + - resourceSubQuery + - `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` - - args = append(args, - clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), - clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), - ) - - err = r.db.QueryRow( - ctx, - query, - args..., - ).ScanStruct(&serviceItem) - - if serviceItem.NumCalls == 0 { - return - } - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - - errorQuery += ` - AND ( - resource_fingerprint GLOBAL IN ` + - resourceSubQuery + - `) AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket` - - err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return - } - - serviceItem.ServiceName = svc - serviceItem.NumErrors = numErrors - mtx.Lock() - serviceItems = append(serviceItems, serviceItem) - mtx.Unlock() - }(svc, ops) - } - wg.Wait() - - for idx := range serviceItems { - serviceItems[idx].CallRate = float64(serviceItems[idx].NumCalls) / float64(queryParams.Period) - serviceItems[idx].ErrorRate = float64(serviceItems[idx].NumErrors) * 100 / float64(serviceItems[idx].NumCalls) - } - return &serviceItems, nil -} - func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError) { if r.indexTable == "" { return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} @@ -626,68 +500,6 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} } - // TODO: Remove this once we are sure the optimized query is working - OGResults, OGerror := r.GetServicesOG(ctx, queryParams) - if OGerror != nil { - fmt.Printf("Error from servicesOG = %v\n", OGerror) - } else { - // Helper function to compare service items from GetServicesOG and this function - compareServiceItems := func(items1, items2 []model.ServiceItem) []string { - // Returns a slice of string describing differences - diffs := []string{} - // Build maps for quick lookup by service name - map1 := make(map[string]model.ServiceItem) - map2 := make(map[string]model.ServiceItem) - for _, item := range items1 { - map1[item.ServiceName] = item - } - for _, item := range items2 { - map2[item.ServiceName] = item - } - // Check for services in items1 but not in items2 - for name, item1 := range map1 { - if item2, ok := map2[name]; ok { - // Compare fields of interest - if item1.NumCalls != item2.NumCalls { - diffs = append(diffs, - fmt.Sprintf("Service %s: NumCalls differs (OG: %d, Optimized: %d)", name, item2.NumCalls, item1.NumCalls)) - } - if item1.NumErrors != item2.NumErrors { - diffs = append(diffs, - fmt.Sprintf("Service %s: NumErrors differs (OG: %d, Optimized: %d)", name, item2.NumErrors, item1.NumErrors)) - } - if item1.Percentile99 != item2.Percentile99 { - diffs = append(diffs, - fmt.Sprintf("Service %s: P99 differs (OG: %f, Optimized: %f)", name, item2.Percentile99/1e6, item1.Percentile99/1e6)) - } - if item1.AvgDuration != item2.AvgDuration { - diffs = append(diffs, - fmt.Sprintf("Service %s: AvgDuration differs (OG: %f, Optimized: %f)", name, item2.AvgDuration, item1.AvgDuration)) - } - } else { - diffs = append(diffs, fmt.Sprintf("Service %s present in Optimized but missing in OG", name)) - } - } - // Check for services in items2 but not in items1 - for name := range map2 { - if _, ok := map1[name]; !ok { - diffs = append(diffs, fmt.Sprintf("Service %s present in OG but missing in Optimized", name)) - } - } - return diffs - } - - // Example usage: compare the two results and print differences - ogItems := *OGResults - differences := compareServiceItems(serviceItems, ogItems) - if len(differences) > 0 { - fmt.Println("Differences between Optimized and OG service items:") - for _, diff := range differences { - fmt.Println(diff) - } - } - } - return &serviceItems, nil }