mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-19 08:26:06 +00:00
* chore: update types 1. add partial bool to indicate if the value covers the partial interval 2. add optional unit if present (ex: duration_nano, metrics with units) 3. use pointers wherever necessary 4. add format options for request and remove redundant name in query envelope * chore: fix some gaps 1. make the range as [start, end) 2. provide the logs statement builder with the body column 3. skip the body filter on resource filter statement builder 4. remove unnecessary agg expr rewriter in metrics 5. add ability to skip full text in where clause visitor * chore: add API endpoint for new query range * chore: add bucket cache implementation * chore: add fingerprinting impl and add bucket cache to querier * chore: add provider factory
134 lines
3.2 KiB
Go
134 lines
3.2 KiB
Go
package querier
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
|
"github.com/prometheus/prometheus/promql"
|
|
)
|
|
|
|
type promqlQuery struct {
|
|
promEngine prometheus.Prometheus
|
|
query qbv5.PromQuery
|
|
tr qbv5.TimeRange
|
|
requestType qbv5.RequestType
|
|
}
|
|
|
|
var _ qbv5.Query = (*promqlQuery)(nil)
|
|
|
|
func newPromqlQuery(
|
|
promEngine prometheus.Prometheus,
|
|
query qbv5.PromQuery,
|
|
tr qbv5.TimeRange,
|
|
requestType qbv5.RequestType,
|
|
) *promqlQuery {
|
|
return &promqlQuery{promEngine, query, tr, requestType}
|
|
}
|
|
|
|
func (q *promqlQuery) Fingerprint() string {
|
|
parts := []string{
|
|
"promql",
|
|
q.query.Query,
|
|
q.query.Step.Duration.String(),
|
|
}
|
|
|
|
return strings.Join(parts, "&")
|
|
}
|
|
|
|
func (q *promqlQuery) Window() (uint64, uint64) {
|
|
return q.tr.From, q.tr.To
|
|
}
|
|
|
|
func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
|
|
|
|
start := int64(querybuilder.ToNanoSecs(q.tr.From))
|
|
end := int64(querybuilder.ToNanoSecs(q.tr.To))
|
|
|
|
qry, err := q.promEngine.Engine().NewRangeQuery(
|
|
ctx,
|
|
q.promEngine.Storage(),
|
|
nil,
|
|
q.query.Query,
|
|
time.Unix(0, start),
|
|
time.Unix(0, end),
|
|
q.query.Step.Duration,
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query %q", q.query.Query)
|
|
}
|
|
|
|
res := qry.Exec(ctx)
|
|
if res.Err != nil {
|
|
var eqc promql.ErrQueryCanceled
|
|
var eqt promql.ErrQueryTimeout
|
|
var es promql.ErrStorage
|
|
switch {
|
|
case errors.As(res.Err, &eqc):
|
|
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
|
|
case errors.As(res.Err, &eqt):
|
|
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "query timeout")
|
|
case errors.As(res.Err, &es):
|
|
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
|
|
}
|
|
|
|
if errors.Is(res.Err, context.Canceled) {
|
|
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
|
|
}
|
|
|
|
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
|
|
}
|
|
|
|
defer qry.Close()
|
|
|
|
matrix, promErr := res.Matrix()
|
|
if promErr != nil {
|
|
return nil, errors.WrapInternalf(promErr, errors.CodeInternal, "error getting matrix from promql query %q", q.query.Query)
|
|
}
|
|
|
|
var series []*qbv5.TimeSeries
|
|
for _, v := range matrix {
|
|
var s qbv5.TimeSeries
|
|
lbls := make([]*qbv5.Label, 0, len(v.Metric))
|
|
for name, value := range v.Metric.Copy().Map() {
|
|
lbls = append(lbls, &qbv5.Label{
|
|
Key: telemetrytypes.TelemetryFieldKey{Name: name},
|
|
Value: value,
|
|
})
|
|
}
|
|
|
|
s.Labels = lbls
|
|
|
|
for idx := range v.Floats {
|
|
p := v.Floats[idx]
|
|
s.Values = append(s.Values, &qbv5.TimeSeriesValue{
|
|
Timestamp: p.T,
|
|
Value: p.F,
|
|
})
|
|
}
|
|
series = append(series, &s)
|
|
}
|
|
|
|
warnings, _ := res.Warnings.AsStrings(q.query.Query, 10, 0)
|
|
|
|
return &qbv5.Result{
|
|
Type: q.requestType,
|
|
Value: &qbv5.TimeSeriesData{
|
|
QueryName: q.query.Name,
|
|
Aggregations: []*qbv5.AggregationBucket{
|
|
{
|
|
Series: series,
|
|
},
|
|
},
|
|
},
|
|
Warnings: warnings,
|
|
// TODO: map promql stats?
|
|
}, nil
|
|
}
|