diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 5004d36ab107..497100e5738e 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -79,6 +79,9 @@ type Server struct { // feature flags featureLookup baseint.FeatureLookup + // Usage manager + usageManager *usage.Manager + unavailableChannel chan healthcheck.Status } @@ -199,6 +202,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { ruleManager: rm, serverOptions: serverOptions, unavailableChannel: make(chan healthcheck.Status), + usageManager: usageManager, } httpServer, err := s.createPublicServer(apiHandler) @@ -560,6 +564,9 @@ func (s *Server) Stop() error { s.ruleManager.Stop() } + // stop usage manager + s.usageManager.Stop() + return nil } diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 08a807c8618d..ffd439cd32d8 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -144,6 +144,7 @@ func main() { logger.Info("Received HealthCheck status: ", zap.Int("status", int(status))) case <-signalsChannel: logger.Fatal("Received OS Interrupt Signal ... ") + server.Stop() } } } diff --git a/ee/query-service/usage/manager.go b/ee/query-service/usage/manager.go index 4f361d465146..335eae814387 100644 --- a/ee/query-service/usage/manager.go +++ b/ee/query-service/usage/manager.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/go-co-op/gocron" "github.com/google/uuid" "github.com/jmoiron/sqlx" @@ -28,9 +29,6 @@ const ( ) var ( - // send usage every 24 hour - uploadFrequency = 24 * time.Hour - locker = stateUnlocked ) @@ -39,12 +37,7 @@ type Manager struct { licenseRepo *license.Repo - // end the usage routine, this is important to gracefully - // stopping usage reporting and protect in-consistent updates - done chan struct{} - - // terminated waits for the UsageExporter go routine to end - terminated chan struct{} + scheduler *gocron.Scheduler } func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { @@ -53,6 +46,7 @@ func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn c // repository: repo, clickhouseConn: clickhouseConn, licenseRepo: licenseRepo, + scheduler: gocron.NewScheduler(time.UTC).Every(1).Day().At("00:00"), // send usage every at 00:00 UTC } return m, nil } @@ -64,37 +58,30 @@ func (lm *Manager) Start() error { return fmt.Errorf("usage exporter is locked") } - go lm.UsageExporter(context.Background()) + _, err := lm.scheduler.Do(func() { lm.UploadUsage() }) + if err != nil { + return err + } + + // upload usage once when starting the service + lm.UploadUsage() + + lm.scheduler.StartAsync() return nil } - -func (lm *Manager) UsageExporter(ctx context.Context) { - defer close(lm.terminated) - - uploadTicker := time.NewTicker(uploadFrequency) - defer uploadTicker.Stop() - - for { - select { - case <-lm.done: - return - case <-uploadTicker.C: - lm.UploadUsage(ctx) - } - } -} - -func (lm *Manager) UploadUsage(ctx context.Context) error { +func (lm *Manager) UploadUsage() { + ctx := context.Background() // check if license is present or not - license, err := lm.licenseRepo.GetActiveLicense(context.Background()) + license, err := lm.licenseRepo.GetActiveLicense(ctx) if err != nil { - return fmt.Errorf("failed to get active license") + zap.S().Errorf("failed to get active license: %v", zap.Error(err)) + return } if license == nil { // we will not start the usage reporting if license is not present. zap.S().Info("no license present, skipping usage reporting") - return nil + return } usages := []model.UsageDB{} @@ -120,7 +107,8 @@ func (lm *Manager) UploadUsage(ctx context.Context) error { dbusages := []model.UsageDB{} err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour))) if err != nil && !strings.Contains(err.Error(), "doesn't exist") { - return err + zap.S().Errorf("failed to get usage from clickhouse: %v", zap.Error(err)) + return } for _, u := range dbusages { u.Type = db @@ -130,7 +118,7 @@ func (lm *Manager) UploadUsage(ctx context.Context) error { if len(usages) <= 0 { zap.S().Info("no snapshots to upload, skipping.") - return nil + return } zap.S().Info("uploading usage data") @@ -139,13 +127,15 @@ func (lm *Manager) UploadUsage(ctx context.Context) error { for _, usage := range usages { usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data)) if err != nil { - return err + zap.S().Errorf("error while decrypting usage data: %v", zap.Error(err)) + return } usageData := model.Usage{} err = json.Unmarshal(usageDataBytes, &usageData) if err != nil { - return err + zap.S().Errorf("error while unmarshalling usage data: %v", zap.Error(err)) + return } usageData.CollectorID = usage.CollectorID @@ -160,20 +150,16 @@ func (lm *Manager) UploadUsage(ctx context.Context) error { LicenseKey: key, Usage: usagesPayload, } - err = lm.UploadUsageWithExponentalBackOff(ctx, payload) - if err != nil { - return err - } - return nil + lm.UploadUsageWithExponentalBackOff(ctx, payload) } -func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error { +func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) { for i := 1; i <= MaxRetries; i++ { apiErr := licenseserver.SendUsage(ctx, payload) if apiErr != nil && i == MaxRetries { zap.S().Errorf("retries stopped : %v", zap.Error(apiErr)) // not returning error here since it is captured in the failed count - return nil + return } else if apiErr != nil { // sleeping for exponential backoff sleepDuration := RetryInterval * time.Duration(i) @@ -183,11 +169,14 @@ func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload break } } - return nil } func (lm *Manager) Stop() { - close(lm.done) + lm.scheduler.Stop() + + zap.S().Debug("sending usage data before shutting down") + // send usage before shutting down + lm.UploadUsage() + atomic.StoreUint32(&locker, stateUnlocked) - <-lm.terminated } diff --git a/go.mod b/go.mod index 59ef611a4c0b..9a9151338635 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230517094211-cd3f3f0aea85 github.com/coreos/go-oidc/v3 v3.4.0 github.com/dustin/go-humanize v1.0.0 + github.com/go-co-op/gocron v1.30.1 github.com/go-kit/log v0.2.1 github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redismock/v8 v8.11.5 @@ -93,6 +94,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect go.opentelemetry.io/collector/featuregate v0.70.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect diff --git a/go.sum b/go.sum index e9f289de7e93..923ab20a20eb 100644 --- a/go.sum +++ b/go.sum @@ -201,6 +201,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v1.30.1 h1:tjWUvJl5KrcwpkEkSXFSQFr4F9h5SfV/m4+RX0cV2fs= +github.com/go-co-op/gocron v1.30.1/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= @@ -606,11 +608,14 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russellhaering/gosaml2 v0.8.0 h1:rm1Gc09/UoEsKGTSFvg8VCHJLY3wrP4BWjC+1ov0qCo= @@ -711,6 +716,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=