lightweight adaptivity on workpool

This commit is contained in:
Mzack9999 2024-04-03 18:50:46 +02:00
parent a8d1393e96
commit 774db61655
6 changed files with 54 additions and 13 deletions

View File

@ -15,12 +15,11 @@ import (
syncutil "github.com/projectdiscovery/utils/sync" syncutil "github.com/projectdiscovery/utils/sync"
) )
const probeBulkSize = 50 var GlobalProbeBulkSize = 50
// initializeTemplatesHTTPInput initializes the http form of input // initializeTemplatesHTTPInput initializes the http form of input
// for any loaded http templates if input is in non-standard format. // for any loaded http templates if input is in non-standard format.
func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) { func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) {
hm, err := hybrid.New(hybrid.DefaultDiskOptions) hm, err := hybrid.New(hybrid.DefaultDiskOptions)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not create temporary input file") return nil, errors.Wrap(err, "could not create temporary input file")
@ -31,11 +30,6 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) {
} }
gologger.Info().Msgf("Running httpx on input host") gologger.Info().Msgf("Running httpx on input host")
var bulkSize = probeBulkSize
if r.options.BulkSize > probeBulkSize {
bulkSize = r.options.BulkSize
}
httpxOptions := httpx.DefaultOptions httpxOptions := httpx.DefaultOptions
httpxOptions.RetryMax = r.options.Retries httpxOptions.RetryMax = r.options.Retries
httpxOptions.Timeout = time.Duration(r.options.Timeout) * time.Second httpxOptions.Timeout = time.Duration(r.options.Timeout) * time.Second
@ -45,7 +39,7 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) {
} }
// Probe the non-standard URLs and store them in cache // Probe the non-standard URLs and store them in cache
swg, err := syncutil.New(syncutil.WithSize(bulkSize)) swg, err := syncutil.New(syncutil.WithSize(GlobalProbeBulkSize))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not create adaptive group") return nil, errors.Wrap(err, "could not create adaptive group")
} }
@ -55,6 +49,10 @@ func (r *Runner) initializeTemplatesHTTPInput() (*hybrid.HybridMap, error) {
return true return true
} }
if swg.Size != GlobalProbeBulkSize {
swg.Resize(GlobalProbeBulkSize)
}
swg.Add() swg.Add()
go func(input *contextargs.MetaInput) { go func(input *contextargs.MetaInput) {
defer swg.Done() defer swg.Done()

View File

@ -30,14 +30,19 @@ func New(options *types.Options) *Engine {
return engine return engine
} }
// GetWorkPool returns a workpool from options func (e *Engine) GetWorkPoolConfig() WorkPoolConfig {
func (e *Engine) GetWorkPool() *WorkPool { config := WorkPoolConfig{
return NewWorkPool(WorkPoolConfig{
InputConcurrency: e.options.BulkSize, InputConcurrency: e.options.BulkSize,
TypeConcurrency: e.options.TemplateThreads, TypeConcurrency: e.options.TemplateThreads,
HeadlessInputConcurrency: e.options.HeadlessBulkSize, HeadlessInputConcurrency: e.options.HeadlessBulkSize,
HeadlessTypeConcurrency: e.options.HeadlessTemplateThreads, HeadlessTypeConcurrency: e.options.HeadlessTemplateThreads,
}) }
return config
}
// GetWorkPool returns a workpool from options
func (e *Engine) GetWorkPool() *WorkPool {
return NewWorkPool(e.GetWorkPoolConfig())
} }
// SetExecuterOptions sets the executer options for the engine. This is required // SetExecuterOptions sets the executer options for the engine. This is required

View File

