feat(telemetrymeter): add support for telemetry meter (#8667)

* feat(telemetry/meter): added base setup for telemetry meter signal

* feat(telemetry/meter): added metadata setup for meter

* feat(telemetry/meter): fix stmnt builder tests

* feat(telemetry/meter): test query range API fixes

* feat(telemetry/meter): improve error messages

* feat(telemetrymeter): step interval improvements

* feat(telemetrymeter): metadata changes and aggregate attribute changes

* feat(telemetrymeter): metadata changes and aggregate attribute changes

* feat(telemetrymeter): deprecate the signal and use aggregation instead

* feat(telemetrymeter): deprecate the signal and use aggregation instead

* feat(telemetrymeter): deprecate the signal and use aggregation instead

* feat(telemetrymeter): cleanup the types

* feat(telemetrymeter): introduce source for query

* feat(telemetrymeter): better naming for source in metadata

* feat(telemetrymeter): added quick filters for meter explorer

* feat(telemetrymeter): incorporate the new changes to stmnt builder

* feat(telemetrymeter): add the statement builder for the ranged cache queries

* feat(telemetrymeter): use meter aggregate keys

* feat(telemetrymeter): use meter aggregate keys

* feat(telemetrymeter): remove meter from complete bools

* feat(telemetrymeter): remove meter from complete bools

* feat(telemetrymeter): update the quick filters to use meter
This commit is contained in:
Vikrant Gupta 2025-08-07 16:50:37 +05:30 committed by GitHub
parent 92794389d6
commit f1ce93171c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1396 additions and 43 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata" "github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces" "github.com/SigNoz/signoz/pkg/telemetrytraces"
@ -33,6 +34,8 @@ func NewAPI(
telemetrytraces.SpanIndexV3TableName, telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName, telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName, telemetrymetrics.AttributesMetadataTableName,
telemetrymeter.DBName,
telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName, telemetrylogs.DBName,
telemetrylogs.LogsV2TableName, telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName, telemetrylogs.TagAttributesV2TableName,

View File

@ -12,6 +12,7 @@ import (
func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) { func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) {
var req telemetrytypes.FieldKeySelector var req telemetrytypes.FieldKeySelector
var signal telemetrytypes.Signal var signal telemetrytypes.Signal
var source telemetrytypes.Source
var err error var err error
signalStr := r.URL.Query().Get("signal") signalStr := r.URL.Query().Get("signal")
@ -21,6 +22,13 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
signal = telemetrytypes.SignalUnspecified signal = telemetrytypes.SignalUnspecified
} }
sourceStr := r.URL.Query().Get("source")
if sourceStr != "" {
source = telemetrytypes.Source{String: valuer.NewString(sourceStr)}
} else {
source = telemetrytypes.SourceUnspecified
}
if r.URL.Query().Get("limit") != "" { if r.URL.Query().Get("limit") != "" {
limit, err := strconv.Atoi(r.URL.Query().Get("limit")) limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil { if err != nil {
@ -76,6 +84,7 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
StartUnixMilli: startUnixMilli, StartUnixMilli: startUnixMilli,
EndUnixMilli: endUnixMilli, EndUnixMilli: endUnixMilli,
Signal: signal, Signal: signal,
Source: source,
Name: name, Name: name,
FieldContext: fieldContext, FieldContext: fieldContext,
FieldDataType: fieldDataType, FieldDataType: fieldDataType,

View File

@ -62,6 +62,9 @@ func (q *builderQuery[T]) Fingerprint() string {
// Add signal type // Add signal type
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue())) parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
// Add source type
parts = append(parts, fmt.Sprintf("source=%s", q.spec.Source.StringValue()))
// Add step interval if present // Add step interval if present
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String())) parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))

View File

@ -31,6 +31,7 @@ type querier struct {
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
bucketCache BucketCache bucketCache BucketCache
} }
@ -44,6 +45,7 @@ func New(
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
bucketCache BucketCache, bucketCache BucketCache,
) *querier { ) *querier {
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier") querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
@ -55,6 +57,7 @@ func New(
traceStmtBuilder: traceStmtBuilder, traceStmtBuilder: traceStmtBuilder,
logStmtBuilder: logStmtBuilder, logStmtBuilder: logStmtBuilder,
metricStmtBuilder: metricStmtBuilder, metricStmtBuilder: metricStmtBuilder,
meterStmtBuilder: meterStmtBuilder,
bucketCache: bucketCache, bucketCache: bucketCache,
} }
} }
@ -168,17 +171,21 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
event.MetricsUsed = true event.MetricsUsed = true
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != "" event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0 event.GroupByApplied = len(spec.GroupBy) > 0
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
}
}
if spec.Source == telemetrytypes.SourceMeter {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
} else {
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
}
}
}
req.CompositeQuery.Queries[idx].Spec = spec req.CompositeQuery.Queries[idx].Spec = spec
} }
} else if query.Type == qbtypes.QueryTypePromQL { } else if query.Type == qbtypes.QueryTypePromQL {
@ -265,7 +272,14 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
} }
spec.ShiftBy = extractShiftFromBuilderQuery(spec) spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars) var bq *builderQuery[qbtypes.MetricAggregation]
if spec.Source == telemetrytypes.SourceMeter {
bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
} else {
bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
}
queries[spec.Name] = bq queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval steps[spec.Name] = spec.StepInterval
default: default:
@ -529,6 +543,9 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
specCopy := qt.spec.Copy() specCopy := qt.spec.Copy()
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy) specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind) adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
if qt.spec.Source == telemetrytypes.SourceMeter {
return newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
}
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables) return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
default: default:

