feat: usage exporter logic updated (#3211)

* feat: usage exporter logic updated

* fix: upload usage before shutdown

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Nityananda Gohain 2023-08-01 10:19:56 +05:30 committed by GitHub
parent 1a3e46cecd
commit 348fa0ba5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 51 additions and 46 deletions

View File

@ -79,6 +79,9 @@ type Server struct {
// feature flags // feature flags
featureLookup baseint.FeatureLookup featureLookup baseint.FeatureLookup
// Usage manager
usageManager *usage.Manager
unavailableChannel chan healthcheck.Status unavailableChannel chan healthcheck.Status
} }
@ -199,6 +202,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
ruleManager: rm, ruleManager: rm,
serverOptions: serverOptions, serverOptions: serverOptions,
unavailableChannel: make(chan healthcheck.Status), unavailableChannel: make(chan healthcheck.Status),
usageManager: usageManager,
} }
httpServer, err := s.createPublicServer(apiHandler) httpServer, err := s.createPublicServer(apiHandler)
@ -560,6 +564,9 @@ func (s *Server) Stop() error {
s.ruleManager.Stop() s.ruleManager.Stop()
} }
// stop usage manager
s.usageManager.Stop()
return nil return nil
} }

View File

@ -144,6 +144,7 @@ func main() {
logger.Info("Received HealthCheck status: ", zap.Int("status", int(status))) logger.Info("Received HealthCheck status: ", zap.Int("status", int(status)))
case <-signalsChannel: case <-signalsChannel:
logger.Fatal("Received OS Interrupt Signal ... ") logger.Fatal("Received OS Interrupt Signal ... ")
server.Stop()
} }
} }
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-co-op/gocron"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -28,9 +29,6 @@ const (
) )
var ( var (
// send usage every 24 hour
uploadFrequency = 24 * time.Hour
locker = stateUnlocked locker = stateUnlocked
) )
@ -39,12 +37,7 @@ type Manager struct {
licenseRepo *license.Repo licenseRepo *license.Repo
// end the usage routine, this is important to gracefully scheduler *gocron.Scheduler
// stopping usage reporting and protect in-consistent updates
done chan struct{}
// terminated waits for the UsageExporter go routine to end
terminated chan struct{}
} }
func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
@ -53,6 +46,7 @@ func New(dbType string, db *sqlx.DB, licenseRepo *license.Repo, clickhouseConn c
// repository: repo, // repository: repo,
clickhouseConn: clickhouseConn, clickhouseConn: clickhouseConn,
licenseRepo: licenseRepo, licenseRepo: licenseRepo,
scheduler: gocron.NewScheduler(time.UTC).Every(1).Day().At("00:00"), // send usage every at 00:00 UTC
} }
return m, nil return m, nil
} }
@ -64,37 +58,30 @@ func (lm *Manager) Start() error {
return fmt.Errorf("usage exporter is locked") return fmt.Errorf("usage exporter is locked")
} }
go lm.UsageExporter(context.Background()) _, err := lm.scheduler.Do(func() { lm.UploadUsage() })
if err != nil {
return err
}
// upload usage once when starting the service
lm.UploadUsage()
lm.scheduler.StartAsync()
return nil return nil
} }
func (lm *Manager) UploadUsage() {
func (lm *Manager) UsageExporter(ctx context.Context) { ctx := context.Background()
defer close(lm.terminated)
uploadTicker := time.NewTicker(uploadFrequency)
defer uploadTicker.Stop()
for {
select {
case <-lm.done:
return
case <-uploadTicker.C:
lm.UploadUsage(ctx)
}
}
}
func (lm *Manager) UploadUsage(ctx context.Context) error {
// check if license is present or not // check if license is present or not
license, err := lm.licenseRepo.GetActiveLicense(context.Background()) license, err := lm.licenseRepo.GetActiveLicense(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get active license") zap.S().Errorf("failed to get active license: %v", zap.Error(err))
return
} }
if license == nil { if license == nil {
// we will not start the usage reporting if license is not present. // we will not start the usage reporting if license is not present.
zap.S().Info("no license present, skipping usage reporting") zap.S().Info("no license present, skipping usage reporting")
return nil return
} }
usages := []model.UsageDB{} usages := []model.UsageDB{}
@ -120,7 +107,8 @@ func (lm *Manager) UploadUsage(ctx context.Context) error {
dbusages := []model.UsageDB{} dbusages := []model.UsageDB{}
err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour))) err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour)))
if err != nil && !strings.Contains(err.Error(), "doesn't exist") { if err != nil && !strings.Contains(err.Error(), "doesn't exist") {
return err zap.S().Errorf("failed to get usage from clickhouse: %v", zap.Error(err))
return
} }
for _, u := range dbusages { for _, u := range dbusages {
u.Type = db u.Type = db
@ -130,7 +118,7 @@ func (lm *Manager) UploadUsage(ctx context.Context) error {
if len(usages) <= 0 { if len(usages) <= 0 {
zap.S().Info("no snapshots to upload, skipping.") zap.S().Info("no snapshots to upload, skipping.")
return nil return
} }
zap.S().Info("uploading usage data") zap.S().Info("uploading usage data")
@ -139,13 +127,15 @@ func (lm *Manager) UploadUsage(ctx context.Context) error {
for _, usage := range usages { for _, usage := range usages {
usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data)) usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data))
if err != nil { if err != nil {
return err zap.S().Errorf("error while decrypting usage data: %v", zap.Error(err))
return
} }
usageData := model.Usage{} usageData := model.Usage{}
err = json.Unmarshal(usageDataBytes, &usageData) err = json.Unmarshal(usageDataBytes, &usageData)
if err != nil { if err != nil {
return err zap.S().Errorf("error while unmarshalling usage data: %v", zap.Error(err))
return
} }
usageData.CollectorID = usage.CollectorID usageData.CollectorID = usage.CollectorID
@ -160,20 +150,16 @@ func (lm *Manager) UploadUsage(ctx context.Context) error {
LicenseKey: key, LicenseKey: key,
Usage: usagesPayload, Usage: usagesPayload,
} }
err = lm.UploadUsageWithExponentalBackOff(ctx, payload) lm.UploadUsageWithExponentalBackOff(ctx, payload)
if err != nil {
return err
}
return nil
} }
func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) error { func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload model.UsagePayload) {
for i := 1; i <= MaxRetries; i++ { for i := 1; i <= MaxRetries; i++ {
apiErr := licenseserver.SendUsage(ctx, payload) apiErr := licenseserver.SendUsage(ctx, payload)
if apiErr != nil && i == MaxRetries { if apiErr != nil && i == MaxRetries {
zap.S().Errorf("retries stopped : %v", zap.Error(apiErr)) zap.S().Errorf("retries stopped : %v", zap.Error(apiErr))
// not returning error here since it is captured in the failed count // not returning error here since it is captured in the failed count
return nil return
} else if apiErr != nil { } else if apiErr != nil {
// sleeping for exponential backoff // sleeping for exponential backoff
sleepDuration := RetryInterval * time.Duration(i) sleepDuration := RetryInterval * time.Duration(i)
@ -183,11 +169,14 @@ func (lm *Manager) UploadUsageWithExponentalBackOff(ctx context.Context, payload
break break
} }
} }
return nil
} }
func (lm *Manager) Stop() { func (lm *Manager) Stop() {
close(lm.done) lm.scheduler.Stop()
zap.S().Debug("sending usage data before shutting down")
// send usage before shutting down
lm.UploadUsage()
atomic.StoreUint32(&locker, stateUnlocked) atomic.StoreUint32(&locker, stateUnlocked)
<-lm.terminated
} }