@ -108,8 +108,10 @@ func (e *Engine) executeTemplateSpray(templatesList []*templates.Template, targe
wp := e.GetWorkPool() wp := e.GetWorkPool()
for _, template := range templatesList { for _, template := range templatesList {
templateType := template.Type() // resize check point - nop if there are no changes
wp.RefreshWithConfig(e.GetWorkPoolConfig())
templateType := template.Type()
var wg *syncutil.AdaptiveWaitGroup var wg *syncutil.AdaptiveWaitGroup
if templateType == types.HeadlessProtocol { if templateType == types.HeadlessProtocol {
wg = wp.Headless wg = wp.Headless

View File

@ -158,6 +158,9 @@ func (e *Engine) executeTemplatesOnTarget(alltemplates []*templates.Template, ta
wp := e.GetWorkPool() wp := e.GetWorkPool()
for _, tpl := range alltemplates { for _, tpl := range alltemplates {
// resize check point - nop if there are no changes
wp.RefreshWithConfig(e.GetWorkPoolConfig())
var sg *syncutil.AdaptiveWaitGroup var sg *syncutil.AdaptiveWaitGroup
if tpl.Type() == types.HeadlessProtocol { if tpl.Type() == types.HeadlessProtocol {
sg = wp.Headless sg = wp.Headless
@ -213,6 +216,9 @@ func (e *ChildExecuter) Close() *atomic.Bool {
func (e *ChildExecuter) Execute(template *templates.Template, value *contextargs.MetaInput) { func (e *ChildExecuter) Execute(template *templates.Template, value *contextargs.MetaInput) {
templateType := template.Type() templateType := template.Type()
// resize check point - nop if there are no changes
e.e.workPool.RefreshWithConfig(e.e.GetWorkPoolConfig())
var wg *syncutil.AdaptiveWaitGroup var wg *syncutil.AdaptiveWaitGroup
if templateType == types.HeadlessProtocol { if templateType == types.HeadlessProtocol {
wg = e.e.workPool.Headless wg = e.e.workPool.Headless

View File

@ -57,3 +57,28 @@ func (w *WorkPool) InputPool(templateType types.ProtocolType) *syncutil.Adaptive
swg, _ := syncutil.New(syncutil.WithSize(count)) swg, _ := syncutil.New(syncutil.WithSize(count))
return swg return swg
} }
func (w *WorkPool) RefreshWithConfig(config WorkPoolConfig) {
if w.config.TypeConcurrency != config.TypeConcurrency {
w.config.TypeConcurrency = config.TypeConcurrency
}
if w.config.HeadlessTypeConcurrency != config.HeadlessTypeConcurrency {
w.config.HeadlessTypeConcurrency = config.HeadlessTypeConcurrency
}
if w.config.InputConcurrency != config.InputConcurrency {
w.config.InputConcurrency = config.InputConcurrency
}
if w.config.HeadlessInputConcurrency != config.HeadlessInputConcurrency {
w.config.HeadlessInputConcurrency = config.HeadlessInputConcurrency
}
w.Refresh()
}
func (w *WorkPool) Refresh() {
if w.Default.Size != w.config.TypeConcurrency {
w.Default.Resize(w.config.TypeConcurrency)
}
if w.Headless.Size != w.config.HeadlessTypeConcurrency {
w.Headless.Resize(w.config.HeadlessTypeConcurrency)
}
}

View File

@ -100,6 +100,11 @@ func executeWithRuntime(runtime *goja.Runtime, p *goja.Program, args *ExecuteArg
// ExecuteProgram executes a compiled program with the default options. // ExecuteProgram executes a compiled program with the default options.
// it deligates if a particular program should run in a pooled or non-pooled runtime // it deligates if a particular program should run in a pooled or non-pooled runtime
func ExecuteProgram(p *goja.Program, args *ExecuteArgs, opts *ExecuteOptions) (result goja.Value, err error) { func ExecuteProgram(p *goja.Program, args *ExecuteArgs, opts *ExecuteOptions) (result goja.Value, err error) {
// resize check point
if pooljsc.Size != PoolingJsVmConcurrency {
pooljsc.Resize(PoolingJsVmConcurrency)
}
if opts.Source == nil { if opts.Source == nil {
// not-recommended anymore // not-recommended anymore
return executeWithoutPooling(p, args, opts) return executeWithoutPooling(p, args, opts)