1550 lines
51 KiB
Go

package telemetrymetadata
import (
"context"
"fmt"
"log/slog"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
"golang.org/x/exp/maps"
)
var (
ErrFailedToGetTracesKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get traces keys")
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
ErrFailedToGetMeterValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter values")
ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values")
)
type telemetryMetaStore struct {
logger *slog.Logger
telemetrystore telemetrystore.TelemetryStore
tracesDBName string
tracesFieldsTblName string
spanAttributesKeysTblName string
indexV3TblName string
metricsDBName string
metricsFieldsTblName string
meterDBName string
meterFieldsTblName string
logsDBName string
logsFieldsTblName string
logAttributeKeysTblName string
logResourceKeysTblName string
logsV2TblName string
relatedMetadataDBName string
relatedMetadataTblName string
fm qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
}
func escapeForLike(s string) string {
return strings.ReplaceAll(strings.ReplaceAll(s, `_`, `\_`), `%`, `\%`)
}
func NewTelemetryMetaStore(
settings factory.ProviderSettings,
telemetrystore telemetrystore.TelemetryStore,
tracesDBName string,
tracesFieldsTblName string,
spanAttributesKeysTblName string,
indexV3TblName string,
metricsDBName string,
metricsFieldsTblName string,
meterDBName string,
meterFieldsTblName string,
logsDBName string,
logsV2TblName string,
logsFieldsTblName string,
logAttributeKeysTblName string,
logResourceKeysTblName string,
relatedMetadataDBName string,
relatedMetadataTblName string,
) telemetrytypes.MetadataStore {
metadataSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetadata")
t := &telemetryMetaStore{
logger: metadataSettings.Logger(),
telemetrystore: telemetrystore,
tracesDBName: tracesDBName,
tracesFieldsTblName: tracesFieldsTblName,
spanAttributesKeysTblName: spanAttributesKeysTblName,
indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName,
meterDBName: meterDBName,
meterFieldsTblName: meterFieldsTblName,
logsDBName: logsDBName,
logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName,
logAttributeKeysTblName: logAttributeKeysTblName,
logResourceKeysTblName: logResourceKeysTblName,
relatedMetadataDBName: relatedMetadataDBName,
relatedMetadataTblName: relatedMetadataTblName,
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
t.fm = fm
t.conditionBuilder = conditionBuilder
return t
}
// tracesTblStatementToFieldKeys returns materialised attribute/resource/scope keys from the traces table
func (t *telemetryMetaStore) tracesTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.tracesDBName, t.indexV3TblName)
statements := []telemetrytypes.ShowCreateTableStatement{}
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
}
materialisedKeys, err := ExtractFieldKeysFromTblStatement(statements[0].Statement)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
for idx := range materialisedKeys {
materialisedKeys[idx].Signal = telemetrytypes.SignalTraces
}
return materialisedKeys, nil
}
// getTracesKeys returns the keys from the spans that match the field selection criteria
func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
// pre-fetch the materialised keys from the traces table
matKeys, err := t.tracesTblStatementToFieldKeys(ctx)
if err != nil {
return nil, false, err
}
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
sb := sqlbuilder.Select(
"tagKey AS tag_key",
"tagType AS tag_type",
"dataType AS tag_data_type",
`CASE
// WHEN tagType = 'spanfield' THEN 1
WHEN tagType = 'resource' THEN 2
// WHEN tagType = 'scope' THEN 3
WHEN tagType = 'tag' THEN 4
ELSE 5
END as priority`,
).From(t.tracesDBName + "." + t.spanAttributesKeysTblName)
var limit int
searchTexts := []string{}
dataTypes := []telemetrytypes.FieldDataType{}
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
// TODO(srikanthccv): support time filtering for span attribute keys
// if fieldKeySelector.StartUnixMilli != 0 {
// conds = append(conds, sb.GE("unix_milli", fieldKeySelector.StartUnixMilli))
// }
// if fieldKeySelector.EndUnixMilli != 0 {
// conds = append(conds, sb.LE("unix_milli", fieldKeySelector.EndUnixMilli))
// }
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("tagKey", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("tagKey", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
searchTexts = append(searchTexts, fieldKeySelector.Name)
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
}
// now look at the field context
// we don't write most of intrinsic fields to keys table
// for this reason we don't want to apply tagType if the field context
// is not attribute or resource attribute
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
(fieldKeySelector.FieldContext == telemetrytypes.FieldContextAttribute ||
fieldKeySelector.FieldContext == telemetrytypes.FieldContextResource) {
fieldKeyConds = append(fieldKeyConds, sb.E("tagType", fieldKeySelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("dataType", fieldKeySelector.FieldDataType.TagDataType()))
}
conds = append(conds, sb.And(fieldKeyConds...))
limit += fieldKeySelector.Limit
}
// the span_attribute_keys has historically pushed the top level column as attributes
sb.Where(sb.Or(conds...)).Where("isColumn = false")
sb.GroupBy("tagKey", "tagType", "dataType")
if limit == 0 {
limit = 1000
}
mainSb := sqlbuilder.Select("tag_key", "tag_type", "tag_data_type", "max(priority) as priority")
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
mainSb.GroupBy("tag_key", "tag_type", "tag_data_type")
mainSb.OrderBy("priority")
// query one extra to check if we hit the limit
mainSb.Limit(limit + 1)
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
// if there is no materialised column, create a key with the field context and data type
if !ok {
key = &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalTraces,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
}
}
keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
}
if rows.Err() != nil {
return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTracesKeys.Error())
}
// hit the limit? (only counting DB results)
complete := rowCount <= limit
staticKeys := []string{"isRoot", "isEntryPoint"}
staticKeys = append(staticKeys, maps.Keys(telemetrytraces.IntrinsicFields)...)
staticKeys = append(staticKeys, maps.Keys(telemetrytraces.CalculatedFields)...)
// Add matching intrinsic and matching calculated fields
// These don't count towards the limit
for _, key := range staticKeys {
found := false
for _, v := range searchTexts {
if v == "" || strings.Contains(key, v) {
found = true
break
}
}
// skip the keys that don't match data type
if field, exists := telemetrytraces.IntrinsicFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if field, exists := telemetrytraces.CalculatedFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if found {
if field, exists := telemetrytraces.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
if field, exists := telemetrytraces.CalculatedFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: key,
FieldContext: telemetrytypes.FieldContextSpan,
Signal: telemetrytypes.SignalTraces,
})
}
}
return keys, complete, nil
}
// logsTblStatementToFieldKeys returns materialised attribute/resource/scope keys from the logs table
func (t *telemetryMetaStore) logsTblStatementToFieldKeys(ctx context.Context) ([]*telemetrytypes.TelemetryFieldKey, error) {
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", t.logsDBName, t.logsV2TblName)
statements := []telemetrytypes.ShowCreateTableStatement{}
err := t.telemetrystore.ClickhouseDB().Select(ctx, &statements, query)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetTblStatement.Error())
}
materialisedKeys, err := ExtractFieldKeysFromTblStatement(statements[0].Statement)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
for idx := range materialisedKeys {
materialisedKeys[idx].Signal = telemetrytypes.SignalLogs
}
return materialisedKeys, nil
}
// getLogsKeys returns the keys from the spans that match the field selection criteria
func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
// pre-fetch the materialised keys from the logs table
matKeys, err := t.logsTblStatementToFieldKeys(ctx)
if err != nil {
return nil, false, err
}
mapOfKeys := make(map[string]*telemetrytypes.TelemetryFieldKey)
for _, key := range matKeys {
mapOfKeys[key.Name+";"+key.FieldContext.StringValue()+";"+key.FieldDataType.StringValue()] = key
}
// queries for both attribute and resource keys tables
var queries []string
var allArgs []any
// tables to query based on field selectors
queryAttributeTable := false
queryResourceTable := false
for _, selector := range fieldKeySelectors {
if selector.FieldContext == telemetrytypes.FieldContextUnspecified {
// unspecified context, query both tables
queryAttributeTable = true
queryResourceTable = true
break
} else if selector.FieldContext == telemetrytypes.FieldContextAttribute {
queryAttributeTable = true
} else if selector.FieldContext == telemetrytypes.FieldContextResource {
queryResourceTable = true
}
}
tablesToQuery := []struct {
fieldContext telemetrytypes.FieldContext
shouldQuery bool
}{
{telemetrytypes.FieldContextAttribute, queryAttributeTable},
{telemetrytypes.FieldContextResource, queryResourceTable},
}
for _, table := range tablesToQuery {
if !table.shouldQuery {
continue
}
fieldContext := table.fieldContext
// table name based on field context
var tblName string
if fieldContext == telemetrytypes.FieldContextAttribute {
tblName = t.logsDBName + "." + t.logAttributeKeysTblName
} else {
tblName = t.logsDBName + "." + t.logResourceKeysTblName
}
sb := sqlbuilder.Select(
"name AS tag_key",
fmt.Sprintf("'%s' AS tag_type", fieldContext.TagType()),
"lower(datatype) AS tag_data_type", // in logs, we had some historical data with capital and small case
fmt.Sprintf(`%d AS priority`, getPriorityForContext(fieldContext)),
).From(tblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
// Include this selector if:
// 1. It has unspecified context (matches all tables)
// 2. Its context matches the current table's context
if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified &&
fieldKeySelector.FieldContext != fieldContext {
continue
}
// key part of the selector
fieldKeyConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldKeyConds = append(fieldKeyConds, sb.E("name", fieldKeySelector.Name))
} else {
fieldKeyConds = append(fieldKeyConds, sb.ILike("name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
// now look at the field data type
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
fieldKeyConds = append(fieldKeyConds, sb.E("datatype", fieldKeySelector.FieldDataType.TagDataType()))
}
if len(fieldKeyConds) > 0 {
conds = append(conds, sb.And(fieldKeyConds...))
}
limit += fieldKeySelector.Limit
}
if len(conds) > 0 {
sb.Where(sb.Or(conds...))
}
sb.GroupBy("name", "datatype")
if limit == 0 {
limit = 1000
}
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
queries = append(queries, query)
allArgs = append(allArgs, args...)
}
if len(queries) == 0 {
// No matching contexts, return empty result
return []*telemetrytypes.TelemetryFieldKey{}, true, nil
}
// Combine queries with UNION ALL
var limit int
for _, fieldKeySelector := range fieldKeySelectors {
limit += fieldKeySelector.Limit
}
if limit == 0 {
limit = 1000
}
mainQuery := fmt.Sprintf(`
SELECT tag_key, tag_type, tag_data_type, max(priority) as priority
FROM (
%s
) AS combined_results
GROUP BY tag_key, tag_type, tag_data_type
ORDER BY priority
LIMIT %d
`, strings.Join(queries, " UNION ALL "), limit+1)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, mainQuery, allArgs...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
searchTexts := []string{}
dataTypes := []telemetrytypes.FieldDataType{}
// Collect search texts and data types for static field matching
for _, fieldKeySelector := range fieldKeySelectors {
searchTexts = append(searchTexts, fieldKeySelector.Name)
if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
dataTypes = append(dataTypes, fieldKeySelector.FieldDataType)
}
}
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
key, ok := mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()]
// if there is no materialised column, create a key with the field context and data type
if !ok {
key = &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalLogs,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
}
}
keys = append(keys, key)
mapOfKeys[name+";"+fieldContext.StringValue()+";"+fieldDataType.StringValue()] = key
}
if rows.Err() != nil {
return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
// hit the limit? (only counting DB results)
complete := rowCount <= limit
staticKeys := []string{}
staticKeys = append(staticKeys, maps.Keys(telemetrylogs.IntrinsicFields)...)
// Add matching intrinsic and matching calculated fields
// These don't count towards the limit
for _, key := range staticKeys {
found := false
for _, v := range searchTexts {
if v == "" || strings.Contains(key, v) {
found = true
break
}
}
// skip the keys that don't match data type
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if len(dataTypes) > 0 &&
slices.Index(dataTypes, field.FieldDataType) == -1 &&
field.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
continue
}
}
if found {
if field, exists := telemetrylogs.IntrinsicFields[key]; exists {
if _, added := mapOfKeys[field.Name+";"+field.FieldContext.StringValue()+";"+field.FieldDataType.StringValue()]; !added {
keys = append(keys, &field)
}
continue
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: key,
FieldContext: telemetrytypes.FieldContextLog,
Signal: telemetrytypes.SignalLogs,
})
}
}
return keys, complete, nil
}
func getPriorityForContext(ctx telemetrytypes.FieldContext) int {
switch ctx {
case telemetrytypes.FieldContextLog:
return 1
case telemetrytypes.FieldContextResource:
return 2
case telemetrytypes.FieldContextScope:
return 3
case telemetrytypes.FieldContextAttribute:
return 4
default:
return 5
}
}
// getMetricsKeys returns the keys from the metrics that match the field selection criteria
func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
sb := sqlbuilder.
Select("attr_name as name", "attr_type as field_context", "attr_datatype as field_data_type", `
CASE
WHEN attr_type = 'resource' THEN 1
WHEN attr_type = 'scope' THEN 2
WHEN attr_type = 'point' THEN 3
ELSE 4
END as priority`).
From(t.metricsDBName + "." + t.metricsFieldsTblName)
var limit int
conds := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
fieldConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name))
} else {
fieldConds = append(fieldConds, sb.ILike("attr_name", "%"+escapeForLike(fieldKeySelector.Name)+"%"))
}
fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
// note: type and datatype do not have much significance in metrics
// if fieldKeySelector.FieldContext != telemetrytypes.FieldContextUnspecified {
// fieldConds = append(fieldConds, sb.E("attr_type", fieldKeySelector.FieldContext.TagType()))
// }
// if fieldKeySelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
// fieldConds = append(fieldConds, sb.E("attr_datatype", fieldKeySelector.FieldDataType.TagDataType()))
// }
if fieldKeySelector.MetricContext != nil {
fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
}
conds = append(conds, sb.And(fieldConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
sb.GroupBy("name", "field_context", "field_data_type")
if limit == 0 {
limit = 1000
}
mainSb := sqlbuilder.Select("name", "field_context", "field_data_type", "max(priority) as priority")
mainSb.From(mainSb.BuilderAs(sb, "sub_query"))
mainSb.GroupBy("name", "field_context", "field_data_type")
mainSb.OrderBy("priority")
// query one extra to check if we hit the limit
mainSb.Limit(limit + 1)
query, args := mainSb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var name string
var fieldContext telemetrytypes.FieldContext
var fieldDataType telemetrytypes.FieldDataType
var priority uint8
err = rows.Scan(&name, &fieldContext, &fieldDataType, &priority)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalMetrics,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
})
}
if rows.Err() != nil {
return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
// hit the limit?
complete := rowCount <= limit
return keys, complete, nil
}
// getMeterKeys returns the keys from the meter metrics that match the field selection criteria
func (t *telemetryMetaStore) getMeterSourceMetricKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
if len(fieldKeySelectors) == 0 {
return nil, true, nil
}
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName)
conds := []string{}
var limit int
for _, fieldKeySelector := range fieldKeySelectors {
fieldConds := []string{}
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name))
} else {
fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%"))
}
fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
if fieldKeySelector.MetricContext != nil {
fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
}
conds = append(conds, sb.And(fieldConds...))
limit += fieldKeySelector.Limit
}
sb.Where(sb.Or(conds...))
if limit == 0 {
limit = 1000
}
sb.Limit(limit)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
defer rows.Close()
keys := []*telemetrytypes.TelemetryFieldKey{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var name string
err = rows.Scan(&name)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
Name: name,
Signal: telemetrytypes.SignalMetrics,
})
}
if rows.Err() != nil {
return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
}
// hit the limit?
complete := rowCount <= limit
return keys, complete, nil
}
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
var keys []*telemetrytypes.TelemetryFieldKey
var complete bool = true
var err error
selectors := []*telemetrytypes.FieldKeySelector{}
if fieldKeySelector != nil {
selectors = []*telemetrytypes.FieldKeySelector{fieldKeySelector}
}
switch fieldKeySelector.Signal {
case telemetrytypes.SignalTraces:
keys, complete, err = t.getTracesKeys(ctx, selectors)
case telemetrytypes.SignalLogs:
keys, complete, err = t.getLogsKeys(ctx, selectors)
case telemetrytypes.SignalMetrics:
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
} else {
keys, complete, err = t.getMetricsKeys(ctx, selectors)
}
case telemetrytypes.SignalUnspecified:
// get traces keys
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, selectors)
if err != nil {
return nil, false, err
}
keys = append(keys, tracesKeys...)
// get logs keys
logsKeys, logsComplete, err := t.getLogsKeys(ctx, selectors)
if err != nil {
return nil, false, err
}
keys = append(keys, logsKeys...)
// get metrics keys
metricsKeys, metricsComplete, err := t.getMetricsKeys(ctx, selectors)
if err != nil {
return nil, false, err
}
keys = append(keys, metricsKeys...)
complete = tracesComplete && logsComplete && metricsComplete
}
if err != nil {
return nil, false, err
}
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range keys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, complete, nil
}
func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
logsSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal {
case telemetrytypes.SignalLogs:
logsSelectors = append(logsSelectors, fieldKeySelector)
case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics:
if fieldKeySelector.Source == telemetrytypes.SourceMeter {
meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector)
} else {
metricsSelectors = append(metricsSelectors, fieldKeySelector)
}
case telemetrytypes.SignalUnspecified:
logsSelectors = append(logsSelectors, fieldKeySelector)
tracesSelectors = append(tracesSelectors, fieldKeySelector)
metricsSelectors = append(metricsSelectors, fieldKeySelector)
}
}
logsKeys, logsComplete, err := t.getLogsKeys(ctx, logsSelectors)
if err != nil {
return nil, false, err
}
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, tracesSelectors)
if err != nil {
return nil, false, err
}
metricsKeys, metricsComplete, err := t.getMetricsKeys(ctx, metricsSelectors)
if err != nil {
return nil, false, err
}
meterSourceMetricsKeys, _, err := t.getMeterSourceMetricKeys(ctx, meterSourceMetricsSelectors)
if err != nil {
return nil, false, err
}
// Complete only if all queries are complete
complete := logsComplete && tracesComplete && metricsComplete
mapOfKeys := make(map[string][]*telemetrytypes.TelemetryFieldKey)
for _, key := range logsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range tracesKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range metricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
for _, key := range meterSourceMetricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
return mapOfKeys, complete, nil
}
func (t *telemetryMetaStore) GetKey(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, error) {
keys, _, err := t.GetKeys(ctx, fieldKeySelector)
if err != nil {
return nil, err
}
return keys[fieldKeySelector.Name], nil
}
func (t *telemetryMetaStore) getRelatedValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) ([]string, bool, error) {
// nothing to return as "related" value if there is nothing to filter on
if fieldValueSelector.ExistingQuery == "" {
return nil, true, nil
}
key := &telemetrytypes.TelemetryFieldKey{
Name: fieldValueSelector.Name,
Signal: fieldValueSelector.Signal,
FieldContext: fieldValueSelector.FieldContext,
FieldDataType: fieldValueSelector.FieldDataType,
}
selectColumn, err := t.fm.FieldFor(ctx, key)
if err != nil {
// we don't have a explicit column to select from the related metadata table
// so we will select either from resource_attributes or attributes table
// in that order
resourceColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
Name: key.Name,
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
})
attributeColumn, _ := t.fm.FieldFor(ctx, &telemetrytypes.TelemetryFieldKey{
Name: key.Name,
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
})
selectColumn = fmt.Sprintf("if(notEmpty(%s), %s, %s)", resourceColumn, resourceColumn, attributeColumn)
}
sb := sqlbuilder.Select("DISTINCT " + selectColumn).From(t.relatedMetadataDBName + "." + t.relatedMetadataTblName)
if len(fieldValueSelector.ExistingQuery) != 0 {
keySelectors := querybuilder.QueryStringToKeysSelectors(fieldValueSelector.ExistingQuery)
for _, keySelector := range keySelectors {
keySelector.Signal = fieldValueSelector.Signal
}
keys, _, err := t.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, false, err
}
whereClause, err := querybuilder.PrepareWhereClause(fieldValueSelector.ExistingQuery, querybuilder.FilterExprVisitorOpts{
Logger: t.logger,
FieldMapper: t.fm,
ConditionBuilder: t.conditionBuilder,
FieldKeys: keys,
})
if err == nil {
sb.AddWhereClause(whereClause.WhereClause)
} else {
t.logger.WarnContext(ctx, "error parsing existing query for related values", "error", err)
}
}
if fieldValueSelector.StartUnixMilli != 0 {
sb.Where(sb.GE("unix_milli", fieldValueSelector.StartUnixMilli))
}
if fieldValueSelector.EndUnixMilli != 0 {
sb.Where(sb.LE("unix_milli", fieldValueSelector.EndUnixMilli))
}
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "query for related values", "query", query, "args", args)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, ErrFailedToGetRelatedValues
}
defer rows.Close()
var attributeValues []string
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var value string
if err := rows.Scan(&value); err != nil {
return nil, false, ErrFailedToGetRelatedValues
}
if value != "" {
attributeValues = append(attributeValues, value)
}
}
// hit the limit?
complete := rowCount <= limit
return attributeValues, complete, nil
}
func (t *telemetryMetaStore) GetRelatedValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) ([]string, bool, error) {
return t.getRelatedValues(ctx, fieldValueSelector)
}
func (t *telemetryMetaStore) getSpanFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
// build the query to get the keys from the spans that match the field selection criteria
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.tracesDBName + "." + t.tracesFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
}
// now look at the field context
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
}
// now look at the field data type
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeString {
sb.Where(sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeNumber {
sb.Where(sb.IsNotNull("number_value"))
sb.Where(sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeUnspecified {
// or b/w string and number
sb.Where(sb.Or(
sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
))
}
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
seen := make(map[string]bool)
rowCount := 0
totalCount := 0 // Track total unique values
for rows.Next() {
rowCount++
var stringValue string
var numberValue float64
if err := rows.Scan(&stringValue, &numberValue); err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
// Only add values if we haven't hit the limit yet
if totalCount < limit {
if _, ok := seen[stringValue]; !ok && stringValue != "" {
values.StringValues = append(values.StringValues, stringValue)
seen[stringValue] = true
totalCount++
}
if _, ok := seen[fmt.Sprintf("%f", numberValue)]; !ok && numberValue != 0 && totalCount < limit {
values.NumberValues = append(values.NumberValues, numberValue)
seen[fmt.Sprintf("%f", numberValue)] = true
totalCount++
}
}
}
// hit the limit?
complete := rowCount <= limit
return values, complete, nil
}
func (t *telemetryMetaStore) getLogFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
// build the query to get the keys from the spans that match the field selection criteria
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
sb := sqlbuilder.Select("DISTINCT string_value, number_value").From(t.logsDBName + "." + t.logsFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("tag_key", fieldValueSelector.Name))
}
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("tag_type", fieldValueSelector.FieldContext.TagType()))
}
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("tag_data_type", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeString {
sb.Where(sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeNumber {
sb.Where(sb.IsNotNull("number_value"))
sb.Where(sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
} else if fieldValueSelector.FieldDataType == telemetrytypes.FieldDataTypeUnspecified {
// or b/w string and number
sb.Where(sb.Or(
sb.ILike("string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
sb.ILike("toString(number_value)", "%"+escapeForLike(fieldValueSelector.Value)+"%"),
))
}
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
seen := make(map[string]bool)
rowCount := 0
totalCount := 0 // Track total unique values
for rows.Next() {
rowCount++
var stringValue string
var numberValue float64
if err := rows.Scan(&stringValue, &numberValue); err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetLogsKeys.Error())
}
// Only add values if we haven't hit the limit yet
if totalCount < limit {
if _, ok := seen[stringValue]; !ok && stringValue != "" {
values.StringValues = append(values.StringValues, stringValue)
seen[stringValue] = true
totalCount++
}
if _, ok := seen[fmt.Sprintf("%f", numberValue)]; !ok && numberValue != 0 && totalCount < limit {
values.NumberValues = append(values.NumberValues, numberValue)
seen[fmt.Sprintf("%f", numberValue)] = true
totalCount++
}
}
}
// hit the limit?
complete := rowCount <= limit
return values, complete, nil
}
// getMetricFieldValues returns field values and whether the result is complete
func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
sb := sqlbuilder.
Select("DISTINCT attr_string_value").
From(t.metricsDBName + "." + t.metricsFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("attr_name", fieldValueSelector.Name))
}
if fieldValueSelector.FieldContext != telemetrytypes.FieldContextUnspecified {
sb.Where(sb.E("attr_type", fieldValueSelector.FieldContext.TagType()))
}
if fieldValueSelector.FieldDataType != telemetrytypes.FieldDataTypeUnspecified {
sb.Where(sb.E("attr_datatype", fieldValueSelector.FieldDataType.TagDataType()))
}
if fieldValueSelector.MetricContext != nil {
sb.Where(sb.E("metric_name", fieldValueSelector.MetricContext.MetricName))
}
if fieldValueSelector.StartUnixMilli > 0 {
sb.Where(sb.GE("last_reported_unix_milli", fieldValueSelector.StartUnixMilli))
}
if fieldValueSelector.EndUnixMilli > 0 {
sb.Where(sb.LE("first_reported_unix_milli", fieldValueSelector.EndUnixMilli))
}
if fieldValueSelector.Value != "" {
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
sb.Where(sb.E("attr_string_value", fieldValueSelector.Value))
} else {
sb.Where(sb.ILike("attr_string_value", "%"+escapeForLike(fieldValueSelector.Value)+"%"))
}
}
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var stringValue string
if err := rows.Scan(&stringValue); err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
}
values.StringValues = append(values.StringValues, stringValue)
}
// hit the limit?
complete := rowCount <= limit
return values, complete, nil
}
func (t *telemetryMetaStore) getMeterSourceMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr").
From(t.meterDBName + "." + t.meterFieldsTblName)
if fieldValueSelector.Name != "" {
sb.Where(sb.E("attr.1", fieldValueSelector.Name))
}
sb.Where(sb.NotLike("attr.1", "\\_\\_%"))
if fieldValueSelector.Value != "" {
if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
sb.Where(sb.E("attr.2", fieldValueSelector.Value))
} else {
sb.Where(sb.Like("attr.2", "%"+fieldValueSelector.Value+"%"))
}
}
sb.Where(sb.NE("attr.2", ""))
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
// query one extra to check if we hit the limit
sb.Limit(limit + 1)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
}
defer rows.Close()
values := &telemetrytypes.TelemetryFieldValues{}
rowCount := 0
for rows.Next() {
rowCount++
// reached the limit, we know there are more results
if rowCount > limit {
break
}
var attribute []string
if err := rows.Scan(&attribute); err != nil {
return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
}
if len(attribute) > 1 {
values.StringValues = append(values.StringValues, attribute[1])
}
}
// hit the limit?
complete := rowCount <= limit
return values, complete, nil
}
func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues, limit int) bool {
complete := true
totalCount := len(mapOfValues) + len(mapOfRelatedValues)
for _, value := range values.StringValues {
if totalCount >= limit {
complete = false
break
}
if _, ok := mapOfValues[value]; !ok {
mapOfValues[value] = true
allUnspecifiedValues.StringValues = append(allUnspecifiedValues.StringValues, value)
totalCount++
}
}
for _, value := range values.NumberValues {
if totalCount >= limit {
complete = false
break
}
if _, ok := mapOfValues[value]; !ok {
mapOfValues[value] = true
allUnspecifiedValues.NumberValues = append(allUnspecifiedValues.NumberValues, value)
totalCount++
}
}
for _, value := range values.RelatedValues {
if totalCount >= limit {
complete = false
break
}
if _, ok := mapOfRelatedValues[value]; !ok {
mapOfRelatedValues[value] = true
allUnspecifiedValues.RelatedValues = append(allUnspecifiedValues.RelatedValues, value)
totalCount++
}
}
return complete
}
// GetAllValues returns all values and whether the result is complete
func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
values := &telemetrytypes.TelemetryFieldValues{}
var complete bool = true
var err error
limit := fieldValueSelector.Limit
if limit == 0 {
limit = 50
}
switch fieldValueSelector.Signal {
case telemetrytypes.SignalTraces:
values, complete, err = t.getSpanFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalLogs:
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalMetrics:
if fieldValueSelector.Source == telemetrytypes.SourceMeter {
values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector)
} else {
values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector)
}
case telemetrytypes.SignalUnspecified:
mapOfValues := make(map[any]bool)
mapOfRelatedValues := make(map[any]bool)
allUnspecifiedValues := &telemetrytypes.TelemetryFieldValues{}
tracesValues, tracesComplete, err := t.getSpanFieldValues(ctx, fieldValueSelector)
if err == nil {
populateComplete := populateAllUnspecifiedValues(allUnspecifiedValues, mapOfValues, mapOfRelatedValues, tracesValues, limit)
complete = complete && tracesComplete && populateComplete
}
logsValues, logsComplete, err := t.getLogFieldValues(ctx, fieldValueSelector)
if err == nil {
populateComplete := populateAllUnspecifiedValues(allUnspecifiedValues, mapOfValues, mapOfRelatedValues, logsValues, limit)
complete = complete && logsComplete && populateComplete
}
metricsValues, metricsComplete, err := t.getMetricFieldValues(ctx, fieldValueSelector)
if err == nil {
populateComplete := populateAllUnspecifiedValues(allUnspecifiedValues, mapOfValues, mapOfRelatedValues, metricsValues, limit)
complete = complete && metricsComplete && populateComplete
}
values = allUnspecifiedValues
}
if err != nil {
return nil, false, err
}
return values, complete, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, metricName)
if err != nil {
return metrictypes.Unknown, err
}
temporality, ok := temporalityMap[metricName]
if !ok {
return metrictypes.Unknown, nil
}
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), nil
}
result := make(map[string]metrictypes.Temporality)
metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
if err != nil {
return nil, err
}
// TODO: return error after table migration are run
meterMetricsTemporality, _ := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...)
// For metrics not found in the database, set to Unknown
for _, metricName := range metricNames {
if temporality, exists := metricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
if temporality, exists := meterMetricsTemporality[metricName]; exists {
result[metricName] = temporality
continue
}
result[metricName] = metrictypes.Unknown
}
return result, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
// Build query to fetch temporality for all metrics
// We use attr_string_value where attr_name = '__temporality__'
// Note: The columns are mixed in the current data - temporality column contains metric_name
// and metric_name column contains temporality value, so we use the correct mapping
sb := sqlbuilder.Select(
"metric_name",
"argMax(temporality, last_reported_unix_milli) as temporality",
).From(t.metricsDBName + "." + t.metricsFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("metric_name", metricNames))
// Group by metric name to get one temporality per metric
sb.GroupBy("metric_name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching metric temporality", "query", query, "args", args)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName, temporalityStr string
if err := rows.Scan(&metricName, &temporalityStr); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
// Convert string to Temporality type
var temporality metrictypes.Temporality
switch temporalityStr {
case "Delta":
temporality = metrictypes.Delta
case "Cumulative":
temporality = metrictypes.Cumulative
case "Unspecified":
temporality = metrictypes.Unspecified
default:
// Unknown or empty temporality
temporality = metrictypes.Unknown
}
result[metricName] = temporality
}
return result, nil
}
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
sb := sqlbuilder.Select(
"metric_name",
"argMax(temporality, unix_milli) as temporality",
).From(t.meterDBName + "." + t.meterFieldsTblName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(sb.In("metric_name", metricNames))
// Group by metric name to get one temporality per metric
sb.GroupBy("metric_name")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName, temporalityStr string
if err := rows.Scan(&metricName, &temporalityStr); err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
// Convert string to Temporality type
var temporality metrictypes.Temporality
switch temporalityStr {
case "Delta":
temporality = metrictypes.Delta
case "Cumulative":
temporality = metrictypes.Cumulative
case "Unspecified":
temporality = metrictypes.Unspecified
default:
// Unknown or empty temporality
temporality = metrictypes.Unknown
}
result[metricName] = temporality
}
return result, nil
}