fix: check ch version (#8778)

Check the clickhouse version, before the setting secondary_indices_enable_bulk_filtering is used.
This commit is contained in:
Nityananda Gohain 2025-08-13 14:57:25 +05:30 committed by GitHub
parent 802ce6de01
commit dac46d82ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 40 additions and 19 deletions

View File

@ -136,7 +136,14 @@ func NewSQLMigrationProviderFactories(
func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] { func NewTelemetryStoreProviderFactories() factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] {
return factory.MustNewNamedMap( return factory.MustNewNamedMap(
clickhousetelemetrystore.NewFactory(telemetrystorehook.NewSettingsFactory(), telemetrystorehook.NewLoggingFactory()), clickhousetelemetrystore.NewFactory(
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewSettingsFactory(s)
}),
telemetrystore.TelemetryStoreHookFactoryFunc(func(s string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return telemetrystorehook.NewLoggingFactory()
}),
),
) )
} }

View File

@ -16,22 +16,13 @@ type provider struct {
hooks []telemetrystore.TelemetryStoreHook hooks []telemetrystore.TelemetryStoreHook
} }
func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] { func NewFactory(hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) { return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) {
// we want to fail fast so we have hook registration errors before creating the telemetry store return New(ctx, providerSettings, config, hookFactories...)
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory.New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return New(ctx, providerSettings, config, hooks...)
}) })
} }
func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) { func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hookFactories ...telemetrystore.TelemetryStoreHookFactoryFunc) (telemetrystore.TelemetryStore, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore") settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/telemetrystore/clickhousetelemetrystore")
options, err := clickhouse.ParseDSN(config.Clickhouse.DSN) options, err := clickhouse.ParseDSN(config.Clickhouse.DSN)
@ -47,6 +38,20 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err return nil, err
} }
var version string
if err := chConn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
return nil, err
}
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory(version).New(ctx, providerSettings, config)
if err != nil {
return nil, err
}
hooks[i] = hook
}
return &provider{ return &provider{
settings: settings, settings: settings,
clickHouseConn: chConn, clickHouseConn: chConn,

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory"
) )
type TelemetryStore interface { type TelemetryStore interface {
@ -19,6 +20,8 @@ type TelemetryStoreHook interface {
AfterQuery(ctx context.Context, event *QueryEvent) AfterQuery(ctx context.Context, event *QueryEvent)
} }
type TelemetryStoreHookFactoryFunc func(string) factory.ProviderFactory[TelemetryStoreHook, Config]
func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context { func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, event *QueryEvent) context.Context {
for _, hook := range hooks { for _, hook := range hooks {
ctx = hook.BeforeQuery(ctx, event) ctx = hook.BeforeQuery(ctx, event)

View File

@ -3,6 +3,7 @@ package telemetrystorehook
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"strings"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/factory" "github.com/SigNoz/signoz/pkg/factory"
@ -11,15 +12,19 @@ import (
) )
type provider struct { type provider struct {
clickHouseVersion string
settings telemetrystore.QuerySettings settings telemetrystore.QuerySettings
} }
func NewSettingsFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { func NewSettingsFactory(version string) factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] {
return factory.NewProviderFactory(factory.MustNewName("settings"), NewSettings) return factory.NewProviderFactory(factory.MustNewName("settings"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) {
return NewSettings(ctx, providerSettings, config, version)
})
} }
func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) { func NewSettings(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, version string) (telemetrystore.TelemetryStoreHook, error) {
return &provider{ return &provider{
clickHouseVersion: version,
settings: config.Clickhouse.QuerySettings, settings: config.Clickhouse.QuerySettings,
}, nil }, nil
} }
@ -75,7 +80,8 @@ func (h *provider) BeforeQuery(ctx context.Context, _ *telemetrystore.QueryEvent
settings["result_overflow_mode"] = ctx.Value("result_overflow_mode") settings["result_overflow_mode"] = ctx.Value("result_overflow_mode")
} }
if !h.settings.SecondaryIndicesEnableBulkFiltering { // ClickHouse version check is added since this setting is not support on version below 25.5
if strings.HasPrefix(h.clickHouseVersion, "25") && !h.settings.SecondaryIndicesEnableBulkFiltering {
// TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed // TODO(srikanthccv): enable it when the "Cannot read all data" issue is fixed
// https://github.com/ClickHouse/ClickHouse/issues/82283 // https://github.com/ClickHouse/ClickHouse/issues/82283
settings["secondary_indices_enable_bulk_filtering"] = false settings["secondary_indices_enable_bulk_filtering"] = false