feat(telemetrymeter): introduce source for query

This commit is contained in:
vikrantgupta25 2025-08-04 02:49:09 +05:30
parent 8370deaf35
commit e522817df5
No known key found for this signature in database
GPG Key ID: F8440BDE36411E79
12 changed files with 74 additions and 33 deletions

View File

@ -12,6 +12,7 @@ import (
func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) {
var req telemetrytypes.FieldKeySelector
var signal telemetrytypes.Signal
var source telemetrytypes.Source
var err error
signalStr := r.URL.Query().Get("signal")
@ -21,6 +22,13 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
signal = telemetrytypes.SignalUnspecified
}
sourceStr := r.URL.Query().Get("source")
if sourceStr != "" {
source = telemetrytypes.Source{String: valuer.NewString(sourceStr)}
} else {
source = telemetrytypes.SourceUnspecified
}
if r.URL.Query().Get("limit") != "" {
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
@ -76,6 +84,7 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
StartUnixMilli: startUnixMilli,
EndUnixMilli: endUnixMilli,
Signal: signal,
Source: source,
Name: name,
FieldContext: fieldContext,
FieldDataType: fieldDataType,

View File

@ -62,6 +62,9 @@ func (q *builderQuery[T]) Fingerprint() string {
// Add signal type
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
// Add source type
parts = append(parts, fmt.Sprintf("source=%s", q.spec.Source.StringValue()))
// Add step interval if present
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))

View File

@ -172,7 +172,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0
if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") {
if spec.Source == telemetrytypes.SourceMeter {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
} else {
if spec.StepInterval.Seconds() == 0 {
@ -274,7 +274,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
var bq *builderQuery[qbtypes.MetricAggregation]
if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") {
if spec.Source == telemetrytypes.SourceMeter {
bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
} else {
bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)

View File

@ -2740,16 +2740,10 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org
response.AttributeKeys = append(response.AttributeKeys, key)
}
meterAggregateAttributes, err := r.getMeterAggregateAttributes(ctx, orgID, req)
if err != nil {
return nil, err
}
response.AttributeKeys = append(response.AttributeKeys, meterAggregateAttributes.AttributeKeys...)
return &response, nil
}
func (r *ClickHouseReader) getMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
var response v3.AggregateAttributeResponse
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
query := fmt.Sprintf(
@ -2834,16 +2828,10 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
response.AttributeKeys = append(response.AttributeKeys, key)
}
meterKeys, err := r.getMeterAttributeKeys(ctx, req)
if err != nil {
return nil, err
}
response.AttributeKeys = append(response.AttributeKeys, meterKeys.AttributeKeys...)
return &response, nil
}
func (r *ClickHouseReader) getMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows

View File

@ -4217,6 +4217,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req)
case v3.DataSourceMeter:
response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req)
default:
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
return
@ -4266,6 +4268,8 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
case v3.DataSourceMeter:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
case v3.DataSourceTraces:

View File