View File

@ -11,6 +11,7 @@ import (
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter" "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata" "github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces" "github.com/SigNoz/signoz/pkg/telemetrytraces"
@ -52,6 +53,8 @@ func newProvider(
telemetrytraces.SpanIndexV3TableName, telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName, telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName, telemetrymetrics.AttributesMetadataTableName,
telemetrymeter.DBName,
telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName, telemetrylogs.DBName,
telemetrylogs.LogsV2TableName, telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName, telemetrylogs.TagAttributesV2TableName,
@ -122,6 +125,14 @@ func newProvider(
metricConditionBuilder, metricConditionBuilder,
) )
// Create meter statement builder
meterStmtBuilder := telemetrymeter.NewMeterQueryStatementBuilder(
settings,
telemetryMetadataStore,
metricFieldMapper,
metricConditionBuilder,
)
// Create bucket cache // Create bucket cache
bucketCache := querier.NewBucketCache( bucketCache := querier.NewBucketCache(
settings, settings,
@ -139,6 +150,7 @@ func newProvider(
traceStmtBuilder, traceStmtBuilder,
logStmtBuilder, logStmtBuilder,
metricStmtBuilder, metricStmtBuilder,
meterStmtBuilder,
bucketCache, bucketCache,
), nil ), nil
} }

View File

@ -64,6 +64,8 @@ const (
signozTraceLocalTableName = "signoz_index_v2" signozTraceLocalTableName = "signoz_index_v2"
signozMetricDBName = "signoz_metrics" signozMetricDBName = "signoz_metrics"
signozMetadataDbName = "signoz_metadata" signozMetadataDbName = "signoz_metadata"
signozMeterDBName = "signoz_meter"
signozMeterSamplesName = "samples_agg_1d"
signozSampleLocalTableName = "samples_v4" signozSampleLocalTableName = "samples_v4"
signozSampleTableName = "distributed_samples_v4" signozSampleTableName = "distributed_samples_v4"
@ -2741,8 +2743,55 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org
return &response, nil return &response, nil
} }
func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
var response v3.AggregateAttributeResponse
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
query := fmt.Sprintf(
`SELECT metric_name,type,temporality,is_monotonic
FROM %s.%s
WHERE metric_name ILIKE $1
GROUP BY metric_name,type,temporality,is_monotonic`,
signozMeterDBName, signozMeterSamplesName)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while querying meter names", zap.Error(err))
return nil, fmt.Errorf("error while executing meter name query: %s", err.Error())
}
defer rows.Close()
for rows.Next() {
var name string
var typ string
var temporality string
var isMonotonic bool
if err := rows.Scan(&name, &typ, &temporality, &isMonotonic); err != nil {
return nil, fmt.Errorf("error while scanning meter name: %s", err.Error())
}
// Non-monotonic cumulative sums are treated as gauges
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
typ = "Gauge"
}
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
key := v3.AttributeKey{
Key: name,
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType(typ),
IsColumn: true,
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
return &response, nil
}
func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string var query string
var err error var err error
var rows driver.Rows var rows driver.Rows
@ -2782,6 +2831,41 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
return &response, nil return &response, nil
} }
func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.FilterAttributeKeyResponse
// skips the internal attributes i.e attributes starting with __
query = fmt.Sprintf("SELECT DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name FROM %s.%s WHERE metric_name=$1 AND attr_name ILIKE $2 AND attr_name NOT LIKE '\\_\\_%%'", signozMeterDBName, signozMeterSamplesName)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var attributeKey string
for rows.Next() {
if err := rows.Scan(&attributeKey); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
key := v3.AttributeKey{
Key: attributeKey,
DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
}
response.AttributeKeys = append(response.AttributeKeys, key)
}
return &response, nil
}
func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var query string var query string

View File

