245 lines
7.7 KiB
Go

package analyticsstatsreporter
import (
"context"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/analytics/segmentanalytics"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/statsreporter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/SigNoz/signoz/pkg/version"
)
type provider struct {
// settings
settings factory.ScopedProviderSettings
// config
config statsreporter.Config
// used to get telemetry details. srikanthcvv to move this to the querier layer
telemetryStore telemetrystore.TelemetryStore
// a list of collectors, used to collect stats from across the codebase
collectors []statsreporter.StatsCollector
// used to get organizations
orgGetter organization.Getter
// used to get users
userGetter user.Getter
// used to send stats to an analytics backend
analytics analytics.Analytics
// used to get build information
build version.Build
// used to get deployment information
deployment version.Deployment
// used to stop the provider
stopC chan struct{}
}
func NewFactory(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, build version.Build, analyticsConfig analytics.Config) factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config] {
return factory.NewProviderFactory(factory.MustNewName("analytics"), func(ctx context.Context, settings factory.ProviderSettings, config statsreporter.Config) (statsreporter.StatsReporter, error) {
return New(ctx, settings, config, telemetryStore, collectors, orgGetter, userGetter, build, analyticsConfig)
})
}
func New(
ctx context.Context,
providerSettings factory.ProviderSettings,
config statsreporter.Config,
telemetryStore telemetrystore.TelemetryStore,
collectors []statsreporter.StatsCollector,
orgGetter organization.Getter,
userGetter user.Getter,
build version.Build,
analyticsConfig analytics.Config,
) (statsreporter.StatsReporter, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/statsreporter/analyticsstatsreporter")
deployment := version.NewDeployment()
analytics, err := segmentanalytics.New(ctx, providerSettings, analyticsConfig)
if err != nil {
return nil, err
}
return &provider{
settings: settings,
config: config,
telemetryStore: telemetryStore,
collectors: collectors,
orgGetter: orgGetter,
userGetter: userGetter,
analytics: analytics,
build: build,
deployment: deployment,
stopC: make(chan struct{}),
}, nil
}
func (provider *provider) Start(ctx context.Context) error {
go func() {
if err := provider.analytics.Start(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to start analytics", "error", err)
}
}()
ticker := time.NewTicker(provider.config.Interval)
defer ticker.Stop()
for {
select {
case <-provider.stopC:
return nil
case <-ticker.C:
if err := provider.Report(ctx); err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to report stats", "error", err)
}
}
}
}
func (provider *provider) Report(ctx context.Context) error {
orgs, err := provider.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
return err
}
for _, org := range orgs {
stats := provider.collectOrg(ctx, org.ID)
if len(stats) == 0 {
provider.settings.Logger().WarnContext(ctx, "no stats collected", "org_id", org.ID)
continue
}
// Add build and deployment stats
stats["build.version"] = provider.build.Version()
stats["build.branch"] = provider.build.Branch()
stats["build.hash"] = provider.build.Hash()
stats["build.variant"] = provider.build.Variant()
stats["deployment.mode"] = provider.deployment.Mode()
stats["deployment.platform"] = provider.deployment.Platform()
stats["deployment.os"] = provider.deployment.OS()
stats["deployment.arch"] = provider.deployment.Arch()
// Add org stats
stats["display_name"] = org.DisplayName
stats["name"] = org.Name
stats["created_at"] = org.CreatedAt
stats["alias"] = org.Alias
provider.settings.Logger().DebugContext(ctx, "reporting stats", "stats", stats)
provider.analytics.IdentifyGroup(ctx, org.ID.String(), stats)
provider.analytics.TrackGroup(ctx, org.ID.String(), "Stats Reported", stats)
if !provider.config.Collect.Identities {
continue
}
users, err := provider.userGetter.ListByOrgID(ctx, org.ID)
if err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to list users", "error", err, "org_id", org.ID)
continue
}
for _, user := range users {
provider.analytics.IdentifyUser(ctx, org.ID.String(), user.ID.String(), types.NewTraitsFromUser(user))
}
}
return nil
}
func (provider *provider) Stop(ctx context.Context) error {
close(provider.stopC)
// report stats on stop
if err := provider.Report(ctx); err != nil {
provider.settings.Logger().WarnContext(ctx, "failed to report stats", "error", err)
}
if err := provider.analytics.Stop(ctx); err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to stop analytics", "error", err)
}
return nil
}
func (provider *provider) collectOrg(ctx context.Context, orgID valuer.UUID) map[string]any {
var wg sync.WaitGroup
wg.Add(len(provider.collectors))
stats := make(map[string]any, 0)
mtx := sync.Mutex{}
for _, collector := range provider.collectors {
go func(collector statsreporter.StatsCollector) {
defer wg.Done()
collectorStats, err := collector.Collect(ctx, orgID)
if err != nil {
provider.settings.Logger().ErrorContext(ctx, "failed to collect stats", "error", err)
return
}
mtx.Lock()
for k, v := range collectorStats {
stats[k] = v
}
mtx.Unlock()
}(collector)
}
wg.Wait()
var traces uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_traces.distributed_signoz_index_v3").Scan(&traces); err == nil {
stats["telemetry.traces.count"] = traces
}
var logs uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_logs.distributed_logs_v2").Scan(&logs); err == nil {
stats["telemetry.logs.count"] = logs
}
var metrics uint64
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT COUNT(*) FROM signoz_metrics.distributed_samples_v4").Scan(&metrics); err == nil {
stats["telemetry.metrics.count"] = metrics
}
var tracesLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT max(timestamp) FROM signoz_traces.distributed_signoz_index_v3").Scan(&tracesLastSeenAt); err == nil {
if tracesLastSeenAt.Unix() != 0 {
stats["telemetry.traces.last_observed.time"] = tracesLastSeenAt.UTC()
stats["telemetry.traces.last_observed.time_unix"] = tracesLastSeenAt.Unix()
}
}
var logsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT fromUnixTimestamp64Nano(max(timestamp)) FROM signoz_logs.distributed_logs_v2").Scan(&logsLastSeenAt); err == nil {
if logsLastSeenAt.Unix() != 0 {
stats["telemetry.logs.last_observed.time"] = logsLastSeenAt.UTC()
stats["telemetry.logs.last_observed.time_unix"] = logsLastSeenAt.Unix()
}
}
var metricsLastSeenAt time.Time
if err := provider.telemetryStore.ClickhouseDB().QueryRow(ctx, "SELECT toDateTime(max(unix_milli) / 1000) FROM signoz_metrics.distributed_samples_v4").Scan(&metricsLastSeenAt); err == nil {
if metricsLastSeenAt.Unix() != 0 {
stats["telemetry.metrics.last_observed.time"] = metricsLastSeenAt.UTC()
stats["telemetry.metrics.last_observed.time_unix"] = metricsLastSeenAt.Unix()
}
}
return stats
}