mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-24 02:46:27 +00:00
feat(telemetrymeter): metadata changes and aggregate attribute changes
This commit is contained in:
parent
712fa3e041
commit
240ce72c9a
@ -35,7 +35,7 @@ func NewAPI(
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -54,7 +54,7 @@ func newProvider(
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -64,6 +64,8 @@ const (
|
||||
signozTraceLocalTableName = "signoz_index_v2"
|
||||
signozMetricDBName = "signoz_metrics"
|
||||
signozMetadataDbName = "signoz_metadata"
|
||||
signozMeterDBName = "signoz_meter"
|
||||
signozMeterSamplesName = "samples_agg_1d"
|
||||
|
||||
signozSampleLocalTableName = "samples_v4"
|
||||
signozSampleTableName = "distributed_samples_v4"
|
||||
@ -2741,6 +2743,54 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
|
||||
var response v3.AggregateAttributeResponse
|
||||
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
|
||||
query := fmt.Sprintf(
|
||||
`SELECT metric_name,type,temporality,is_monotonic
|
||||
FROM %s.%s
|
||||
WHERE metric_name ILIKE $1
|
||||
GROUP BY metric_name,type,temporality,is_monotonic`,
|
||||
signozMeterDBName, signozMeterSamplesName)
|
||||
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
|
||||
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||
if err != nil {
|
||||
zap.L().Error("Error while querying meter names", zap.Error(err))
|
||||
return nil, fmt.Errorf("error while executing meter name query: %s", err.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var typ string
|
||||
var temporality string
|
||||
var isMonotonic bool
|
||||
if err := rows.Scan(&name, &typ, &temporality, &isMonotonic); err != nil {
|
||||
return nil, fmt.Errorf("error while scanning meter name: %s", err.Error())
|
||||
}
|
||||
|
||||
// Non-monotonic cumulative sums are treated as gauges
|
||||
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
|
||||
typ = "Gauge"
|
||||
}
|
||||
|
||||
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
|
||||
key := v3.AttributeKey{
|
||||
Key: name,
|
||||
DataType: v3.AttributeKeyDataTypeFloat64,
|
||||
Type: v3.AttributeKeyType(typ),
|
||||
IsColumn: true,
|
||||
}
|
||||
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
|
||||
var query string
|
||||
|
||||
@ -4213,6 +4213,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
|
||||
switch req.DataSource {
|
||||
case v3.DataSourceMetrics:
|
||||
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), orgID, req, false)
|
||||
case v3.DataSourceMeter:
|
||||
response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req)
|
||||
case v3.DataSourceLogs:
|
||||
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
|
||||
case v3.DataSourceTraces:
|
||||
|
||||
@ -50,6 +50,7 @@ type Reader interface {
|
||||
|
||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
||||
GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||
|
||||
|
||||
@ -22,11 +22,12 @@ const (
|
||||
DataSourceTraces DataSource = "traces"
|
||||
DataSourceLogs DataSource = "logs"
|
||||
DataSourceMetrics DataSource = "metrics"
|
||||
DataSourceMeter DataSource = "meter"
|
||||
)
|
||||
|
||||
func (d DataSource) Validate() error {
|
||||
switch d {
|
||||
case DataSourceTraces, DataSourceLogs, DataSourceMetrics:
|
||||
case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("invalid data source: %s", d)
|
||||
|
||||
@ -557,25 +557,15 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
|
||||
// getMeterKeys returns the keys from the meter metrics that match the field selection criteria
|
||||
func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
sb := sqlbuilder.
|
||||
Select("attr_name as name", "attr_type as field_context", "attr_datatype as field_data_type", `
|
||||
CASE
|
||||
WHEN attr_type = 'resource' THEN 1
|
||||
WHEN attr_type = 'scope' THEN 2
|
||||
WHEN attr_type = 'point' THEN 3
|
||||
ELSE 4
|
||||
END as priority`).
|
||||
From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
var limit int
|
||||
|
||||
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
conds := []string{}
|
||||
var limit int
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
fieldConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
@ -585,16 +575,6 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors
|
||||
}
|
||||
fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
|
||||
|
||||
// note: type and datatype do not have much significance in metrics
|
||||
|
||||
// if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
// fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType()))
|
||||
// }
|
||||
|
||||
// if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
// fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
// }
|
||||
|
||||
if fieldKeySelector.MetricContext != nil {
|
||||
fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
|
||||
}
|
||||
@ -603,18 +583,12 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
sb.Where(sb.Or(conds...))
|
||||
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
mainSb := sqlbuilder.Select("name", "field_context", "field_data_type", "max(priority) as priority")
|
||||
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
|
||||
mainSb.GroupBy("name", "field_context", "field_data_type")
|
||||
mainSb.OrderBy("priority")
|
||||
mainSb.Limit(limit)
|
||||
|
||||
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
sb.Limit(limit)
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
@ -625,18 +599,13 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors
|
||||
keys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var fieldContext telemetrytypes.FieldContext
|
||||
var fieldDataType telemetrytypes.FieldDataType
|
||||
var priority uint8
|
||||
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
|
||||
err = rows.Scan(&name)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
|
||||
}
|
||||
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
FieldContext: fieldContext,
|
||||
FieldDataType: fieldDataType,
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
})
|
||||
}
|
||||
|
||||
@ -645,6 +614,7 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
@ -1052,25 +1022,13 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
|
||||
sb := sqlbuilder.
|
||||
Select("DISTINCT attr_string_value").
|
||||
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr, attr.2 AS attr_string_value").
|
||||
From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
if fieldValueSelector.Name != "" {
|
||||
sb.Where(sb.E("attr_name", fieldValueSelector.Name))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
sb.Where(sb.E("attr_type", fieldValueSelector.FieldContext.TagType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
sb.Where(sb.E("attr_datatype", fieldValueSelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.MetricContext != nil {
|
||||
sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName))
|
||||
sb.Where(sb.E("attr.1", fieldValueSelector.Name))
|
||||
}
|
||||
sb.Where(sb.NotLike("attr.1", "\\_\\_%"))
|
||||
|
||||
if fieldValueSelector.Value != "" {
|
||||
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
@ -1079,6 +1037,7 @@ func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValue
|
||||
sb.Where(sb.Like("attr_string_value", "%"+fieldValueSelector.Value+"%"))
|
||||
}
|
||||
}
|
||||
sb.Where(sb.NE("attr_string_value", ""))
|
||||
|
||||
if fieldValueSelector.Limit > 0 {
|
||||
sb.Limit(fieldValueSelector.Limit)
|
||||
@ -1096,8 +1055,9 @@ func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValue
|
||||
|
||||
values := &telemetrytypes.TelemetryFieldValues{}
|
||||
for rows.Next() {
|
||||
var attribute []any
|
||||
var stringValue string
|
||||
if err := rows.Scan(&stringValue); err != nil {
|
||||
if err := rows.Scan(&attribute, &stringValue); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
values.StringValues = append(values.StringValues, stringValue)
|
||||
@ -1187,11 +1147,11 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
|
||||
}
|
||||
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
metricsTemporality, err := t.fetchMetricsTemporality(ctx, t.metricsDBName, t.metricsFieldsTblName, metricNames...)
|
||||
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
meterMetricsTemporality, err := t.fetchMetricsTemporality(ctx, t.meterDBName, t.meterFieldsTblName, metricNames...)
|
||||
meterMetricsTemporality, err := t.fetchMeterMetricsTemporality(ctx, metricNames...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1212,7 +1172,7 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, database string, table string, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
|
||||
// Build query to fetch temporality for all metrics
|
||||
@ -1222,7 +1182,7 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, databa
|
||||
sb := sqlbuilder.Select(
|
||||
"metric_name",
|
||||
"argMax(temporality, last_reported_unix_milli) as temporality",
|
||||
).From(database + "." + table)
|
||||
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
|
||||
|
||||
// Filter by metric names (in the temporality column due to data mix-up)
|
||||
sb.Where(sb.In("metric_name", metricNames))
|
||||
@ -1266,3 +1226,54 @@ func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, databa
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) fetchMeterMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
|
||||
result := make(map[string]metrictypes.Temporality)
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
"metric_name",
|
||||
"argMax(temporality, unix_milli) as temporality",
|
||||
).From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
// Filter by metric names (in the temporality column due to data mix-up)
|
||||
sb.Where(sb.In("metric_name", metricNames))
|
||||
|
||||
// Group by metric name to get one temporality per metric
|
||||
sb.GroupBy("metric_name")
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Process results
|
||||
for rows.Next() {
|
||||
var metricName, temporalityStr string
|
||||
if err := rows.Scan(&metricName, &temporalityStr); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
|
||||
}
|
||||
|
||||
// Convert string to Temporality type
|
||||
var temporality metrictypes.Temporality
|
||||
switch temporalityStr {
|
||||
case "Delta":
|
||||
temporality = metrictypes.Delta
|
||||
case "Cumulative":
|
||||
temporality = metrictypes.Cumulative
|
||||
case "Unspecified":
|
||||
temporality = metrictypes.Unspecified
|
||||
default:
|
||||
// Unknown or empty temporality
|
||||
temporality = metrictypes.Unknown
|
||||
}
|
||||
|
||||
result[metricName] = temporality
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ func TestGetKeys(t *testing.T) {
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -5,13 +5,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DBName = "signoz_meter"
|
||||
SamplesTableName = "distributed_samples"
|
||||
SamplesLocalTableName = "samples"
|
||||
SamplesV4Agg1dTableName = "distributed_samples_agg_1d"
|
||||
SamplesV4Agg1dLocalTableName = "samples_agg_1d"
|
||||
AttributesMetadataTableName = "distributed_metadata"
|
||||
AttributesMetadataLocalTableName = "metadata"
|
||||
DBName = "signoz_meter"
|
||||
SamplesTableName = "distributed_samples"
|
||||
SamplesLocalTableName = "samples"
|
||||
SamplesV4Agg1dTableName = "distributed_samples_agg_1d"
|
||||
SamplesV4Agg1dLocalTableName = "samples_agg_1d"
|
||||
)
|
||||
|
||||
func AggregationColumnForSamplesTable(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user