mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-18 07:56:56 +00:00
chore(2354): added preloaded metrics metadata at first api call (#8229)
* chore(2354): added preloaded metrics metadata at first api call
This commit is contained in:
parent
8274ebfe37
commit
273452352d
@ -203,17 +203,6 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
|
|||||||
&opAmpModel.AllAgents, agentConfMgr, signoz.Instrumentation,
|
&opAmpModel.AllAgents, agentConfMgr, signoz.Instrumentation,
|
||||||
)
|
)
|
||||||
|
|
||||||
orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, org := range orgs {
|
|
||||||
errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID)
|
|
||||||
for _, er := range errorList {
|
|
||||||
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2300,41 +2300,24 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode
|
|||||||
|
|
||||||
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
|
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
|
||||||
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||||
var metricNamesToQuery []string
|
|
||||||
for _, metricName := range metricNames {
|
// Batch fetch all metadata at once
|
||||||
updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
|
metadataMap, apiErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
|
||||||
if cacheErr != nil {
|
if apiErr != nil {
|
||||||
zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
|
zap.L().Warn("Failed to fetch updated metrics metadata", zap.Error(apiErr))
|
||||||
}
|
return nil, apiErr
|
||||||
if metadata, exist := updatedMetadata[metricName]; exist {
|
|
||||||
if _, exists := metricNameToTemporality[metricName]; !exists {
|
|
||||||
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
|
||||||
}
|
|
||||||
metricNameToTemporality[metricName][metadata.Temporality] = true
|
|
||||||
} else {
|
|
||||||
metricNamesToQuery = append(metricNamesToQuery, metricName)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)
|
for metricName, metadata := range metadataMap {
|
||||||
|
if metadata == nil {
|
||||||
rows, err := r.db.Query(ctx, query, metricNames)
|
continue
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
var metricName, temporality string
|
|
||||||
err := rows.Scan(&metricName, &temporality)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
if _, ok := metricNameToTemporality[metricName]; !ok {
|
if _, exists := metricNameToTemporality[metricName]; !exists {
|
||||||
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
||||||
}
|
}
|
||||||
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
|
metricNameToTemporality[metricName][metadata.Temporality] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return metricNameToTemporality, nil
|
return metricNameToTemporality, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2673,67 +2656,77 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {
|
func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {
|
||||||
|
|
||||||
var query string
|
|
||||||
var err error
|
|
||||||
var rows driver.Rows
|
|
||||||
var response v3.AggregateAttributeResponse
|
var response v3.AggregateAttributeResponse
|
||||||
normalized := true
|
normalized := true
|
||||||
if constants.IsDotMetricsEnabled {
|
if constants.IsDotMetricsEnabled {
|
||||||
normalized = false
|
normalized = false
|
||||||
}
|
}
|
||||||
|
|
||||||
query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 and __normalized = $2 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day)
|
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
`SELECT DISTINCT metric_name
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE metric_name ILIKE $1 AND __normalized = $2`,
|
||||||
|
signozMetricDBName, signozTSTableNameV41Day)
|
||||||
|
|
||||||
if req.Limit != 0 {
|
if req.Limit != 0 {
|
||||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||||
}
|
}
|
||||||
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
|
|
||||||
|
|
||||||
|
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("Error while executing query", zap.Error(err))
|
zap.L().Error("Error while querying metric names", zap.Error(err))
|
||||||
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
return nil, fmt.Errorf("error while executing metric name query: %s", err.Error())
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
seen := make(map[string]struct{})
|
var metricNames []string
|
||||||
|
|
||||||
var metricName, typ, temporality string
|
|
||||||
var isMonotonic bool
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil {
|
var name string
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
if err := rows.Scan(&name); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning metric name: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
if skipSignozMetrics && strings.HasPrefix(name, "signoz") {
|
||||||
if skipSignozMetrics && strings.HasPrefix(metricName, "signoz") {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
metricNames = append(metricNames, name)
|
||||||
|
}
|
||||||
|
|
||||||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
|
if len(metricNames) == 0 {
|
||||||
if apiError != nil {
|
return &response, nil
|
||||||
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
|
}
|
||||||
}
|
|
||||||
if updatedMetadata, exist := metadata[metricName]; exist {
|
// Get all metadata in one shot
|
||||||
typ = string(updatedMetadata.MetricType)
|
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
|
||||||
isMonotonic = updatedMetadata.IsMonotonic
|
if apiError != nil {
|
||||||
temporality = string(updatedMetadata.Temporality)
|
return &response, fmt.Errorf("error getting updated metrics metadata: %s", apiError.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
for _, name := range metricNames {
|
||||||
|
metadata := metadataMap[name]
|
||||||
|
|
||||||
|
typ := string(metadata.MetricType)
|
||||||
|
temporality := string(metadata.Temporality)
|
||||||
|
isMonotonic := metadata.IsMonotonic
|
||||||
|
|
||||||
// Non-monotonic cumulative sums are treated as gauges
|
// Non-monotonic cumulative sums are treated as gauges
|
||||||
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
|
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
|
||||||
typ = "Gauge"
|
typ = "Gauge"
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
|
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
|
||||||
key := v3.AttributeKey{
|
key := v3.AttributeKey{
|
||||||
Key: metricName,
|
Key: name,
|
||||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||||
Type: v3.AttributeKeyType(typ),
|
Type: v3.AttributeKeyType(typ),
|
||||||
IsColumn: true,
|
IsColumn: true,
|
||||||
}
|
}
|
||||||
// remove duplicates
|
|
||||||
if _, ok := seen[metricName+typ]; ok {
|
if _, ok := seen[name+typ]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
seen[metricName+typ] = struct{}{}
|
seen[name+typ] = struct{}{}
|
||||||
response.AttributeKeys = append(response.AttributeKeys, key)
|
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2825,72 +2818,69 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.U
|
|||||||
|
|
||||||
unixMilli := common.PastDayRoundOff()
|
unixMilli := common.PastDayRoundOff()
|
||||||
|
|
||||||
// Note: metric metadata should be accessible regardless of the time range selection
|
// 1. Fetch metadata from cache/db using unified function
|
||||||
// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
|
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
|
||||||
// amount of data scanned
|
|
||||||
query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day)
|
|
||||||
rows, err := r.db.Query(ctx, query, metricName, unixMilli)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Error while fetching metric metadata", zap.Error(err))
|
|
||||||
return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var deltaExists, isMonotonic bool
|
|
||||||
var temporality, description, metricType, unit string
|
|
||||||
for rows.Next() {
|
|
||||||
if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil {
|
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
|
||||||
}
|
|
||||||
if temporality == string(v3.Delta) {
|
|
||||||
deltaExists = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
|
|
||||||
if apiError != nil {
|
if apiError != nil {
|
||||||
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
|
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
|
||||||
}
|
return nil, fmt.Errorf("error fetching metric metadata: %s", apiError.Err.Error())
|
||||||
if updatedMetadata, exist := metadata[metricName]; exist {
|
|
||||||
metricType = string(updatedMetadata.MetricType)
|
|
||||||
temporality = string(updatedMetadata.Temporality)
|
|
||||||
if temporality == string(v3.Delta) {
|
|
||||||
deltaExists = true
|
|
||||||
}
|
|
||||||
isMonotonic = updatedMetadata.IsMonotonic
|
|
||||||
if updatedMetadata.Description != "" {
|
|
||||||
description = updatedMetadata.Description
|
|
||||||
}
|
|
||||||
if updatedMetadata.Unit != "" {
|
|
||||||
unit = updatedMetadata.Unit
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
|
// Defaults in case metadata is not found
|
||||||
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
|
var (
|
||||||
if err != nil {
|
deltaExists bool
|
||||||
zap.L().Error("Error while executing query", zap.Error(err))
|
isMonotonic bool
|
||||||
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
temporality string
|
||||||
}
|
description string
|
||||||
defer rows.Close()
|
metricType string
|
||||||
|
unit string
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata, ok := metadataMap[metricName]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("metric metadata not found: %s", metricName)
|
||||||
|
}
|
||||||
|
|
||||||
|
metricType = string(metadata.MetricType)
|
||||||
|
temporality = string(metadata.Temporality)
|
||||||
|
isMonotonic = metadata.IsMonotonic
|
||||||
|
description = metadata.Description
|
||||||
|
unit = metadata.Unit
|
||||||
|
|
||||||
|
if temporality == string(v3.Delta) {
|
||||||
|
deltaExists = true
|
||||||
|
}
|
||||||
|
// 2. Only for Histograms, get `le` buckets
|
||||||
var leFloat64 []float64
|
var leFloat64 []float64
|
||||||
for rows.Next() {
|
if metricType == string(v3.MetricTypeHistogram) {
|
||||||
var leStr string
|
query := fmt.Sprintf(`
|
||||||
if err := rows.Scan(&leStr); err != nil {
|
SELECT JSONExtractString(labels, 'le') AS le
|
||||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
FROM %s.%s
|
||||||
}
|
WHERE metric_name = $1
|
||||||
le, err := strconv.ParseFloat(leStr, 64)
|
AND unix_milli >= $2
|
||||||
// ignore the error and continue if the value is not a float
|
AND type = 'Histogram'
|
||||||
// ideally this should not happen but we have seen ClickHouse
|
AND JSONExtractString(labels, 'service_name') = $3
|
||||||
// returning empty string for some values
|
GROUP BY le
|
||||||
|
ORDER BY le`, signozMetricDBName, signozTSTableNameV41Day)
|
||||||
|
|
||||||
|
rows, err := r.db.Query(ctx, query, metricName, unixMilli, serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("error while parsing le value", zap.Error(err))
|
zap.L().Error("Error while querying histogram buckets", zap.Error(err))
|
||||||
continue
|
return nil, fmt.Errorf("error while querying histogram buckets: %s", err.Error())
|
||||||
}
|
}
|
||||||
if math.IsInf(le, 0) {
|
defer rows.Close()
|
||||||
continue
|
|
||||||
|
for rows.Next() {
|
||||||
|
var leStr string
|
||||||
|
if err := rows.Scan(&leStr); err != nil {
|
||||||
|
return nil, fmt.Errorf("error while scanning le: %s", err.Error())
|
||||||
|
}
|
||||||
|
le, err := strconv.ParseFloat(leStr, 64)
|
||||||
|
if err != nil || math.IsInf(le, 0) {
|
||||||
|
zap.L().Error("Invalid 'le' bucket value", zap.String("value", leStr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
leFloat64 = append(leFloat64, le)
|
||||||
}
|
}
|
||||||
leFloat64 = append(leFloat64, le)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &v3.MetricMetadataResponse{
|
return &v3.MetricMetadataResponse{
|
||||||
@ -5811,7 +5801,7 @@ VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadat
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||||
}
|
}
|
||||||
err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
|
err = r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &model.ApiError{Typ: "CachingErr", Err: err}
|
return &model.ApiError{Typ: "CachingErr", Err: err}
|
||||||
}
|
}
|
||||||
@ -5852,33 +5842,11 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
|
|||||||
return hasLE, nil
|
return hasLE, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) []error {
|
|
||||||
var allMetricsMetadata []model.UpdateMetricsMetadata
|
|
||||||
var errorList []error
|
|
||||||
// Fetch all rows from ClickHouse
|
|
||||||
query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit
|
|
||||||
FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
|
|
||||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
|
||||||
err := r.db.Select(valueCtx, &allMetricsMetadata, query)
|
|
||||||
if err != nil {
|
|
||||||
errorList = append(errorList, err)
|
|
||||||
return errorList
|
|
||||||
}
|
|
||||||
for _, m := range allMetricsMetadata {
|
|
||||||
err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
|
|
||||||
if err != nil {
|
|
||||||
errorList = append(errorList, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return errorList
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
|
||||||
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
|
||||||
var missingMetrics []string
|
var missingMetrics []string
|
||||||
|
|
||||||
// First, try retrieving each metric from cache.
|
// 1. Try cache
|
||||||
for _, metricName := range metricNames {
|
for _, metricName := range metricNames {
|
||||||
metadata := new(model.UpdateMetricsMetadata)
|
metadata := new(model.UpdateMetricsMetadata)
|
||||||
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
|
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
|
||||||
@ -5890,10 +5858,10 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are any metrics missing in the cache, query them from the database.
|
// 2. Try updated_metrics_metadata table
|
||||||
|
var stillMissing []string
|
||||||
if len(missingMetrics) > 0 {
|
if len(missingMetrics) > 0 {
|
||||||
// Join the missing metric names; ensure proper quoting if needed.
|
metricList := "'" + strings.Join(missingMetrics, "', '") + "'"
|
||||||
metricList := "'" + strings.Join(metricNames, "', '") + "'"
|
|
||||||
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
|
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
|
||||||
FROM %s.%s
|
FROM %s.%s
|
||||||
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
|
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
|
||||||
@ -5905,6 +5873,7 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
|
|||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
|
found := make(map[string]struct{})
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
metadata := new(model.UpdateMetricsMetadata)
|
metadata := new(model.UpdateMetricsMetadata)
|
||||||
if err := rows.Scan(
|
if err := rows.Scan(
|
||||||
@ -5918,15 +5887,57 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
|
|||||||
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
|
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache the result for future requests.
|
|
||||||
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
|
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
|
||||||
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
|
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, 0); cacheErr != nil {
|
||||||
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
|
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
|
||||||
}
|
}
|
||||||
cachedMetadata[metadata.MetricName] = metadata
|
cachedMetadata[metadata.MetricName] = metadata
|
||||||
|
found[metadata.MetricName] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine which metrics are still missing
|
||||||
|
for _, m := range missingMetrics {
|
||||||
|
if _, ok := found[m]; !ok {
|
||||||
|
stillMissing = append(stillMissing, m)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 3. Fallback: Try time_series_v4_1week table
|
||||||
|
if len(stillMissing) > 0 {
|
||||||
|
metricList := "'" + strings.Join(stillMissing, "', '") + "'"
|
||||||
|
query := fmt.Sprintf(`SELECT DISTINCT metric_name, type, description, temporality, is_monotonic, unit
|
||||||
|
FROM %s.%s
|
||||||
|
WHERE metric_name IN (%s)`, signozMetricDBName, signozTSTableNameV4, metricList)
|
||||||
|
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||||
|
rows, err := r.db.Query(valueCtx, query)
|
||||||
|
if err != nil {
|
||||||
|
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying time_series_v4 to get metrics metadata: %v", err)}
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
for rows.Next() {
|
||||||
|
metadata := new(model.UpdateMetricsMetadata)
|
||||||
|
if err := rows.Scan(
|
||||||
|
&metadata.MetricName,
|
||||||
|
&metadata.MetricType,
|
||||||
|
&metadata.Description,
|
||||||
|
&metadata.Temporality,
|
||||||
|
&metadata.IsMonotonic,
|
||||||
|
&metadata.Unit,
|
||||||
|
); err != nil {
|
||||||
|
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
|
||||||
|
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, 0); cacheErr != nil {
|
||||||
|
zap.L().Error("Failed to cache fallback metadata", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
|
||||||
|
}
|
||||||
|
cachedMetadata[metadata.MetricName] = metadata
|
||||||
|
}
|
||||||
|
if rows.Err() != nil {
|
||||||
|
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
|
||||||
|
}
|
||||||
|
}
|
||||||
return cachedMetadata, nil
|
return cachedMetadata, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -213,17 +213,6 @@ func (q *querier) runBuilderQuery(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
|
|
||||||
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
|
|
||||||
if apiError != nil {
|
|
||||||
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
|
|
||||||
}
|
|
||||||
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
|
|
||||||
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
|
|
||||||
builderQuery.Temporality = updatedMetadata.Temporality
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// What is happening here?
|
// What is happening here?
|
||||||
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
||||||
// If the query is not cached, we execute the query and return the result without caching it.
|
// If the query is not cached, we execute the query and return the result without caching it.
|
||||||
|
|||||||
@ -214,17 +214,6 @@ func (q *querier) runBuilderQuery(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
|
|
||||||
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
|
|
||||||
if apiError != nil {
|
|
||||||
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
|
|
||||||
}
|
|
||||||
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
|
|
||||||
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
|
|
||||||
builderQuery.Temporality = updatedMetadata.Temporality
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// What is happening here?
|
// What is happening here?
|
||||||
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
||||||
// If the query is not cached, we execute the query and return the result without caching it.
|
// If the query is not cached, we execute the query and return the result without caching it.
|
||||||
|
|||||||
@ -164,18 +164,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz, jwt *authtypes.JWT)
|
|||||||
agentConfMgr,
|
agentConfMgr,
|
||||||
signoz.Instrumentation,
|
signoz.Instrumentation,
|
||||||
)
|
)
|
||||||
|
|
||||||
orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
for _, org := range orgs {
|
|
||||||
errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID)
|
|
||||||
for _, er := range errorList {
|
|
||||||
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,6 @@ package rules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -1224,7 +1223,6 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
|||||||
|
|
||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rows := cmock.NewRows(cols, c.values)
|
rows := cmock.NewRows(cols, c.values)
|
||||||
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
|
||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
queryString := "SELECT any"
|
queryString := "SELECT any"
|
||||||
@ -1322,7 +1320,6 @@ func TestThresholdRuleNoData(t *testing.T) {
|
|||||||
for idx, c := range cases {
|
for idx, c := range cases {
|
||||||
rows := cmock.NewRows(cols, c.values)
|
rows := cmock.NewRows(cols, c.values)
|
||||||
|
|
||||||
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
|
|
||||||
|
|
||||||
// We are testing the eval logic after the query is run
|
// We are testing the eval logic after the query is run
|
||||||
// so we don't care about the query string here
|
// so we don't care about the query string here
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user