diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 850baa578e74..c39f98997877 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2315,41 +2315,25 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) { func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) { metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - var metricNamesToQuery []string - for _, metricName := range metricNames { - updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) - if cacheErr != nil { - zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr)) - } - if metadata, exist := updatedMetadata[metricName]; exist { - if _, exists := metricNameToTemporality[metricName]; !exists { - metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) - } - metricNameToTemporality[metricName][metadata.Temporality] = true - } else { - metricNamesToQuery = append(metricNamesToQuery, metricName) - } + + // Batch fetch all metadata at once + metadataMap, apiErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...) + if apiErr != nil { + zap.L().Warn("Failed to fetch updated metrics metadata", zap.Error(apiErr)) + // best-effort return, not failing outright + return metricNameToTemporality, nil } - query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day) - - rows, err := r.db.Query(ctx, query, metricNames) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var metricName, temporality string - err := rows.Scan(&metricName, &temporality) - if err != nil { - return nil, err + for metricName, metadata := range metadataMap { + if metadata == nil { + continue } - if _, ok := metricNameToTemporality[metricName]; !ok { + if _, exists := metricNameToTemporality[metricName]; !exists { metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) } - metricNameToTemporality[metricName][v3.Temporality(temporality)] = true + metricNameToTemporality[metricName][metadata.Temporality] = true } + return metricNameToTemporality, nil } @@ -3004,67 +2988,80 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) } func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) { - - var query string - var err error - var rows driver.Rows var response v3.AggregateAttributeResponse normalized := true if constants.IsDotMetricsEnabled { normalized = false } - query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 and __normalized = $2 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day) + // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db + query := fmt.Sprintf( + `SELECT DISTINCT metric_name + FROM %s.%s + WHERE metric_name ILIKE $1 AND __normalized = $2`, + signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized) + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized) if err != nil { - zap.L().Error("Error while executing query", zap.Error(err)) - return nil, fmt.Errorf("error while executing query: %s", err.Error()) + zap.L().Error("Error while querying metric names", zap.Error(err)) + return nil, fmt.Errorf("error while executing metric name query: %s", err.Error()) } defer rows.Close() - seen := make(map[string]struct{}) - - var metricName, typ, temporality string - var isMonotonic bool + var metricNames []string for rows.Next() { - if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("error while scanning metric name: %s", err.Error()) } + if skipSignozMetrics && strings.HasPrefix(name, "signoz") { + continue + } + metricNames = append(metricNames, name) + } - if skipSignozMetrics && strings.HasPrefix(metricName, "signoz") { + if len(metricNames) == 0 { + return &response, nil + } + + // Get all metadata in one shot + metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...) + if apiError != nil { + return &response, fmt.Errorf("error getting updated metrics metadata: %s", apiError.Error()) + } + + seen := make(map[string]struct{}) + for _, name := range metricNames { + metadata, ok := metadataMap[name] + if !ok { continue } - metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[metricName]; exist { - typ = string(updatedMetadata.MetricType) - isMonotonic = updatedMetadata.IsMonotonic - temporality = string(updatedMetadata.Temporality) - } + typ := string(metadata.MetricType) + temporality := string(metadata.Temporality) + isMonotonic := metadata.IsMonotonic // Non-monotonic cumulative sums are treated as gauges if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) { typ = "Gauge" } + // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type key := v3.AttributeKey{ - Key: metricName, + Key: name, DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyType(typ), IsColumn: true, } - // remove duplicates - if _, ok := seen[metricName+typ]; ok { + + if _, ok := seen[name+typ]; ok { continue } - seen[metricName+typ] = struct{}{} + seen[name+typ] = struct{}{} response.AttributeKeys = append(response.AttributeKeys, key) } @@ -3156,72 +3153,67 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.U unixMilli := common.PastDayRoundOff() - // Note: metric metadata should be accessible regardless of the time range selection - // our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the - // amount of data scanned - query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day) - rows, err := r.db.Query(ctx, query, metricName, unixMilli) - if err != nil { - zap.L().Error("Error while fetching metric metadata", zap.Error(err)) - return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error()) - } - defer rows.Close() - - var deltaExists, isMonotonic bool - var temporality, description, metricType, unit string - for rows.Next() { - if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) - } - if temporality == string(v3.Delta) { - deltaExists = true - } - } - metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) + // 1. Fetch metadata from cache/db using unified function + metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName) if apiError != nil { zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError)) + return nil, fmt.Errorf("error fetching metric metadata: %s", apiError.Err.Error()) } - if updatedMetadata, exist := metadata[metricName]; exist { - metricType = string(updatedMetadata.MetricType) - temporality = string(updatedMetadata.Temporality) + + // Defaults in case metadata is not found + var ( + deltaExists bool + isMonotonic bool + temporality string + description string + metricType string + unit string + ) + + if metadata, exists := metadataMap[metricName]; exists { + metricType = string(metadata.MetricType) + temporality = string(metadata.Temporality) + isMonotonic = metadata.IsMonotonic + description = metadata.Description + unit = metadata.Unit + if temporality == string(v3.Delta) { deltaExists = true } - isMonotonic = updatedMetadata.IsMonotonic - if updatedMetadata.Description != "" { - description = updatedMetadata.Description - } - if updatedMetadata.Unit != "" { - unit = updatedMetadata.Unit - } } - query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day) - rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName) - if err != nil { - zap.L().Error("Error while executing query", zap.Error(err)) - return nil, fmt.Errorf("error while executing query: %s", err.Error()) - } - defer rows.Close() - + // 2. Only for Histograms, get `le` buckets var leFloat64 []float64 - for rows.Next() { - var leStr string - if err := rows.Scan(&leStr); err != nil { - return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) - } - le, err := strconv.ParseFloat(leStr, 64) - // ignore the error and continue if the value is not a float - // ideally this should not happen but we have seen ClickHouse - // returning empty string for some values + if metricType == string(v3.MetricTypeHistogram) { + query := fmt.Sprintf(` + SELECT JSONExtractString(labels, 'le') AS le + FROM %s.%s + WHERE metric_name = $1 + AND unix_milli >= $2 + AND type = 'Histogram' + AND JSONExtractString(labels, 'service_name') = $3 + GROUP BY le + ORDER BY le`, signozMetricDBName, signozTSTableNameV41Day) + + rows, err := r.db.Query(ctx, query, metricName, unixMilli, serviceName) if err != nil { - zap.L().Error("error while parsing le value", zap.Error(err)) - continue + zap.L().Error("Error while querying histogram buckets", zap.Error(err)) + return nil, fmt.Errorf("error while querying histogram buckets: %s", err.Error()) } - if math.IsInf(le, 0) { - continue + defer rows.Close() + + for rows.Next() { + var leStr string + if err := rows.Scan(&leStr); err != nil { + return nil, fmt.Errorf("error while scanning le: %s", err.Error()) + } + le, err := strconv.ParseFloat(leStr, 64) + if err != nil || math.IsInf(le, 0) { + zap.L().Error("Invalid 'le' bucket value", zap.String("value", leStr), zap.Error(err)) + continue + } + leFloat64 = append(leFloat64, le) } - leFloat64 = append(leFloat64, le) } return &v3.MetricMetadataResponse{ @@ -6233,68 +6225,26 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam return hasLE, nil } -func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) ([]string, *model.ApiError) { - var allMetricsMetadata []model.UpdateMetricsMetadata - var errorMetricsList []string - // Fetch all rows from ClickHouse - query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit - FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable) - valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) - err := r.db.Select(valueCtx, &allMetricsMetadata, query) - if err != nil { - return nil, &model.ApiError{Typ: "ClickHouseError", Err: fmt.Errorf("error in getting updated metadata: %v", err)} - } - for _, m := range allMetricsMetadata { - err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1) - if err != nil { - errorMetricsList = append(errorMetricsList, m.MetricName) - } - } - - return errorMetricsList, nil -} - func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) { cachedMetadata := make(map[string]*model.UpdateMetricsMetadata) var missingMetrics []string - preCacheLoaded := new(model.CacheLoaded) - err := r.cache.Get(ctx, orgID, constants.METRICS_UPDATED_METADATA_CACHE_LOADED_KEY, preCacheLoaded, false) - if err != nil { - *preCacheLoaded = false - zap.L().Warn("Failed to get cached metrics updated metadata", zap.Error(err)) - } - - if !*preCacheLoaded { - preLoadErrorList, apiErr := r.PreloadMetricsMetadata(ctx, orgID) - if apiErr != nil { - return nil, apiErr - } - if preLoadErrorList != nil && len(preLoadErrorList) > 0 { - missingMetrics = append(missingMetrics, preLoadErrorList...) - } - *preCacheLoaded = true - err := r.cache.Set(ctx, orgID, constants.METRICS_UPDATED_METADATA_CACHE_LOADED_KEY, preCacheLoaded, -1) - if err != nil { - zap.L().Warn("Failed to set cached metrics updated metadata", zap.Error(err)) - } - } - - // First, try retrieving each metric from cache. + // 1. Try cache for _, metricName := range metricNames { metadata := new(model.UpdateMetricsMetadata) cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName err := r.cache.Get(ctx, orgID, cacheKey, metadata, true) if err == nil { cachedMetadata[metricName] = metadata - } else if !errorsV2.Ast(err, errorsV2.TypeNotFound) { + } else { missingMetrics = append(missingMetrics, metricName) } } + // 2. Try updated_metrics_metadata table + var stillMissing []string if len(missingMetrics) > 0 { - // Join the missing metric names; ensure proper quoting if needed. - metricList := "'" + strings.Join(metricNames, "', '") + "'" + metricList := "'" + strings.Join(missingMetrics, "', '") + "'" query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit FROM %s.%s WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList) @@ -6306,6 +6256,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID } defer rows.Close() + found := make(map[string]struct{}) for rows.Next() { metadata := new(model.UpdateMetricsMetadata) if err := rows.Scan( @@ -6319,12 +6270,55 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)} } - // Cache the result for future requests. cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil { zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr)) } cachedMetadata[metadata.MetricName] = metadata + found[metadata.MetricName] = struct{}{} + } + + // Determine which metrics are still missing + for _, m := range missingMetrics { + if _, ok := found[m]; !ok { + stillMissing = append(stillMissing, m) + } + } + } + + // 3. Fallback: Try time_series_v4_1week table + if len(stillMissing) > 0 { + metricList := "'" + strings.Join(stillMissing, "', '") + "'" + query := fmt.Sprintf(`SELECT DISTINCT metric_name, type, description, temporality, is_monotonic, unit + FROM %s.%s + WHERE metric_name IN (%s)`, signozMetricDBName, signozTSTableNameV4, metricList) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query) + if err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying time_series_v4 to get metrics metadata: %v", err)} + } + defer rows.Close() + for rows.Next() { + metadata := new(model.UpdateMetricsMetadata) + if err := rows.Scan( + &metadata.MetricName, + &metadata.MetricType, + &metadata.Description, + &metadata.Temporality, + &metadata.IsMonotonic, + &metadata.Unit, + ); err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)} + } + + cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName + if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil { + zap.L().Error("Failed to cache fallback metadata", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr)) + } + cachedMetadata[metadata.MetricName] = metadata + } + if rows.Err() != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)} } } return cachedMetadata, nil diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 872dde7b1bf3..98117c956a6c 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -213,17 +213,6 @@ func (q *querier) runBuilderQuery( return } - if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { - metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) - builderQuery.Temporality = updatedMetadata.Temporality - } - } - // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 3ee940e32234..7312fc46da9f 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -214,17 +214,6 @@ func (q *querier) runBuilderQuery( return } - if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { - metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key) - if apiError != nil { - zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) - } - if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) - builderQuery.Temporality = updatedMetadata.Temporality - } - } - // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it.