signoz/pkg/querier/promql_query.go
Srikanth Chekuri 85f04e4bae
chore: add querier HTTP API endpoint and bucket cache implementation (#8178)
* chore: update types
1. add partial bool to indicate if the value covers the partial interval
2. add optional unit if present (ex: duration_nano, metrics with units)
3. use pointers wherever necessary
4. add format options for request and remove redundant name in query envelope

* chore: fix some gaps
1. make the range as [start, end)
2. provide the logs statement builder with the body column
3. skip the body filter on resource filter statement builder
4. remove unnecessary agg expr rewriter in metrics
5. add ability to skip full text in where clause visitor

* chore: add API endpoint for new query range

* chore: add bucket cache implementation

* chore: add fingerprinting impl and add bucket cache to querier

* chore: add provider factory
2025-06-10 12:56:28 +00:00

134 lines
3.2 KiB
Go

package querier
import (
"context"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/prometheus/prometheus/promql"
)
type promqlQuery struct {
promEngine prometheus.Prometheus
query qbv5.PromQuery
tr qbv5.TimeRange
requestType qbv5.RequestType
}
var _ qbv5.Query = (*promqlQuery)(nil)
func newPromqlQuery(
promEngine prometheus.Prometheus,
query qbv5.PromQuery,
tr qbv5.TimeRange,
requestType qbv5.RequestType,
) *promqlQuery {
return &promqlQuery{promEngine, query, tr, requestType}
}
func (q *promqlQuery) Fingerprint() string {
parts := []string{
"promql",
q.query.Query,
q.query.Step.Duration.String(),
}
return strings.Join(parts, "&")
}
func (q *promqlQuery) Window() (uint64, uint64) {
return q.tr.From, q.tr.To
}
func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
start := int64(querybuilder.ToNanoSecs(q.tr.From))
end := int64(querybuilder.ToNanoSecs(q.tr.To))
qry, err := q.promEngine.Engine().NewRangeQuery(
ctx,
q.promEngine.Storage(),
nil,
q.query.Query,
time.Unix(0, start),
time.Unix(0, end),
q.query.Step.Duration,
)
if err != nil {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query %q", q.query.Query)
}
res := qry.Exec(ctx)
if res.Err != nil {
var eqc promql.ErrQueryCanceled
var eqt promql.ErrQueryTimeout
var es promql.ErrStorage
switch {
case errors.As(res.Err, &eqc):
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
case errors.As(res.Err, &eqt):
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "query timeout")
case errors.As(res.Err, &es):
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
}
if errors.Is(res.Err, context.Canceled) {
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
}
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
}
defer qry.Close()
matrix, promErr := res.Matrix()
if promErr != nil {
return nil, errors.WrapInternalf(promErr, errors.CodeInternal, "error getting matrix from promql query %q", q.query.Query)
}
var series []*qbv5.TimeSeries
for _, v := range matrix {
var s qbv5.TimeSeries
lbls := make([]*qbv5.Label, 0, len(v.Metric))
for name, value := range v.Metric.Copy().Map() {
lbls = append(lbls, &qbv5.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: value,
})
}
s.Labels = lbls
for idx := range v.Floats {
p := v.Floats[idx]
s.Values = append(s.Values, &qbv5.TimeSeriesValue{
Timestamp: p.T,
Value: p.F,
})
}
series = append(series, &s)
}
warnings, _ := res.Warnings.AsStrings(q.query.Query, 10, 0)
return &qbv5.Result{
Type: q.requestType,
Value: &qbv5.TimeSeriesData{
QueryName: q.query.Name,
Aggregations: []*qbv5.AggregationBucket{
{
Series: series,
},
},
},
Warnings: warnings,
// TODO: map promql stats?
}, nil
}