chore(2354): added preloaded metrics metadata at first api call

This commit is contained in:
aniket 2025-06-12 23:19:53 +05:30
parent ea8edaf646
commit eb6d361241
3 changed files with 153 additions and 181 deletions

View File

@ -2315,41 +2315,25 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
var metricNamesToQuery []string
for _, metricName := range metricNames {
updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if cacheErr != nil {
zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
}
if metadata, exist := updatedMetadata[metricName]; exist {
if _, exists := metricNameToTemporality[metricName]; !exists {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][metadata.Temporality] = true
} else {
metricNamesToQuery = append(metricNamesToQuery, metricName)
}
// Batch fetch all metadata at once
metadataMap, apiErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
if apiErr != nil {
zap.L().Warn("Failed to fetch updated metrics metadata", zap.Error(apiErr))
// best-effort return, not failing outright
return metricNameToTemporality, nil
}
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricNames)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var metricName, temporality string
err := rows.Scan(&metricName, &temporality)
if err != nil {
return nil, err
for metricName, metadata := range metadataMap {
if metadata == nil {
continue
}
if _, ok := metricNameToTemporality[metricName]; !ok {
if _, exists := metricNameToTemporality[metricName]; !exists {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
metricNameToTemporality[metricName][metadata.Temporality] = true
}
return metricNameToTemporality, nil
}
@ -3004,67 +2988,80 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string)
}
func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.AggregateAttributeResponse
normalized := true
if constants.IsDotMetricsEnabled {
normalized = false
}
query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 and __normalized = $2 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day)
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
query := fmt.Sprintf(
`SELECT DISTINCT metric_name
FROM %s.%s
WHERE metric_name ILIKE $1 AND __normalized = $2`,
signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
zap.L().Error("Error while querying metric names", zap.Error(err))
return nil, fmt.Errorf("error while executing metric name query: %s", err.Error())
}
defer rows.Close()
seen := make(map[string]struct{})
var metricName, typ, temporality string
var isMonotonic bool
var metricNames []string
for rows.Next() {
if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
var name string
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("error while scanning metric name: %s", err.Error())
}
if skipSignozMetrics && strings.HasPrefix(name, "signoz") {
continue
}
metricNames = append(metricNames, name)
}
if skipSignozMetrics && strings.HasPrefix(metricName, "signoz") {
if len(metricNames) == 0 {
return &response, nil
}
// Get all metadata in one shot
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
if apiError != nil {
return &response, fmt.Errorf("error getting updated metrics metadata: %s", apiError.Error())
}
seen := make(map[string]struct{})
for _, name := range metricNames {
metadata, ok := metadataMap[name]
if !ok {
continue
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[metricName]; exist {
typ = string(updatedMetadata.MetricType)
isMonotonic = updatedMetadata.IsMonotonic
temporality = string(updatedMetadata.Temporality)
}
typ := string(metadata.MetricType)
temporality := string(metadata.Temporality)
isMonotonic := metadata.IsMonotonic
// Non-monotonic cumulative sums are treated as gauges
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
typ = "Gauge"
}
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
key := v3.AttributeKey{
Key: metricName,
Key: name,
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType(typ),
IsColumn: true,
}
// remove duplicates
if _, ok := seen[metricName+typ]; ok {
if _, ok := seen[name+typ]; ok {
continue
}
seen[metricName+typ] = struct{}{}
seen[name+typ] = struct{}{}
response.AttributeKeys = append(response.AttributeKeys, key)
}
@ -3156,72 +3153,67 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.U
unixMilli := common.PastDayRoundOff()
// Note: metric metadata should be accessible regardless of the time range selection
// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
// amount of data scanned
query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName, unixMilli)
if err != nil {
zap.L().Error("Error while fetching metric metadata", zap.Error(err))
return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
}
defer rows.Close()
var deltaExists, isMonotonic bool
var temporality, description, metricType, unit string
for rows.Next() {
if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if temporality == string(v3.Delta) {
deltaExists = true
}
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
// 1. Fetch metadata from cache/db using unified function
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
return nil, fmt.Errorf("error fetching metric metadata: %s", apiError.Err.Error())
}
if updatedMetadata, exist := metadata[metricName]; exist {
metricType = string(updatedMetadata.MetricType)
temporality = string(updatedMetadata.Temporality)
// Defaults in case metadata is not found
var (
deltaExists bool
isMonotonic bool
temporality string
description string
metricType string
unit string
)
if metadata, exists := metadataMap[metricName]; exists {
metricType = string(metadata.MetricType)
temporality = string(metadata.Temporality)
isMonotonic = metadata.IsMonotonic
description = metadata.Description
unit = metadata.Unit
if temporality == string(v3.Delta) {
deltaExists = true
}
isMonotonic = updatedMetadata.IsMonotonic
if updatedMetadata.Description != "" {
description = updatedMetadata.Description
}
if updatedMetadata.Unit != "" {
unit = updatedMetadata.Unit
}
}
query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
// 2. Only for Histograms, get `le` buckets
var leFloat64 []float64
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
// ignore the error and continue if the value is not a float
// ideally this should not happen but we have seen ClickHouse
// returning empty string for some values
if metricType == string(v3.MetricTypeHistogram) {
query := fmt.Sprintf(`
SELECT JSONExtractString(labels, 'le') AS le
FROM %s.%s
WHERE metric_name = $1
AND unix_milli >= $2
AND type = 'Histogram'
AND JSONExtractString(labels, 'service_name') = $3
GROUP BY le
ORDER BY le`, signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName, unixMilli, serviceName)
if err != nil {
zap.L().Error("error while parsing le value", zap.Error(err))
continue
zap.L().Error("Error while querying histogram buckets", zap.Error(err))
return nil, fmt.Errorf("error while querying histogram buckets: %s", err.Error())
}
if math.IsInf(le, 0) {
continue
defer rows.Close()
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning le: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
if err != nil || math.IsInf(le, 0) {
zap.L().Error("Invalid 'le' bucket value", zap.String("value", leStr), zap.Error(err))
continue
}
leFloat64 = append(leFloat64, le)
}
leFloat64 = append(leFloat64, le)
}
return &v3.MetricMetadataResponse{
@ -6233,68 +6225,26 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
return hasLE, nil
}
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) ([]string, *model.ApiError) {
var allMetricsMetadata []model.UpdateMetricsMetadata
var errorMetricsList []string
// Fetch all rows from ClickHouse
query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit
FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
err := r.db.Select(valueCtx, &allMetricsMetadata, query)
if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: fmt.Errorf("error in getting updated metadata: %v", err)}
}
for _, m := range allMetricsMetadata {
err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
if err != nil {
errorMetricsList = append(errorMetricsList, m.MetricName)
}
}
return errorMetricsList, nil
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string
preCacheLoaded := new(model.CacheLoaded)
err := r.cache.Get(ctx, orgID, constants.METRICS_UPDATED_METADATA_CACHE_LOADED_KEY, preCacheLoaded, false)
if err != nil {
*preCacheLoaded = false
zap.L().Warn("Failed to get cached metrics updated metadata", zap.Error(err))
}
if !*preCacheLoaded {
preLoadErrorList, apiErr := r.PreloadMetricsMetadata(ctx, orgID)
if apiErr != nil {
return nil, apiErr
}
if preLoadErrorList != nil && len(preLoadErrorList) > 0 {
missingMetrics = append(missingMetrics, preLoadErrorList...)
}
*preCacheLoaded = true
err := r.cache.Set(ctx, orgID, constants.METRICS_UPDATED_METADATA_CACHE_LOADED_KEY, preCacheLoaded, -1)
if err != nil {
zap.L().Warn("Failed to set cached metrics updated metadata", zap.Error(err))
}
}
// First, try retrieving each metric from cache.
// 1. Try cache
for _, metricName := range metricNames {
metadata := new(model.UpdateMetricsMetadata)
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
err := r.cache.Get(ctx, orgID, cacheKey, metadata, true)
if err == nil {
cachedMetadata[metricName] = metadata
} else if !errorsV2.Ast(err, errorsV2.TypeNotFound) {
} else {
missingMetrics = append(missingMetrics, metricName)
}
}
// 2. Try updated_metrics_metadata table
var stillMissing []string
if len(missingMetrics) > 0 {
// Join the missing metric names; ensure proper quoting if needed.
metricList := "'" + strings.Join(metricNames, "', '") + "'"
metricList := "'" + strings.Join(missingMetrics, "', '") + "'"
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
@ -6306,6 +6256,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
}
defer rows.Close()
found := make(map[string]struct{})
for rows.Next() {
metadata := new(model.UpdateMetricsMetadata)
if err := rows.Scan(
@ -6319,12 +6270,55 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
}
// Cache the result for future requests.
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata
found[metadata.MetricName] = struct{}{}
}
// Determine which metrics are still missing
for _, m := range missingMetrics {
if _, ok := found[m]; !ok {
stillMissing = append(stillMissing, m)
}
}
}
// 3. Fallback: Try time_series_v4_1week table
if len(stillMissing) > 0 {
metricList := "'" + strings.Join(stillMissing, "', '") + "'"
query := fmt.Sprintf(`SELECT DISTINCT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name IN (%s)`, signozMetricDBName, signozTSTableNameV4, metricList)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query)
if err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying time_series_v4 to get metrics metadata: %v", err)}
}
defer rows.Close()
for rows.Next() {
metadata := new(model.UpdateMetricsMetadata)
if err := rows.Scan(
&metadata.MetricName,
&metadata.MetricType,
&metadata.Description,
&metadata.Temporality,
&metadata.IsMonotonic,
&metadata.Unit,
); err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
}
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to cache fallback metadata", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata
}
if rows.Err() != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
}
}
return cachedMetadata, nil

View File

@ -213,17 +213,6 @@ func (q *querier) runBuilderQuery(
return
}
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.

View File

@ -214,17 +214,6 @@ func (q *querier) runBuilderQuery(
return
}
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.