diff --git a/pkg/apis/fields/parse.go b/pkg/apis/fields/parse.go index 114be5a51669..913b48b1502a 100644 --- a/pkg/apis/fields/parse.go +++ b/pkg/apis/fields/parse.go @@ -12,6 +12,7 @@ import ( func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) { var req telemetrytypes.FieldKeySelector var signal telemetrytypes.Signal + var source telemetrytypes.Source var err error signalStr := r.URL.Query().Get("signal") @@ -21,6 +22,13 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er signal = telemetrytypes.SignalUnspecified } + sourceStr := r.URL.Query().Get("source") + if sourceStr != "" { + source = telemetrytypes.Source{String: valuer.NewString(sourceStr)} + } else { + source = telemetrytypes.SourceUnspecified + } + if r.URL.Query().Get("limit") != "" { limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { @@ -76,6 +84,7 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er StartUnixMilli: startUnixMilli, EndUnixMilli: endUnixMilli, Signal: signal, + Source: source, Name: name, FieldContext: fieldContext, FieldDataType: fieldDataType, diff --git a/pkg/querier/builder_query.go b/pkg/querier/builder_query.go index 756dbd318b7b..1f7d025a293c 100644 --- a/pkg/querier/builder_query.go +++ b/pkg/querier/builder_query.go @@ -62,6 +62,9 @@ func (q *builderQuery[T]) Fingerprint() string { // Add signal type parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue())) + // Add source type + parts = append(parts, fmt.Sprintf("source=%s", q.spec.Source.StringValue())) + // Add step interval if present parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String())) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 95fe304f9256..9d544950b221 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -172,7 +172,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != "" event.GroupByApplied = len(spec.GroupBy) > 0 - if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") { + if spec.Source == telemetrytypes.SourceMeter { spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))} } else { if spec.StepInterval.Seconds() == 0 { @@ -274,7 +274,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) var bq *builderQuery[qbtypes.MetricAggregation] - if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") { + if spec.Source == telemetrytypes.SourceMeter { bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars) } else { bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 1476ba96e514..84426b538212 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2740,16 +2740,10 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org response.AttributeKeys = append(response.AttributeKeys, key) } - meterAggregateAttributes, err := r.getMeterAggregateAttributes(ctx, orgID, req) - if err != nil { - return nil, err - } - - response.AttributeKeys = append(response.AttributeKeys, meterAggregateAttributes.AttributeKeys...) return &response, nil } -func (r *ClickHouseReader) getMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { +func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { var response v3.AggregateAttributeResponse // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db query := fmt.Sprintf( @@ -2834,16 +2828,10 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F response.AttributeKeys = append(response.AttributeKeys, key) } - meterKeys, err := r.getMeterAttributeKeys(ctx, req) - if err != nil { - return nil, err - } - - response.AttributeKeys = append(response.AttributeKeys, meterKeys.AttributeKeys...) return &response, nil } -func (r *ClickHouseReader) getMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { +func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { var query string var err error var rows driver.Rows diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 69be0b7f18fb..02e8b9f3b2c2 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4217,6 +4217,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req) + case v3.DataSourceMeter: + response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req) default: RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) return @@ -4266,6 +4268,8 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R switch req.DataSource { case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) + case v3.DataSourceMeter: + response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) case v3.DataSourceLogs: response, err = aH.reader.GetLogAttributeKeys(r.Context(), req) case v3.DataSourceTraces: diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index b378c94a0283..183dfa4ae84d 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -50,7 +50,9 @@ type Reader interface { FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) + GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) // Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 67bbf65af586..cb1abd38818b 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -22,11 +22,12 @@ const ( DataSourceTraces DataSource = "traces" DataSourceLogs DataSource = "logs" DataSourceMetrics DataSource = "metrics" + DataSourceMeter DataSource = "meter" ) func (d DataSource) Validate() error { switch d { - case DataSourceTraces, DataSourceLogs, DataSourceMetrics: + case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter: return nil default: return fmt.Errorf("invalid data source: %s", d) diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go index cb6c18a0b778..8e78cb5525d6 100644 --- a/pkg/querybuilder/time.go +++ b/pkg/querybuilder/time.go @@ -67,7 +67,7 @@ func RecommendedStepIntervalForMeter(start, end uint64) uint64 { step := (end - start) / RecommendedNumberOfPoints / 1e9 - // for meter queries the minimum step interval allowed is 1 day as this is our granularity + // for meter queries the minimum step interval allowed is 1 hour as this is our granularity if step < 3600 { return 3600 } diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 8a04b333422d..e7b9518d3c54 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -555,12 +555,6 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) } - meterKeys, err := t.getMeterKeys(ctx, fieldKeySelectors) - if err != nil { - return nil, err - } - - keys = append(keys, meterKeys...) return keys, nil } @@ -639,7 +633,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele case telemetrytypes.SignalLogs: keys, err = t.getLogsKeys(ctx, selectors) case telemetrytypes.SignalMetrics: - keys, err = t.getMetricsKeys(ctx, selectors) + if fieldKeySelector.Source == telemetrytypes.SourceMeter { + keys, err = t.getMeterKeys(ctx, selectors) + } else { + keys, err = t.getMetricsKeys(ctx, selectors) + } case telemetrytypes.SignalUnspecified: // get traces keys tracesKeys, err := t.getTracesKeys(ctx, selectors) @@ -661,6 +659,14 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele return nil, err } keys = append(keys, metricsKeys...) + + // get meter metrics keys + meterSourceMetricsKeys, err := t.getMeterKeys(ctx, selectors) + if err != nil { + return nil, err + } + + keys = append(keys, meterSourceMetricsKeys...) } if err != nil { return nil, err @@ -679,6 +685,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors logsSelectors := []*telemetrytypes.FieldKeySelector{} tracesSelectors := []*telemetrytypes.FieldKeySelector{} metricsSelectors := []*telemetrytypes.FieldKeySelector{} + meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{} for _, fieldKeySelector := range fieldKeySelectors { switch fieldKeySelector.Signal { @@ -687,11 +694,16 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors case telemetrytypes.SignalTraces: tracesSelectors = append(tracesSelectors, fieldKeySelector) case telemetrytypes.SignalMetrics: - metricsSelectors = append(metricsSelectors, fieldKeySelector) + if fieldKeySelector.Source == telemetrytypes.SourceMeter { + meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector) + } else { + metricsSelectors = append(metricsSelectors, fieldKeySelector) + } case telemetrytypes.SignalUnspecified: logsSelectors = append(logsSelectors, fieldKeySelector) tracesSelectors = append(tracesSelectors, fieldKeySelector) metricsSelectors = append(metricsSelectors, fieldKeySelector) + meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector) } } @@ -708,6 +720,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors return nil, err } + meterSourceMetricsKeys, err := t.getMeterKeys(ctx, meterSourceMetricsSelectors) + if err != nil { + return nil, err + } + mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey) for _, key := range logsKeys { mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) @@ -718,6 +735,9 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors for _, key := range metricsKeys { mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) } + for _, key := range meterSourceMetricsKeys { + mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) + } return mapOfKeys, nil } @@ -1024,12 +1044,6 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu values.StringValues = append(values.StringValues, stringValue) } - meterFieldValues, err := t.getMeterFieldValues(ctx, fieldValueSelector) - if err != nil { - return nil, err - } - - values.StringValues = append(values.StringValues, meterFieldValues.StringValues...) return values, nil } @@ -1108,7 +1122,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto case telemetrytypes.SignalLogs: values, err = t.getLogFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalMetrics: - values, err = t.getMetricFieldValues(ctx, fieldValueSelector) + if fieldValueSelector.Source == telemetrytypes.SourceMeter { + values, err = t.getMeterFieldValues(ctx, fieldValueSelector) + } else { + values, err = t.getMetricFieldValues(ctx, fieldValueSelector) + } case telemetrytypes.SignalUnspecified: mapOfValues := make(map[any]bool) mapOfRelatedValues := make(map[any]bool) diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go index c81433d1ed02..50ce9423ad77 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go @@ -14,6 +14,9 @@ type QueryBuilderQuery[T any] struct { // signal to query Signal telemetrytypes.Signal `json:"signal,omitempty"` + // source for query + Source telemetrytypes.Source `json:"source,omitempty"` + // we want to support multiple aggregations // currently supported: []Aggregation, []MetricAggregation Aggregations []T `json:"aggregations,omitempty"` diff --git a/pkg/types/telemetrytypes/field.go b/pkg/types/telemetrytypes/field.go index 38928adcc952..256707da6221 100644 --- a/pkg/types/telemetrytypes/field.go +++ b/pkg/types/telemetrytypes/field.go @@ -124,6 +124,7 @@ type FieldKeySelector struct { StartUnixMilli int64 `json:"startUnixMilli"` EndUnixMilli int64 `json:"endUnixMilli"` Signal Signal `json:"signal"` + Source Source `json:"source"` FieldContext FieldContext `json:"fieldContext"` FieldDataType FieldDataType `json:"fieldDataType"` Name string `json:"name"` diff --git a/pkg/types/telemetrytypes/source.go b/pkg/types/telemetrytypes/source.go new file mode 100644 index 000000000000..247a9a5a5b3c --- /dev/null +++ b/pkg/types/telemetrytypes/source.go @@ -0,0 +1,12 @@ +package telemetrytypes + +import "github.com/SigNoz/signoz/pkg/valuer" + +type Source struct { + valuer.String +} + +var ( + SourceMeter = Source{valuer.NewString("meter")} + SourceUnspecified = Source{valuer.NewString("")} +)