@ -4218,6 +4218,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces: case v3.DataSourceTraces:
response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req) response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req)
case v3.DataSourceMeter:
response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req)
default: default:
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
return return
@ -4267,6 +4269,8 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R
switch req.DataSource { switch req.DataSource {
case v3.DataSourceMetrics: case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
case v3.DataSourceMeter:
response, err = aH.reader.GetMeterAttributeKeys(r.Context(), req)
case v3.DataSourceLogs: case v3.DataSourceLogs:
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req) response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
case v3.DataSourceTraces: case v3.DataSourceTraces:

View File

@ -484,7 +484,7 @@ func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequ
limit = 50 limit = 50
} }
if dataSource != v3.DataSourceMetrics { if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter {
if err := aggregateOperator.Validate(); err != nil { if err := aggregateOperator.Validate(); err != nil {
return nil, err return nil, err
} }
@ -604,7 +604,7 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ
return nil, err return nil, err
} }
if dataSource != v3.DataSourceMetrics { if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter {
if err := aggregateOperator.Validate(); err != nil { if err := aggregateOperator.Validate(); err != nil {
return nil, err return nil, err
} }

View File

@ -50,7 +50,9 @@ type Reader interface {
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations // Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations

View File

@ -22,11 +22,12 @@ const (
DataSourceTraces DataSource = "traces" DataSourceTraces DataSource = "traces"
DataSourceLogs DataSource = "logs" DataSourceLogs DataSource = "logs"
DataSourceMetrics DataSource = "metrics" DataSourceMetrics DataSource = "metrics"
DataSourceMeter DataSource = "meter"
) )
func (d DataSource) Validate() error { func (d DataSource) Validate() error {
switch d { switch d {
case DataSourceTraces, DataSourceLogs, DataSourceMetrics: case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter:
return nil return nil
default: default:
return fmt.Errorf("invalid data source: %s", d) return fmt.Errorf("invalid data source: %s", d)

View File

@ -63,6 +63,32 @@ func MinAllowedStepInterval(start, end uint64) uint64 {
return step - step%5 return step - step%5
} }
func RecommendedStepIntervalForMeter(start, end uint64) uint64 {
start = ToNanoSecs(start)
end = ToNanoSecs(end)
step := (end - start) / RecommendedNumberOfPoints / 1e9
// for meter queries the minimum step interval allowed is 1 hour as this is our granularity
if step < 3600 {
return 3600
}
// return the nearest lower multiple of 3600 ( 1 hour )
recommended := step - step%3600
// if the time range is greater than 1 month set the step interval to be multiple of 1 day
if end-start >= uint64(30*24*time.Hour.Nanoseconds()) {
if recommended < 86400 {
recommended = 86400
} else {
recommended = uint64(math.Round(float64(recommended)/86400)) * 86400
}
}
return recommended
}
func RecommendedStepIntervalForMetric(start, end uint64) uint64 { func RecommendedStepIntervalForMetric(start, end uint64) uint64 {
start = ToNanoSecs(start) start = ToNanoSecs(start)
end = ToNanoSecs(end) end = ToNanoSecs(end)

View File

@ -130,6 +130,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewUpdateOrgDomainFactory(sqlstore, sqlschema), sqlmigration.NewUpdateOrgDomainFactory(sqlstore, sqlschema),
sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema), sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema),
sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore), sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore),
sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema),
) )
} }

View File