2
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230517094211-cd3f3f0aea85 github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230517094211-cd3f3f0aea85
github.com/coreos/go-oidc/v3 v3.4.0 github.com/coreos/go-oidc/v3 v3.4.0
github.com/dustin/go-humanize v1.0.0 github.com/dustin/go-humanize v1.0.0
github.com/go-co-op/gocron v1.30.1
github.com/go-kit/log v0.2.1 github.com/go-kit/log v0.2.1
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redismock/v8 v8.11.5 github.com/go-redis/redismock/v8 v8.11.5
@ -93,6 +94,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
go.opentelemetry.io/collector/featuregate v0.70.0 // indirect go.opentelemetry.io/collector/featuregate v0.70.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect

8
go.sum
View File

@ -201,6 +201,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-co-op/gocron v1.30.1 h1:tjWUvJl5KrcwpkEkSXFSQFr4F9h5SfV/m4+RX0cV2fs=
github.com/go-co-op/gocron v1.30.1/go.mod h1:39f6KNSGVOU1LO/ZOoZfcSxwlsJDQOKSu8erN0SH48Y=
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
@ -606,11 +608,14 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U=
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russellhaering/gosaml2 v0.8.0 h1:rm1Gc09/UoEsKGTSFvg8VCHJLY3wrP4BWjC+1ov0qCo= github.com/russellhaering/gosaml2 v0.8.0 h1:rm1Gc09/UoEsKGTSFvg8VCHJLY3wrP4BWjC+1ov0qCo=
@ -711,6 +716,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=