diff --git a/pkg/apis/fields/api.go b/pkg/apis/fields/api.go index f36346960b8f..25a0362b3232 100644 --- a/pkg/apis/fields/api.go +++ b/pkg/apis/fields/api.go @@ -35,7 +35,7 @@ func NewAPI( telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.AttributesMetadataTableName, + telemetrymeter.SamplesV4Agg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go index 84e2293a3cf7..755547119d7d 100644 --- a/pkg/querier/signozquerier/provider.go +++ b/pkg/querier/signozquerier/provider.go @@ -54,7 +54,7 @@ func newProvider( telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.AttributesMetadataTableName, + telemetrymeter.SamplesV4Agg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 744a2711b65c..5ed350562c50 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -64,6 +64,8 @@ const ( signozTraceLocalTableName = "signoz_index_v2" signozMetricDBName = "signoz_metrics" signozMetadataDbName = "signoz_metadata" + signozMeterDBName = "signoz_meter" + signozMeterSamplesName = "samples_agg_1d" signozSampleLocalTableName = "samples_v4" signozSampleTableName = "distributed_samples_v4" @@ -2741,6 +2743,54 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org return &response, nil } +func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + var response v3.AggregateAttributeResponse + // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db + query := fmt.Sprintf( + `SELECT metric_name,type,temporality,is_monotonic + FROM %s.%s + WHERE metric_name ILIKE $1 + GROUP BY metric_name,type,temporality,is_monotonic`, + signozMeterDBName, signozMeterSamplesName) + + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while querying meter names", zap.Error(err)) + return nil, fmt.Errorf("error while executing meter name query: %s", err.Error()) + } + defer rows.Close() + + for rows.Next() { + var name string + var typ string + var temporality string + var isMonotonic bool + if err := rows.Scan(&name, &typ, &temporality, &isMonotonic); err != nil { + return nil, fmt.Errorf("error while scanning meter name: %s", err.Error()) + } + + // Non-monotonic cumulative sums are treated as gauges + if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) { + typ = "Gauge" + } + + // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type + key := v3.AttributeKey{ + Key: name, + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyType(typ), + IsColumn: true, + } + response.AttributeKeys = append(response.AttributeKeys, key) + } + + return &response, nil +} + func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { var query string diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 69be0b7f18fb..690db39d9483 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4213,6 +4213,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * switch req.DataSource { case v3.DataSourceMetrics: response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), orgID, req, false) + case v3.DataSourceMeter: + response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req) case v3.DataSourceLogs: response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index b378c94a0283..037fb769309e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -50,6 +50,7 @@ type Reader interface { FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) + GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 67bbf65af586..cb1abd38818b 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -22,11 +22,12 @@ const ( DataSourceTraces DataSource = "traces" DataSourceLogs DataSource = "logs" DataSourceMetrics DataSource = "metrics" + DataSourceMeter DataSource = "meter" ) func (d DataSource) Validate() error { switch d { - case DataSourceTraces, DataSourceLogs, DataSourceMetrics: + case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter: return nil default: return fmt.Errorf("invalid data source: %s", d) diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go index 9f0e3f6bf69a..3ace8db4e2d3 100644 --- a/pkg/telemetrymetadata/metadata.go +++ b/pkg/telemetrymetadata/metadata.go @@ -557,25 +557,15 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto return keys, nil } -// getMetricsKeys returns the keys from the metrics that match the field selection criteria +// getMeterKeys returns the keys from the meter metrics that match the field selection criteria func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) { if len(fieldKeySelectors) == 0 { return nil, nil } - sb := sqlbuilder. - Select("attr_name as name", "attr_type as field_context", "attr_datatype as field_data_type", ` - CASE - WHEN attr_type = 'resource' THEN 1 - WHEN attr_type = 'scope' THEN 2 - WHEN attr_type = 'point' THEN 3 - ELSE 4 - END as priority`). - From(t.meterDBName + "." + t.meterFieldsTblName) - - var limit int - + sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName) conds := []string{} + var limit int for _, fieldKeySelector := range fieldKeySelectors { fieldConds := []string{} if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { @@ -585,16 +575,6 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors } fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%")) - // note: type and datatype do not have much significance in metrics - - // if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified { - // fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType())) - // } - - // if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - // fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType())) - // } - if fieldKeySelector.MetricContext != nil { fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName)) } @@ -603,18 +583,12 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors limit += fieldKeySelector.Limit } sb.Where(sb.Or(conds...)) - if limit == 0 { limit = 1000 } - mainSb := sqlbuilder.Select("name", "field_context", "field_data_type", "max(priority) as priority") - mainSb.From(mainSb.BuilderAs(sb, "sub_query")) - mainSb.GroupBy("name", "field_context", "field_data_type") - mainSb.OrderBy("priority") - mainSb.Limit(limit) - - query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse) + sb.Limit(limit) + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) if err != nil { @@ -625,18 +599,13 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors keys := []*telemetrytypes.TelemetryFieldKey{} for rows.Next() { var name string - var fieldContext telemetrytypes.FieldContext - var fieldDataType telemetrytypes.FieldDataType - var priority uint8 - err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority) + err = rows.Scan(&name) if err != nil { return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error()) } keys = append(keys, &telemetrytypes.TelemetryFieldKey{ - Name: name, - Signal: telemetrytypes.SignalMetrics, - FieldContext: fieldContext, - FieldDataType: fieldDataType, + Name: name, + Signal: telemetrytypes.SignalMeter, }) } @@ -645,6 +614,7 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors } return keys, nil + } func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) { @@ -1052,25 +1022,13 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu } func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) { - sb := sqlbuilder. - Select("DISTINCT attr_string_value"). + sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr, attr.2 AS attr_string_value"). From(t.meterDBName + "." + t.meterFieldsTblName) if fieldValueSelector.Name != "" { - sb.Where(sb.E("attr_name", fieldValueSelector.Name)) - } - - if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified { - sb.Where(sb.E("attr_type", fieldValueSelector.FieldContext.TagType())) - } - - if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified { - sb.Where(sb.E("attr_datatype", fieldValueSelector.FieldDataType.TagDataType())) - } - - if fieldValueSelector.MetricContext != nil { - sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName)) + sb.Where(sb.E("attr.1", fieldValueSelector.Name)) } + sb.Where(sb.NotLike("attr.1", "\\_\\_%")) if fieldValueSelector.Value != "" { if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact { @@ -1079,6 +1037,7 @@ func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValue sb.Where(sb.Like("attr_string_value", "%"+fieldValueSelector.Value+"%")) } } + sb.Where(sb.NE("attr_string_value", "")) if fieldValueSelector.Limit > 0 { sb.Limit(fieldValueSelector.Limit) @@ -1096,8 +1055,9 @@ func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValue values := &telemetrytypes.TelemetryFieldValues{} for rows.Next() { + var attribute []any var stringValue string - if err := rows.Scan(&stringValue); err != nil { + if err := rows.Scan(&attribute, &stringValue); err != nil { return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error()) } values.StringValues = append(values.StringValues, stringValue) @@ -1187,11 +1147,11 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa } result := make(map[string]metrictypes.Temporality) - metricsTemporality, err := t.fetchMetricsTemporality(ctx, t.metricsDBName, t.metricsFieldsTblName, metricNames...) + metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...) if err != nil { return nil, err } - meterMetricsTemporality, err := t.fetchMetricsTemporality(ctx, t.meterDBName, t.meterFieldsTblName, metricNames...) + meterMetricsTemporality, err := t.fetchMeterMetricsTemporality(ctx, metricNames...) if err != nil { return nil, err } @@ -1212,7 +1172,7 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa return result, nil } -func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, database string, table string, metricNames ...string) (map[string]metrictypes.Temporality, error) { +func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { result := make(map[string]metrictypes.Temporality) // Build query to fetch temporality for all metrics @@ -1222,7 +1182,7 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, databa sb := sqlbuilder.Select( "metric_name", "argMax(temporality, last_reported_unix_milli) as temporality", - ).From(database + "." + table) + ).From(t.metricsDBName + "." + t.metricsFieldsTblName) // Filter by metric names (in the temporality column due to data mix-up) sb.Where(sb.In("metric_name", metricNames)) @@ -1266,3 +1226,54 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, databa return result, nil } + +func (t *telemetryMetaStore) fetchMeterMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) { + result := make(map[string]metrictypes.Temporality) + + sb := sqlbuilder.Select( + "metric_name", + "argMax(temporality, unix_milli) as temporality", + ).From(t.meterDBName + "." + t.meterFieldsTblName) + + // Filter by metric names (in the temporality column due to data mix-up) + sb.Where(sb.In("metric_name", metricNames)) + + // Group by metric name to get one temporality per metric + sb.GroupBy("metric_name") + + query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse) + + t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args) + + rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...) + if err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality") + } + defer rows.Close() + + // Process results + for rows.Next() { + var metricName, temporalityStr string + if err := rows.Scan(&metricName, &temporalityStr); err != nil { + return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result") + } + + // Convert string to Temporality type + var temporality metrictypes.Temporality + switch temporalityStr { + case "Delta": + temporality = metrictypes.Delta + case "Cumulative": + temporality = metrictypes.Cumulative + case "Unspecified": + temporality = metrictypes.Unspecified + default: + // Unknown or empty temporality + temporality = metrictypes.Unknown + } + + result[metricName] = temporality + } + + return result, nil +} diff --git a/pkg/telemetrymetadata/metadata_test.go b/pkg/telemetrymetadata/metadata_test.go index 728fa3ecd420..19b791db0bb6 100644 --- a/pkg/telemetrymetadata/metadata_test.go +++ b/pkg/telemetrymetadata/metadata_test.go @@ -44,7 +44,7 @@ func TestGetKeys(t *testing.T) { telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName, telemetrymeter.DBName, - telemetrymeter.AttributesMetadataTableName, + telemetrymeter.SamplesV4Agg1dTableName, telemetrylogs.DBName, telemetrylogs.LogsV2TableName, telemetrylogs.TagAttributesV2TableName, diff --git a/pkg/telemetrymeter/tables.go b/pkg/telemetrymeter/tables.go index ee58a5077da2..7f040f64f1d4 100644 --- a/pkg/telemetrymeter/tables.go +++ b/pkg/telemetrymeter/tables.go @@ -5,13 +5,11 @@ import ( ) const ( - DBName = "signoz_meter" - SamplesTableName = "distributed_samples" - SamplesLocalTableName = "samples" - SamplesV4Agg1dTableName = "distributed_samples_agg_1d" - SamplesV4Agg1dLocalTableName = "samples_agg_1d" - AttributesMetadataTableName = "distributed_metadata" - AttributesMetadataLocalTableName = "metadata" + DBName = "signoz_meter" + SamplesTableName = "distributed_samples" + SamplesLocalTableName = "samples" + SamplesV4Agg1dTableName = "distributed_samples_agg_1d" + SamplesV4Agg1dLocalTableName = "samples_agg_1d" ) func AggregationColumnForSamplesTable(