@ -0,0 +1,137 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addMeterQuickFilters struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewAddMeterQuickFiltersFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_meter_quick_filters"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
return newAddMeterQuickFilters(ctx, providerSettings, config, sqlstore, sqlschema)
})
}
func newAddMeterQuickFilters(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
return &addMeterQuickFilters{
sqlstore: sqlstore,
sqlschema: sqlschema,
}, nil
}
func (migration *addMeterQuickFilters) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addMeterQuickFilters) Up(ctx context.Context, db *bun.DB) error {
meterFilters := []map[string]interface{}{
{"key": "deployment.environment", "dataType": "float64", "type": "Sum"},
{"key": "service.name", "dataType": "float64", "type": "Sum"},
{"key": "host.name", "dataType": "float64", "type": "Sum"},
}
meterJSON, err := json.Marshal(meterFilters)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters")
}
type signal struct {
valuer.String
}
type identifiable struct {
ID valuer.UUID `json:"id" bun:"id,pk,type:text"`
}
type timeAuditable struct {
CreatedAt time.Time `bun:"created_at" json:"createdAt"`
UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"`
}
type quickFilterType struct {
bun.BaseModel `bun:"table:quick_filter"`
identifiable
OrgID valuer.UUID `bun:"org_id,type:text,notnull"`
Filter string `bun:"filter,type:text,notnull"`
Signal signal `bun:"signal,type:text,notnull"`
timeAuditable
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var orgIDs []string
err = tx.NewSelect().
Table("organizations").
Column("id").
Scan(ctx, &orgIDs)
if err != nil && err != sql.ErrNoRows {
return err
}
var meterFiltersToInsert []quickFilterType
for _, orgIDStr := range orgIDs {
orgID, err := valuer.NewUUID(orgIDStr)
if err != nil {
return err
}
meterFiltersToInsert = append(meterFiltersToInsert, quickFilterType{
identifiable: identifiable{
ID: valuer.GenerateUUID(),
},
OrgID: orgID,
Filter: string(meterJSON),
Signal: signal{valuer.NewString("meter")},
timeAuditable: timeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
})
}
if len(meterFiltersToInsert) > 0 {
_, err = tx.NewInsert().
Model(&meterFiltersToInsert).
On("CONFLICT (org_id, signal) DO UPDATE").
Set("filter = EXCLUDED.filter, updated_at = EXCLUDED.updated_at").
Exec(ctx)
if err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *addMeterQuickFilters) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@ -25,6 +25,8 @@ var (
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys") ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement") ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys") ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
ErrFailedToGetMeterValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter values")
ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values") ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values")
) )
@ -36,6 +38,8 @@ type telemetryMetaStore struct {
indexV3TblName string indexV3TblName string
metricsDBName string metricsDBName string
metricsFieldsTblName string metricsFieldsTblName string
meterDBName string
meterFieldsTblName string
logsDBName string logsDBName string
logsFieldsTblName string logsFieldsTblName string
logsV2TblName string logsV2TblName string
@ -58,6 +62,8 @@ func NewTelemetryMetaStore(
indexV3TblName string, indexV3TblName string,
metricsDBName string, metricsDBName string,
metricsFieldsTblName string, metricsFieldsTblName string,
meterDBName string,
meterFieldsTblName string,
logsDBName string, logsDBName string,
logsV2TblName string, logsV2TblName string,
logsFieldsTblName string, logsFieldsTblName string,
@ -74,6 +80,8 @@ func NewTelemetryMetaStore(
indexV3TblName: indexV3TblName, indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName, metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName, metricsFieldsTblName: metricsFieldsTblName,
meterDBName: meterDBName,
meterFieldsTblName: meterFieldsTblName,
logsDBName: logsDBName, logsDBName: logsDBName,
logsV2TblName: logsV2TblName, logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName, logsFieldsTblName: logsFieldsTblName,
@ -598,6 +606,76 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
return keys, complete, nil return keys, complete, nil
} }
// getMeterKeys returns the keys from the meter metrics that match the field selection criteria
func (t *telemetryMetaStore) getMeterSourceMetricKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName)
conds := []string{}
var limit int
for _, fieldKeySelector := range fieldKeySelectors {
fieldConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name))
} else {
fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%"))
}
fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
if fieldKeySelector.MetricContext != nil {
fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
}
conds = append(conds, sb.And(fieldConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
if limit == 0 {
limit = 1000
}
sb.Limit(limit)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var name string
err = rows.Scan(&name)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalMetrics,
})
}
if rows.Err() != nil {
return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
// hit the limit?
complete := rowCount <= limit
return keys, complete, nil
}
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) { func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
var keys []*telemetrytypes.TelemetryFieldKey var keys []*telemetrytypes.TelemetryFieldKey
var complete bool = true var complete bool = true
@ -614,7 +692,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
case telemetrytypes.SignalLogs: case telemetrytypes.SignalLogs:
keys, complete, err = t.getLogsKeys(ctx, selectors) keys, complete, err = t.getLogsKeys(ctx, selectors)
case telemetrytypes.SignalMetrics: case telemetrytypes.SignalMetrics:
keys, complete, err = t.getMetricsKeys(ctx, selectors) if fieldKeySelector.Source == telemetrytypes.SourceMeter {
keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
} else {
keys, complete, err = t.getMetricsKeys(ctx, selectors)
}
case telemetrytypes.SignalUnspecified: case telemetrytypes.SignalUnspecified:
// get traces keys // get traces keys
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, selectors) tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, selectors)
@ -637,7 +719,6 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
} }
keys = append(keys, metricsKeys...) keys = append(keys, metricsKeys...)
// Complete only if all signals are complete
complete = tracesComplete && logsComplete && metricsComplete complete = tracesComplete && logsComplete && metricsComplete
} }
if err != nil { if err != nil {
@ -657,6 +738,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
logsSelectors := []*telemetrytypes.FieldKeySelector{} logsSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{} tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{} metricsSelectors := []*telemetrytypes.FieldKeySelector{}
meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
for _, fieldKeySelector := range fieldKeySelectors { for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal { switch fieldKeySelector.Signal {
@ -665,7 +747,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
case telemetrytypes.SignalTraces: case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector) tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics: case telemetrytypes.SignalMetrics:
metricsSelectors = append(metricsSelectors, fieldKeySelector) if fieldKeySelector.Source == telemetrytypes.SourceMeter {
meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector)
} else {
metricsSelectors = append(metricsSelectors, fieldKeySelector)
}
case telemetrytypes.SignalUnspecified: case telemetrytypes.SignalUnspecified:
logsSelectors = append(logsSelectors, fieldKeySelector) logsSelectors = append(logsSelectors, fieldKeySelector)
tracesSelectors = append(tracesSelectors, fieldKeySelector) tracesSelectors = append(tracesSelectors, fieldKeySelector)
@ -686,6 +772,10 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
return nil, false, err return nil, false, err
} }
meterSourceMetricsKeys, _, err := t.getMeterSourceMetricKeys(ctx, meterSourceMetricsSelectors)
if err != nil {
return nil, false, err
}
// Complete only if all queries are complete // Complete only if all queries are complete
complete := logsComplete && tracesComplete && metricsComplete complete := logsComplete && tracesComplete && metricsComplete
@ -699,6 +789,9 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
for _, key := range metricsKeys { for _, key := range metricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
} }
for _, key := range meterSourceMetricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, complete, nil return mapOfKeys, complete, nil
} }
@ -1062,6 +1155,61 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
return values, complete, nil return values, complete, nil
} }
func (t *telemetryMetaStore) getMeterSourceMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr").
From(t.meterDBName + "." + t.meterFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("attr.1", fieldValueSelector.Name))
}
sb.Where(sb.NotLike("attr.1", "\\_\\_%"))
if fieldValueSelector.Value != "" {
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
sb.Where(sb.E("attr.2", fieldValueSelector.Value))
} else {
sb.Where(sb.Like("attr.2", "%"+fieldValueSelector.Value+"%"))
}
}
sb.Where(sb.NE("attr.2", ""))
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var attribute []string
if err := rows.Scan(&attribute); err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
}
if len(attribute) > 1 {
values.StringValues = append(values.StringValues, attribute[1])
}
}
// hit the limit?
complete := rowCount <= limit
return values, complete, nil
}
func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues, limit int) bool { func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues, limit int) bool {
complete := true complete := true
totalCount := len(mapOfValues) + len(mapOfRelatedValues) totalCount := len(mapOfValues) + len(mapOfRelatedValues)
@ -1122,7 +1270,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
case telemetrytypes.SignalLogs: case telemetrytypes.SignalLogs:
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector) values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalMetrics: case telemetrytypes.SignalMetrics:
values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector) if fieldValueSelector.Source == telemetrytypes.SourceMeter {
values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector)
} else {
values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector)
}
case telemetrytypes.SignalUnspecified: case telemetrytypes.SignalUnspecified:
mapOfValues := make(map[any]bool) mapOfValues := make(map[any]bool)
mapOfRelatedValues := make(map[any]bool) mapOfRelatedValues := make(map[any]bool)
@ -1178,6 +1330,33 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
return make(map[string]metrictypes.Temporality), nil return make(map[string]metrictypes.Temporality), nil
} }
result := make(map[string]metrictypes.Temporality)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
if err != nil {
return nil, err
}
meterMetricsTemporality, err := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...)
if err != nil {
return nil, err
}
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if temporality, exists := metricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
if temporality, exists := meterMetricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
result[metricName] = metrictypes.Unknown
}
return result, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality) result := make(map[string]metrictypes.Temporality)
// Build query to fetch temporality for all metrics // Build query to fetch temporality for all metrics
@ -1229,11 +1408,55 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
result[metricName] = temporality result[metricName] = temporality
} }
// For metrics not found in the database, set to Unknown return result, nil
for _, metricName := range metricNames { }
if _, exists := result[metricName]; !exists {
result[metricName] = metrictypes.Unknown func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
sb := sqlbuilder.Select(
"metric_name",
"argMax(temporality, unix_milli) as temporality",
).From(t.meterDBName + "." + t.meterFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("metric_name", metricNames))
// Group by metric name to get one temporality per metric
sb.GroupBy("metric_name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName, temporalityStr string
if err := rows.Scan(&metricName, &temporalityStr); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
} }
// Convert string to Temporality type
var temporality metrictypes.Temporality
switch temporalityStr {
case "Delta":
temporality = metrictypes.Delta
case "Cumulative":
temporality = metrictypes.Cumulative
case "Unspecified":
temporality = metrictypes.Unspecified
default:
// Unknown or empty temporality
temporality = metrictypes.Unknown
}
result[metricName] = temporality
} }
return result, nil return result, nil