@ -50,7 +50,9 @@ type Reader interface {
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations

View File

@ -22,11 +22,12 @@ const (
DataSourceTraces DataSource = "traces"
DataSourceLogs DataSource = "logs"
DataSourceMetrics DataSource = "metrics"
DataSourceMeter DataSource = "meter"
)
func (d DataSource) Validate() error {
switch d {
case DataSourceTraces, DataSourceLogs, DataSourceMetrics:
case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter:
return nil
default:
return fmt.Errorf("invalid data source: %s", d)

View File

@ -67,7 +67,7 @@ func RecommendedStepIntervalForMeter(start, end uint64) uint64 {
step := (end - start) / RecommendedNumberOfPoints / 1e9
// for meter queries the minimum step interval allowed is 1 day as this is our granularity
// for meter queries the minimum step interval allowed is 1 hour as this is our granularity
if step < 3600 {
return 3600
}

View File

@ -555,12 +555,6 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
meterKeys, err := t.getMeterKeys(ctx, fieldKeySelectors)
if err != nil {
return nil, err
}
keys = append(keys, meterKeys...)
return keys, nil
}
@ -639,7 +633,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
case telemetrytypes.SignalLogs:
keys, err = t.getLogsKeys(ctx, selectors)
case telemetrytypes.SignalMetrics:
keys, err = t.getMetricsKeys(ctx, selectors)
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
keys, err = t.getMeterKeys(ctx, selectors)
} else {
keys, err = t.getMetricsKeys(ctx, selectors)
}
case telemetrytypes.SignalUnspecified:
// get traces keys
tracesKeys, err := t.getTracesKeys(ctx, selectors)
@ -661,6 +659,14 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
return nil, err
}
keys = append(keys, metricsKeys...)
// get meter metrics keys
meterSourceMetricsKeys, err := t.getMeterKeys(ctx, selectors)
if err != nil {
return nil, err
}
keys = append(keys, meterSourceMetricsKeys...)
}
if err != nil {
return nil, err
@ -679,6 +685,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
logsSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal {
@ -687,11 +694,16 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics:
metricsSelectors = append(metricsSelectors, fieldKeySelector)
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector)
} else {
metricsSelectors = append(metricsSelectors, fieldKeySelector)
}
case telemetrytypes.SignalUnspecified:
logsSelectors = append(logsSelectors, fieldKeySelector)
tracesSelectors = append(tracesSelectors, fieldKeySelector)
metricsSelectors = append(metricsSelectors, fieldKeySelector)
meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector)
}
}
@ -708,6 +720,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
return nil, err
}
meterSourceMetricsKeys, err := t.getMeterKeys(ctx, meterSourceMetricsSelectors)
if err != nil {
return nil, err
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range logsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
@ -718,6 +735,9 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
for _, key := range metricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range meterSourceMetricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, nil
}
@ -1024,12 +1044,6 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
values.StringValues = append(values.StringValues, stringValue)
}
meterFieldValues, err := t.getMeterFieldValues(ctx, fieldValueSelector)
if err != nil {
return nil, err
}
values.StringValues = append(values.StringValues, meterFieldValues.StringValues...)
return values, nil
}
@ -1108,7 +1122,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
case telemetrytypes.SignalLogs:
values, err = t.getLogFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalMetrics:
values, err = t.getMetricFieldValues(ctx, fieldValueSelector)
if fieldValueSelector.Source == telemetrytypes.SourceMeter {
values, err = t.getMeterFieldValues(ctx, fieldValueSelector)
} else {
values, err = t.getMetricFieldValues(ctx, fieldValueSelector)
}
case telemetrytypes.SignalUnspecified:
mapOfValues := make(map[any]bool)
mapOfRelatedValues := make(map[any]bool)

View File

@ -14,6 +14,9 @@ type QueryBuilderQuery[T any] struct {
// signal to query
Signal telemetrytypes.Signal `json:"signal,omitempty"`
// source for query
Source telemetrytypes.Source `json:"source,omitempty"`
// we want to support multiple aggregations
// currently supported: []Aggregation, []MetricAggregation
Aggregations []T `json:"aggregations,omitempty"`

View File

@ -124,6 +124,7 @@ type FieldKeySelector struct {
StartUnixMilli int64 `json:"startUnixMilli"`
EndUnixMilli int64 `json:"endUnixMilli"`
Signal Signal `json:"signal"`
Source Source `json:"source"`
FieldContext FieldContext `json:"fieldContext"`
FieldDataType FieldDataType `json:"fieldDataType"`
Name string `json:"name"`

View File

@ -0,0 +1,12 @@
package telemetrytypes
import "github.com/SigNoz/signoz/pkg/valuer"
type Source struct {
valuer.String
}
var (
SourceMeter = Source{valuer.NewString("meter")}
SourceUnspecified = Source{valuer.NewString("")}
)