mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-24 02:46:27 +00:00
feat(telemetry/meter): added metadata setup for meter
This commit is contained in:
parent
71e17a760c
commit
e7a5266cd3
@ -9,6 +9,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrytraces"
|
||||
@ -33,6 +34,8 @@ func NewAPI(
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -171,17 +171,21 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
event.MetricsUsed = true
|
||||
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
||||
event.GroupByApplied = len(spec.GroupBy) > 0
|
||||
if spec.StepInterval.Seconds() == 0 {
|
||||
spec.StepInterval = qbtypes.Step{
|
||||
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
|
||||
}
|
||||
}
|
||||
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
|
||||
spec.StepInterval = qbtypes.Step{
|
||||
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
|
||||
}
|
||||
}
|
||||
|
||||
if spec.Signal == telemetrytypes.SignalMeter {
|
||||
spec.StepInterval = qbtypes.Step{Duration: time.Hour * 24}
|
||||
} else {
|
||||
if spec.StepInterval.Seconds() == 0 {
|
||||
spec.StepInterval = qbtypes.Step{
|
||||
Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
|
||||
}
|
||||
}
|
||||
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
|
||||
spec.StepInterval = qbtypes.Step{
|
||||
Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
|
||||
}
|
||||
}
|
||||
}
|
||||
req.CompositeQuery.Queries[idx].Spec = spec
|
||||
}
|
||||
} else if query.Type == qbtypes.QueryTypePromQL {
|
||||
|
||||
@ -53,6 +53,8 @@ func newProvider(
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -25,6 +25,7 @@ var (
|
||||
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
|
||||
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
|
||||
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
|
||||
ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
|
||||
ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values")
|
||||
)
|
||||
|
||||
@ -36,6 +37,8 @@ type telemetryMetaStore struct {
|
||||
indexV3TblName string
|
||||
metricsDBName string
|
||||
metricsFieldsTblName string
|
||||
meterDBName string
|
||||
meterFieldsTblName string
|
||||
logsDBName string
|
||||
logsFieldsTblName string
|
||||
logsV2TblName string
|
||||
@ -54,6 +57,8 @@ func NewTelemetryMetaStore(
|
||||
indexV3TblName string,
|
||||
metricsDBName string,
|
||||
metricsFieldsTblName string,
|
||||
meterDBName string,
|
||||
meterFieldsTblName string,
|
||||
logsDBName string,
|
||||
logsV2TblName string,
|
||||
logsFieldsTblName string,
|
||||
@ -70,6 +75,8 @@ func NewTelemetryMetaStore(
|
||||
indexV3TblName: indexV3TblName,
|
||||
metricsDBName: metricsDBName,
|
||||
metricsFieldsTblName: metricsFieldsTblName,
|
||||
meterDBName: meterDBName,
|
||||
meterFieldsTblName: meterFieldsTblName,
|
||||
logsDBName: logsDBName,
|
||||
logsV2TblName: logsV2TblName,
|
||||
logsFieldsTblName: logsFieldsTblName,
|
||||
@ -550,6 +557,96 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
|
||||
func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
sb := sqlbuilder.
|
||||
Select("attr_name as name", "attr_type as field_context", "attr_datatype as field_data_type", `
|
||||
CASE
|
||||
WHEN attr_type = 'resource' THEN 1
|
||||
WHEN attr_type = 'scope' THEN 2
|
||||
WHEN attr_type = 'point' THEN 3
|
||||
ELSE 4
|
||||
END as priority`).
|
||||
From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
var limit int
|
||||
|
||||
conds := []string{}
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
fieldConds := []string{}
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name))
|
||||
} else {
|
||||
fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%"))
|
||||
}
|
||||
fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
|
||||
|
||||
// note: type and datatype do not have much significance in metrics
|
||||
|
||||
// if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
// fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType()))
|
||||
// }
|
||||
|
||||
// if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
// fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType()))
|
||||
// }
|
||||
|
||||
if fieldKeySelector.MetricContext != nil {
|
||||
fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
|
||||
}
|
||||
|
||||
conds = append(conds, sb.And(fieldConds...))
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
sb.Where(sb.Or(conds...))
|
||||
|
||||
if limit == 0 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
mainSb := sqlbuilder.Select("name", "field_context", "field_data_type", "max(priority) as priority")
|
||||
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
|
||||
mainSb.GroupBy("name", "field_context", "field_data_type")
|
||||
mainSb.OrderBy("priority")
|
||||
mainSb.Limit(limit)
|
||||
|
||||
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
keys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var fieldContext telemetrytypes.FieldContext
|
||||
var fieldDataType telemetrytypes.FieldDataType
|
||||
var priority uint8
|
||||
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
FieldContext: fieldContext,
|
||||
FieldDataType: fieldDataType,
|
||||
})
|
||||
}
|
||||
|
||||
if rows.Err() != nil {
|
||||
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, error) {
|
||||
var keys []*telemetrytypes.TelemetryFieldKey
|
||||
var err error
|
||||
@ -566,6 +663,8 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
|
||||
keys, err = t.getLogsKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalMetrics:
|
||||
keys, err = t.getMetricsKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalMeter:
|
||||
keys, err = t.getMeterKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalUnspecified:
|
||||
// get traces keys
|
||||
tracesKeys, err := t.getTracesKeys(ctx, selectors)
|
||||
@ -952,6 +1051,68 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func (t *telemetryMetaStore) getMeterFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, error) {
|
||||
sb := sqlbuilder.
|
||||
Select("DISTINCT attr_string_value").
|
||||
From(t.meterDBName + "." + t.meterFieldsTblName)
|
||||
|
||||
if fieldValueSelector.Name != "" {
|
||||
sb.Where(sb.E("attr_name", fieldValueSelector.Name))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
|
||||
sb.Where(sb.E("attr_type", fieldValueSelector.FieldContext.TagType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
|
||||
sb.Where(sb.E("attr_datatype", fieldValueSelector.FieldDataType.TagDataType()))
|
||||
}
|
||||
|
||||
if fieldValueSelector.MetricContext != nil {
|
||||
sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName))
|
||||
}
|
||||
|
||||
if fieldValueSelector.StartUnixMilli > 0 {
|
||||
sb.Where(sb.GE("last_reported_unix_milli", fieldValueSelector.StartUnixMilli))
|
||||
}
|
||||
|
||||
if fieldValueSelector.EndUnixMilli > 0 {
|
||||
sb.Where(sb.LE("first_reported_unix_milli", fieldValueSelector.EndUnixMilli))
|
||||
}
|
||||
|
||||
if fieldValueSelector.Value != "" {
|
||||
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
sb.Where(sb.E("attr_string_value", fieldValueSelector.Value))
|
||||
} else {
|
||||
sb.Where(sb.Like("attr_string_value", "%"+fieldValueSelector.Value+"%"))
|
||||
}
|
||||
}
|
||||
|
||||
if fieldValueSelector.Limit > 0 {
|
||||
sb.Limit(fieldValueSelector.Limit)
|
||||
} else {
|
||||
sb.Limit(50)
|
||||
}
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
values := &telemetrytypes.TelemetryFieldValues{}
|
||||
for rows.Next() {
|
||||
var stringValue string
|
||||
if err := rows.Scan(&stringValue); err != nil {
|
||||
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
values.StringValues = append(values.StringValues, stringValue)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues) {
|
||||
for _, value := range values.StringValues {
|
||||
if _, ok := mapOfValues[value]; !ok {
|
||||
@ -984,6 +1145,8 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
|
||||
values, err = t.getLogFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalMetrics:
|
||||
values, err = t.getMetricFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalMeter:
|
||||
values, err = t.getMeterFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalUnspecified:
|
||||
mapOfValues := make(map[any]bool)
|
||||
mapOfRelatedValues := make(map[any]bool)
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymeter"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
|
||||
@ -42,6 +43,8 @@ func TestGetKeys(t *testing.T) {
|
||||
telemetrytraces.SpanIndexV3TableName,
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.AttributesMetadataTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -93,7 +93,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "signoz_calls_total",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user