View File

@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
@ -42,6 +43,8 @@ func TestGetKeys(t *testing.T) {
telemetrytraces.SpanIndexV3TableName, telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName, telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName, telemetrymetrics.AttributesMetadataTableName,
telemetrymeter.DBName,
telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName, telemetrylogs.DBName,
telemetrylogs.LogsV2TableName, telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName, telemetrylogs.TagAttributesV2TableName,

View File

@ -0,0 +1,365 @@
package telemetrymeter
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
type meterQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder
}
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil)
func NewMeterQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
) *meterQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter")
metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder)
return &meterQueryStatementBuilder{
logger: metricsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
cb: conditionBuilder,
metricsStatementBuilder: metricsStatementBuilder,
}
}
func (b *meterQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
_ qbtypes.RequestType,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
keySelectors := telemetrymetrics.GetKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
}
start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query)
return b.buildPipelineStatement(ctx, start, end, query, keys, variables)
}
func (b *meterQueryStatementBuilder) buildPipelineStatement(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
var (
cteFragments []string
cteArgs [][]any
)
if b.metricsStatementBuilder.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(ctx, start, end, query, keys, variables)
if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
} else {
// temporal_aggregation_cte
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
// spatial_aggregation_cte
frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys)
if frag != "" {
cteFragments = append(cteFragments, frag)
cteArgs = append(cteArgs, args)
}
}
// final SELECT
return b.metricsStatementBuilder.BuildFinalSelect(cteFragments, cteArgs, query)
}
func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any) {
var filterWhere *querybuilder.PreparedWhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", []any{}
}
sb.SelectMore(col)
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", []any{}
}
}
if filterWhere != nil {
sb.AddWhereClause(filterWhere.WhereClause)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
func (b *meterQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, keys, variables)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, keys, variables)
}
func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
var filterWhere *querybuilder.PreparedWhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
sb.SelectMore(col)
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
}
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", nil, err
}
}
if filterWhere != nil {
sb.AddWhereClause(filterWhere.WhereClause)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
sb.GroupBy("fingerprint", "ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
sb.OrderBy("fingerprint", "ts")
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
var filterWhere *querybuilder.PreparedWhereClause
var err error
stepSec := int64(query.StepInterval.Seconds())
baseSb := sqlbuilder.NewSelectBuilder()
baseSb.Select("fingerprint")
baseSb.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
baseSb.SelectMore(col)
}
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
baseSb.GTE("unix_milli", start),
baseSb.LT("unix_milli", end),
)
if query.Filter != nil && query.Filter.Expression != "" {
filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
})
if err != nil {
return "", nil, err
}
}
if filterWhere != nil {
baseSb.AddWhereClause(filterWhere.WhereClause)
}
if query.Aggregations[0].Temporality != metrictypes.Unknown {
baseSb.Where(baseSb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
}
baseSb.GroupBy("fingerprint", "ts")
baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
baseSb.OrderBy("fingerprint", "ts")
innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse)
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
default:
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil
}
}
func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
sb.From("__temporal_aggregation_cte")
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
if query.Aggregations[0].ValueFilter != nil {
sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value))
}
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}

