From e7a5266cd3d3433f97508db0c7fa6a94f857fdf4 Mon Sep 17 00:00:00 2001 From: vikrantgupta25 Date: Thu, 31 Jul 2025 01:41:55 +0530 Subject: [PATCH] feat(telemetry/meter): added metadata setup for meter --- pkg/apis/fields/api.go | 3 + pkg/querier/querier.go | 24 ++-- pkg/querier/signozquerier/provider.go | 2 + pkg/telemetrymetadata/metadata.go | 163 ++++++++++++++++++++++++ pkg/telemetrymetadata/metadata_test.go | 3 + pkg/telemetrymeter/stmt_builder_test.go | 2 +- 6 files changed, 186 insertions(+), 11 deletions(-) diff --git a/pkg/apis/fields/api.go b/pkg/apis/fields/api.go index 671670f7e630..f36346960b8f 100644 --- a/pkg/apis/fields/api.go +++ b/pkg/apis/fields/api.go @@ -9,6 +9,7 @@ import ( "github.com/SigNoz/signoz/pkg/http/render" "github.com/SigNoz/signoz/pkg/telemetrylogs" "github.com/SigNoz/signoz/pkg/telemetrymetadata" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrytraces" @@ -33,6 +34,8 @@ func NewAPI( telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.AttributesMetadataTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2c404230de94..4749df5a2ce0 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -171,17 +171,21 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype event.MetricsUsed = true event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != "" event.GroupByApplied = len(spec.GroupBy) > 0 - if spec.StepInterval.Seconds() == 0 { - spec.StepInterval = qbtypes.Step{ - Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)), - } - } - if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) { - spec.StepInterval = qbtypes.Step{ - Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)), - } - } + if spec.Signal == telemetrytypes.SignalMeter { + spec.StepInterval = qbtypes.Step{Duration: time.Hour * 24} + } else { + if spec.StepInterval.Seconds() == 0 { + spec.StepInterval = qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)), + } + } + if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) { + spec.StepInterval = qbtypes.Step{ + Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)), + } + } + } req.CompositeQuery.Queries[idx].Spec = spec } } else if query.Type == qbtypes.QueryTypePromQL { diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index 59c2b80fb1c0..7b528699c4bf 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -53,6 +53,8 @@ func newProvider( telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.AttributesMetadataTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 4fd86e29f146..37123f73a2e8 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -25,6 +25,7 @@ var ( ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys") ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement") ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys") + ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys") ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values") ) @@ -36,6 +37,8 @@ type telemetryMetaStore struct { indexV3TblName string metricsDBName string metricsFieldsTblName string + meterDBName string + meterFieldsTblName string logsDBName string logsFieldsTblName string logsV2TblName string @@ -54,6 +57,8 @@ func NewTelemetryMetaStore( indexV3TblName string, metricsDBName string, metricsFieldsTblName string, + meterDBName string, + meterFieldsTblName string, logsDBName string, logsV2TblName string, logsFieldsTblName string, @@ -70,6 +75,8 @@ func NewTelemetryMetaStore( indexV3TblName: indexV3TblName, metricsDBName: metricsDBName, metricsFieldsTblName: metricsFieldsTblName, + meterDBName: meterDBName, + meterFieldsTblName: meterFieldsTblName, logsDBName: logsDBName, logsV2TblName: logsV2TblName, logsFieldsTblName: logsFieldsTblName, @@ -550,6 +557,96 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto return keys, nil } +// getMetricsKeys returns the keys from the metrics that match the field selection criteria +func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) { + if len(fieldKeySelectors) == 0 { + return nil, nil + } + + sb := sqlbuilder. + Select("attr_name as name", "attr_type as field_context", "attr_datatype as field_data_type", ` + CASE + WHEN attr_type = 'resource' THEN 1 + WHEN attr_type = 'scope' THEN 2 + WHEN attr_type = 'point' THEN 3 + ELSE 4 + END as priority`). + From(t.meterDBName + "." + t.meterFieldsTblName) + + var limit int + + conds := []string{} + for _, fieldKeySelector := range fieldKeySelectors { + fieldConds := []string{} + if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { + fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name)) + } else { + fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%")) + } + fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%")) + + // note: type and datatype do not have much significance in metrics + + // if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified { + // fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType())) + // } + + // if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { + // fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType())) + // } + + if fieldKeySelector.MetricContext != nil { + fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName)) + } + + conds = append(conds, sb.And(fieldConds...)) + limit += fieldKeySelector.Limit + } + sb.Where(sb.Or(conds...)) + + if limit == 0 { + limit = 1000 + } + + mainSb := sqlbuilder.Select("name", "field_context", "field_data_type", "max(priority) as priority") + mainSb.From(mainSb.BuilderAs(sb, "sub_query")) + mainSb.GroupBy("name", "field_context", "field_data_type") + mainSb.OrderBy("priority") + mainSb.Limit(limit) + + query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) + } + defer rows.Close() + + keys := []*telemetrytypes.TelemetryFieldKey{} + for rows.Next() { + var name string + var fieldContext telemetrytypes.FieldContext + var fieldDataType telemetrytypes.FieldDataType + var priority uint8 + err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) + } + keys = append(keys, &telemetrytypes.TelemetryFieldKey{ + Name: name, + Signal: telemetrytypes.SignalMetrics, + FieldContext: fieldContext, + FieldDataType: fieldDataType, + }) + } + + if rows.Err() != nil { + return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) + } + + return keys, nil +} + func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) { var keys []*telemetrytypes.TelemetryFieldKey var err error @@ -566,6 +663,8 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele keys, err = t.getLogsKeys(ctx, selectors) case telemetrytypes.SignalMetrics: keys, err = t.getMetricsKeys(ctx, selectors) + case telemetrytypes.SignalMeter: + keys, err = t.getMeterKeys(ctx, selectors) case telemetrytypes.SignalUnspecified: // get traces keys tracesKeys, err := t.getTracesKeys(ctx, selectors) @@ -952,6 +1051,68 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu return values, nil } +func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) { + sb := sqlbuilder. + Select("DISTINCT attr_string_value"). + From(t.meterDBName + "." + t.meterFieldsTblName) + + if fieldValueSelector.Name != "" { + sb.Where(sb.E("attr_name", fieldValueSelector.Name)) + } + + if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified { + sb.Where(sb.E("attr_type", fieldValueSelector.FieldContext.TagType())) + } + + if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { + sb.Where(sb.E("attr_datatype", fieldValueSelector.FieldDataType.TagDataType())) + } + + if fieldValueSelector.MetricContext != nil { + sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName)) + } + + if fieldValueSelector.StartUnixMilli > 0 { + sb.Where(sb.GE("last_reported_unix_milli", fieldValueSelector.StartUnixMilli)) + } + + if fieldValueSelector.EndUnixMilli > 0 { + sb.Where(sb.LE("first_reported_unix_milli", fieldValueSelector.EndUnixMilli)) + } + + if fieldValueSelector.Value != "" { + if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { + sb.Where(sb.E("attr_string_value", fieldValueSelector.Value)) + } else { + sb.Where(sb.Like("attr_string_value", "%"+fieldValueSelector.Value+"%")) + } + } + + if fieldValueSelector.Limit > 0 { + sb.Limit(fieldValueSelector.Limit) + } else { + sb.Limit(50) + } + + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) + } + defer rows.Close() + + values := &telemetrytypes.TelemetryFieldValues{} + for rows.Next() { + var stringValue string + if err := rows.Scan(&stringValue); err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) + } + values.StringValues = append(values.StringValues, stringValue) + } + return values, nil +} + func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues) { for _, value := range values.StringValues { if _, ok := mapOfValues[value]; !ok { @@ -984,6 +1145,8 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto values, err = t.getLogFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalMetrics: values, err = t.getMetricFieldValues(ctx, fieldValueSelector) + case telemetrytypes.SignalMeter: + values, err = t.getMeterFieldValues(ctx, fieldValueSelector) case telemetrytypes.SignalUnspecified: mapOfValues := make(map[any]bool) mapOfRelatedValues := make(map[any]bool) diff --git a/pkg/telemetrymetadata/metadata_test.go b/pkg/telemetrymetadata/metadata_test.go index 078f74891529..728fa3ecd420 100644 --- a/pkg/telemetrymetadata/metadata_test.go +++ b/pkg/telemetrymetadata/metadata_test.go @@ -8,6 +8,7 @@ import ( "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest" "github.com/SigNoz/signoz/pkg/telemetrylogs" + "github.com/SigNoz/signoz/pkg/telemetrymeter" "github.com/SigNoz/signoz/pkg/telemetrymetrics" "github.com/SigNoz/signoz/pkg/telemetrystore" "github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest" @@ -42,6 +43,8 @@ func TestGetKeys(t *testing.T) { telemetrytraces.SpanIndexV3TableName, telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, + telemetrymeter.DBName, + telemetrymeter.AttributesMetadataTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/telemetrymeter/stmt_builder_test.go b/pkg/telemetrymeter/stmt_builder_test.go index 02ec8e743997..aa4d833157d9 100644 --- a/pkg/telemetrymeter/stmt_builder_test.go +++ b/pkg/telemetrymeter/stmt_builder_test.go @@ -93,7 +93,7 @@ func TestStatementBuilder(t *testing.T) { requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ Signal: telemetrytypes.SignalMetrics, - StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + StepInterval: qbtypes.Step{Duration: 24 * time.Hour}, Aggregations: []qbtypes.MetricAggregation{ { MetricName: "signoz_calls_total",