mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-25 03:16:53 +00:00
feat(telemetrymeter): deprecate the signal and use aggregation instead
This commit is contained in:
parent
254c7f8396
commit
9158b25d4d
@ -35,7 +35,7 @@ func NewAPI(
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrymeter.SamplesAgg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -172,7 +172,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
|
||||
event.GroupByApplied = len(spec.GroupBy) > 0
|
||||
|
||||
if spec.Signal == telemetrytypes.SignalMeter {
|
||||
if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") {
|
||||
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
|
||||
} else {
|
||||
if spec.StepInterval.Seconds() == 0 {
|
||||
@ -274,10 +274,9 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
|
||||
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
|
||||
var bq *builderQuery[qbtypes.MetricAggregation]
|
||||
|
||||
switch spec.Signal {
|
||||
case telemetrytypes.SignalMeter:
|
||||
if strings.HasPrefix(spec.Aggregations[0].MetricName, "signoz.meter") {
|
||||
bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
default:
|
||||
} else {
|
||||
bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
|
||||
}
|
||||
|
||||
|
||||
@ -54,7 +54,7 @@ func newProvider(
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrymeter.SamplesAgg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -2740,10 +2740,16 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org
|
||||
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||
}
|
||||
|
||||
meterAggregateAttributes, err := r.getMeterAggregateAttributes(ctx, orgID, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response.AttributeKeys = append(response.AttributeKeys, meterAggregateAttributes.AttributeKeys...)
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
|
||||
func (r *ClickHouseReader) getMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
|
||||
var response v3.AggregateAttributeResponse
|
||||
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
|
||||
query := fmt.Sprintf(
|
||||
@ -2792,7 +2798,6 @@ func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgI
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
|
||||
var query string
|
||||
var err error
|
||||
var rows driver.Rows
|
||||
@ -2829,6 +2834,47 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
|
||||
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||
}
|
||||
|
||||
meterKeys, err := r.getMeterAttributeKeys(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response.AttributeKeys = append(response.AttributeKeys, meterKeys.AttributeKeys...)
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) getMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
|
||||
var query string
|
||||
var err error
|
||||
var rows driver.Rows
|
||||
var response v3.FilterAttributeKeyResponse
|
||||
|
||||
// skips the internal attributes i.e attributes starting with __
|
||||
query = fmt.Sprintf("SELECT DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name FROM %s.%s WHERE metric_name=$1 AND attr_name ILIKE $2 AND attr_name NOT LIKE '\\_\\_%%'", signozMeterDBName, signozMeterSamplesName)
|
||||
if req.Limit != 0 {
|
||||
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
|
||||
}
|
||||
rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText))
|
||||
if err != nil {
|
||||
zap.L().Error("Error while executing query", zap.Error(err))
|
||||
return nil, fmt.Errorf("error while executing query: %s", err.Error())
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var attributeKey string
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&attributeKey); err != nil {
|
||||
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
|
||||
}
|
||||
key := v3.AttributeKey{
|
||||
Key: attributeKey,
|
||||
DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: false,
|
||||
}
|
||||
response.AttributeKeys = append(response.AttributeKeys, key)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
|
||||
@ -4213,8 +4213,6 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
|
||||
switch req.DataSource {
|
||||
case v3.DataSourceMetrics:
|
||||
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), orgID, req, false)
|
||||
case v3.DataSourceMeter:
|
||||
response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req)
|
||||
case v3.DataSourceLogs:
|
||||
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
|
||||
case v3.DataSourceTraces:
|
||||
|
||||
@ -50,7 +50,6 @@ type Reader interface {
|
||||
|
||||
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
|
||||
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
|
||||
GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||
|
||||
|
||||
@ -22,12 +22,11 @@ const (
|
||||
DataSourceTraces DataSource = "traces"
|
||||
DataSourceLogs DataSource = "logs"
|
||||
DataSourceMetrics DataSource = "metrics"
|
||||
DataSourceMeter DataSource = "meter"
|
||||
)
|
||||
|
||||
func (d DataSource) Validate() error {
|
||||
switch d {
|
||||
case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter:
|
||||
case DataSourceTraces, DataSourceLogs, DataSourceMetrics:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("invalid data source: %s", d)
|
||||
|
||||
@ -68,12 +68,21 @@ func RecommendedStepIntervalForMeter(start, end uint64) uint64 {
|
||||
step := (end - start) / RecommendedNumberOfPoints / 1e9
|
||||
|
||||
// for meter queries the minimum step interval allowed is 1 day as this is our granularity
|
||||
if step < 86400 {
|
||||
return 86400
|
||||
if step < 3600 {
|
||||
return 3600
|
||||
}
|
||||
|
||||
// return the nearest lower multiple of 86400 ( 1 day )
|
||||
recommended := step - step%86400
|
||||
// return the nearest lower multiple of 3600 ( 1 hour )
|
||||
recommended := step - step%3600
|
||||
|
||||
// if the time range is greater than 1 month set the step interval to be multiple of 1 day
|
||||
if end-start >= uint64(30*24*time.Hour.Nanoseconds()) {
|
||||
if recommended < 86400 {
|
||||
recommended = 86400
|
||||
} else {
|
||||
recommended = uint64(math.Round(float64(recommended)/86400)) * 86400
|
||||
}
|
||||
}
|
||||
|
||||
return recommended
|
||||
}
|
||||
|
||||
@ -555,6 +555,12 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
|
||||
return nil, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMetricsKeys.Error())
|
||||
}
|
||||
|
||||
meterKeys, err := t.getMeterKeys(ctx, fieldKeySelectors)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keys = append(keys, meterKeys...)
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
@ -606,7 +612,7 @@ func (t *telemetryMetaStore) getMeterKeys(ctx context.Context, fieldKeySelectors
|
||||
}
|
||||
keys = append(keys, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: name,
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
})
|
||||
}
|
||||
|
||||
@ -634,8 +640,6 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
|
||||
keys, err = t.getLogsKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalMetrics:
|
||||
keys, err = t.getMetricsKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalMeter:
|
||||
keys, err = t.getMeterKeys(ctx, selectors)
|
||||
case telemetrytypes.SignalUnspecified:
|
||||
// get traces keys
|
||||
tracesKeys, err := t.getTracesKeys(ctx, selectors)
|
||||
@ -1019,6 +1023,13 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
|
||||
}
|
||||
values.StringValues = append(values.StringValues, stringValue)
|
||||
}
|
||||
|
||||
meterFieldValues, err := t.getMeterFieldValues(ctx, fieldValueSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
values.StringValues = append(values.StringValues, meterFieldValues.StringValues...)
|
||||
return values, nil
|
||||
}
|
||||
|
||||
@ -1098,8 +1109,6 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
|
||||
values, err = t.getLogFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalMetrics:
|
||||
values, err = t.getMetricFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalMeter:
|
||||
values, err = t.getMeterFieldValues(ctx, fieldValueSelector)
|
||||
case telemetrytypes.SignalUnspecified:
|
||||
mapOfValues := make(map[any]bool)
|
||||
mapOfRelatedValues := make(map[any]bool)
|
||||
|
||||
@ -44,7 +44,7 @@ func TestGetKeys(t *testing.T) {
|
||||
telemetrymetrics.DBName,
|
||||
telemetrymetrics.AttributesMetadataTableName,
|
||||
telemetrymeter.DBName,
|
||||
telemetrymeter.SamplesV4Agg1dTableName,
|
||||
telemetrymeter.SamplesAgg1dTableName,
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2TableName,
|
||||
telemetrylogs.TagAttributesV2TableName,
|
||||
|
||||
@ -56,7 +56,6 @@ func (b *meterQueryStatementBuilder) Build(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO[vikrantgupta25]: need to adjust this properly for meter metrics (scrape interval for 1D default so step interval should never be less than that!)
|
||||
start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query)
|
||||
|
||||
return b.buildPipelineStatement(ctx, start, end, query, keys, variables)
|
||||
@ -127,13 +126,14 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
|
||||
sb.SelectMore(col)
|
||||
}
|
||||
|
||||
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
||||
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
|
||||
}
|
||||
|
||||
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.Where(
|
||||
sb.In("metric_name", query.Aggregations[0].MetricName),
|
||||
sb.GTE("unix_milli", start),
|
||||
@ -205,7 +205,8 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
sb.SelectMore(col)
|
||||
}
|
||||
|
||||
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality,
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
|
||||
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
|
||||
aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
|
||||
@ -213,7 +214,7 @@ func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
|
||||
|
||||
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
|
||||
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
sb.Where(
|
||||
sb.In("metric_name", query.Aggregations[0].MetricName),
|
||||
sb.GTE("unix_milli", start),
|
||||
@ -273,10 +274,11 @@ func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
|
||||
baseSb.SelectMore(col)
|
||||
}
|
||||
|
||||
aggCol := AggregationColumnForSamplesTable(query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
|
||||
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
|
||||
|
||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, SamplesV4Agg1dTableName))
|
||||
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
|
||||
baseSb.Where(
|
||||
baseSb.In("metric_name", query.Aggregations[0].MetricName),
|
||||
baseSb.GTE("unix_milli", start),
|
||||
|
||||
@ -26,7 +26,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
name: "test_cumulative_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
@ -59,7 +59,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
name: "test_delta_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
@ -92,7 +92,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
name: "test_delta_rate_avg",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
@ -125,7 +125,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
name: "test_gauge_avg_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMeter,
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
|
||||
@ -1,84 +1,192 @@
|
||||
package telemetrymeter
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/metrictypes"
|
||||
)
|
||||
|
||||
const (
|
||||
DBName = "signoz_meter"
|
||||
SamplesTableName = "distributed_samples"
|
||||
SamplesLocalTableName = "samples"
|
||||
SamplesV4Agg1dTableName = "distributed_samples_agg_1d"
|
||||
SamplesV4Agg1dLocalTableName = "samples_agg_1d"
|
||||
DBName = "signoz_meter"
|
||||
SamplesTableName = "distributed_samples"
|
||||
SamplesLocalTableName = "samples"
|
||||
SamplesAgg1dTableName = "distributed_samples_agg_1d"
|
||||
SamplesAgg1dLocalTableName = "samples_agg_1d"
|
||||
)
|
||||
|
||||
var (
|
||||
oneMonthInMilliseconds = uint64(time.Hour * 24 * 30)
|
||||
|
||||
// when the query requests for almost 1 day, but not exactly 1 day, we need to add an offset to the end time
|
||||
// to make sure that we are using the correct table
|
||||
// this is because the start gets adjusted to the nearest step interval and uses the 5m table for 4m step interval
|
||||
// leading to time series that doesn't best represent the rate of change
|
||||
offsetBucket = uint64(1 * time.Hour.Milliseconds())
|
||||
)
|
||||
|
||||
// start and end are in milliseconds
|
||||
// we have two tables for samples
|
||||
// 1. distributed_samples
|
||||
// 2. distributed_samples_v4_agg_1d - for queries with time range above or equal to 30 days
|
||||
// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
|
||||
func WhichSamplesTableToUse(
|
||||
start, end uint64,
|
||||
metricType metrictypes.Type,
|
||||
timeAggregation metrictypes.TimeAggregation,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) string {
|
||||
|
||||
// if we have a hint for the table, we need to use it
|
||||
// the hint will be used to override the default table selection logic
|
||||
if tableHints != nil {
|
||||
if tableHints.SamplesTableName != "" {
|
||||
return tableHints.SamplesTableName
|
||||
}
|
||||
}
|
||||
|
||||
// if the time aggregation is count_distinct, we need to use the distributed_samples table
|
||||
// because the aggregated tables don't support count_distinct
|
||||
if timeAggregation == metrictypes.TimeAggregationCountDistinct {
|
||||
return SamplesTableName
|
||||
}
|
||||
|
||||
if end-start < oneMonthInMilliseconds+offsetBucket {
|
||||
return SamplesTableName
|
||||
}
|
||||
return SamplesAgg1dTableName
|
||||
|
||||
}
|
||||
|
||||
func AggregationColumnForSamplesTable(
|
||||
start, end uint64,
|
||||
metricType metrictypes.Type,
|
||||
temporality metrictypes.Temporality,
|
||||
timeAggregation metrictypes.TimeAggregation,
|
||||
tableHints *metrictypes.MetricTableHints,
|
||||
) string {
|
||||
tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
|
||||
var aggregationColumn string
|
||||
switch temporality {
|
||||
case metrictypes.Delta:
|
||||
// for delta metrics, we only support `RATE`/`INCREASE` both of which are sum
|
||||
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
|
||||
// we are keeping it here to make sure that query will not be invalid
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "sum(sum)"
|
||||
switch tableName {
|
||||
case SamplesTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(value)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(value)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "avg(value)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(value)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(value)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "count(value)"
|
||||
case metrictypes.TimeAggregationCountDistinct:
|
||||
aggregationColumn = "countDistinct(value)"
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "sum(value)"
|
||||
}
|
||||
case SamplesAgg1dTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "sum(sum)"
|
||||
}
|
||||
}
|
||||
|
||||
case metrictypes.Cumulative:
|
||||
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
|
||||
// used to calculate the sum which is then divided by the window size to get the rate
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "max(max)"
|
||||
switch tableName {
|
||||
case SamplesTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(value)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(value)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "avg(value)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(value)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(value)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "count(value)"
|
||||
case metrictypes.TimeAggregationCountDistinct:
|
||||
aggregationColumn = "countDistinct(value)"
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "max(value)"
|
||||
}
|
||||
case SamplesAgg1dTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
|
||||
aggregationColumn = "max(max)"
|
||||
}
|
||||
}
|
||||
|
||||
case metrictypes.Unspecified:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
|
||||
aggregationColumn = "sum(sum)"
|
||||
switch tableName {
|
||||
case SamplesTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(value)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(value)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "avg(value)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(value)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(value)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "count(value)"
|
||||
case metrictypes.TimeAggregationCountDistinct:
|
||||
aggregationColumn = "countDistinct(value)"
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
|
||||
aggregationColumn = "sum(value)"
|
||||
}
|
||||
case SamplesAgg1dTableName:
|
||||
switch timeAggregation {
|
||||
case metrictypes.TimeAggregationLatest:
|
||||
aggregationColumn = "anyLast(last)"
|
||||
case metrictypes.TimeAggregationSum:
|
||||
aggregationColumn = "sum(sum)"
|
||||
case metrictypes.TimeAggregationAvg:
|
||||
aggregationColumn = "sum(sum) / sum(count)"
|
||||
case metrictypes.TimeAggregationMin:
|
||||
aggregationColumn = "min(min)"
|
||||
case metrictypes.TimeAggregationMax:
|
||||
aggregationColumn = "max(max)"
|
||||
case metrictypes.TimeAggregationCount:
|
||||
aggregationColumn = "sum(count)"
|
||||
// count_distinct is not supported in aggregated tables
|
||||
case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
|
||||
aggregationColumn = "sum(sum)"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -62,12 +62,6 @@ func (q *QueryEnvelope) UnmarshalJSON(data []byte) error {
|
||||
return wrapUnmarshalError(err, "invalid metric builder query spec: %v", err)
|
||||
}
|
||||
q.Spec = spec
|
||||
case telemetrytypes.SignalMeter:
|
||||
var spec QueryBuilderQuery[MetricAggregation]
|
||||
if err := UnmarshalJSONWithContext(shadow.Spec, &spec, "query spec"); err != nil {
|
||||
return wrapUnmarshalError(err, "invalid meter builder query spec: %v", err)
|
||||
}
|
||||
q.Spec = spec
|
||||
default:
|
||||
return errors.NewInvalidInputf(
|
||||
errors.CodeInvalidInput,
|
||||
|
||||
@ -155,7 +155,6 @@ func (q *QueryBuilderQuery[T]) validateSignal() error {
|
||||
case telemetrytypes.SignalMetrics,
|
||||
telemetrytypes.SignalTraces,
|
||||
telemetrytypes.SignalLogs,
|
||||
telemetrytypes.SignalMeter,
|
||||
telemetrytypes.SignalUnspecified: // Empty is allowed for backward compatibility
|
||||
return nil
|
||||
default:
|
||||
|
||||
@ -10,6 +10,5 @@ var (
|
||||
SignalTraces = Signal{valuer.NewString("traces")}
|
||||
SignalLogs = Signal{valuer.NewString("logs")}
|
||||
SignalMetrics = Signal{valuer.NewString("metrics")}
|
||||
SignalMeter = Signal{valuer.NewString("meter")}
|
||||
SignalUnspecified = Signal{valuer.NewString("")}
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user