520 lines
14 KiB
Go
Raw Normal View History

2021-01-03 18:15:44 +05:30
package app
import (
"context"
"errors"
2021-05-22 19:51:56 +05:30
"fmt"
2021-01-03 18:15:44 +05:30
"net"
"net/http"
_ "net/http/pprof" // http profiler
2021-01-09 19:10:34 +05:30
"time"
2021-01-03 18:15:44 +05:30
"github.com/gorilla/handlers"
"github.com/jmoiron/sqlx"
2021-03-01 02:43:05 +05:30
"github.com/rs/cors"
2021-01-03 18:15:44 +05:30
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/http/middleware"
"go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
feat: aws integration: UI facing QS api for cloud account management (#6771) * feat: init app/cloud_integrations * feat: get API test started for cloudintegrations account lifecycle * feat: cloudintegrations: get controller started * feat: cloud integrations: add cloudintegrations.Controller to APIHandler and servers * feat: cloud integrations: get routes started * feat: cloud integrations: get accounts table schema started * feat: cloud integrations: get cloudProviderAccountsSQLRepository started * feat: cloud integrations: cloudProviderAccountsSQLRepository.listAccounts * feat: cloud integrations: http handler and controller plumbing for /generate-connection-url * feat: cloud integrations: cloudProviderAccountsSQLRepository.upsert * feat: cloud integrations: finish up with /generate-connection-url * feat: cloud integrations: add cloudProviderAccountsRepository.get * feat: cloud integrations: add API test expectation for being able to get account status * feat: cloud integrations: add http handler and controller method for getting account status * feat: cloud integrations: ensure unconnected accounts aren't included in list of connected accounts * feat: cloud integrations: add test expectation for agent check in request * feat: cloud integrations: agent check in API * feat: cloud integrations: ensure polling for status after agent check in works * feat: cloud integrations: ensure account included in connected account list after agent check in * feat: cloud integrations: add API expectation for updating account config * feat: cloud integrations: API for updating cloud account config * feat: cloud integrations: expectation for agent receiving latest config after account config update * feat: cloud integrations: expectation for disconnecting cloud accounts from UI * feat: cloud integrations: API for disconnecting cloud accounts * feat: cloud integrations: some cleanup * feat: cloud integrations: some more cleanup * feat: cloud integrations: repo: scope rows by cloud provider * feat: testutils: refactor out helper for creating a test sqlite DB * feat: cloud integrations: controller: add test validating regeneration of connection url * feat: cloud integrations: controller: validations for agent check ins * feat: cloud integrations: connected account response structure * feat: cloud integrations: API response account structure * feat: cloud integrations: some more cleanup * feat: cloud integrations: remove cloudProviderAccountsRepository.GetById * feat: cloud integrations: shouldn't be able to disconnect non-existent account * feat: cloud integrations: validate agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: ensure agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: remove stray import of ee/model in cloudintegrations controller
2025-01-10 18:43:35 +05:30
"go.signoz.io/signoz/pkg/query-service/app/cloudintegrations"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
2023-09-17 10:40:45 +05:30
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/preferences"
"go.signoz.io/signoz/pkg/signoz"
"go.signoz.io/signoz/pkg/types/authtypes"
"go.signoz.io/signoz/pkg/web"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
2023-02-15 23:49:03 +05:30
"go.signoz.io/signoz/pkg/query-service/auth"
2023-09-17 10:40:45 +05:30
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine"
"go.signoz.io/signoz/pkg/query-service/rules"
"go.signoz.io/signoz/pkg/query-service/telemetry"
"go.signoz.io/signoz/pkg/query-service/utils"
2021-01-03 18:15:44 +05:30
"go.uber.org/zap"
)
type ServerOptions struct {
Config signoz.Config
PromConfigPath string
SkipTopLvlOpsPath string
HTTPHostPort string
PrivateHostPort string
// alert specific params
DisableRules bool
RuleRepoURL string
PreferSpanMetrics bool
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Cluster string
UseLogsNewSchema bool
UseTraceNewSchema bool
SigNoz *signoz.SigNoz
Jwt *authtypes.JWT
2021-01-03 18:15:44 +05:30
}
// Server runs HTTP, Mux and a grpc server
type Server struct {
2021-05-22 19:51:56 +05:30
serverOptions *ServerOptions
ruleManager *rules.Manager
// public http router
httpConn net.Listener
httpServer *http.Server
// private http
privateConn net.Listener
privateHTTP *http.Server
opampServer *opamp.Server
2021-01-03 18:15:44 +05:30
unavailableChannel chan healthcheck.Status
}
// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}
// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions) (*Server, error) {
var err error
if err := dao.InitDao(serverOptions.SigNoz.SQLStore); err != nil {
return nil, err
}
if err := preferences.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
if err := dashboards.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
2021-01-09 19:10:34 +05:30
if err := explorer.InitWithDSN(serverOptions.SigNoz.SQLStore.SQLxDB()); err != nil {
return nil, err
}
// initiate feature manager
fm := featureManager.StartManager()
readerReady := make(chan bool)
fluxIntervalForTraceDetail, err := time.ParseDuration(serverOptions.FluxIntervalForTraceDetail)
if err != nil {
return nil, err
}
clickhouseReader := clickhouseReader.NewReader(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath,
fm,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
)
go clickhouseReader.Start(readerReady)
reader := clickhouseReader
skipConfig := &model.SkipConfig{}
if serverOptions.SkipTopLvlOpsPath != "" {
// read skip config
skipConfig, err = model.ReadSkipConfig(serverOptions.SkipTopLvlOpsPath)
if err != nil {
return nil, err
}
}
2024-09-24 10:22:52 +05:30
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
2021-01-03 18:15:44 +05:30
<-readerReady
2024-09-24 10:22:52 +05:30
rm, err := makeRulesManager(
serverOptions.PromConfigPath,
constants.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL, serverOptions.SigNoz.SQLStore.SQLxDB(), reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema)
if err != nil {
return nil, err
}
2023-09-17 10:40:45 +05:30
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
}
integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore)
if err != nil {
return nil, fmt.Errorf("couldn't create integrations controller: %w", err)
}
cloudIntegrationsController, err := cloudintegrations.NewController(serverOptions.SigNoz.SQLStore)
feat: aws integration: UI facing QS api for cloud account management (#6771) * feat: init app/cloud_integrations * feat: get API test started for cloudintegrations account lifecycle * feat: cloudintegrations: get controller started * feat: cloud integrations: add cloudintegrations.Controller to APIHandler and servers * feat: cloud integrations: get routes started * feat: cloud integrations: get accounts table schema started * feat: cloud integrations: get cloudProviderAccountsSQLRepository started * feat: cloud integrations: cloudProviderAccountsSQLRepository.listAccounts * feat: cloud integrations: http handler and controller plumbing for /generate-connection-url * feat: cloud integrations: cloudProviderAccountsSQLRepository.upsert * feat: cloud integrations: finish up with /generate-connection-url * feat: cloud integrations: add cloudProviderAccountsRepository.get * feat: cloud integrations: add API test expectation for being able to get account status * feat: cloud integrations: add http handler and controller method for getting account status * feat: cloud integrations: ensure unconnected accounts aren't included in list of connected accounts * feat: cloud integrations: add test expectation for agent check in request * feat: cloud integrations: agent check in API * feat: cloud integrations: ensure polling for status after agent check in works * feat: cloud integrations: ensure account included in connected account list after agent check in * feat: cloud integrations: add API expectation for updating account config * feat: cloud integrations: API for updating cloud account config * feat: cloud integrations: expectation for agent receiving latest config after account config update * feat: cloud integrations: expectation for disconnecting cloud accounts from UI * feat: cloud integrations: API for disconnecting cloud accounts * feat: cloud integrations: some cleanup * feat: cloud integrations: some more cleanup * feat: cloud integrations: repo: scope rows by cloud provider * feat: testutils: refactor out helper for creating a test sqlite DB * feat: cloud integrations: controller: add test validating regeneration of connection url * feat: cloud integrations: controller: validations for agent check ins * feat: cloud integrations: connected account response structure * feat: cloud integrations: API response account structure * feat: cloud integrations: some more cleanup * feat: cloud integrations: remove cloudProviderAccountsRepository.GetById * feat: cloud integrations: shouldn't be able to disconnect non-existent account * feat: cloud integrations: validate agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: ensure agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: remove stray import of ee/model in cloudintegrations controller
2025-01-10 18:43:35 +05:30
if err != nil {
return nil, fmt.Errorf("couldn't create cloud provider integrations controller: %w", err)
}
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
}
telemetry.GetInstance().SetReader(reader)
apiHandler, err := NewAPIHandler(APIHandlerOpts{
Reader: reader,
SkipConfig: skipConfig,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
AppDao: dao.DB(),
RuleManager: rm,
FeatureFlags: fm,
IntegrationsController: integrationsController,
feat: aws integration: UI facing QS api for cloud account management (#6771) * feat: init app/cloud_integrations * feat: get API test started for cloudintegrations account lifecycle * feat: cloudintegrations: get controller started * feat: cloud integrations: add cloudintegrations.Controller to APIHandler and servers * feat: cloud integrations: get routes started * feat: cloud integrations: get accounts table schema started * feat: cloud integrations: get cloudProviderAccountsSQLRepository started * feat: cloud integrations: cloudProviderAccountsSQLRepository.listAccounts * feat: cloud integrations: http handler and controller plumbing for /generate-connection-url * feat: cloud integrations: cloudProviderAccountsSQLRepository.upsert * feat: cloud integrations: finish up with /generate-connection-url * feat: cloud integrations: add cloudProviderAccountsRepository.get * feat: cloud integrations: add API test expectation for being able to get account status * feat: cloud integrations: add http handler and controller method for getting account status * feat: cloud integrations: ensure unconnected accounts aren't included in list of connected accounts * feat: cloud integrations: add test expectation for agent check in request * feat: cloud integrations: agent check in API * feat: cloud integrations: ensure polling for status after agent check in works * feat: cloud integrations: ensure account included in connected account list after agent check in * feat: cloud integrations: add API expectation for updating account config * feat: cloud integrations: API for updating cloud account config * feat: cloud integrations: expectation for agent receiving latest config after account config update * feat: cloud integrations: expectation for disconnecting cloud accounts from UI * feat: cloud integrations: API for disconnecting cloud accounts * feat: cloud integrations: some cleanup * feat: cloud integrations: some more cleanup * feat: cloud integrations: repo: scope rows by cloud provider * feat: testutils: refactor out helper for creating a test sqlite DB * feat: cloud integrations: controller: add test validating regeneration of connection url * feat: cloud integrations: controller: validations for agent check ins * feat: cloud integrations: connected account response structure * feat: cloud integrations: API response account structure * feat: cloud integrations: some more cleanup * feat: cloud integrations: remove cloudProviderAccountsRepository.GetById * feat: cloud integrations: shouldn't be able to disconnect non-existent account * feat: cloud integrations: validate agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: ensure agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: remove stray import of ee/model in cloudintegrations controller
2025-01-10 18:43:35 +05:30
CloudIntegrationsController: cloudIntegrationsController,
LogsParsingPipelineController: logParsingPipelineController,
2023-09-17 10:40:45 +05:30
Cache: c,
FluxInterval: fluxInterval,
2024-09-12 10:58:07 +05:30
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
})
if err != nil {
return nil, err
}
s := &Server{
// logger: logger,
// tracer: tracer,
ruleManager: rm,
serverOptions: serverOptions,
unavailableChannel: make(chan healthcheck.Status),
}
httpServer, err := s.createPublicServer(apiHandler, serverOptions.SigNoz.Web)
if err != nil {
return nil, err
}
s.httpServer = httpServer
privateServer, err := s.createPrivateServer(apiHandler)
if err != nil {
return nil, err
}
s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
},
})
if err != nil {
return nil, err
}
s.opampServer = opamp.InitializeServer(
&opAmpModel.AllAgents, agentConfMgr,
)
return s, nil
}
func (s *Server) createPrivateServer(api *APIHandler) (*http.Server, error) {
r := NewRouter()
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
r.Use(middleware.NewTimeout(zap.L(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics(zap.L()).Wrap)
r.Use(middleware.NewLogging(zap.L(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
api.RegisterPrivateRoutes(r)
c := cors.New(cors.Options{
//todo(amol): find out a way to add exact domain or
// ip here for alert manager
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "X-SIGNOZ-QUERY-ID", "Sec-WebSocket-Protocol"},
})
handler := c.Handler(r)
handler = handlers.CompressHandler(handler)
return &http.Server{
Handler: handler,
}, nil
}
func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, error) {
2021-01-03 18:15:44 +05:30
r := NewRouter()
r.Use(middleware.NewAuth(zap.L(), s.serverOptions.Jwt, []string{"Authorization", "Sec-WebSocket-Protocol"}).Wrap)
r.Use(middleware.NewTimeout(zap.L(),
s.serverOptions.Config.APIServer.Timeout.ExcludedRoutes,
s.serverOptions.Config.APIServer.Timeout.Default,
s.serverOptions.Config.APIServer.Timeout.Max,
).Wrap)
r.Use(middleware.NewAnalytics(zap.L()).Wrap)
r.Use(middleware.NewLogging(zap.L(), s.serverOptions.Config.APIServer.Logging.ExcludedRoutes).Wrap)
2021-01-09 19:10:34 +05:30
// add auth middleware
getUserFromRequest := func(ctx context.Context) (*model.UserPayload, error) {
user, err := auth.GetUserFromReqContext(ctx)
if err != nil {
return nil, err
}
if user.User.OrgId == "" {
return nil, model.UnauthorizedError(errors.New("orgId is missing in the claims"))
}
return user, nil
}
am := NewAuthMiddleware(getUserFromRequest)
2023-02-15 23:49:03 +05:30
api.RegisterRoutes(r, am)
api.RegisterLogsRoutes(r, am)
api.RegisterIntegrationRoutes(r, am)
feat: aws integration: UI facing QS api for cloud account management (#6771) * feat: init app/cloud_integrations * feat: get API test started for cloudintegrations account lifecycle * feat: cloudintegrations: get controller started * feat: cloud integrations: add cloudintegrations.Controller to APIHandler and servers * feat: cloud integrations: get routes started * feat: cloud integrations: get accounts table schema started * feat: cloud integrations: get cloudProviderAccountsSQLRepository started * feat: cloud integrations: cloudProviderAccountsSQLRepository.listAccounts * feat: cloud integrations: http handler and controller plumbing for /generate-connection-url * feat: cloud integrations: cloudProviderAccountsSQLRepository.upsert * feat: cloud integrations: finish up with /generate-connection-url * feat: cloud integrations: add cloudProviderAccountsRepository.get * feat: cloud integrations: add API test expectation for being able to get account status * feat: cloud integrations: add http handler and controller method for getting account status * feat: cloud integrations: ensure unconnected accounts aren't included in list of connected accounts * feat: cloud integrations: add test expectation for agent check in request * feat: cloud integrations: agent check in API * feat: cloud integrations: ensure polling for status after agent check in works * feat: cloud integrations: ensure account included in connected account list after agent check in * feat: cloud integrations: add API expectation for updating account config * feat: cloud integrations: API for updating cloud account config * feat: cloud integrations: expectation for agent receiving latest config after account config update * feat: cloud integrations: expectation for disconnecting cloud accounts from UI * feat: cloud integrations: API for disconnecting cloud accounts * feat: cloud integrations: some cleanup * feat: cloud integrations: some more cleanup * feat: cloud integrations: repo: scope rows by cloud provider * feat: testutils: refactor out helper for creating a test sqlite DB * feat: cloud integrations: controller: add test validating regeneration of connection url * feat: cloud integrations: controller: validations for agent check ins * feat: cloud integrations: connected account response structure * feat: cloud integrations: API response account structure * feat: cloud integrations: some more cleanup * feat: cloud integrations: remove cloudProviderAccountsRepository.GetById * feat: cloud integrations: shouldn't be able to disconnect non-existent account * feat: cloud integrations: validate agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: ensure agents can't check in to cloud account with 2 signoz ids * feat: cloud integrations: remove stray import of ee/model in cloudintegrations controller
2025-01-10 18:43:35 +05:30
api.RegisterCloudIntegrationsRoutes(r, am)
api.RegisterQueryRangeV3Routes(r, am)
2024-10-10 17:02:46 +05:30
api.RegisterInfraMetricsRoutes(r, am)
api.RegisterWebSocketPaths(r, am)
api.RegisterQueryRangeV4Routes(r, am)
2024-07-26 11:50:02 +05:30
api.RegisterMessagingQueuesRoutes(r, am)
api.RegisterThirdPartyApiRoutes(r, am)
api.MetricExplorerRoutes(r, am)
2021-01-03 18:15:44 +05:30
2021-03-01 02:43:05 +05:30
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "cache-control", "X-SIGNOZ-QUERY-ID", "Sec-WebSocket-Protocol"},
2021-03-01 02:43:05 +05:30
})
2021-01-03 18:15:44 +05:30
2021-03-01 02:43:05 +05:30
handler := c.Handler(r)
2021-01-03 18:15:44 +05:30
handler = handlers.CompressHandler(handler)
err := web.AddToRouter(r)
if err != nil {
return nil, err
}
2021-01-03 18:15:44 +05:30
return &http.Server{
Handler: handler,
2021-05-22 19:51:56 +05:30
}, nil
2021-01-03 18:15:44 +05:30
}
// initListeners initialises listeners of the server
func (s *Server) initListeners() error {
// listen on public port
var err error
publicHostPort := s.serverOptions.HTTPHostPort
if publicHostPort == "" {
return fmt.Errorf("constants.HTTPHostPort is required")
2021-01-03 18:15:44 +05:30
}
s.httpConn, err = net.Listen("tcp", publicHostPort)
if err != nil {
return err
}
2021-01-03 18:15:44 +05:30
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.serverOptions.HTTPHostPort))
2021-01-03 18:15:44 +05:30
// listen on private port to support internal services
privateHostPort := s.serverOptions.PrivateHostPort
2021-01-03 18:15:44 +05:30
if privateHostPort == "" {
return fmt.Errorf("constants.PrivateHostPort is required")
}
2021-01-03 18:15:44 +05:30
s.privateConn, err = net.Listen("tcp", privateHostPort)
if err != nil {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on private port %s...", s.serverOptions.PrivateHostPort))
2021-01-03 18:15:44 +05:30
return nil
2021-01-03 18:15:44 +05:30
}
// Start listening on http and private http port concurrently
2021-01-03 18:15:44 +05:30
func (s *Server) Start() error {
// initiate rule manager first
if !s.serverOptions.DisableRules {
s.ruleManager.Start()
} else {
zap.L().Info("msg: Rules disabled as rules.disable is set to TRUE")
}
err := s.initListeners()
2021-01-03 18:15:44 +05:30
if err != nil {
return err
}
var httpPort int
if port, err := utils.GetPort(s.httpConn.Addr()); err == nil {
httpPort = port
}
go func() {
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.serverOptions.HTTPHostPort))
2021-01-03 18:15:44 +05:30
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
zap.L().Error("Could not start HTTP server", zap.Error(err))
2021-01-03 18:15:44 +05:30
}
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
zap.L().Info("Starting pprof server", zap.String("addr", constants.DebugHttpPort))
2021-01-03 18:15:44 +05:30
err = http.ListenAndServe(constants.DebugHttpPort, nil)
if err != nil {
zap.L().Error("Could not start pprof server", zap.Error(err))
}
2021-01-03 18:15:44 +05:30
}()
var privatePort int
if port, err := utils.GetPort(s.privateConn.Addr()); err == nil {
privatePort = port
}
fmt.Println("starting private http")
go func() {
zap.L().Info("Starting Private HTTP server", zap.Int("port", privatePort), zap.String("addr", s.serverOptions.PrivateHostPort))
switch err := s.privateHTTP.Serve(s.privateConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
zap.L().Info("private http server closed")
default:
zap.L().Error("Could not start private HTTP server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
zap.L().Info("Starting OpAmp Websocket server", zap.String("addr", constants.OpAmpWsEndpoint))
err := s.opampServer.Start(constants.OpAmpWsEndpoint)
if err != nil {
zap.L().Info("opamp ws server failed to start", zap.Error(err))
s.unavailableChannel <- healthcheck.Unavailable
}
}()
return nil
}
func (s *Server) Stop() error {
if s.httpServer != nil {
if err := s.httpServer.Shutdown(context.Background()); err != nil {
return err
}
}
if s.privateHTTP != nil {
if err := s.privateHTTP.Shutdown(context.Background()); err != nil {
return err
}
}
s.opampServer.Stop()
if s.ruleManager != nil {
s.ruleManager.Stop()
}
2021-01-03 18:15:44 +05:30
return nil
}
func makeRulesManager(
_,
alertManagerURL string,
ruleRepoURL string,
db *sqlx.DB,
ch interfaces.Reader,
2024-09-24 10:22:52 +05:30
cache cache.Cache,
disableRules bool,
2024-09-12 10:58:07 +05:30
fm interfaces.FeatureLookup,
useLogsNewSchema bool,
useTraceNewSchema bool) (*rules.Manager, error) {
// create engine
pqle, err := pqle.FromReader(ch)
if err != nil {
return nil, fmt.Errorf("failed to create pql engine : %v", err)
}
// notifier opts
notifierOpts := am.NotifierOptions{
QueueCapacity: 10000,
Timeout: 1 * time.Second,
AlertManagerURLs: []string{alertManagerURL},
}
// create manager opts
managerOpts := &rules.ManagerOptions{
NotifierOpts: notifierOpts,
PqlEngine: pqle,
RepoURL: ruleRepoURL,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
DisableRules: disableRules,
FeatureFlags: fm,
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
// create Manager
manager, err := rules.NewManager(managerOpts)
if err != nil {
return nil, fmt.Errorf("rule manager error: %v", err)
}
zap.L().Info("rules manager is ready")
return manager, nil
}