From f1ce93171c3014f43162323d767a348dbd35c04e Mon Sep 17 00:00:00 2001 From: Vikrant Gupta Date: Thu, 7 Aug 2025 16:50:37 +0530 Subject: [PATCH] feat(telemetrymeter): add support for telemetry meter (#8667) * feat(telemetry/meter): added base setup for telemetry meter signal * feat(telemetry/meter): added metadata setup for meter * feat(telemetry/meter): fix stmnt builder tests * feat(telemetry/meter): test query range API fixes * feat(telemetry/meter): improve error messages * feat(telemetrymeter): step interval improvements * feat(telemetrymeter): metadata changes and aggregate attribute changes * feat(telemetrymeter): metadata changes and aggregate attribute changes * feat(telemetrymeter): deprecate the signal and use aggregation instead * feat(telemetrymeter): deprecate the signal and use aggregation instead * feat(telemetrymeter): deprecate the signal and use aggregation instead * feat(telemetrymeter): cleanup the types * feat(telemetrymeter): introduce source for query * feat(telemetrymeter): better naming for source in metadata * feat(telemetrymeter): added quick filters for meter explorer * feat(telemetrymeter): incorporate the new changes to stmnt builder * feat(telemetrymeter): add the statement builder for the ranged cache queries * feat(telemetrymeter): use meter aggregate keys * feat(telemetrymeter): use meter aggregate keys * feat(telemetrymeter): remove meter from complete bools * feat(telemetrymeter): remove meter from complete bools * feat(telemetrymeter): update the quick filters to use meter --- pkg/apis/fields/api.go | 3 + pkg/apis/fields/parse.go | 9 + pkg/querier/builder_query.go | 3 + pkg/querier/querier.go | 39 +- pkg/querier/signozquerier/provider.go | 12 + .../app/clickhouseReader/reader.go | 86 ++++- pkg/query-service/app/http_handler.go | 4 + pkg/query-service/app/parser.go | 4 +- pkg/query-service/interfaces/interface.go | 2 + pkg/query-service/model/v3/v3.go | 3 +- pkg/querybuilder/time.go | 26 ++ pkg/signoz/provider.go | 1 + .../047_add_meter_quickfilters.go | 137 +++++++ pkg/telemetrymetadata/metadata.go | 239 +++++++++++- pkg/telemetrymetadata/metadata_test.go | 3 + pkg/telemetrymeter/statement_builder.go | 365 ++++++++++++++++++ pkg/telemetrymeter/stmt_builder_test.go | 191 +++++++++ pkg/telemetrymeter/tables.go | 194 ++++++++++ pkg/telemetrymeter/testdata/keys_map.json | 34 ++ pkg/telemetrymetrics/statement_builder.go | 36 +- .../querybuildertypesv5/builder_query.go | 3 + pkg/types/quickfiltertypes/filter.go | 32 +- pkg/types/telemetrytypes/field.go | 1 + pkg/types/telemetrytypes/source.go | 12 + 24 files changed, 1396 insertions(+), 43 deletions(-) create mode 100644 pkg/sqlmigration/047_add_meter_quickfilters.go create mode 100644 pkg/telemetrymeter/statement_builder.go create mode 100644 pkg/telemetrymeter/stmt_builder_test.go create mode 100644 pkg/telemetrymeter/tables.go create mode 100644 pkg/telemetrymeter/testdata/keys_map.json create mode 100644 pkg/types/telemetrytypes/source.go diff --git a/pkg/apis/fields/api.go b/pkg/apis/fields/api.go index 788982ac1271..0d4b20fef879 100644 --- a/pkg/apis/fields/api.go +++ b/pkg/apis/fields/api.go @@ -9,6 +9,7 @@ import ( "github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrymetadata" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrytraces" @@ -33,6 +34,8 @@ func NewAPI( telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/apis/fields/parse.go b/pkg/apis/fields/parse.go index 114be5a51669..913b48b1502a 100644 --- a/pkg/apis/fields/parse.go +++ b/pkg/apis/fields/parse.go @@ -12,6 +12,7 @@ import ( func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) { var req telemetrytypes.FieldKeySelector var signal telemetrytypes.Signal + var source telemetrytypes.Source var err error signalStr := r.URL.Query().Get("signal") @@ -21,6 +22,13 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er signal = telemetrytypes.SignalUnspecified } + sourceStr := r.URL.Query().Get("source") + if sourceStr != "" { + source = telemetrytypes.Source{String: valuer.NewString(sourceStr)} + } else { + source = telemetrytypes.SourceUnspecified + } + if r.URL.Query().Get("limit") != "" { limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { @@ -76,6 +84,7 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er StartUnixMilli: startUnixMilli, EndUnixMilli: endUnixMilli, Signal: signal, + Source: source, Name: name, FieldContext: fieldContext, FieldDataType: fieldDataType, diff --git a/pkg/querier/builder_query.go b/pkg/querier/builder_query.go index 3b1e4166daf3..aaea96fd1881 100644 --- a/pkg/querier/builder_query.go +++ b/pkg/querier/builder_query.go @@ -62,6 +62,9 @@ func (q *builderQuery[T]) Fingerprint() string { // Add signal type parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue())) + // Add source type + parts = append(parts, fmt.Sprintf("source=%s", q.spec.Source.StringValue())) + // Add step interval if present parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String())) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9077f9eb4ad8..6f34e6460781 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -31,6 +31,7 @@ type querier struct { traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation] logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation] metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] + meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation] bucketCache BucketCache } @@ -44,6 +45,7 @@ func New( traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation], logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation], metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], + meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation], bucketCache BucketCache, ) *querier { querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier") @@ -55,6 +57,7 @@ func New( traceStmtBuilder: traceStmtBuilder, logStmtBuilder: logStmtBuilder, metricStmtBuilder: metricStmtBuilder, + meterStmtBuilder: meterStmtBuilder, bucketCache: bucketCache, } } @@ -168,17 +171,21 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype event.MetricsUsed = true event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != "" event.GroupByApplied = len(spec.GroupBy) > 0 - if spec.StepInterval.Seconds() == 0 { - spec.StepInterval = qbtypes.Step{ - Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)), - } - } - if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) { - spec.StepInterval = qbtypes.Step{ - Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)), - } - } + if spec.Source == telemetrytypes.SourceMeter { + spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))} + } else { + if spec.StepInterval.Seconds() == 0 { + spec.StepInterval = qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)), + } + } + if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) { + spec.StepInterval = qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)), + } + } + } req.CompositeQuery.Queries[idx].Spec = spec } } else if query.Type == qbtypes.QueryTypePromQL { @@ -265,7 +272,14 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype } spec.ShiftBy = extractShiftFromBuilderQuery(spec) timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType) - bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars) + var bq *builderQuery[qbtypes.MetricAggregation] + + if spec.Source == telemetrytypes.SourceMeter { + bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars) + } else { + bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars) + } + queries[spec.Name] = bq steps[spec.Name] = spec.StepInterval default: @@ -529,6 +543,9 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp specCopy := qt.spec.Copy() specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy) adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind) + if qt.spec.Source == telemetrytypes.SourceMeter { + return newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables) + } return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables) default: diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index c9801880c40d..9d4cb1dbdae4 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -11,6 +11,7 @@ import ( "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter" "github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrymetadata" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrytraces" @@ -52,6 +53,8 @@ func newProvider( telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, @@ -122,6 +125,14 @@ func newProvider( metricConditionBuilder, ) + // Create meter statement builder + meterStmtBuilder := telemetrymeter.NewMeterQueryStatementBuilder( + settings, + telemetryMetadataStore, + metricFieldMapper, + metricConditionBuilder, + ) + // Create bucket cache bucketCache := querier.NewBucketCache( settings, @@ -139,6 +150,7 @@ func newProvider( traceStmtBuilder, logStmtBuilder, metricStmtBuilder, + meterStmtBuilder, bucketCache, ), nil } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 744a2711b65c..84426b538212 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -64,6 +64,8 @@ const ( signozTraceLocalTableName = "signoz_index_v2" signozMetricDBName = "signoz_metrics" signozMetadataDbName = "signoz_metadata" + signozMeterDBName = "signoz_meter" + signozMeterSamplesName = "samples_agg_1d" signozSampleLocalTableName = "samples_v4" signozSampleTableName = "distributed_samples_v4" @@ -2741,8 +2743,55 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org return &response, nil } -func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { +func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + var response v3.AggregateAttributeResponse + // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db + query := fmt.Sprintf( + `SELECT metric_name,type,temporality,is_monotonic + FROM %s.%s + WHERE metric_name ILIKE $1 + GROUP BY metric_name,type,temporality,is_monotonic`, + signozMeterDBName, signozMeterSamplesName) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while querying meter names", zap.Error(err)) + return nil, fmt.Errorf("error while executing meter name query: %s", err.Error()) + } + defer rows.Close() + + for rows.Next() { + var name string + var typ string + var temporality string + var isMonotonic bool + if err := rows.Scan(&name, &typ, &temporality, &isMonotonic); err != nil { + return nil, fmt.Errorf("error while scanning meter name: %s", err.Error()) + } + + // Non-monotonic cumulative sums are treated as gauges + if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) { + typ = "Gauge" + } + + // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type + key := v3.AttributeKey{ + Key: name, + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyType(typ), + IsColumn: true, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + + return &response, nil +} + +func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { var query string var err error var rows driver.Rows @@ -2782,6 +2831,41 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F return &response, nil } +func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + // skips the internal attributes i.e attributes starting with __ + query = fmt.Sprintf("SELECT DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name FROM %s.%s WHERE metric_name=$1 AND attr_name ILIKE $2 AND attr_name NOT LIKE '\\_\\_%%'", signozMeterDBName, signozMeterSamplesName) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: attributeKey, + DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72. + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + + return &response, nil +} + func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { var query string diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 1b580a0f5141..cff6c4731b2c 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4218,6 +4218,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req) + case v3.DataSourceMeter: + response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req) default: RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil) return @@ -4267,6 +4269,8 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R switch req.DataSource { case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req) + case v3.DataSourceMeter: + response, err = aH.reader.GetMeterAttributeKeys(r.Context(), req) case v3.DataSourceLogs: response, err = aH.reader.GetLogAttributeKeys(r.Context(), req) case v3.DataSourceTraces: diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index e0e957927a51..d5740941637a 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -484,7 +484,7 @@ func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequ limit = 50 } - if dataSource != v3.DataSourceMetrics { + if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter { if err := aggregateOperator.Validate(); err != nil { return nil, err } @@ -604,7 +604,7 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ return nil, err } - if dataSource != v3.DataSourceMetrics { + if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter { if err := aggregateOperator.Validate(); err != nil { return nil, err } diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index b378c94a0283..183dfa4ae84d 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -50,7 +50,9 @@ type Reader interface { FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) + GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) // Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index c989cb9218a3..519ea56c5325 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -22,11 +22,12 @@ const ( DataSourceTraces DataSource = "traces" DataSourceLogs DataSource = "logs" DataSourceMetrics DataSource = "metrics" + DataSourceMeter DataSource = "meter" ) func (d DataSource) Validate() error { switch d { - case DataSourceTraces, DataSourceLogs, DataSourceMetrics: + case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter: return nil default: return fmt.Errorf("invalid data source: %s", d) diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go index a85db77dfc72..afb0d8f5984e 100644 --- a/pkg/querybuilder/time.go +++ b/pkg/querybuilder/time.go @@ -63,6 +63,32 @@ func MinAllowedStepInterval(start, end uint64) uint64 { return step - step%5 } +func RecommendedStepIntervalForMeter(start, end uint64) uint64 { + start = ToNanoSecs(start) + end = ToNanoSecs(end) + + step := (end - start) / RecommendedNumberOfPoints / 1e9 + + // for meter queries the minimum step interval allowed is 1 hour as this is our granularity + if step < 3600 { + return 3600 + } + + // return the nearest lower multiple of 3600 ( 1 hour ) + recommended := step - step%3600 + + // if the time range is greater than 1 month set the step interval to be multiple of 1 day + if end-start >= uint64(30*24*time.Hour.Nanoseconds()) { + if recommended < 86400 { + recommended = 86400 + } else { + recommended = uint64(math.Round(float64(recommended)/86400)) * 86400 + } + } + + return recommended +} + func RecommendedStepIntervalForMetric(start, end uint64) uint64 { start = ToNanoSecs(start) end = ToNanoSecs(end) diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index d24882f47404..88585ad467d0 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -130,6 +130,7 @@ func NewSQLMigrationProviderFactories( sqlmigration.NewUpdateOrgDomainFactory(sqlstore, sqlschema), sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema), sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore), + sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema), ) } diff --git a/pkg/sqlmigration/047_add_meter_quickfilters.go b/pkg/sqlmigration/047_add_meter_quickfilters.go new file mode 100644 index 000000000000..bef43988c110 --- /dev/null +++ b/pkg/sqlmigration/047_add_meter_quickfilters.go @@ -0,0 +1,137 @@ +package sqlmigration + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/SigNoz/signoz/pkg/errors" + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/sqlschema" + "github.com/SigNoz/signoz/pkg/sqlstore" + "github.com/SigNoz/signoz/pkg/valuer" + "github.com/uptrace/bun" + "github.com/uptrace/bun/migrate" +) + +type addMeterQuickFilters struct { + sqlstore sqlstore.SQLStore + sqlschema sqlschema.SQLSchema +} + +func NewAddMeterQuickFiltersFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] { + return factory.NewProviderFactory(factory.MustNewName("add_meter_quick_filters"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) { + return newAddMeterQuickFilters(ctx, providerSettings, config, sqlstore, sqlschema) + }) +} + +func newAddMeterQuickFilters(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) { + return &addMeterQuickFilters{ + sqlstore: sqlstore, + sqlschema: sqlschema, + }, nil +} + +func (migration *addMeterQuickFilters) Register(migrations *migrate.Migrations) error { + if err := migrations.Register(migration.Up, migration.Down); err != nil { + return err + } + + return nil +} + +func (migration *addMeterQuickFilters) Up(ctx context.Context, db *bun.DB) error { + meterFilters := []map[string]interface{}{ + {"key": "deployment.environment", "dataType": "float64", "type": "Sum"}, + {"key": "service.name", "dataType": "float64", "type": "Sum"}, + {"key": "host.name", "dataType": "float64", "type": "Sum"}, + } + + meterJSON, err := json.Marshal(meterFilters) + if err != nil { + return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters") + } + + type signal struct { + valuer.String + } + + type identifiable struct { + ID valuer.UUID `json:"id" bun:"id,pk,type:text"` + } + + type timeAuditable struct { + CreatedAt time.Time `bun:"created_at" json:"createdAt"` + UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"` + } + + type quickFilterType struct { + bun.BaseModel `bun:"table:quick_filter"` + identifiable + OrgID valuer.UUID `bun:"org_id,type:text,notnull"` + Filter string `bun:"filter,type:text,notnull"` + Signal signal `bun:"signal,type:text,notnull"` + timeAuditable + } + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + defer func() { + _ = tx.Rollback() + }() + + var orgIDs []string + err = tx.NewSelect(). + Table("organizations"). + Column("id"). + Scan(ctx, &orgIDs) + if err != nil && err != sql.ErrNoRows { + return err + } + + var meterFiltersToInsert []quickFilterType + for _, orgIDStr := range orgIDs { + orgID, err := valuer.NewUUID(orgIDStr) + if err != nil { + return err + } + + meterFiltersToInsert = append(meterFiltersToInsert, quickFilterType{ + identifiable: identifiable{ + ID: valuer.GenerateUUID(), + }, + OrgID: orgID, + Filter: string(meterJSON), + Signal: signal{valuer.NewString("meter")}, + timeAuditable: timeAuditable{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }, + }) + } + + if len(meterFiltersToInsert) > 0 { + _, err = tx.NewInsert(). + Model(&meterFiltersToInsert). + On("CONFLICT (org_id, signal) DO UPDATE"). + Set("filter = EXCLUDED.filter, updated_at = EXCLUDED.updated_at"). + Exec(ctx) + if err != nil { + return err + } + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil +} + +func (migration *addMeterQuickFilters) Down(ctx context.Context, db *bun.DB) error { + return nil +} diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index d06c1746008c..fdaf0d84d040 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -25,6 +25,8 @@ var ( ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys") ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement") ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys") + ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys") + ErrFailedToGetMeterValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter values") ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values") ) @@ -36,6 +38,8 @@ type telemetryMetaStore struct { indexV3TblName string metricsDBName string metricsFieldsTblName string + meterDBName string + meterFieldsTblName string logsDBName string logsFieldsTblName string logsV2TblName string @@ -58,6 +62,8 @@ func NewTelemetryMetaStore( indexV3TblName string, metricsDBName string, metricsFieldsTblName string, + meterDBName string, + meterFieldsTblName string, logsDBName string, logsV2TblName string, logsFieldsTblName string, @@ -74,6 +80,8 @@ func NewTelemetryMetaStore( indexV3TblName: indexV3TblName, metricsDBName: metricsDBName, metricsFieldsTblName: metricsFieldsTblName, + meterDBName: meterDBName, + meterFieldsTblName: meterFieldsTblName, logsDBName: logsDBName, logsV2TblName: logsV2TblName, logsFieldsTblName: logsFieldsTblName, @@ -598,6 +606,76 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto return keys, complete, nil } +// getMeterKeys returns the keys from the meter metrics that match the field selection criteria +func (t *telemetryMetaStore) getMeterSourceMetricKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) { + if len(fieldKeySelectors) == 0 { + return nil, true, nil + } + + sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName) + conds := []string{} + var limit int + for _, fieldKeySelector := range fieldKeySelectors { + fieldConds := []string{} + if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { + fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name)) + } else { + fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%")) + } + fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%")) + + if fieldKeySelector.MetricContext != nil { + fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName)) + } + + conds = append(conds, sb.And(fieldConds...)) + limit += fieldKeySelector.Limit + } + sb.Where(sb.Or(conds...)) + if limit == 0 { + limit = 1000 + } + + sb.Limit(limit) + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error()) + } + defer rows.Close() + + keys := []*telemetrytypes.TelemetryFieldKey{} + rowCount := 0 + for rows.Next() { + rowCount++ + // reached the limit, we know there are more results + if rowCount > limit { + break + } + + var name string + err = rows.Scan(&name) + if err != nil { + return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error()) + } + keys = append(keys, &telemetrytypes.TelemetryFieldKey{ + Name: name, + Signal: telemetrytypes.SignalMetrics, + }) + } + + if rows.Err() != nil { + return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error()) + } + + // hit the limit? + complete := rowCount <= limit + + return keys, complete, nil + +} + func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) { var keys []*telemetrytypes.TelemetryFieldKey var complete bool = true @@ -614,7 +692,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele case telemetrytypes.SignalLogs: keys, complete, err = t.getLogsKeys(ctx, selectors) case telemetrytypes.SignalMetrics: - keys, complete, err = t.getMetricsKeys(ctx, selectors) + if fieldKeySelector.Source == telemetrytypes.SourceMeter { + keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors) + } else { + keys, complete, err = t.getMetricsKeys(ctx, selectors) + } case telemetrytypes.SignalUnspecified: // get traces keys tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, selectors) @@ -637,7 +719,6 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele } keys = append(keys, metricsKeys...) - // Complete only if all signals are complete complete = tracesComplete && logsComplete && metricsComplete } if err != nil { @@ -657,6 +738,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors logsSelectors := []*telemetrytypes.FieldKeySelector{} tracesSelectors := []*telemetrytypes.FieldKeySelector{} metricsSelectors := []*telemetrytypes.FieldKeySelector{} + meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{} for _, fieldKeySelector := range fieldKeySelectors { switch fieldKeySelector.Signal { @@ -665,7 +747,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors case telemetrytypes.SignalTraces: tracesSelectors = append(tracesSelectors, fieldKeySelector) case telemetrytypes.SignalMetrics: - metricsSelectors = append(metricsSelectors, fieldKeySelector) + if fieldKeySelector.Source == telemetrytypes.SourceMeter { + meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector) + } else { + metricsSelectors = append(metricsSelectors, fieldKeySelector) + } case telemetrytypes.SignalUnspecified: logsSelectors = append(logsSelectors, fieldKeySelector) tracesSelectors = append(tracesSelectors, fieldKeySelector) @@ -686,6 +772,10 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors return nil, false, err } + meterSourceMetricsKeys, _, err := t.getMeterSourceMetricKeys(ctx, meterSourceMetricsSelectors) + if err != nil { + return nil, false, err + } // Complete only if all queries are complete complete := logsComplete && tracesComplete && metricsComplete @@ -699,6 +789,9 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors for _, key := range metricsKeys { mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) } + for _, key := range meterSourceMetricsKeys { + mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key) + } return mapOfKeys, complete, nil } @@ -1062,6 +1155,61 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu return values, complete, nil } +func (t *telemetryMetaStore) getMeterSourceMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) { + sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr"). + From(t.meterDBName + "." + t.meterFieldsTblName) + + if fieldValueSelector.Name != "" { + sb.Where(sb.E("attr.1", fieldValueSelector.Name)) + } + sb.Where(sb.NotLike("attr.1", "\\_\\_%")) + + if fieldValueSelector.Value != "" { + if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { + sb.Where(sb.E("attr.2", fieldValueSelector.Value)) + } else { + sb.Where(sb.Like("attr.2", "%"+fieldValueSelector.Value+"%")) + } + } + sb.Where(sb.NE("attr.2", "")) + + limit := fieldValueSelector.Limit + if limit == 0 { + limit = 50 + } + // query one extra to check if we hit the limit + sb.Limit(limit + 1) + + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error()) + } + defer rows.Close() + + values := &telemetrytypes.TelemetryFieldValues{} + rowCount := 0 + for rows.Next() { + rowCount++ + // reached the limit, we know there are more results + if rowCount > limit { + break + } + + var attribute []string + if err := rows.Scan(&attribute); err != nil { + return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error()) + } + if len(attribute) > 1 { + values.StringValues = append(values.StringValues, attribute[1]) + } + } + + // hit the limit? + complete := rowCount <= limit + return values, complete, nil +} + func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues, limit int) bool { complete := true totalCount := len(mapOfValues) + len(mapOfRelatedValues) @@ -1122,7 +1270,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto case telemetrytypes.SignalLogs: values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalMetrics: - values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector) + if fieldValueSelector.Source == telemetrytypes.SourceMeter { + values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector) + } else { + values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector) + } case telemetrytypes.SignalUnspecified: mapOfValues := make(map[any]bool) mapOfRelatedValues := make(map[any]bool) @@ -1178,6 +1330,33 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa return make(map[string]metrictypes.Temporality), nil } + result := make(map[string]metrictypes.Temporality) + metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...) + if err != nil { + return nil, err + } + meterMetricsTemporality, err := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...) + if err != nil { + return nil, err + } + + // For metrics not found in the database, set to Unknown + for _, metricName := range metricNames { + if temporality, exists := metricsTemporality[metricName]; exists { + result[metricName] = temporality + continue + } + if temporality, exists := meterMetricsTemporality[metricName]; exists { + result[metricName] = temporality + continue + } + result[metricName] = metrictypes.Unknown + } + + return result, nil +} + +func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { result := make(map[string]metrictypes.Temporality) // Build query to fetch temporality for all metrics @@ -1229,11 +1408,55 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa result[metricName] = temporality } - // For metrics not found in the database, set to Unknown - for _, metricName := range metricNames { - if _, exists := result[metricName]; !exists { - result[metricName] = metrictypes.Unknown + return result, nil +} + +func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { + result := make(map[string]metrictypes.Temporality) + + sb := sqlbuilder.Select( + "metric_name", + "argMax(temporality, unix_milli) as temporality", + ).From(t.meterDBName + "." + t.meterFieldsTblName) + + // Filter by metric names (in the temporality column due to data mix-up) + sb.Where(sb.In("metric_name", metricNames)) + + // Group by metric name to get one temporality per metric + sb.GroupBy("metric_name") + + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality") + } + defer rows.Close() + + // Process results + for rows.Next() { + var metricName, temporalityStr string + if err := rows.Scan(&metricName, &temporalityStr); err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result") } + + // Convert string to Temporality type + var temporality metrictypes.Temporality + switch temporalityStr { + case "Delta": + temporality = metrictypes.Delta + case "Cumulative": + temporality = metrictypes.Cumulative + case "Unspecified": + temporality = metrictypes.Unspecified + default: + // Unknown or empty temporality + temporality = metrictypes.Unknown + } + + result[metricName] = temporality } return result, nil diff --git a/pkg/telemetrymetadata/metadata_test.go b/pkg/telemetrymetadata/metadata_test.go index 7b1cc7c18e79..a2b9442e304a 100644 --- a/pkg/telemetrymetadata/metadata_test.go +++ b/pkg/telemetrymetadata/metadata_test.go @@ -8,6 +8,7 @@ import ( "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" "github.com/SigNoz/signoz/pkg/telemetrylogs" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" @@ -42,6 +43,8 @@ func TestGetKeys(t *testing.T) { telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.SamplesAgg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/telemetrymeter/statement_builder.go b/pkg/telemetrymeter/statement_builder.go new file mode 100644 index 000000000000..2a6949fe8c2a --- /dev/null +++ b/pkg/telemetrymeter/statement_builder.go @@ -0,0 +1,365 @@ +package telemetrymeter + +import ( + "context" + "fmt" + "log/slog" + + "github.com/SigNoz/signoz/pkg/factory" + "github.com/SigNoz/signoz/pkg/querybuilder" + "github.com/SigNoz/signoz/pkg/telemetrymetrics" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/huandu/go-sqlbuilder" +) + +type meterQueryStatementBuilder struct { + logger *slog.Logger + metadataStore telemetrytypes.MetadataStore + fm qbtypes.FieldMapper + cb qbtypes.ConditionBuilder + metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder +} + +var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil) + +func NewMeterQueryStatementBuilder( + settings factory.ProviderSettings, + metadataStore telemetrytypes.MetadataStore, + fieldMapper qbtypes.FieldMapper, + conditionBuilder qbtypes.ConditionBuilder, +) *meterQueryStatementBuilder { + metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter") + metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder) + + return &meterQueryStatementBuilder{ + logger: metricsSettings.Logger(), + metadataStore: metadataStore, + fm: fieldMapper, + cb: conditionBuilder, + metricsStatementBuilder: metricsStatementBuilder, + } +} + +func (b *meterQueryStatementBuilder) Build( + ctx context.Context, + start uint64, + end uint64, + _ qbtypes.RequestType, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + variables map[string]qbtypes.VariableItem, +) (*qbtypes.Statement, error) { + keySelectors := telemetrymetrics.GetKeySelectors(query) + keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) + if err != nil { + return nil, err + } + + start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query) + + return b.buildPipelineStatement(ctx, start, end, query, keys, variables) +} + +func (b *meterQueryStatementBuilder) buildPipelineStatement( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (*qbtypes.Statement, error) { + var ( + cteFragments []string + cteArgs [][]any + ) + + if b.metricsStatementBuilder.CanShortCircuitDelta(query) { + // spatial_aggregation_cte directly for certain delta queries + frag, args := b.buildTemporalAggDeltaFastPath(ctx, start, end, query, keys, variables) + if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } else { + // temporal_aggregation_cte + if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, variables); err != nil { + return nil, err + } else if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + + // spatial_aggregation_cte + frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys) + if frag != "" { + cteFragments = append(cteFragments, frag) + cteArgs = append(cteArgs, args) + } + } + + // final SELECT + return b.metricsStatementBuilder.BuildFinalSelect(cteFragments, cteArgs, query) +} + +func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any) { + var filterWhere *querybuilder.PreparedWhereClause + var err error + stepSec := int64(query.StepInterval.Seconds()) + + sb := sqlbuilder.NewSelectBuilder() + + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", []any{} + } + sb.SelectMore(col) + } + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + sb.SelectMore(fmt.Sprintf("%s AS value", aggCol)) + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", []any{} + } + } + if filterWhere != nil { + sb.AddWhereClause(filterWhere.WhereClause) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + sb.GroupBy("ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args +} + +func (b *meterQueryStatementBuilder) buildTemporalAggregationCTE( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + if query.Aggregations[0].Temporality == metrictypes.Delta { + return b.buildTemporalAggDelta(ctx, start, end, query, keys, variables) + } + return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, keys, variables) +} + +func (b *meterQueryStatementBuilder) buildTemporalAggDelta( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + var filterWhere *querybuilder.PreparedWhereClause + var err error + + stepSec := int64(query.StepInterval.Seconds()) + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("fingerprint") + sb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", nil, err + } + sb.SelectMore(col) + } + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, + query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { + aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) + } + + sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + sb.Where( + sb.In("metric_name", query.Aggregations[0].MetricName), + sb.GTE("unix_milli", start), + sb.LT("unix_milli", end), + ) + + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", nil, err + } + } + if filterWhere != nil { + sb.AddWhereClause(filterWhere.WhereClause) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + + sb.GroupBy("fingerprint", "ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + sb.OrderBy("fingerprint", "ts") + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil +} + +func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( + ctx context.Context, + start, end uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + keys map[string][]*telemetrytypes.TelemetryFieldKey, + variables map[string]qbtypes.VariableItem, +) (string, []any, error) { + var filterWhere *querybuilder.PreparedWhereClause + var err error + stepSec := int64(query.StepInterval.Seconds()) + + baseSb := sqlbuilder.NewSelectBuilder() + baseSb.Select("fingerprint") + baseSb.SelectMore(fmt.Sprintf( + "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", + stepSec, + )) + for _, g := range query.GroupBy { + col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys) + if err != nil { + return "", nil, err + } + baseSb.SelectMore(col) + } + + tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol)) + + baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl)) + baseSb.Where( + baseSb.In("metric_name", query.Aggregations[0].MetricName), + baseSb.GTE("unix_milli", start), + baseSb.LT("unix_milli", end), + ) + if query.Filter != nil && query.Filter.Expression != "" { + filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{ + FieldMapper: b.fm, + ConditionBuilder: b.cb, + FieldKeys: keys, + FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"}, + Variables: variables, + }) + if err != nil { + return "", nil, err + } + } + if filterWhere != nil { + baseSb.AddWhereClause(filterWhere.WhereClause) + } + + if query.Aggregations[0].Temporality != metrictypes.Unknown { + baseSb.Where(baseSb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + baseSb.GroupBy("fingerprint", "ts") + baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + baseSb.OrderBy("fingerprint", "ts") + + innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse) + + switch query.Aggregations[0].TimeAggregation { + case metrictypes.TimeAggregationRate: + rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + + case metrictypes.TimeAggregationIncrease: + incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start) + wrapped := sqlbuilder.NewSelectBuilder() + wrapped.Select("ts") + for _, g := range query.GroupBy { + wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr)) + wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery)) + q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...) + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil + default: + return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil + } +} + +func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE( + _ context.Context, + _ uint64, + _ uint64, + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], + _ map[string][]*telemetrytypes.TelemetryFieldKey, +) (string, []any) { + sb := sqlbuilder.NewSelectBuilder() + + sb.Select("ts") + for _, g := range query.GroupBy { + sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) + } + sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue())) + sb.From("__temporal_aggregation_cte") + sb.Where(sb.EQ("isNaN(per_series_value)", 0)) + if query.Aggregations[0].ValueFilter != nil { + sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value)) + } + sb.GroupBy("ts") + sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...) + + q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args +} diff --git a/pkg/telemetrymeter/stmt_builder_test.go b/pkg/telemetrymeter/stmt_builder_test.go new file mode 100644 index 000000000000..62f4124d6eed --- /dev/null +++ b/pkg/telemetrymeter/stmt_builder_test.go @@ -0,0 +1,191 @@ +package telemetrymeter + +import ( + "context" + "testing" + "time" + + "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" + "github.com/SigNoz/signoz/pkg/telemetrymetrics" + "github.com/SigNoz/signoz/pkg/types/metrictypes" + qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest" + "github.com/stretchr/testify/require" +) + +func TestStatementBuilder(t *testing.T) { + cases := []struct { + name string + requestType qbtypes.RequestType + query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] + expected qbtypes.Statement + expectedErr error + }{ + { + name: "test_cumulative_rate_sum", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Cumulative, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0}, + }, + expectedErr: nil, + }, + { + name: "test_delta_rate_sum", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Delta, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"}, + }, + expectedErr: nil, + }, + { + name: "test_delta_rate_avg", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_calls_total", + Type: metrictypes.SumType, + Temporality: metrictypes.Delta, + TimeAggregation: metrictypes.TimeAggregationRate, + SpaceAggregation: metrictypes.SpaceAggregationAvg, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0}, + }, + expectedErr: nil, + }, + { + name: "test_gauge_avg_sum", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "system.memory.usage", + Type: metrictypes.GaugeType, + Temporality: metrictypes.Unspecified, + TimeAggregation: metrictypes.TimeAggregationAvg, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "host.name = 'big-data-node-1'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "host.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0}, + }, + expectedErr: nil, + }, + } + + fm := telemetrymetrics.NewFieldMapper() + cb := telemetrymetrics.NewConditionBuilder(fm) + mockMetadataStore := telemetrytypestest.NewMockMetadataStore() + keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json") + if err != nil { + t.Fatalf("failed to load field keys: %v", err) + } + mockMetadataStore.KeysMap = keys + + statementBuilder := NewMeterQueryStatementBuilder( + instrumentationtest.New().ToProviderSettings(), + mockMetadataStore, + fm, + cb, + ) + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + + q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil) + + if c.expectedErr != nil { + require.Error(t, err) + require.Contains(t, err.Error(), c.expectedErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, c.expected.Query, q.Query) + require.Equal(t, c.expected.Args, q.Args) + require.Equal(t, c.expected.Warnings, q.Warnings) + } + }) + } +} diff --git a/pkg/telemetrymeter/tables.go b/pkg/telemetrymeter/tables.go new file mode 100644 index 000000000000..6bc91e9f5425 --- /dev/null +++ b/pkg/telemetrymeter/tables.go @@ -0,0 +1,194 @@ +package telemetrymeter + +import ( + "time" + + "github.com/SigNoz/signoz/pkg/types/metrictypes" +) + +const ( + DBName = "signoz_meter" + SamplesTableName = "distributed_samples" + SamplesLocalTableName = "samples" + SamplesAgg1dTableName = "distributed_samples_agg_1d" + SamplesAgg1dLocalTableName = "samples_agg_1d" +) + +var ( + oneMonthInMilliseconds = uint64(time.Hour.Milliseconds() * 24 * 30) + + // when the query requests for almost 1 day, but not exactly 1 day, we need to add an offset to the end time + // to make sure that we are using the correct table + // this is because the start gets adjusted to the nearest step interval and uses the 5m table for 4m step interval + // leading to time series that doesn't best represent the rate of change + offsetBucket = uint64(1 * time.Hour.Milliseconds()) +) + +// start and end are in milliseconds +// we have two tables for samples +// 1. distributed_samples +// 2. distributed_samples_agg_1d - for queries with time range above or equal to 30 days +// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it +func WhichSamplesTableToUse( + start, end uint64, + metricType metrictypes.Type, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + + // if we have a hint for the table, we need to use it + // the hint will be used to override the default table selection logic + if tableHints != nil { + if tableHints.SamplesTableName != "" { + return tableHints.SamplesTableName + } + } + + // if the time aggregation is count_distinct, we need to use the distributed_samples table + // because the aggregated tables don't support count_distinct + if timeAggregation == metrictypes.TimeAggregationCountDistinct { + return SamplesTableName + } + + if end-start < oneMonthInMilliseconds+offsetBucket { + return SamplesTableName + } + return SamplesAgg1dTableName + +} + +func AggregationColumnForSamplesTable( + start, end uint64, + metricType metrictypes.Type, + temporality metrictypes.Temporality, + timeAggregation metrictypes.TimeAggregation, + tableHints *metrictypes.MetricTableHints, +) string { + tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints) + var aggregationColumn string + switch temporality { + case metrictypes.Delta: + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "sum(sum)" + } + } + case metrictypes.Cumulative: + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results + aggregationColumn = "max(max)" + } + } + + case metrictypes.Unspecified: + switch tableName { + case SamplesTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(value)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(value)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "avg(value)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(value)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(value)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "count(value)" + case metrictypes.TimeAggregationCountDistinct: + aggregationColumn = "countDistinct(value)" + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(value)" + } + case SamplesAgg1dTableName: + switch timeAggregation { + case metrictypes.TimeAggregationLatest: + aggregationColumn = "anyLast(last)" + case metrictypes.TimeAggregationSum: + aggregationColumn = "sum(sum)" + case metrictypes.TimeAggregationAvg: + aggregationColumn = "sum(sum) / sum(count)" + case metrictypes.TimeAggregationMin: + aggregationColumn = "min(min)" + case metrictypes.TimeAggregationMax: + aggregationColumn = "max(max)" + case metrictypes.TimeAggregationCount: + aggregationColumn = "sum(count)" + // count_distinct is not supported in aggregated tables + case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen + aggregationColumn = "sum(sum)" + } + } + + } + return aggregationColumn +} diff --git a/pkg/telemetrymeter/testdata/keys_map.json b/pkg/telemetrymeter/testdata/keys_map.json new file mode 100644 index 000000000000..d1c27a3bf460 --- /dev/null +++ b/pkg/telemetrymeter/testdata/keys_map.json @@ -0,0 +1,34 @@ +{ + "service.name": [ + { + "name": "service.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.request.method": [ + { + "name": "http.request.method", + "fieldContext": "attribute", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.response.status_code": [ + { + "name": "http.response.status_code", + "fieldContext": "attribute", + "fieldDataType": "int", + "signal": "metrics" + } + ], + "host.name": [ + { + "name": "host.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ] +} \ No newline at end of file diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go index 13dfbd1c1f1e..839e15757824 100644 --- a/pkg/telemetrymetrics/statement_builder.go +++ b/pkg/telemetrymetrics/statement_builder.go @@ -19,23 +19,23 @@ const ( IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))` ) -type metricQueryStatementBuilder struct { +type MetricQueryStatementBuilder struct { logger *slog.Logger metadataStore telemetrytypes.MetadataStore fm qbtypes.FieldMapper cb qbtypes.ConditionBuilder } -var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil) +var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil) func NewMetricQueryStatementBuilder( settings factory.ProviderSettings, metadataStore telemetrytypes.MetadataStore, fieldMapper qbtypes.FieldMapper, conditionBuilder qbtypes.ConditionBuilder, -) *metricQueryStatementBuilder { +) *MetricQueryStatementBuilder { metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics") - return &metricQueryStatementBuilder{ + return &MetricQueryStatementBuilder{ logger: metricsSettings.Logger(), metadataStore: metadataStore, fm: fieldMapper, @@ -43,7 +43,7 @@ func NewMetricQueryStatementBuilder( } } -func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { +func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector { var keySelectors []*telemetrytypes.FieldKeySelector if query.Filter != nil && query.Filter.Expression != "" { whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) @@ -72,7 +72,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) return keySelectors } -func (b *metricQueryStatementBuilder) Build( +func (b *MetricQueryStatementBuilder) Build( ctx context.Context, start uint64, end uint64, @@ -80,7 +80,7 @@ func (b *metricQueryStatementBuilder) Build( query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], variables map[string]qbtypes.VariableItem, ) (*qbtypes.Statement, error) { - keySelectors := getKeySelectors(query) + keySelectors := GetKeySelectors(query) keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors) if err != nil { return nil, err @@ -113,7 +113,7 @@ func (b *metricQueryStatementBuilder) Build( // we can directly use the quantilesDDMerge function // // all of this is true only for delta metrics -func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { +func (b *MetricQueryStatementBuilder) CanShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { if q.Aggregations[0].Temporality != metrictypes.Delta { return false } @@ -139,7 +139,7 @@ func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilde return false } -func (b *metricQueryStatementBuilder) buildPipelineStatement( +func (b *MetricQueryStatementBuilder) buildPipelineStatement( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -200,7 +200,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( return nil, err } - if b.canShortCircuitDelta(query) { + if b.CanShortCircuitDelta(query) { // spatial_aggregation_cte directly for certain delta queries frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs) if frag != "" { @@ -230,10 +230,10 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( query.GroupBy = origGroupBy // final SELECT - return b.buildFinalSelect(cteFragments, cteArgs, query) + return b.BuildFinalSelect(cteFragments, cteArgs, query) } -func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( +func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath( start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], timeSeriesCTE string, @@ -281,7 +281,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args } -func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( +func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -344,7 +344,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil } -func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( +func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE( ctx context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -358,7 +358,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs) } -func (b *metricQueryStatementBuilder) buildTemporalAggDelta( +func (b *MetricQueryStatementBuilder) buildTemporalAggDelta( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -401,7 +401,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta( return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil } -func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( +func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], @@ -466,7 +466,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( } } -func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( +func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE( _ context.Context, _ uint64, _ uint64, @@ -492,7 +492,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE( return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args } -func (b *metricQueryStatementBuilder) buildFinalSelect( +func (b *MetricQueryStatementBuilder) BuildFinalSelect( cteFragments []string, cteArgs [][]any, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go index d3a7b83331ea..659c7cf604f2 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go @@ -14,6 +14,9 @@ type QueryBuilderQuery[T any] struct { // signal to query Signal telemetrytypes.Signal `json:"signal,omitempty"` + // source for query + Source telemetrytypes.Source `json:"source,omitempty"` + // we want to support multiple aggregations // currently supported: []Aggregation, []MetricAggregation Aggregations []T `json:"aggregations,omitempty"` diff --git a/pkg/types/quickfiltertypes/filter.go b/pkg/types/quickfiltertypes/filter.go index ac436a451b3b..b86d2cb1929f 100644 --- a/pkg/types/quickfiltertypes/filter.go +++ b/pkg/types/quickfiltertypes/filter.go @@ -35,6 +35,7 @@ var ( SignalLogs = Signal{valuer.NewString("logs")} SignalApiMonitoring = Signal{valuer.NewString("api_monitoring")} SignalExceptions = Signal{valuer.NewString("exceptions")} + SignalMeter = Signal{valuer.NewString("meter")} ) // NewSignal creates a Signal from a string @@ -48,6 +49,8 @@ func NewSignal(s string) (Signal, error) { return SignalApiMonitoring, nil case "exceptions": return SignalExceptions, nil + case "meter": + return SignalMeter, nil default: return Signal{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid signal: %s", s) } @@ -178,6 +181,12 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) { {"key": "k8s.pod.name", "dataType": "string", "type": "resource"}, } + meterFilters := []map[string]interface{}{ + {"key": "deployment.environment", "dataType": "float64", "type": "Sum"}, + {"key": "service.name", "dataType": "float64", "type": "Sum"}, + {"key": "host.name", "dataType": "float64", "type": "Sum"}, + } + tracesJSON, err := json.Marshal(tracesFilters) if err != nil { return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal traces filters") @@ -190,12 +199,19 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) { apiMonitoringJSON, err := json.Marshal(apiMonitoringFilters) if err != nil { - return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Api Monitoring filters") + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal api monitoring filters") } + exceptionsJSON, err := json.Marshal(exceptionsFilters) if err != nil { - return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Exceptions filters") + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal exceptions filters") } + + meterJSON, err := json.Marshal(meterFilters) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters") + } + timeRightNow := time.Now() return []*StorableQuickFilter{ @@ -247,5 +263,17 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) { UpdatedAt: timeRightNow, }, }, + { + Identifiable: types.Identifiable{ + ID: valuer.GenerateUUID(), + }, + OrgID: orgID, + Filter: string(meterJSON), + Signal: SignalMeter, + TimeAuditable: types.TimeAuditable{ + CreatedAt: timeRightNow, + UpdatedAt: timeRightNow, + }, + }, }, nil } diff --git a/pkg/types/telemetrytypes/field.go b/pkg/types/telemetrytypes/field.go index 206586db4bfa..398fc19854a3 100644 --- a/pkg/types/telemetrytypes/field.go +++ b/pkg/types/telemetrytypes/field.go @@ -121,6 +121,7 @@ type FieldKeySelector struct { StartUnixMilli int64 `json:"startUnixMilli"` EndUnixMilli int64 `json:"endUnixMilli"` Signal Signal `json:"signal"` + Source Source `json:"source"` FieldContext FieldContext `json:"fieldContext"` FieldDataType FieldDataType `json:"fieldDataType"` Name string `json:"name"` diff --git a/pkg/types/telemetrytypes/source.go b/pkg/types/telemetrytypes/source.go new file mode 100644 index 000000000000..247a9a5a5b3c --- /dev/null +++ b/pkg/types/telemetrytypes/source.go @@ -0,0 +1,12 @@ +package telemetrytypes + +import "github.com/SigNoz/signoz/pkg/valuer" + +type Source struct { + valuer.String +} + +var ( + SourceMeter = Source{valuer.NewString("meter")} + SourceUnspecified = Source{valuer.NewString("")} +)