View File

@ -0,0 +1,191 @@
package telemetrymeter
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
requestType qbtypes.RequestType
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
expectedErr error
}{
{
name: "test_cumulative_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Cumulative,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
},
expectedErr: nil,
},
{
name: "test_delta_rate_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
},
expectedErr: nil,
},
{
name: "test_delta_rate_avg",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "signoz_calls_total",
Type: metrictypes.SumType,
Temporality: metrictypes.Delta,
TimeAggregation: metrictypes.TimeAggregationRate,
SpaceAggregation: metrictypes.SpaceAggregationAvg,
},
},
Filter: &qbtypes.Filter{
Expression: "service.name = 'cartservice'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
},
expectedErr: nil,
},
{
name: "test_gauge_avg_sum",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
Aggregations: []qbtypes.MetricAggregation{
{
MetricName: "system.memory.usage",
Type: metrictypes.GaugeType,
Temporality: metrictypes.Unspecified,
TimeAggregation: metrictypes.TimeAggregationAvg,
SpaceAggregation: metrictypes.SpaceAggregationSum,
},
},
Filter: &qbtypes.Filter{
Expression: "host.name = 'big-data-node-1'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "host.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
},
expectedErr: nil,
},
}
fm := telemetrymetrics.NewFieldMapper()
cb := telemetrymetrics.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
if err != nil {
t.Fatalf("failed to load field keys: %v", err)
}
mockMetadataStore.KeysMap = keys
statementBuilder := NewMeterQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
if c.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, c.expected.Query, q.Query)
require.Equal(t, c.expected.Args, q.Args)
require.Equal(t, c.expected.Warnings, q.Warnings)
}
})
}
}

View File

@ -0,0 +1,194 @@
package telemetrymeter
import (
"time"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
)
const (
DBName = "signoz_meter"
SamplesTableName = "distributed_samples"
SamplesLocalTableName = "samples"
SamplesAgg1dTableName = "distributed_samples_agg_1d"
SamplesAgg1dLocalTableName = "samples_agg_1d"
)
var (
oneMonthInMilliseconds = uint64(time.Hour.Milliseconds() * 24 * 30)
// when the query requests for almost 1 day, but not exactly 1 day, we need to add an offset to the end time
// to make sure that we are using the correct table
// this is because the start gets adjusted to the nearest step interval and uses the 5m table for 4m step interval
// leading to time series that doesn't best represent the rate of change
offsetBucket = uint64(1 * time.Hour.Milliseconds())
)
// start and end are in milliseconds
// we have two tables for samples
// 1. distributed_samples
// 2. distributed_samples_agg_1d - for queries with time range above or equal to 30 days
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
func WhichSamplesTableToUse(
start, end uint64,
metricType metrictypes.Type,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic
if tableHints != nil {
if tableHints.SamplesTableName != "" {
return tableHints.SamplesTableName
}
}
// if the time aggregation is count_distinct, we need to use the distributed_samples table
// because the aggregated tables don't support count_distinct
if timeAggregation == metrictypes.TimeAggregationCountDistinct {
return SamplesTableName
}
if end-start < oneMonthInMilliseconds+offsetBucket {
return SamplesTableName
}
return SamplesAgg1dTableName
}
func AggregationColumnForSamplesTable(
start, end uint64,
metricType metrictypes.Type,
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) string {
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
case metrictypes.Delta:
switch tableName {
case SamplesTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(value)"
}
case SamplesAgg1dTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "sum(sum)"
}
}
case metrictypes.Cumulative:
switch tableName {
case SamplesTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(value)"
}
case SamplesAgg1dTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
aggregationColumn = "max(max)"
}
}
case metrictypes.Unspecified:
switch tableName {
case SamplesTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(value)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "avg(value)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(value)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(value)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "count(value)"
case metrictypes.TimeAggregationCountDistinct:
aggregationColumn = "countDistinct(value)"
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(value)"
}
case SamplesAgg1dTableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(last)"
case metrictypes.TimeAggregationSum:
aggregationColumn = "sum(sum)"
case metrictypes.TimeAggregationAvg:
aggregationColumn = "sum(sum) / sum(count)"
case metrictypes.TimeAggregationMin:
aggregationColumn = "min(min)"
case metrictypes.TimeAggregationMax:
aggregationColumn = "max(max)"
case metrictypes.TimeAggregationCount:
aggregationColumn = "sum(count)"
// count_distinct is not supported in aggregated tables
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
aggregationColumn = "sum(sum)"
}
}
}
return aggregationColumn
}

