diff --git a/conf/example.yaml b/conf/example.yaml
index 1dfbec121aef..dff8a2f69112 100644
--- a/conf/example.yaml
+++ b/conf/example.yaml
@@ -121,6 +121,7 @@ telemetrystore:
timeout_before_checking_execution_speed: 0
max_bytes_to_read: 0
max_result_rows: 0
+ ignore_data_skipping_indices: ""
##################### Prometheus #####################
prometheus:
diff --git a/frontend/src/container/LogsExplorerList/InfinityTableView/TableRow.tsx b/frontend/src/container/LogsExplorerList/InfinityTableView/TableRow.tsx
index 5977df8730f5..ddb68aa88fb8 100644
--- a/frontend/src/container/LogsExplorerList/InfinityTableView/TableRow.tsx
+++ b/frontend/src/container/LogsExplorerList/InfinityTableView/TableRow.tsx
@@ -73,6 +73,8 @@ export default function TableRow({
{tableColumns.map((column) => {
if (!column.render) return
Empty | ;
+ if (!column.key) return null;
+
const element: ColumnTypeRender> = column.render(
log[column.key as keyof Record],
log,
@@ -97,6 +99,7 @@ export default function TableRow({
fontSize={fontSize}
columnKey={column.key as string}
onClick={handleShowLogDetails}
+ className={column.key as string}
>
{cloneElement(children, props)}
diff --git a/frontend/src/container/LogsExplorerList/InfinityTableView/index.tsx b/frontend/src/container/LogsExplorerList/InfinityTableView/index.tsx
index 9a75359c8414..91b2508afdae 100644
--- a/frontend/src/container/LogsExplorerList/InfinityTableView/index.tsx
+++ b/frontend/src/container/LogsExplorerList/InfinityTableView/index.tsx
@@ -136,7 +136,7 @@ const InfinityTable = forwardRef(
key={column.key}
fontSize={tableViewProps?.fontSize}
// eslint-disable-next-line react/jsx-props-no-spreading
- {...(isDragColumn && { className: 'dragHandler' })}
+ {...(isDragColumn && { className: `dragHandler ${column.key}` })}
columnKey={column.key as string}
>
{(column.title as string).replace(/^\w/, (c) => c.toUpperCase())}
diff --git a/frontend/src/container/LogsExplorerList/utils.ts b/frontend/src/container/LogsExplorerList/utils.ts
index 90032dfcdbe2..3cb62c39bf80 100644
--- a/frontend/src/container/LogsExplorerList/utils.ts
+++ b/frontend/src/container/LogsExplorerList/utils.ts
@@ -1,4 +1,5 @@
import { TelemetryFieldKey } from 'api/v5/v5';
+import { isEmpty } from 'lodash-es';
import { IField } from 'types/api/logs/fields';
import {
IBuilderQuery,
@@ -8,11 +9,13 @@ import {
export const convertKeysToColumnFields = (
keys: TelemetryFieldKey[],
): IField[] =>
- keys.map((item) => ({
- dataType: item.fieldDataType ?? '',
- name: item.name,
- type: item.fieldContext ?? '',
- }));
+ keys
+ .filter((item) => !isEmpty(item.name))
+ .map((item) => ({
+ dataType: item.fieldDataType ?? '',
+ name: item.name,
+ type: item.fieldContext ?? '',
+ }));
/**
* Determines if a query represents a trace-to-logs navigation
* by checking for the presence of a trace_id filter.
diff --git a/frontend/src/container/OptionsMenu/useOptionsMenu.ts b/frontend/src/container/OptionsMenu/useOptionsMenu.ts
index 9fd41231f0ed..5432ebab2b04 100644
--- a/frontend/src/container/OptionsMenu/useOptionsMenu.ts
+++ b/frontend/src/container/OptionsMenu/useOptionsMenu.ts
@@ -6,6 +6,7 @@ import { useGetQueryKeySuggestions } from 'hooks/querySuggestions/useGetQueryKey
import useDebounce from 'hooks/useDebounce';
import { useNotifications } from 'hooks/useNotifications';
import useUrlQueryData from 'hooks/useUrlQueryData';
+import { has } from 'lodash-es';
import { AllTraceFilterKeyValue } from 'pages/TracesExplorer/Filter/filterUtils';
import { usePreferenceContext } from 'providers/preferences/context/PreferenceContextProvider';
import { useCallback, useEffect, useMemo, useState } from 'react';
@@ -452,7 +453,9 @@ const useOptionsMenu = ({
() => ({
addColumn: {
isFetching: isSearchedAttributesFetchingV5,
- value: preferences?.columns || defaultOptionsQuery.selectColumns,
+ value:
+ preferences?.columns.filter((item) => has(item, 'name')) ||
+ defaultOptionsQuery.selectColumns.filter((item) => has(item, 'name')),
options: optionsFromAttributeKeys || [],
onFocus: handleFocus,
onBlur: handleBlur,
diff --git a/frontend/src/container/TopNav/DateTimeSelectionV2/index.tsx b/frontend/src/container/TopNav/DateTimeSelectionV2/index.tsx
index d17dde4ba266..c5a48b14022a 100644
--- a/frontend/src/container/TopNav/DateTimeSelectionV2/index.tsx
+++ b/frontend/src/container/TopNav/DateTimeSelectionV2/index.tsx
@@ -372,7 +372,7 @@ function DateTimeSelection({
})),
},
};
- return JSON.stringify(updatedCompositeQuery);
+ return encodeURIComponent(JSON.stringify(updatedCompositeQuery));
}, [currentQuery]);
const onSelectHandler = useCallback(
diff --git a/frontend/src/lib/getChartData.ts b/frontend/src/lib/getChartData.ts
index 3742487366bc..cd0ebcebf857 100644
--- a/frontend/src/lib/getChartData.ts
+++ b/frontend/src/lib/getChartData.ts
@@ -17,9 +17,9 @@ const getChartData = ({
// eslint-disable-next-line sonarjs/cognitive-complexity
} => {
const uniqueTimeLabels = new Set();
- queryData.forEach((data) => {
- data.queryData.forEach((query) => {
- query.values.forEach((value) => {
+ queryData?.forEach((data) => {
+ data.queryData?.forEach((query) => {
+ query.values?.forEach((value) => {
uniqueTimeLabels.add(value[0]);
});
});
@@ -27,8 +27,8 @@ const getChartData = ({
const labels = Array.from(uniqueTimeLabels).sort((a, b) => a - b);
- const response = queryData.map(
- ({ queryData, query: queryG, legend: legendG }) =>
+ const response =
+ queryData?.map(({ queryData, query: queryG, legend: legendG }) =>
queryData.map((e) => {
const { values = [], metric, legend, queryName } = e || {};
const labelNames = getLabelName(
@@ -61,7 +61,7 @@ const getChartData = ({
second: filledDataValues.map((e) => e.second || 0),
};
}),
- );
+ ) || [];
const modifiedData = response
.flat()
diff --git a/pkg/apis/fields/api.go b/pkg/apis/fields/api.go
index 788982ac1271..0d4b20fef879 100644
--- a/pkg/apis/fields/api.go
+++ b/pkg/apis/fields/api.go
@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
+ "github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
@@ -33,6 +34,8 @@ func NewAPI(
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
+ telemetrymeter.DBName,
+ telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
diff --git a/pkg/apis/fields/parse.go b/pkg/apis/fields/parse.go
index 114be5a51669..913b48b1502a 100644
--- a/pkg/apis/fields/parse.go
+++ b/pkg/apis/fields/parse.go
@@ -12,6 +12,7 @@ import (
func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, error) {
var req telemetrytypes.FieldKeySelector
var signal telemetrytypes.Signal
+ var source telemetrytypes.Source
var err error
signalStr := r.URL.Query().Get("signal")
@@ -21,6 +22,13 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
signal = telemetrytypes.SignalUnspecified
}
+ sourceStr := r.URL.Query().Get("source")
+ if sourceStr != "" {
+ source = telemetrytypes.Source{String: valuer.NewString(sourceStr)}
+ } else {
+ source = telemetrytypes.SourceUnspecified
+ }
+
if r.URL.Query().Get("limit") != "" {
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
@@ -76,6 +84,7 @@ func parseFieldKeyRequest(r *http.Request) (*telemetrytypes.FieldKeySelector, er
StartUnixMilli: startUnixMilli,
EndUnixMilli: endUnixMilli,
Signal: signal,
+ Source: source,
Name: name,
FieldContext: fieldContext,
FieldDataType: fieldDataType,
diff --git a/pkg/querier/builder_query.go b/pkg/querier/builder_query.go
index 3b1e4166daf3..aaea96fd1881 100644
--- a/pkg/querier/builder_query.go
+++ b/pkg/querier/builder_query.go
@@ -62,6 +62,9 @@ func (q *builderQuery[T]) Fingerprint() string {
// Add signal type
parts = append(parts, fmt.Sprintf("signal=%s", q.spec.Signal.StringValue()))
+ // Add source type
+ parts = append(parts, fmt.Sprintf("source=%s", q.spec.Source.StringValue()))
+
// Add step interval if present
parts = append(parts, fmt.Sprintf("step=%s", q.spec.StepInterval.String()))
diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go
index 9077f9eb4ad8..6f34e6460781 100644
--- a/pkg/querier/querier.go
+++ b/pkg/querier/querier.go
@@ -31,6 +31,7 @@ type querier struct {
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation]
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation]
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
+ meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation]
bucketCache BucketCache
}
@@ -44,6 +45,7 @@ func New(
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
logStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
metricStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
+ meterStmtBuilder qbtypes.StatementBuilder[qbtypes.MetricAggregation],
bucketCache BucketCache,
) *querier {
querierSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querier")
@@ -55,6 +57,7 @@ func New(
traceStmtBuilder: traceStmtBuilder,
logStmtBuilder: logStmtBuilder,
metricStmtBuilder: metricStmtBuilder,
+ meterStmtBuilder: meterStmtBuilder,
bucketCache: bucketCache,
}
}
@@ -168,17 +171,21 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
event.MetricsUsed = true
event.FilterApplied = spec.Filter != nil && spec.Filter.Expression != ""
event.GroupByApplied = len(spec.GroupBy) > 0
- if spec.StepInterval.Seconds() == 0 {
- spec.StepInterval = qbtypes.Step{
- Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
- }
- }
- if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
- spec.StepInterval = qbtypes.Step{
- Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
- }
- }
+ if spec.Source == telemetrytypes.SourceMeter {
+ spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
+ } else {
+ if spec.StepInterval.Seconds() == 0 {
+ spec.StepInterval = qbtypes.Step{
+ Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End)),
+ }
+ }
+ if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
+ spec.StepInterval = qbtypes.Step{
+ Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)),
+ }
+ }
+ }
req.CompositeQuery.Queries[idx].Spec = spec
}
} else if query.Type == qbtypes.QueryTypePromQL {
@@ -265,7 +272,14 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
}
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
- bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
+ var bq *builderQuery[qbtypes.MetricAggregation]
+
+ if spec.Source == telemetrytypes.SourceMeter {
+ bq = newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
+ } else {
+ bq = newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
+ }
+
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
default:
@@ -529,6 +543,9 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
specCopy := qt.spec.Copy()
specCopy.ShiftBy = extractShiftFromBuilderQuery(specCopy)
adjustedTimeRange := adjustTimeRangeForShift(specCopy, timeRange, qt.kind)
+ if qt.spec.Source == telemetrytypes.SourceMeter {
+ return newBuilderQuery(q.telemetryStore, q.meterStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
+ }
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, specCopy, adjustedTimeRange, qt.kind, qt.variables)
default:
diff --git a/pkg/querier/signozquerier/provider.go b/pkg/querier/signozquerier/provider.go
index c9801880c40d..9d4cb1dbdae4 100644
--- a/pkg/querier/signozquerier/provider.go
+++ b/pkg/querier/signozquerier/provider.go
@@ -11,6 +11,7 @@ import (
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
+ "github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrytraces"
@@ -52,6 +53,8 @@ func newProvider(
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
+ telemetrymeter.DBName,
+ telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
@@ -122,6 +125,14 @@ func newProvider(
metricConditionBuilder,
)
+ // Create meter statement builder
+ meterStmtBuilder := telemetrymeter.NewMeterQueryStatementBuilder(
+ settings,
+ telemetryMetadataStore,
+ metricFieldMapper,
+ metricConditionBuilder,
+ )
+
// Create bucket cache
bucketCache := querier.NewBucketCache(
settings,
@@ -139,6 +150,7 @@ func newProvider(
traceStmtBuilder,
logStmtBuilder,
metricStmtBuilder,
+ meterStmtBuilder,
bucketCache,
), nil
}
diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go
index 744a2711b65c..84426b538212 100644
--- a/pkg/query-service/app/clickhouseReader/reader.go
+++ b/pkg/query-service/app/clickhouseReader/reader.go
@@ -64,6 +64,8 @@ const (
signozTraceLocalTableName = "signoz_index_v2"
signozMetricDBName = "signoz_metrics"
signozMetadataDbName = "signoz_metadata"
+ signozMeterDBName = "signoz_meter"
+ signozMeterSamplesName = "samples_agg_1d"
signozSampleLocalTableName = "samples_v4"
signozSampleTableName = "distributed_samples_v4"
@@ -2741,8 +2743,55 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, org
return &response, nil
}
-func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
+func (r *ClickHouseReader) GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) {
+ var response v3.AggregateAttributeResponse
+ // Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
+ query := fmt.Sprintf(
+ `SELECT metric_name,type,temporality,is_monotonic
+ FROM %s.%s
+ WHERE metric_name ILIKE $1
+ GROUP BY metric_name,type,temporality,is_monotonic`,
+ signozMeterDBName, signozMeterSamplesName)
+ if req.Limit != 0 {
+ query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
+ }
+
+ rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
+ if err != nil {
+ zap.L().Error("Error while querying meter names", zap.Error(err))
+ return nil, fmt.Errorf("error while executing meter name query: %s", err.Error())
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var name string
+ var typ string
+ var temporality string
+ var isMonotonic bool
+ if err := rows.Scan(&name, &typ, &temporality, &isMonotonic); err != nil {
+ return nil, fmt.Errorf("error while scanning meter name: %s", err.Error())
+ }
+
+ // Non-monotonic cumulative sums are treated as gauges
+ if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
+ typ = "Gauge"
+ }
+
+ // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
+ key := v3.AttributeKey{
+ Key: name,
+ DataType: v3.AttributeKeyDataTypeFloat64,
+ Type: v3.AttributeKeyType(typ),
+ IsColumn: true,
+ }
+ response.AttributeKeys = append(response.AttributeKeys, key)
+ }
+
+ return &response, nil
+}
+
+func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
var query string
var err error
var rows driver.Rows
@@ -2782,6 +2831,41 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
return &response, nil
}
+func (r *ClickHouseReader) GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
+ var query string
+ var err error
+ var rows driver.Rows
+ var response v3.FilterAttributeKeyResponse
+
+ // skips the internal attributes i.e attributes starting with __
+ query = fmt.Sprintf("SELECT DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name FROM %s.%s WHERE metric_name=$1 AND attr_name ILIKE $2 AND attr_name NOT LIKE '\\_\\_%%'", signozMeterDBName, signozMeterSamplesName)
+ if req.Limit != 0 {
+ query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
+ }
+ rows, err = r.db.Query(ctx, query, req.AggregateAttribute, fmt.Sprintf("%%%s%%", req.SearchText))
+ if err != nil {
+ zap.L().Error("Error while executing query", zap.Error(err))
+ return nil, fmt.Errorf("error while executing query: %s", err.Error())
+ }
+ defer rows.Close()
+
+ var attributeKey string
+ for rows.Next() {
+ if err := rows.Scan(&attributeKey); err != nil {
+ return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
+ }
+ key := v3.AttributeKey{
+ Key: attributeKey,
+ DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
+ Type: v3.AttributeKeyTypeTag,
+ IsColumn: false,
+ }
+ response.AttributeKeys = append(response.AttributeKeys, key)
+ }
+
+ return &response, nil
+}
+
func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var query string
diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go
index 1b580a0f5141..cff6c4731b2c 100644
--- a/pkg/query-service/app/http_handler.go
+++ b/pkg/query-service/app/http_handler.go
@@ -4218,6 +4218,8 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
response, err = aH.reader.GetTraceAggregateAttributes(r.Context(), req)
+ case v3.DataSourceMeter:
+ response, err = aH.reader.GetMeterAggregateAttributes(r.Context(), orgID, req)
default:
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid data source")}, nil)
return
@@ -4267,6 +4269,8 @@ func (aH *APIHandler) autoCompleteAttributeKeys(w http.ResponseWriter, r *http.R
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAttributeKeys(r.Context(), req)
+ case v3.DataSourceMeter:
+ response, err = aH.reader.GetMeterAttributeKeys(r.Context(), req)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAttributeKeys(r.Context(), req)
case v3.DataSourceTraces:
diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go
index e0e957927a51..d5740941637a 100644
--- a/pkg/query-service/app/parser.go
+++ b/pkg/query-service/app/parser.go
@@ -484,7 +484,7 @@ func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequ
limit = 50
}
- if dataSource != v3.DataSourceMetrics {
+ if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter {
if err := aggregateOperator.Validate(); err != nil {
return nil, err
}
@@ -604,7 +604,7 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ
return nil, err
}
- if dataSource != v3.DataSourceMetrics {
+ if dataSource != v3.DataSourceMetrics && dataSource != v3.DataSourceMeter {
if err := aggregateOperator.Validate(); err != nil {
return nil, err
}
diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go
index b378c94a0283..183dfa4ae84d 100644
--- a/pkg/query-service/interfaces/interface.go
+++ b/pkg/query-service/interfaces/interface.go
@@ -50,7 +50,9 @@ type Reader interface {
FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
+ GetMeterAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
+ GetMeterAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
// Returns `MetricStatus` for latest received metric among `metricNames`. Useful for status calculations
diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go
index c989cb9218a3..519ea56c5325 100644
--- a/pkg/query-service/model/v3/v3.go
+++ b/pkg/query-service/model/v3/v3.go
@@ -22,11 +22,12 @@ const (
DataSourceTraces DataSource = "traces"
DataSourceLogs DataSource = "logs"
DataSourceMetrics DataSource = "metrics"
+ DataSourceMeter DataSource = "meter"
)
func (d DataSource) Validate() error {
switch d {
- case DataSourceTraces, DataSourceLogs, DataSourceMetrics:
+ case DataSourceTraces, DataSourceLogs, DataSourceMetrics, DataSourceMeter:
return nil
default:
return fmt.Errorf("invalid data source: %s", d)
diff --git a/pkg/querybuilder/time.go b/pkg/querybuilder/time.go
index a85db77dfc72..afb0d8f5984e 100644
--- a/pkg/querybuilder/time.go
+++ b/pkg/querybuilder/time.go
@@ -63,6 +63,32 @@ func MinAllowedStepInterval(start, end uint64) uint64 {
return step - step%5
}
+func RecommendedStepIntervalForMeter(start, end uint64) uint64 {
+ start = ToNanoSecs(start)
+ end = ToNanoSecs(end)
+
+ step := (end - start) / RecommendedNumberOfPoints / 1e9
+
+ // for meter queries the minimum step interval allowed is 1 hour as this is our granularity
+ if step < 3600 {
+ return 3600
+ }
+
+ // return the nearest lower multiple of 3600 ( 1 hour )
+ recommended := step - step%3600
+
+ // if the time range is greater than 1 month set the step interval to be multiple of 1 day
+ if end-start >= uint64(30*24*time.Hour.Nanoseconds()) {
+ if recommended < 86400 {
+ recommended = 86400
+ } else {
+ recommended = uint64(math.Round(float64(recommended)/86400)) * 86400
+ }
+ }
+
+ return recommended
+}
+
func RecommendedStepIntervalForMetric(start, end uint64) uint64 {
start = ToNanoSecs(start)
end = ToNanoSecs(end)
diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go
index d24882f47404..88585ad467d0 100644
--- a/pkg/signoz/provider.go
+++ b/pkg/signoz/provider.go
@@ -130,6 +130,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewUpdateOrgDomainFactory(sqlstore, sqlschema),
sqlmigration.NewAddFactorIndexesFactory(sqlstore, sqlschema),
sqlmigration.NewQueryBuilderV5MigrationFactory(sqlstore, telemetryStore),
+ sqlmigration.NewAddMeterQuickFiltersFactory(sqlstore, sqlschema),
)
}
diff --git a/pkg/sqlmigration/047_add_meter_quickfilters.go b/pkg/sqlmigration/047_add_meter_quickfilters.go
new file mode 100644
index 000000000000..bef43988c110
--- /dev/null
+++ b/pkg/sqlmigration/047_add_meter_quickfilters.go
@@ -0,0 +1,137 @@
+package sqlmigration
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "time"
+
+ "github.com/SigNoz/signoz/pkg/errors"
+ "github.com/SigNoz/signoz/pkg/factory"
+ "github.com/SigNoz/signoz/pkg/sqlschema"
+ "github.com/SigNoz/signoz/pkg/sqlstore"
+ "github.com/SigNoz/signoz/pkg/valuer"
+ "github.com/uptrace/bun"
+ "github.com/uptrace/bun/migrate"
+)
+
+type addMeterQuickFilters struct {
+ sqlstore sqlstore.SQLStore
+ sqlschema sqlschema.SQLSchema
+}
+
+func NewAddMeterQuickFiltersFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
+ return factory.NewProviderFactory(factory.MustNewName("add_meter_quick_filters"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
+ return newAddMeterQuickFilters(ctx, providerSettings, config, sqlstore, sqlschema)
+ })
+}
+
+func newAddMeterQuickFilters(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) (SQLMigration, error) {
+ return &addMeterQuickFilters{
+ sqlstore: sqlstore,
+ sqlschema: sqlschema,
+ }, nil
+}
+
+func (migration *addMeterQuickFilters) Register(migrations *migrate.Migrations) error {
+ if err := migrations.Register(migration.Up, migration.Down); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (migration *addMeterQuickFilters) Up(ctx context.Context, db *bun.DB) error {
+ meterFilters := []map[string]interface{}{
+ {"key": "deployment.environment", "dataType": "float64", "type": "Sum"},
+ {"key": "service.name", "dataType": "float64", "type": "Sum"},
+ {"key": "host.name", "dataType": "float64", "type": "Sum"},
+ }
+
+ meterJSON, err := json.Marshal(meterFilters)
+ if err != nil {
+ return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters")
+ }
+
+ type signal struct {
+ valuer.String
+ }
+
+ type identifiable struct {
+ ID valuer.UUID `json:"id" bun:"id,pk,type:text"`
+ }
+
+ type timeAuditable struct {
+ CreatedAt time.Time `bun:"created_at" json:"createdAt"`
+ UpdatedAt time.Time `bun:"updated_at" json:"updatedAt"`
+ }
+
+ type quickFilterType struct {
+ bun.BaseModel `bun:"table:quick_filter"`
+ identifiable
+ OrgID valuer.UUID `bun:"org_id,type:text,notnull"`
+ Filter string `bun:"filter,type:text,notnull"`
+ Signal signal `bun:"signal,type:text,notnull"`
+ timeAuditable
+ }
+
+ tx, err := db.BeginTx(ctx, nil)
+ if err != nil {
+ return err
+ }
+
+ defer func() {
+ _ = tx.Rollback()
+ }()
+
+ var orgIDs []string
+ err = tx.NewSelect().
+ Table("organizations").
+ Column("id").
+ Scan(ctx, &orgIDs)
+ if err != nil && err != sql.ErrNoRows {
+ return err
+ }
+
+ var meterFiltersToInsert []quickFilterType
+ for _, orgIDStr := range orgIDs {
+ orgID, err := valuer.NewUUID(orgIDStr)
+ if err != nil {
+ return err
+ }
+
+ meterFiltersToInsert = append(meterFiltersToInsert, quickFilterType{
+ identifiable: identifiable{
+ ID: valuer.GenerateUUID(),
+ },
+ OrgID: orgID,
+ Filter: string(meterJSON),
+ Signal: signal{valuer.NewString("meter")},
+ timeAuditable: timeAuditable{
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ },
+ })
+ }
+
+ if len(meterFiltersToInsert) > 0 {
+ _, err = tx.NewInsert().
+ Model(&meterFiltersToInsert).
+ On("CONFLICT (org_id, signal) DO UPDATE").
+ Set("filter = EXCLUDED.filter, updated_at = EXCLUDED.updated_at").
+ Exec(ctx)
+ if err != nil {
+ return err
+ }
+ }
+
+ if err := tx.Commit(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (migration *addMeterQuickFilters) Down(ctx context.Context, db *bun.DB) error {
+ return nil
+}
diff --git a/pkg/telemetrymetadata/metadata.go b/pkg/telemetrymetadata/metadata.go
index aa61cc96edbf..f0a97555e5dd 100644
--- a/pkg/telemetrymetadata/metadata.go
+++ b/pkg/telemetrymetadata/metadata.go
@@ -25,6 +25,8 @@ var (
ErrFailedToGetLogsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get logs keys")
ErrFailedToGetTblStatement = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get tbl statement")
ErrFailedToGetMetricsKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get metrics keys")
+ ErrFailedToGetMeterKeys = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter keys")
+ ErrFailedToGetMeterValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get meter values")
ErrFailedToGetRelatedValues = errors.Newf(errors.TypeInternal, errors.CodeInternal, "failed to get related values")
)
@@ -36,6 +38,8 @@ type telemetryMetaStore struct {
indexV3TblName string
metricsDBName string
metricsFieldsTblName string
+ meterDBName string
+ meterFieldsTblName string
logsDBName string
logsFieldsTblName string
logsV2TblName string
@@ -58,6 +62,8 @@ func NewTelemetryMetaStore(
indexV3TblName string,
metricsDBName string,
metricsFieldsTblName string,
+ meterDBName string,
+ meterFieldsTblName string,
logsDBName string,
logsV2TblName string,
logsFieldsTblName string,
@@ -74,6 +80,8 @@ func NewTelemetryMetaStore(
indexV3TblName: indexV3TblName,
metricsDBName: metricsDBName,
metricsFieldsTblName: metricsFieldsTblName,
+ meterDBName: meterDBName,
+ meterFieldsTblName: meterFieldsTblName,
logsDBName: logsDBName,
logsV2TblName: logsV2TblName,
logsFieldsTblName: logsFieldsTblName,
@@ -598,6 +606,76 @@ func (t *telemetryMetaStore) getMetricsKeys(ctx context.Context, fieldKeySelecto
return keys, complete, nil
}
+// getMeterKeys returns the keys from the meter metrics that match the field selection criteria
+func (t *telemetryMetaStore) getMeterSourceMetricKeys(ctx context.Context, fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
+ if len(fieldKeySelectors) == 0 {
+ return nil, true, nil
+ }
+
+ sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeys(labels)) as attr_name").From(t.meterDBName + "." + t.meterFieldsTblName)
+ conds := []string{}
+ var limit int
+ for _, fieldKeySelector := range fieldKeySelectors {
+ fieldConds := []string{}
+ if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
+ fieldConds = append(fieldConds, sb.E("attr_name", fieldKeySelector.Name))
+ } else {
+ fieldConds = append(fieldConds, sb.Like("attr_name", "%"+fieldKeySelector.Name+"%"))
+ }
+ fieldConds = append(fieldConds, sb.NotLike("attr_name", "\\_\\_%"))
+
+ if fieldKeySelector.MetricContext != nil {
+ fieldConds = append(fieldConds, sb.E("metric_name", fieldKeySelector.MetricContext.MetricName))
+ }
+
+ conds = append(conds, sb.And(fieldConds...))
+ limit += fieldKeySelector.Limit
+ }
+ sb.Where(sb.Or(conds...))
+ if limit == 0 {
+ limit = 1000
+ }
+
+ sb.Limit(limit)
+ query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+
+ rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
+ if err != nil {
+ return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
+ }
+ defer rows.Close()
+
+ keys := []*telemetrytypes.TelemetryFieldKey{}
+ rowCount := 0
+ for rows.Next() {
+ rowCount++
+ // reached the limit, we know there are more results
+ if rowCount > limit {
+ break
+ }
+
+ var name string
+ err = rows.Scan(&name)
+ if err != nil {
+ return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
+ }
+ keys = append(keys, &telemetrytypes.TelemetryFieldKey{
+ Name: name,
+ Signal: telemetrytypes.SignalMetrics,
+ })
+ }
+
+ if rows.Err() != nil {
+ return nil, false, errors.Wrapf(rows.Err(), errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterKeys.Error())
+ }
+
+ // hit the limit?
+ complete := rowCount <= limit
+
+ return keys, complete, nil
+
+}
+
func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *telemetrytypes.FieldKeySelector) (map[string][]*telemetrytypes.TelemetryFieldKey, bool, error) {
var keys []*telemetrytypes.TelemetryFieldKey
var complete bool = true
@@ -614,7 +692,11 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
case telemetrytypes.SignalLogs:
keys, complete, err = t.getLogsKeys(ctx, selectors)
case telemetrytypes.SignalMetrics:
- keys, complete, err = t.getMetricsKeys(ctx, selectors)
+ if fieldKeySelector.Source == telemetrytypes.SourceMeter {
+ keys, complete, err = t.getMeterSourceMetricKeys(ctx, selectors)
+ } else {
+ keys, complete, err = t.getMetricsKeys(ctx, selectors)
+ }
case telemetrytypes.SignalUnspecified:
// get traces keys
tracesKeys, tracesComplete, err := t.getTracesKeys(ctx, selectors)
@@ -637,7 +719,6 @@ func (t *telemetryMetaStore) GetKeys(ctx context.Context, fieldKeySelector *tele
}
keys = append(keys, metricsKeys...)
- // Complete only if all signals are complete
complete = tracesComplete && logsComplete && metricsComplete
}
if err != nil {
@@ -657,6 +738,7 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
logsSelectors := []*telemetrytypes.FieldKeySelector{}
tracesSelectors := []*telemetrytypes.FieldKeySelector{}
metricsSelectors := []*telemetrytypes.FieldKeySelector{}
+ meterSourceMetricsSelectors := []*telemetrytypes.FieldKeySelector{}
for _, fieldKeySelector := range fieldKeySelectors {
switch fieldKeySelector.Signal {
@@ -665,7 +747,11 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
case telemetrytypes.SignalTraces:
tracesSelectors = append(tracesSelectors, fieldKeySelector)
case telemetrytypes.SignalMetrics:
- metricsSelectors = append(metricsSelectors, fieldKeySelector)
+ if fieldKeySelector.Source == telemetrytypes.SourceMeter {
+ meterSourceMetricsSelectors = append(meterSourceMetricsSelectors, fieldKeySelector)
+ } else {
+ metricsSelectors = append(metricsSelectors, fieldKeySelector)
+ }
case telemetrytypes.SignalUnspecified:
logsSelectors = append(logsSelectors, fieldKeySelector)
tracesSelectors = append(tracesSelectors, fieldKeySelector)
@@ -686,6 +772,10 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
return nil, false, err
}
+ meterSourceMetricsKeys, _, err := t.getMeterSourceMetricKeys(ctx, meterSourceMetricsSelectors)
+ if err != nil {
+ return nil, false, err
+ }
// Complete only if all queries are complete
complete := logsComplete && tracesComplete && metricsComplete
@@ -699,6 +789,9 @@ func (t *telemetryMetaStore) GetKeysMulti(ctx context.Context, fieldKeySelectors
for _, key := range metricsKeys {
mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
}
+ for _, key := range meterSourceMetricsKeys {
+ mapOfKeys[key.Name] = append(mapOfKeys[key.Name], key)
+ }
return mapOfKeys, complete, nil
}
@@ -1062,6 +1155,61 @@ func (t *telemetryMetaStore) getMetricFieldValues(ctx context.Context, fieldValu
return values, complete, nil
}
+func (t *telemetryMetaStore) getMeterSourceMetricFieldValues(ctx context.Context, fieldValueSelector *telemetrytypes.FieldValueSelector) (*telemetrytypes.TelemetryFieldValues, bool, error) {
+ sb := sqlbuilder.Select("DISTINCT arrayJoin(JSONExtractKeysAndValues(labels, 'String')) AS attr").
+ From(t.meterDBName + "." + t.meterFieldsTblName)
+
+ if fieldValueSelector.Name != "" {
+ sb.Where(sb.E("attr.1", fieldValueSelector.Name))
+ }
+ sb.Where(sb.NotLike("attr.1", "\\_\\_%"))
+
+ if fieldValueSelector.Value != "" {
+ if fieldValueSelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
+ sb.Where(sb.E("attr.2", fieldValueSelector.Value))
+ } else {
+ sb.Where(sb.Like("attr.2", "%"+fieldValueSelector.Value+"%"))
+ }
+ }
+ sb.Where(sb.NE("attr.2", ""))
+
+ limit := fieldValueSelector.Limit
+ if limit == 0 {
+ limit = 50
+ }
+ // query one extra to check if we hit the limit
+ sb.Limit(limit + 1)
+
+ query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+ rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
+ if err != nil {
+ return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
+ }
+ defer rows.Close()
+
+ values := &telemetrytypes.TelemetryFieldValues{}
+ rowCount := 0
+ for rows.Next() {
+ rowCount++
+ // reached the limit, we know there are more results
+ if rowCount > limit {
+ break
+ }
+
+ var attribute []string
+ if err := rows.Scan(&attribute); err != nil {
+ return nil, false, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, ErrFailedToGetMeterValues.Error())
+ }
+ if len(attribute) > 1 {
+ values.StringValues = append(values.StringValues, attribute[1])
+ }
+ }
+
+ // hit the limit?
+ complete := rowCount <= limit
+ return values, complete, nil
+}
+
func populateAllUnspecifiedValues(allUnspecifiedValues *telemetrytypes.TelemetryFieldValues, mapOfValues map[any]bool, mapOfRelatedValues map[any]bool, values *telemetrytypes.TelemetryFieldValues, limit int) bool {
complete := true
totalCount := len(mapOfValues) + len(mapOfRelatedValues)
@@ -1122,7 +1270,11 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
case telemetrytypes.SignalLogs:
values, complete, err = t.getLogFieldValues(ctx, fieldValueSelector)
case telemetrytypes.SignalMetrics:
- values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector)
+ if fieldValueSelector.Source == telemetrytypes.SourceMeter {
+ values, complete, err = t.getMeterSourceMetricFieldValues(ctx, fieldValueSelector)
+ } else {
+ values, complete, err = t.getMetricFieldValues(ctx, fieldValueSelector)
+ }
case telemetrytypes.SignalUnspecified:
mapOfValues := make(map[any]bool)
mapOfRelatedValues := make(map[any]bool)
@@ -1178,6 +1330,33 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
return make(map[string]metrictypes.Temporality), nil
}
+ result := make(map[string]metrictypes.Temporality)
+ metricsTemporality, err := t.fetchMetricsTemporality(ctx, metricNames...)
+ if err != nil {
+ return nil, err
+ }
+ meterMetricsTemporality, err := t.fetchMeterSourceMetricsTemporality(ctx, metricNames...)
+ if err != nil {
+ return nil, err
+ }
+
+ // For metrics not found in the database, set to Unknown
+ for _, metricName := range metricNames {
+ if temporality, exists := metricsTemporality[metricName]; exists {
+ result[metricName] = temporality
+ continue
+ }
+ if temporality, exists := meterMetricsTemporality[metricName]; exists {
+ result[metricName] = temporality
+ continue
+ }
+ result[metricName] = metrictypes.Unknown
+ }
+
+ return result, nil
+}
+
+func (t *telemetryMetaStore) fetchMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
// Build query to fetch temporality for all metrics
@@ -1229,11 +1408,55 @@ func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, metricNa
result[metricName] = temporality
}
- // For metrics not found in the database, set to Unknown
- for _, metricName := range metricNames {
- if _, exists := result[metricName]; !exists {
- result[metricName] = metrictypes.Unknown
+ return result, nil
+}
+
+func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporality(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, error) {
+ result := make(map[string]metrictypes.Temporality)
+
+ sb := sqlbuilder.Select(
+ "metric_name",
+ "argMax(temporality, unix_milli) as temporality",
+ ).From(t.meterDBName + "." + t.meterFieldsTblName)
+
+ // Filter by metric names (in the temporality column due to data mix-up)
+ sb.Where(sb.In("metric_name", metricNames))
+
+ // Group by metric name to get one temporality per metric
+ sb.GroupBy("metric_name")
+
+ query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+
+ t.logger.DebugContext(ctx, "fetching meter metrics temporality", "query", query, "args", args)
+
+ rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
+ if err != nil {
+ return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch meter metric temporality")
+ }
+ defer rows.Close()
+
+ // Process results
+ for rows.Next() {
+ var metricName, temporalityStr string
+ if err := rows.Scan(&metricName, &temporalityStr); err != nil {
+ return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
+
+ // Convert string to Temporality type
+ var temporality metrictypes.Temporality
+ switch temporalityStr {
+ case "Delta":
+ temporality = metrictypes.Delta
+ case "Cumulative":
+ temporality = metrictypes.Cumulative
+ case "Unspecified":
+ temporality = metrictypes.Unspecified
+ default:
+ // Unknown or empty temporality
+ temporality = metrictypes.Unknown
+ }
+
+ result[metricName] = temporality
}
return result, nil
diff --git a/pkg/telemetrymetadata/metadata_test.go b/pkg/telemetrymetadata/metadata_test.go
index 7b1cc7c18e79..a2b9442e304a 100644
--- a/pkg/telemetrymetadata/metadata_test.go
+++ b/pkg/telemetrymetadata/metadata_test.go
@@ -8,6 +8,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
+ "github.com/SigNoz/signoz/pkg/telemetrymeter"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
@@ -42,6 +43,8 @@ func TestGetKeys(t *testing.T) {
telemetrytraces.SpanIndexV3TableName,
telemetrymetrics.DBName,
telemetrymetrics.AttributesMetadataTableName,
+ telemetrymeter.DBName,
+ telemetrymeter.SamplesAgg1dTableName,
telemetrylogs.DBName,
telemetrylogs.LogsV2TableName,
telemetrylogs.TagAttributesV2TableName,
diff --git a/pkg/telemetrymeter/statement_builder.go b/pkg/telemetrymeter/statement_builder.go
new file mode 100644
index 000000000000..2a6949fe8c2a
--- /dev/null
+++ b/pkg/telemetrymeter/statement_builder.go
@@ -0,0 +1,365 @@
+package telemetrymeter
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+
+ "github.com/SigNoz/signoz/pkg/factory"
+ "github.com/SigNoz/signoz/pkg/querybuilder"
+ "github.com/SigNoz/signoz/pkg/telemetrymetrics"
+ "github.com/SigNoz/signoz/pkg/types/metrictypes"
+ qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
+ "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
+ "github.com/huandu/go-sqlbuilder"
+)
+
+type meterQueryStatementBuilder struct {
+ logger *slog.Logger
+ metadataStore telemetrytypes.MetadataStore
+ fm qbtypes.FieldMapper
+ cb qbtypes.ConditionBuilder
+ metricsStatementBuilder *telemetrymetrics.MetricQueryStatementBuilder
+}
+
+var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*meterQueryStatementBuilder)(nil)
+
+func NewMeterQueryStatementBuilder(
+ settings factory.ProviderSettings,
+ metadataStore telemetrytypes.MetadataStore,
+ fieldMapper qbtypes.FieldMapper,
+ conditionBuilder qbtypes.ConditionBuilder,
+) *meterQueryStatementBuilder {
+ metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymeter")
+ metricsStatementBuilder := telemetrymetrics.NewMetricQueryStatementBuilder(settings, metadataStore, fieldMapper, conditionBuilder)
+
+ return &meterQueryStatementBuilder{
+ logger: metricsSettings.Logger(),
+ metadataStore: metadataStore,
+ fm: fieldMapper,
+ cb: conditionBuilder,
+ metricsStatementBuilder: metricsStatementBuilder,
+ }
+}
+
+func (b *meterQueryStatementBuilder) Build(
+ ctx context.Context,
+ start uint64,
+ end uint64,
+ _ qbtypes.RequestType,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ variables map[string]qbtypes.VariableItem,
+) (*qbtypes.Statement, error) {
+ keySelectors := telemetrymetrics.GetKeySelectors(query)
+ keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
+ if err != nil {
+ return nil, err
+ }
+
+ start, end = querybuilder.AdjustedMetricTimeRange(start, end, uint64(query.StepInterval.Seconds()), query)
+
+ return b.buildPipelineStatement(ctx, start, end, query, keys, variables)
+}
+
+func (b *meterQueryStatementBuilder) buildPipelineStatement(
+ ctx context.Context,
+ start, end uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ keys map[string][]*telemetrytypes.TelemetryFieldKey,
+ variables map[string]qbtypes.VariableItem,
+) (*qbtypes.Statement, error) {
+ var (
+ cteFragments []string
+ cteArgs [][]any
+ )
+
+ if b.metricsStatementBuilder.CanShortCircuitDelta(query) {
+ // spatial_aggregation_cte directly for certain delta queries
+ frag, args := b.buildTemporalAggDeltaFastPath(ctx, start, end, query, keys, variables)
+ if frag != "" {
+ cteFragments = append(cteFragments, frag)
+ cteArgs = append(cteArgs, args)
+ }
+ } else {
+ // temporal_aggregation_cte
+ if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, variables); err != nil {
+ return nil, err
+ } else if frag != "" {
+ cteFragments = append(cteFragments, frag)
+ cteArgs = append(cteArgs, args)
+ }
+
+ // spatial_aggregation_cte
+ frag, args := b.buildSpatialAggregationCTE(ctx, start, end, query, keys)
+ if frag != "" {
+ cteFragments = append(cteFragments, frag)
+ cteArgs = append(cteArgs, args)
+ }
+ }
+
+ // final SELECT
+ return b.metricsStatementBuilder.BuildFinalSelect(cteFragments, cteArgs, query)
+}
+
+func (b *meterQueryStatementBuilder) buildTemporalAggDeltaFastPath(
+ ctx context.Context,
+ start, end uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ keys map[string][]*telemetrytypes.TelemetryFieldKey,
+ variables map[string]qbtypes.VariableItem,
+) (string, []any) {
+ var filterWhere *querybuilder.PreparedWhereClause
+ var err error
+ stepSec := int64(query.StepInterval.Seconds())
+
+ sb := sqlbuilder.NewSelectBuilder()
+
+ sb.SelectMore(fmt.Sprintf(
+ "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
+ stepSec,
+ ))
+ for _, g := range query.GroupBy {
+ col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
+ if err != nil {
+ return "", []any{}
+ }
+ sb.SelectMore(col)
+ }
+
+ tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
+ aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
+ }
+
+ sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
+ sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
+ sb.Where(
+ sb.In("metric_name", query.Aggregations[0].MetricName),
+ sb.GTE("unix_milli", start),
+ sb.LT("unix_milli", end),
+ )
+ if query.Filter != nil && query.Filter.Expression != "" {
+ filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
+ FieldMapper: b.fm,
+ ConditionBuilder: b.cb,
+ FieldKeys: keys,
+ FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
+ Variables: variables,
+ })
+ if err != nil {
+ return "", []any{}
+ }
+ }
+ if filterWhere != nil {
+ sb.AddWhereClause(filterWhere.WhereClause)
+ }
+
+ if query.Aggregations[0].Temporality != metrictypes.Unknown {
+ sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
+ }
+ sb.GroupBy("ts")
+ sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
+
+ q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+ return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
+}
+
+func (b *meterQueryStatementBuilder) buildTemporalAggregationCTE(
+ ctx context.Context,
+ start, end uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ keys map[string][]*telemetrytypes.TelemetryFieldKey,
+ variables map[string]qbtypes.VariableItem,
+) (string, []any, error) {
+ if query.Aggregations[0].Temporality == metrictypes.Delta {
+ return b.buildTemporalAggDelta(ctx, start, end, query, keys, variables)
+ }
+ return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, keys, variables)
+}
+
+func (b *meterQueryStatementBuilder) buildTemporalAggDelta(
+ ctx context.Context,
+ start, end uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ keys map[string][]*telemetrytypes.TelemetryFieldKey,
+ variables map[string]qbtypes.VariableItem,
+) (string, []any, error) {
+ var filterWhere *querybuilder.PreparedWhereClause
+ var err error
+
+ stepSec := int64(query.StepInterval.Seconds())
+ sb := sqlbuilder.NewSelectBuilder()
+
+ sb.Select("fingerprint")
+ sb.SelectMore(fmt.Sprintf(
+ "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
+ stepSec,
+ ))
+
+ for _, g := range query.GroupBy {
+ col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
+ if err != nil {
+ return "", nil, err
+ }
+ sb.SelectMore(col)
+ }
+
+ tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
+ query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
+ aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec)
+ }
+
+ sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
+
+ sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
+ sb.Where(
+ sb.In("metric_name", query.Aggregations[0].MetricName),
+ sb.GTE("unix_milli", start),
+ sb.LT("unix_milli", end),
+ )
+
+ if query.Filter != nil && query.Filter.Expression != "" {
+ filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
+ FieldMapper: b.fm,
+ ConditionBuilder: b.cb,
+ FieldKeys: keys,
+ FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
+ Variables: variables,
+ })
+ if err != nil {
+ return "", nil, err
+ }
+ }
+ if filterWhere != nil {
+ sb.AddWhereClause(filterWhere.WhereClause)
+ }
+
+ if query.Aggregations[0].Temporality != metrictypes.Unknown {
+ sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
+ }
+
+ sb.GroupBy("fingerprint", "ts")
+ sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
+ sb.OrderBy("fingerprint", "ts")
+
+ q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+ return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
+}
+
+func (b *meterQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
+ ctx context.Context,
+ start, end uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ keys map[string][]*telemetrytypes.TelemetryFieldKey,
+ variables map[string]qbtypes.VariableItem,
+) (string, []any, error) {
+ var filterWhere *querybuilder.PreparedWhereClause
+ var err error
+ stepSec := int64(query.StepInterval.Seconds())
+
+ baseSb := sqlbuilder.NewSelectBuilder()
+ baseSb.Select("fingerprint")
+ baseSb.SelectMore(fmt.Sprintf(
+ "toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
+ stepSec,
+ ))
+ for _, g := range query.GroupBy {
+ col, err := b.fm.ColumnExpressionFor(ctx, &g.TelemetryFieldKey, keys)
+ if err != nil {
+ return "", nil, err
+ }
+ baseSb.SelectMore(col)
+ }
+
+ tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
+ baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
+
+ baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
+ baseSb.Where(
+ baseSb.In("metric_name", query.Aggregations[0].MetricName),
+ baseSb.GTE("unix_milli", start),
+ baseSb.LT("unix_milli", end),
+ )
+ if query.Filter != nil && query.Filter.Expression != "" {
+ filterWhere, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
+ FieldMapper: b.fm,
+ ConditionBuilder: b.cb,
+ FieldKeys: keys,
+ FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
+ Variables: variables,
+ })
+ if err != nil {
+ return "", nil, err
+ }
+ }
+ if filterWhere != nil {
+ baseSb.AddWhereClause(filterWhere.WhereClause)
+ }
+
+ if query.Aggregations[0].Temporality != metrictypes.Unknown {
+ baseSb.Where(baseSb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()))
+ }
+ baseSb.GroupBy("fingerprint", "ts")
+ baseSb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
+ baseSb.OrderBy("fingerprint", "ts")
+
+ innerQuery, innerArgs := baseSb.BuildWithFlavor(sqlbuilder.ClickHouse)
+
+ switch query.Aggregations[0].TimeAggregation {
+ case metrictypes.TimeAggregationRate:
+ rateExpr := fmt.Sprintf(telemetrymetrics.RateWithoutNegative, start, start)
+ wrapped := sqlbuilder.NewSelectBuilder()
+ wrapped.Select("ts")
+ for _, g := range query.GroupBy {
+ wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
+ }
+ wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", rateExpr))
+ wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
+ q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
+ return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
+
+ case metrictypes.TimeAggregationIncrease:
+ incExpr := fmt.Sprintf(telemetrymetrics.IncreaseWithoutNegative, start, start)
+ wrapped := sqlbuilder.NewSelectBuilder()
+ wrapped.Select("ts")
+ for _, g := range query.GroupBy {
+ wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
+ }
+ wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", incExpr))
+ wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
+ q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
+ return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
+ default:
+ return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerArgs, nil
+ }
+}
+
+func (b *meterQueryStatementBuilder) buildSpatialAggregationCTE(
+ _ context.Context,
+ _ uint64,
+ _ uint64,
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
+ _ map[string][]*telemetrytypes.TelemetryFieldKey,
+) (string, []any) {
+ sb := sqlbuilder.NewSelectBuilder()
+
+ sb.Select("ts")
+ for _, g := range query.GroupBy {
+ sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
+ }
+ sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
+ sb.From("__temporal_aggregation_cte")
+ sb.Where(sb.EQ("isNaN(per_series_value)", 0))
+ if query.Aggregations[0].ValueFilter != nil {
+ sb.Where(sb.EQ("per_series_value", query.Aggregations[0].ValueFilter.Value))
+ }
+ sb.GroupBy("ts")
+ sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
+
+ q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
+ return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
+}
diff --git a/pkg/telemetrymeter/stmt_builder_test.go b/pkg/telemetrymeter/stmt_builder_test.go
new file mode 100644
index 000000000000..62f4124d6eed
--- /dev/null
+++ b/pkg/telemetrymeter/stmt_builder_test.go
@@ -0,0 +1,191 @@
+package telemetrymeter
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
+ "github.com/SigNoz/signoz/pkg/telemetrymetrics"
+ "github.com/SigNoz/signoz/pkg/types/metrictypes"
+ qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
+ "github.com/SigNoz/signoz/pkg/types/telemetrytypes"
+ "github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
+ "github.com/stretchr/testify/require"
+)
+
+func TestStatementBuilder(t *testing.T) {
+ cases := []struct {
+ name string
+ requestType qbtypes.RequestType
+ query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
+ expected qbtypes.Statement
+ expectedErr error
+ }{
+ {
+ name: "test_cumulative_rate_sum",
+ requestType: qbtypes.RequestTypeTimeSeries,
+ query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
+ Signal: telemetrytypes.SignalMetrics,
+ StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
+ Aggregations: []qbtypes.MetricAggregation{
+ {
+ MetricName: "signoz_calls_total",
+ Type: metrictypes.SumType,
+ Temporality: metrictypes.Cumulative,
+ TimeAggregation: metrictypes.TimeAggregationRate,
+ SpaceAggregation: metrictypes.SpaceAggregationSum,
+ },
+ },
+ Filter: &qbtypes.Filter{
+ Expression: "service.name = 'cartservice'",
+ },
+ Limit: 10,
+ GroupBy: []qbtypes.GroupByKey{
+ {
+ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
+ Name: "service.name",
+ },
+ },
+ },
+ },
+ expected: qbtypes.Statement{
+ Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747785600000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
+ Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
+ },
+ expectedErr: nil,
+ },
+ {
+ name: "test_delta_rate_sum",
+ requestType: qbtypes.RequestTypeTimeSeries,
+ query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
+ Signal: telemetrytypes.SignalMetrics,
+ StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
+ Aggregations: []qbtypes.MetricAggregation{
+ {
+ MetricName: "signoz_calls_total",
+ Type: metrictypes.SumType,
+ Temporality: metrictypes.Delta,
+ TimeAggregation: metrictypes.TimeAggregationRate,
+ SpaceAggregation: metrictypes.SpaceAggregationSum,
+ },
+ },
+ Filter: &qbtypes.Filter{
+ Expression: "service.name = 'cartservice'",
+ },
+ Limit: 10,
+ GroupBy: []qbtypes.GroupByKey{
+ {
+ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
+ Name: "service.name",
+ },
+ },
+ },
+ },
+ expected: qbtypes.Statement{
+ Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
+ Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
+ },
+ expectedErr: nil,
+ },
+ {
+ name: "test_delta_rate_avg",
+ requestType: qbtypes.RequestTypeTimeSeries,
+ query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
+ Signal: telemetrytypes.SignalMetrics,
+ StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
+ Aggregations: []qbtypes.MetricAggregation{
+ {
+ MetricName: "signoz_calls_total",
+ Type: metrictypes.SumType,
+ Temporality: metrictypes.Delta,
+ TimeAggregation: metrictypes.TimeAggregationRate,
+ SpaceAggregation: metrictypes.SpaceAggregationAvg,
+ },
+ },
+ Filter: &qbtypes.Filter{
+ Expression: "service.name = 'cartservice'",
+ },
+ Limit: 10,
+ GroupBy: []qbtypes.GroupByKey{
+ {
+ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
+ Name: "service.name",
+ },
+ },
+ },
+ },
+ expected: qbtypes.Statement{
+ Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte",
+ Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
+ },
+ expectedErr: nil,
+ },
+ {
+ name: "test_gauge_avg_sum",
+ requestType: qbtypes.RequestTypeTimeSeries,
+ query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
+ Signal: telemetrytypes.SignalMetrics,
+ StepInterval: qbtypes.Step{Duration: 24 * time.Hour},
+ Aggregations: []qbtypes.MetricAggregation{
+ {
+ MetricName: "system.memory.usage",
+ Type: metrictypes.GaugeType,
+ Temporality: metrictypes.Unspecified,
+ TimeAggregation: metrictypes.TimeAggregationAvg,
+ SpaceAggregation: metrictypes.SpaceAggregationSum,
+ },
+ },
+ Filter: &qbtypes.Filter{
+ Expression: "host.name = 'big-data-node-1'",
+ },
+ Limit: 10,
+ GroupBy: []qbtypes.GroupByKey{
+ {
+ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
+ Name: "host.name",
+ },
+ },
+ },
+ },
+ expected: qbtypes.Statement{
+ Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte",
+ Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
+ },
+ expectedErr: nil,
+ },
+ }
+
+ fm := telemetrymetrics.NewFieldMapper()
+ cb := telemetrymetrics.NewConditionBuilder(fm)
+ mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
+ keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json")
+ if err != nil {
+ t.Fatalf("failed to load field keys: %v", err)
+ }
+ mockMetadataStore.KeysMap = keys
+
+ statementBuilder := NewMeterQueryStatementBuilder(
+ instrumentationtest.New().ToProviderSettings(),
+ mockMetadataStore,
+ fm,
+ cb,
+ )
+
+ for _, c := range cases {
+ t.Run(c.name, func(t *testing.T) {
+
+ q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
+
+ if c.expectedErr != nil {
+ require.Error(t, err)
+ require.Contains(t, err.Error(), c.expectedErr.Error())
+ } else {
+ require.NoError(t, err)
+ require.Equal(t, c.expected.Query, q.Query)
+ require.Equal(t, c.expected.Args, q.Args)
+ require.Equal(t, c.expected.Warnings, q.Warnings)
+ }
+ })
+ }
+}
diff --git a/pkg/telemetrymeter/tables.go b/pkg/telemetrymeter/tables.go
new file mode 100644
index 000000000000..6bc91e9f5425
--- /dev/null
+++ b/pkg/telemetrymeter/tables.go
@@ -0,0 +1,194 @@
+package telemetrymeter
+
+import (
+ "time"
+
+ "github.com/SigNoz/signoz/pkg/types/metrictypes"
+)
+
+const (
+ DBName = "signoz_meter"
+ SamplesTableName = "distributed_samples"
+ SamplesLocalTableName = "samples"
+ SamplesAgg1dTableName = "distributed_samples_agg_1d"
+ SamplesAgg1dLocalTableName = "samples_agg_1d"
+)
+
+var (
+ oneMonthInMilliseconds = uint64(time.Hour.Milliseconds() * 24 * 30)
+
+ // when the query requests for almost 1 day, but not exactly 1 day, we need to add an offset to the end time
+ // to make sure that we are using the correct table
+ // this is because the start gets adjusted to the nearest step interval and uses the 5m table for 4m step interval
+ // leading to time series that doesn't best represent the rate of change
+ offsetBucket = uint64(1 * time.Hour.Milliseconds())
+)
+
+// start and end are in milliseconds
+// we have two tables for samples
+// 1. distributed_samples
+// 2. distributed_samples_agg_1d - for queries with time range above or equal to 30 days
+// if the `timeAggregation` is `count_distinct` we can't use the aggregated tables because they don't support it
+func WhichSamplesTableToUse(
+ start, end uint64,
+ metricType metrictypes.Type,
+ timeAggregation metrictypes.TimeAggregation,
+ tableHints *metrictypes.MetricTableHints,
+) string {
+
+ // if we have a hint for the table, we need to use it
+ // the hint will be used to override the default table selection logic
+ if tableHints != nil {
+ if tableHints.SamplesTableName != "" {
+ return tableHints.SamplesTableName
+ }
+ }
+
+ // if the time aggregation is count_distinct, we need to use the distributed_samples table
+ // because the aggregated tables don't support count_distinct
+ if timeAggregation == metrictypes.TimeAggregationCountDistinct {
+ return SamplesTableName
+ }
+
+ if end-start < oneMonthInMilliseconds+offsetBucket {
+ return SamplesTableName
+ }
+ return SamplesAgg1dTableName
+
+}
+
+func AggregationColumnForSamplesTable(
+ start, end uint64,
+ metricType metrictypes.Type,
+ temporality metrictypes.Temporality,
+ timeAggregation metrictypes.TimeAggregation,
+ tableHints *metrictypes.MetricTableHints,
+) string {
+ tableName := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
+ var aggregationColumn string
+ switch temporality {
+ case metrictypes.Delta:
+ switch tableName {
+ case SamplesTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(value)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(value)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "avg(value)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(value)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(value)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "count(value)"
+ case metrictypes.TimeAggregationCountDistinct:
+ aggregationColumn = "countDistinct(value)"
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
+ aggregationColumn = "sum(value)"
+ }
+ case SamplesAgg1dTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(last)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(sum)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "sum(sum) / sum(count)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(min)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(max)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "sum(count)"
+ // count_distinct is not supported in aggregated tables
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
+ aggregationColumn = "sum(sum)"
+ }
+ }
+ case metrictypes.Cumulative:
+ switch tableName {
+ case SamplesTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(value)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(value)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "avg(value)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(value)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(value)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "count(value)"
+ case metrictypes.TimeAggregationCountDistinct:
+ aggregationColumn = "countDistinct(value)"
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
+ aggregationColumn = "max(value)"
+ }
+ case SamplesAgg1dTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(last)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(sum)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "sum(sum) / sum(count)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(min)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(max)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "sum(count)"
+ // count_distinct is not supported in aggregated tables
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // only these two options give meaningful results
+ aggregationColumn = "max(max)"
+ }
+ }
+
+ case metrictypes.Unspecified:
+ switch tableName {
+ case SamplesTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(value)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(value)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "avg(value)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(value)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(value)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "count(value)"
+ case metrictypes.TimeAggregationCountDistinct:
+ aggregationColumn = "countDistinct(value)"
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
+ aggregationColumn = "sum(value)"
+ }
+ case SamplesAgg1dTableName:
+ switch timeAggregation {
+ case metrictypes.TimeAggregationLatest:
+ aggregationColumn = "anyLast(last)"
+ case metrictypes.TimeAggregationSum:
+ aggregationColumn = "sum(sum)"
+ case metrictypes.TimeAggregationAvg:
+ aggregationColumn = "sum(sum) / sum(count)"
+ case metrictypes.TimeAggregationMin:
+ aggregationColumn = "min(min)"
+ case metrictypes.TimeAggregationMax:
+ aggregationColumn = "max(max)"
+ case metrictypes.TimeAggregationCount:
+ aggregationColumn = "sum(count)"
+ // count_distinct is not supported in aggregated tables
+ case metrictypes.TimeAggregationRate, metrictypes.TimeAggregationIncrease: // ideally, this should never happen
+ aggregationColumn = "sum(sum)"
+ }
+ }
+
+ }
+ return aggregationColumn
+}
diff --git a/pkg/telemetrymeter/testdata/keys_map.json b/pkg/telemetrymeter/testdata/keys_map.json
new file mode 100644
index 000000000000..d1c27a3bf460
--- /dev/null
+++ b/pkg/telemetrymeter/testdata/keys_map.json
@@ -0,0 +1,34 @@
+{
+ "service.name": [
+ {
+ "name": "service.name",
+ "fieldContext": "resource",
+ "fieldDataType": "string",
+ "signal": "metrics"
+ }
+ ],
+ "http.request.method": [
+ {
+ "name": "http.request.method",
+ "fieldContext": "attribute",
+ "fieldDataType": "string",
+ "signal": "metrics"
+ }
+ ],
+ "http.response.status_code": [
+ {
+ "name": "http.response.status_code",
+ "fieldContext": "attribute",
+ "fieldDataType": "int",
+ "signal": "metrics"
+ }
+ ],
+ "host.name": [
+ {
+ "name": "host.name",
+ "fieldContext": "resource",
+ "fieldDataType": "string",
+ "signal": "metrics"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go
index 13dfbd1c1f1e..839e15757824 100644
--- a/pkg/telemetrymetrics/statement_builder.go
+++ b/pkg/telemetrymetrics/statement_builder.go
@@ -19,23 +19,23 @@ const (
IncreaseWithoutNegative = `If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, ((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window)) * (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(%d))) OVER rate_window))`
)
-type metricQueryStatementBuilder struct {
+type MetricQueryStatementBuilder struct {
logger *slog.Logger
metadataStore telemetrytypes.MetadataStore
fm qbtypes.FieldMapper
cb qbtypes.ConditionBuilder
}
-var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*metricQueryStatementBuilder)(nil)
+var _ qbtypes.StatementBuilder[qbtypes.MetricAggregation] = (*MetricQueryStatementBuilder)(nil)
func NewMetricQueryStatementBuilder(
settings factory.ProviderSettings,
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
-) *metricQueryStatementBuilder {
+) *MetricQueryStatementBuilder {
metricsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrymetrics")
- return &metricQueryStatementBuilder{
+ return &MetricQueryStatementBuilder{
logger: metricsSettings.Logger(),
metadataStore: metadataStore,
fm: fieldMapper,
@@ -43,7 +43,7 @@ func NewMetricQueryStatementBuilder(
}
}
-func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
+func GetKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) []*telemetrytypes.FieldKeySelector {
var keySelectors []*telemetrytypes.FieldKeySelector
if query.Filter != nil && query.Filter.Expression != "" {
whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression)
@@ -72,7 +72,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation])
return keySelectors
}
-func (b *metricQueryStatementBuilder) Build(
+func (b *MetricQueryStatementBuilder) Build(
ctx context.Context,
start uint64,
end uint64,
@@ -80,7 +80,7 @@ func (b *metricQueryStatementBuilder) Build(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
- keySelectors := getKeySelectors(query)
+ keySelectors := GetKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)
if err != nil {
return nil, err
@@ -113,7 +113,7 @@ func (b *metricQueryStatementBuilder) Build(
// we can directly use the quantilesDDMerge function
//
// all of this is true only for delta metrics
-func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
+func (b *MetricQueryStatementBuilder) CanShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool {
if q.Aggregations[0].Temporality != metrictypes.Delta {
return false
}
@@ -139,7 +139,7 @@ func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilde
return false
}
-func (b *metricQueryStatementBuilder) buildPipelineStatement(
+func (b *MetricQueryStatementBuilder) buildPipelineStatement(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@@ -200,7 +200,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
return nil, err
}
- if b.canShortCircuitDelta(query) {
+ if b.CanShortCircuitDelta(query) {
// spatial_aggregation_cte directly for certain delta queries
frag, args := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
if frag != "" {
@@ -230,10 +230,10 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement(
query.GroupBy = origGroupBy
// final SELECT
- return b.buildFinalSelect(cteFragments, cteArgs, query)
+ return b.BuildFinalSelect(cteFragments, cteArgs, query)
}
-func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
+func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
@@ -281,7 +281,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
-func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
+func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@@ -344,7 +344,7 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE(
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
}
-func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
+func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@@ -358,7 +358,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE(
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
-func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
+func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@@ -401,7 +401,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta(
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
-func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
+func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
@@ -466,7 +466,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
}
-func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
+func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
@@ -492,7 +492,7 @@ func (b *metricQueryStatementBuilder) buildSpatialAggregationCTE(
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
}
-func (b *metricQueryStatementBuilder) buildFinalSelect(
+func (b *MetricQueryStatementBuilder) BuildFinalSelect(
cteFragments []string,
cteArgs [][]any,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
diff --git a/pkg/telemetrystore/config.go b/pkg/telemetrystore/config.go
index 120740b2631c..fc7c050a01fe 100644
--- a/pkg/telemetrystore/config.go
+++ b/pkg/telemetrystore/config.go
@@ -40,11 +40,12 @@ type ClickhouseConfig struct {
}
type QuerySettings struct {
- MaxExecutionTime int `mapstructure:"max_execution_time"`
- MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
- TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
- MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
- MaxResultRows int `mapstructure:"max_result_rows"`
+ MaxExecutionTime int `mapstructure:"max_execution_time"`
+ MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"`
+ TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"`
+ MaxBytesToRead int `mapstructure:"max_bytes_to_read"`
+ MaxResultRows int `mapstructure:"max_result_rows"`
+ IgnoreDataSkippingIndices string `mapstructure:"ignore_data_skipping_indices"`
}
func NewConfigFactory() factory.ConfigFactory {
diff --git a/pkg/telemetrystore/telemetrystorehook/settings.go b/pkg/telemetrystore/telemetrystorehook/settings.go
index af0ef93a3127..d317b942f058 100644
--- a/pkg/telemetrystore/telemetrystorehook/settings.go
+++ b/pkg/telemetrystore/telemetrystorehook/settings.go
@@ -53,6 +53,10 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed
}
+ if h.settings.IgnoreDataSkippingIndices != "" {
+ settings["ignore_data_skipping_indices"] = h.settings.IgnoreDataSkippingIndices
+ }
+
if ctx.Value("clickhouse_max_threads") != nil {
if maxThreads, ok := ctx.Value("clickhouse_max_threads").(int); ok {
settings["max_threads"] = maxThreads
diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go
index d3a7b83331ea..659c7cf604f2 100644
--- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go
+++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_query.go
@@ -14,6 +14,9 @@ type QueryBuilderQuery[T any] struct {
// signal to query
Signal telemetrytypes.Signal `json:"signal,omitempty"`
+ // source for query
+ Source telemetrytypes.Source `json:"source,omitempty"`
+
// we want to support multiple aggregations
// currently supported: []Aggregation, []MetricAggregation
Aggregations []T `json:"aggregations,omitempty"`
diff --git a/pkg/types/quickfiltertypes/filter.go b/pkg/types/quickfiltertypes/filter.go
index ac436a451b3b..b86d2cb1929f 100644
--- a/pkg/types/quickfiltertypes/filter.go
+++ b/pkg/types/quickfiltertypes/filter.go
@@ -35,6 +35,7 @@ var (
SignalLogs = Signal{valuer.NewString("logs")}
SignalApiMonitoring = Signal{valuer.NewString("api_monitoring")}
SignalExceptions = Signal{valuer.NewString("exceptions")}
+ SignalMeter = Signal{valuer.NewString("meter")}
)
// NewSignal creates a Signal from a string
@@ -48,6 +49,8 @@ func NewSignal(s string) (Signal, error) {
return SignalApiMonitoring, nil
case "exceptions":
return SignalExceptions, nil
+ case "meter":
+ return SignalMeter, nil
default:
return Signal{}, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid signal: %s", s)
}
@@ -178,6 +181,12 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
{"key": "k8s.pod.name", "dataType": "string", "type": "resource"},
}
+ meterFilters := []map[string]interface{}{
+ {"key": "deployment.environment", "dataType": "float64", "type": "Sum"},
+ {"key": "service.name", "dataType": "float64", "type": "Sum"},
+ {"key": "host.name", "dataType": "float64", "type": "Sum"},
+ }
+
tracesJSON, err := json.Marshal(tracesFilters)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal traces filters")
@@ -190,12 +199,19 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
apiMonitoringJSON, err := json.Marshal(apiMonitoringFilters)
if err != nil {
- return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Api Monitoring filters")
+ return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal api monitoring filters")
}
+
exceptionsJSON, err := json.Marshal(exceptionsFilters)
if err != nil {
- return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal Exceptions filters")
+ return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal exceptions filters")
}
+
+ meterJSON, err := json.Marshal(meterFilters)
+ if err != nil {
+ return nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal meter filters")
+ }
+
timeRightNow := time.Now()
return []*StorableQuickFilter{
@@ -247,5 +263,17 @@ func NewDefaultQuickFilter(orgID valuer.UUID) ([]*StorableQuickFilter, error) {
UpdatedAt: timeRightNow,
},
},
+ {
+ Identifiable: types.Identifiable{
+ ID: valuer.GenerateUUID(),
+ },
+ OrgID: orgID,
+ Filter: string(meterJSON),
+ Signal: SignalMeter,
+ TimeAuditable: types.TimeAuditable{
+ CreatedAt: timeRightNow,
+ UpdatedAt: timeRightNow,
+ },
+ },
}, nil
}
diff --git a/pkg/types/telemetrytypes/field.go b/pkg/types/telemetrytypes/field.go
index 206586db4bfa..398fc19854a3 100644
--- a/pkg/types/telemetrytypes/field.go
+++ b/pkg/types/telemetrytypes/field.go
@@ -121,6 +121,7 @@ type FieldKeySelector struct {
StartUnixMilli int64 `json:"startUnixMilli"`
EndUnixMilli int64 `json:"endUnixMilli"`
Signal Signal `json:"signal"`
+ Source Source `json:"source"`
FieldContext FieldContext `json:"fieldContext"`
FieldDataType FieldDataType `json:"fieldDataType"`
Name string `json:"name"`
diff --git a/pkg/types/telemetrytypes/source.go b/pkg/types/telemetrytypes/source.go
new file mode 100644
index 000000000000..247a9a5a5b3c
--- /dev/null
+++ b/pkg/types/telemetrytypes/source.go
@@ -0,0 +1,12 @@
+package telemetrytypes
+
+import "github.com/SigNoz/signoz/pkg/valuer"
+
+type Source struct {
+ valuer.String
+}
+
+var (
+ SourceMeter = Source{valuer.NewString("meter")}
+ SourceUnspecified = Source{valuer.NewString("")}
+)