diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index b275c27f6ea4..82c53c6fa34d 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -203,17 +203,6 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT) &opAmpModel.AllAgents, agentConfMgr, signoz.Instrumentation, ) - orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background()) - if err != nil { - return nil, err - } - for _, org := range orgs { - errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID) - for _, er := range errorList { - zap.L().Error("failed to preload metrics metadata", zap.Error(er)) - } - } - return s, nil } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 0e8515c67d4c..4f5266c51975 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2300,41 +2300,24 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) { metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - var metricNamesToQuery []string - for _, metricName := range metricNames { - updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) - if cacheErr != nil { - zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr)) - } - if metadata, exist := updatedMetadata[metricName]; exist { - if _, exists := metricNameToTemporality[metricName]; !exists { - metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) - } - metricNameToTemporality[metricName][metadata.Temporality] = true - } else { - metricNamesToQuery = append(metricNamesToQuery, metricName) - } + + // Batch fetch all metadata at once + metadataMap, apiErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...) + if apiErr != nil { + zap.L().Warn("Failed to fetch updated metrics metadata", zap.Error(apiErr)) + return nil, apiErr } - query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day) - - rows, err := r.db.Query(ctx, query, metricNames) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var metricName, temporality string - err := rows.Scan(&metricName, &temporality) - if err != nil { - return nil, err + for metricName, metadata := range metadataMap { + if metadata == nil { + continue } - if _, ok := metricNameToTemporality[metricName]; !ok { + if _, exists := metricNameToTemporality[metricName]; !exists { metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) } - metricNameToTemporality[metricName][v3.Temporality(temporality)] = true + metricNameToTemporality[metricName][metadata.Temporality] = true } + return metricNameToTemporality, nil } @@ -2673,67 +2656,77 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) } func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) { - - var query string - var err error - var rows driver.Rows var response v3.AggregateAttributeResponse normalized := true if constants.IsDotMetricsEnabled { normalized = false } - query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 and __normalized = $2 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day) + // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db + query := fmt.Sprintf( + `SELECT DISTINCT metric_name + FROM %s.%s + WHERE metric_name ILIKE $1 AND __normalized = $2`, + signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized) + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized) if err != nil { - zap.L().Error("Error while executing query", zap.Error(err)) - return nil, fmt.Errorf("error while executing query: %s", err.Error()) + zap.L().Error("Error while querying metric names", zap.Error(err)) + return nil, fmt.Errorf("error while executing metric name query: %s", err.Error()) } defer rows.Close() - seen := make(map[string]struct{}) - - var metricName, typ, temporality string - var isMonotonic bool + var metricNames []string for rows.Next() { - if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("error while scanning metric name: %s", err.Error()) } - - if skipSignozMetrics && strings.HasPrefix(metricName, "signoz") { + if skipSignozMetrics && strings.HasPrefix(name, "signoz") { continue } + metricNames = append(metricNames, name) + } - metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[metricName]; exist { - typ = string(updatedMetadata.MetricType) - isMonotonic = updatedMetadata.IsMonotonic - temporality = string(updatedMetadata.Temporality) - } + if len(metricNames) == 0 { + return &response, nil + } + + // Get all metadata in one shot + metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...) + if apiError != nil { + return &response, fmt.Errorf("error getting updated metrics metadata: %s", apiError.Error()) + } + + seen := make(map[string]struct{}) + for _, name := range metricNames { + metadata := metadataMap[name] + + typ := string(metadata.MetricType) + temporality := string(metadata.Temporality) + isMonotonic := metadata.IsMonotonic // Non-monotonic cumulative sums are treated as gauges if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) { typ = "Gauge" } + // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type key := v3.AttributeKey{ - Key: metricName, + Key: name, DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyType(typ), IsColumn: true, } - // remove duplicates - if _, ok := seen[metricName+typ]; ok { + + if _, ok := seen[name+typ]; ok { continue } - seen[metricName+typ] = struct{}{} + seen[name+typ] = struct{}{} response.AttributeKeys = append(response.AttributeKeys, key) } @@ -2825,72 +2818,69 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.U unixMilli := common.PastDayRoundOff() - // Note: metric metadata should be accessible regardless of the time range selection - // our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the - // amount of data scanned - query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day) - rows, err := r.db.Query(ctx, query, metricName, unixMilli) - if err != nil { - zap.L().Error("Error while fetching metric metadata", zap.Error(err)) - return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error()) - } - defer rows.Close() - - var deltaExists, isMonotonic bool - var temporality, description, metricType, unit string - for rows.Next() { - if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) - } - if temporality == string(v3.Delta) { - deltaExists = true - } - } - metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) + // 1. Fetch metadata from cache/db using unified function + metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) if apiError != nil { zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[metricName]; exist { - metricType = string(updatedMetadata.MetricType) - temporality = string(updatedMetadata.Temporality) - if temporality == string(v3.Delta) { - deltaExists = true - } - isMonotonic = updatedMetadata.IsMonotonic - if updatedMetadata.Description != "" { - description = updatedMetadata.Description - } - if updatedMetadata.Unit != "" { - unit = updatedMetadata.Unit - } + return nil, fmt.Errorf("error fetching metric metadata: %s", apiError.Err.Error()) } - query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day) - rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName) - if err != nil { - zap.L().Error("Error while executing query", zap.Error(err)) - return nil, fmt.Errorf("error while executing query: %s", err.Error()) - } - defer rows.Close() + // Defaults in case metadata is not found + var ( + deltaExists bool + isMonotonic bool + temporality string + description string + metricType string + unit string + ) + metadata, ok := metadataMap[metricName] + if !ok { + return nil, fmt.Errorf("metric metadata not found: %s", metricName) + } + + metricType = string(metadata.MetricType) + temporality = string(metadata.Temporality) + isMonotonic = metadata.IsMonotonic + description = metadata.Description + unit = metadata.Unit + + if temporality == string(v3.Delta) { + deltaExists = true + } + // 2. Only for Histograms, get `le` buckets var leFloat64 []float64 - for rows.Next() { - var leStr string - if err := rows.Scan(&leStr); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) - } - le, err := strconv.ParseFloat(leStr, 64) - // ignore the error and continue if the value is not a float - // ideally this should not happen but we have seen ClickHouse - // returning empty string for some values + if metricType == string(v3.MetricTypeHistogram) { + query := fmt.Sprintf(` + SELECT JSONExtractString(labels, 'le') AS le + FROM %s.%s + WHERE metric_name = $1 + AND unix_milli >= $2 + AND type = 'Histogram' + AND JSONExtractString(labels, 'service_name') = $3 + GROUP BY le + ORDER BY le`, signozMetricDBName, signozTSTableNameV41Day) + + rows, err := r.db.Query(ctx, query, metricName, unixMilli, serviceName) if err != nil { - zap.L().Error("error while parsing le value", zap.Error(err)) - continue + zap.L().Error("Error while querying histogram buckets", zap.Error(err)) + return nil, fmt.Errorf("error while querying histogram buckets: %s", err.Error()) } - if math.IsInf(le, 0) { - continue + defer rows.Close() + + for rows.Next() { + var leStr string + if err := rows.Scan(&leStr); err != nil { + return nil, fmt.Errorf("error while scanning le: %s", err.Error()) + } + le, err := strconv.ParseFloat(leStr, 64) + if err != nil || math.IsInf(le, 0) { + zap.L().Error("Invalid 'le' bucket value", zap.String("value", leStr), zap.Error(err)) + continue + } + leFloat64 = append(leFloat64, le) } - leFloat64 = append(leFloat64, le) } return &v3.MetricMetadataResponse{ @@ -5811,7 +5801,7 @@ VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadat if err != nil { return &model.ApiError{Typ: "ClickHouseError", Err: err} } - err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1) + err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, 0) if err != nil { return &model.ApiError{Typ: "CachingErr", Err: err} } @@ -5852,33 +5842,11 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam return hasLE, nil } -func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) []error { - var allMetricsMetadata []model.UpdateMetricsMetadata - var errorList []error - // Fetch all rows from ClickHouse - query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit - FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable) - valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) - err := r.db.Select(valueCtx, &allMetricsMetadata, query) - if err != nil { - errorList = append(errorList, err) - return errorList - } - for _, m := range allMetricsMetadata { - err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1) - if err != nil { - errorList = append(errorList, err) - } - } - - return errorList -} - func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) { cachedMetadata := make(map[string]*model.UpdateMetricsMetadata) var missingMetrics []string - // First, try retrieving each metric from cache. + // 1. Try cache for _, metricName := range metricNames { metadata := new(model.UpdateMetricsMetadata) cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName @@ -5890,10 +5858,10 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID } } - // If there are any metrics missing in the cache, query them from the database. + // 2. Try updated_metrics_metadata table + var stillMissing []string if len(missingMetrics) > 0 { - // Join the missing metric names; ensure proper quoting if needed. - metricList := "'" + strings.Join(metricNames, "', '") + "'" + metricList := "'" + strings.Join(missingMetrics, "', '") + "'" query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit FROM %s.%s WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList) @@ -5905,6 +5873,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID } defer rows.Close() + found := make(map[string]struct{}) for rows.Next() { metadata := new(model.UpdateMetricsMetadata) if err := rows.Scan( @@ -5918,15 +5887,57 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)} } - // Cache the result for future requests. cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName - if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil { + if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, 0); cacheErr != nil { zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr)) } cachedMetadata[metadata.MetricName] = metadata + found[metadata.MetricName] = struct{}{} + } + + // Determine which metrics are still missing + for _, m := range missingMetrics { + if _, ok := found[m]; !ok { + stillMissing = append(stillMissing, m) + } } } + // 3. Fallback: Try time_series_v4_1week table + if len(stillMissing) > 0 { + metricList := "'" + strings.Join(stillMissing, "', '") + "'" + query := fmt.Sprintf(`SELECT DISTINCT metric_name, type, description, temporality, is_monotonic, unit + FROM %s.%s + WHERE metric_name IN (%s)`, signozMetricDBName, signozTSTableNameV4, metricList) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query) + if err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying time_series_v4 to get metrics metadata: %v", err)} + } + defer rows.Close() + for rows.Next() { + metadata := new(model.UpdateMetricsMetadata) + if err := rows.Scan( + &metadata.MetricName, + &metadata.MetricType, + &metadata.Description, + &metadata.Temporality, + &metadata.IsMonotonic, + &metadata.Unit, + ); err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)} + } + + cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName + if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, 0); cacheErr != nil { + zap.L().Error("Failed to cache fallback metadata", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr)) + } + cachedMetadata[metadata.MetricName] = metadata + } + if rows.Err() != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)} + } + } return cachedMetadata, nil } diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 872dde7b1bf3..98117c956a6c 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -213,17 +213,6 @@ func (q *querier) runBuilderQuery( return } - if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { - metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) - builderQuery.Temporality = updatedMetadata.Temporality - } - } - // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 3ee940e32234..7312fc46da9f 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -214,17 +214,6 @@ func (q *querier) runBuilderQuery( return } - if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { - metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) - builderQuery.Temporality = updatedMetadata.Temporality - } - } - // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index ec588e79a23a..0afe28c0c719 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -164,18 +164,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT) agentConfMgr, signoz.Instrumentation, ) - - orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background()) - if err != nil { - return nil, err - } - for _, org := range orgs { - errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID) - for _, er := range errorList { - zap.L().Error("failed to preload metrics metadata", zap.Error(er)) - } - } - + return s, nil } diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index b3551638d98b..fdfc0321bbc2 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -2,7 +2,6 @@ package rules import ( "context" - "fmt" "strings" "testing" "time" @@ -1224,7 +1223,6 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { for idx, c := range cases { rows := cmock.NewRows(cols, c.values) - telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) // We are testing the eval logic after the query is run // so we don't care about the query string here queryString := "SELECT any" @@ -1322,7 +1320,6 @@ func TestThresholdRuleNoData(t *testing.T) { for idx, c := range cases { rows := cmock.NewRows(cols, c.values) - telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) // We are testing the eval logic after the query is run // so we don't care about the query string here