View File

@ -0,0 +1,34 @@
{
"service.name": [
{
"name": "service.name",
"fieldContext": "resource",
"fieldDataType": "string",
"signal": "metrics"
}
],
"http.request.method": [
{
"name": "http.request.method",
"fieldContext": "attribute",
"fieldDataType": "string",
"signal": "metrics"
}
],
"http.response.status_code": [
{
"name": "http.response.status_code",
"fieldContext": "attribute",
"fieldDataType": "int",
"signal": "metrics"
}
],
"host.name": [
{
"name": "host.name",
"fieldContext": "resource",
"fieldDataType": "string",
"signal": "metrics"
}
]
}

View File

@ -19,23 +19,23 @@ const (
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))` IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
) )
type metricQueryStatementBuilder struct { type MetricQueryStatementBuilder struct {
logger *slog.Logger logger *slog.Logger
metadataStore telemetrytypes.MetadataStore metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder cb qbtypes.ConditionBuilder
} }
var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil) var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil)
func NewMetricQueryStatementBuilder( func NewMetricQueryStatementBuilder(
settings factory.ProviderSettings, settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore, metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper, fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder, conditionBuilder qbtypes.ConditionBuilder,
) *metricQueryStatementBuilder { ) *MetricQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics") metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics")
return &metricQueryStatementBuilder{ return &MetricQueryStatementBuilder{
logger: metricsSettings.Logger(), logger: metricsSettings.Logger(),
metadataStore: metadataStore, metadataStore: metadataStore,
fm: fieldMapper, fm: fieldMapper,
@ -43,7 +43,7 @@ func NewMetricQueryStatementBuilder(
} }
} }
func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector var keySelectors []*telemetrytypes.FieldKeySelector
if query.Filter != nil && query.Filter.Expression != "" { if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
@ -72,7 +72,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation])
return keySelectors return keySelectors
} }
func (b *metricQueryStatementBuilder) Build( func (b *MetricQueryStatementBuilder) Build(
ctx context.Context, ctx context.Context,
start uint64, start uint64,
end uint64, end uint64,
@ -80,7 +80,7 @@ func (b *metricQueryStatementBuilder) Build(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
variables map[string]qbtypes.VariableItem, variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) { ) (*qbtypes.Statement, error) {
keySelectors := getKeySelectors(query) keySelectors := GetKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil { if err != nil {
return nil, err return nil, err
@ -113,7 +113,7 @@ func (b *metricQueryStatementBuilder) Build(
// we can directly use the quantilesDDMerge function // we can directly use the quantilesDDMerge function
// //
// all of this is true only for delta metrics // all of this is true only for delta metrics
func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { func (b *MetricQueryStatementBuilder) CanShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
if q.Aggregations[0].Temporality != metrictypes.Delta { if q.Aggregations[0].Temporality != metrictypes.Delta {
return false return false
} }
@ -139,7 +139,7 @@ func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilde
return false return false
} }
func (b *metricQueryStatementBuilder) buildPipelineStatement( func (b *MetricQueryStatementBuilder) buildPipelineStatement(
ctx context.Context, ctx context.Context,
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -200,7 +200,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
return nil, err return nil, err
} }
if b.canShortCircuitDelta(query) { if b.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries // spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs) frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
if frag != "" { if frag != "" {
@ -230,10 +230,10 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
query.GroupBy = origGroupBy query.GroupBy = origGroupBy
// final SELECT // final SELECT
return b.buildFinalSelect(cteFragments, cteArgs, query) return b.BuildFinalSelect(cteFragments, cteArgs, query)
} }
func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string, timeSeriesCTE string,
@ -281,7 +281,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
} }
func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
ctx context.Context, ctx context.Context,
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -344,7 +344,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
} }
func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context, ctx context.Context,
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -358,7 +358,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs) return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
} }
func (b *metricQueryStatementBuilder) buildTemporalAggDelta( func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
_ context.Context, _ context.Context,
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -401,7 +401,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
} }
func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context, _ context.Context,
start, end uint64, start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@ -466,7 +466,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
} }
} }
func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context, _ context.Context,
_ uint64, _ uint64,
_ uint64, _ uint64,
@ -492,7 +492,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
} }
func (b *metricQueryStatementBuilder) buildFinalSelect( func (b *MetricQueryStatementBuilder) BuildFinalSelect(
cteFragments []string, cteFragments []string,
cteArgs [][]any, cteArgs [][]any,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],

View File

@ -14,6 +14,9 @@ type QueryBuilderQuery[T any] struct {
// signal to query // signal to query
Signal telemetrytypes.Signal `json:"signal,omitempty"` Signal telemetrytypes.Signal `json:"signal,omitempty"`
// source for query
Source telemetrytypes.Source `json:"source,omitempty"`
// we want to support multiple aggregations // we want to support multiple aggregations
// currently supported: []Aggregation, []MetricAggregation // currently supported: []Aggregation, []MetricAggregation
Aggregations []T `json:"aggregations,omitempty"` Aggregations []T `json:"aggregations,omitempty"`

View File

@ -35,6 +35,7 @@ var (
SignalLogs = Signal{valuer.NewString("logs")} SignalLogs = Signal{valuer.NewString("logs")}
SignalApiMonitoring = Signal{valuer.NewString("api_monitoring")} SignalApiMonitoring = Signal{valuer.NewString("api_monitoring")}
SignalExceptions = Signal{valuer.NewString("exceptions")} SignalExceptions = Signal{valuer.NewString("exceptions")}
SignalMeter = Signal{valuer.NewString("meter")}
) )
// NewSignal creates a Signal from a string // NewSignal creates a Signal from a string
@ -48,6 +49,8 @@ func NewSignal(s string) (Signal, error) {
return SignalApiMonitoring, nil return SignalApiMonitoring, nil
case "exceptions": case "exceptions":
return SignalExceptions, nil return SignalExceptions, nil
case "meter":
return SignalMeter, nil
default: default:
return Signal{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid signal: %s", s) return Signal{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid signal: %s", s)
} }
@ -178,6 +181,12 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
{"key": "k8s.pod.name", "dataType": "string", "type": "resource"}, {"key": "k8s.pod.name", "dataType": "string", "type": "resource"},
} }
meterFilters := []map[string]interface{}{
{"key": "deployment.environment", "dataType": "float64", "type": "Sum"},
{"key": "service.name", "dataType": "float64", "type": "Sum"},
{"key": "host.name", "dataType": "float64", "type": "Sum"},
}
tracesJSON, err := json.Marshal(tracesFilters) tracesJSON, err := json.Marshal(tracesFilters)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal traces filters") return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal traces filters")
@ -190,12 +199,19 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
apiMonitoringJSON, err := json.Marshal(apiMonitoringFilters) apiMonitoringJSON, err := json.Marshal(apiMonitoringFilters)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Api Monitoring filters") return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal api monitoring filters")
} }
exceptionsJSON, err := json.Marshal(exceptionsFilters) exceptionsJSON, err := json.Marshal(exceptionsFilters)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Exceptions filters") return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal exceptions filters")
} }
meterJSON, err := json.Marshal(meterFilters)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters")
}
timeRightNow := time.Now() timeRightNow := time.Now()
return []*StorableQuickFilter{ return []*StorableQuickFilter{
@ -247,5 +263,17 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
UpdatedAt: timeRightNow, UpdatedAt: timeRightNow,
}, },
}, },
{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
OrgID: orgID,
Filter: string(meterJSON),
Signal: SignalMeter,
TimeAuditable: types.TimeAuditable{
CreatedAt: timeRightNow,
UpdatedAt: timeRightNow,
},
},
}, nil }, nil
} }

View File

@ -121,6 +121,7 @@ type FieldKeySelector struct {
StartUnixMilli int64 `json:"startUnixMilli"` StartUnixMilli int64 `json:"startUnixMilli"`
EndUnixMilli int64 `json:"endUnixMilli"` EndUnixMilli int64 `json:"endUnixMilli"`
Signal Signal `json:"signal"` Signal Signal `json:"signal"`
Source Source `json:"source"`
FieldContext FieldContext `json:"fieldContext"` FieldContext FieldContext `json:"fieldContext"`
FieldDataType FieldDataType `json:"fieldDataType"` FieldDataType FieldDataType `json:"fieldDataType"`
Name string `json:"name"` Name string `json:"name"`

View File

@ -0,0 +1,12 @@
package telemetrytypes
import "github.com/SigNoz/signoz/pkg/valuer"
type Source struct {
valuer.String
}
var (
SourceMeter = Source{valuer.NewString("meter")}
SourceUnspecified = Source{valuer.NewString("")}
)