nuclei/pkg/core/executors.go
Nakul Bharti c4fa2c74c1
cache, goroutine and unbounded workers management (#6420)
* Enhance matcher compilation with caching for regex and DSL expressions to improve performance. Update template parsing to conditionally retain raw templates based on size constraints.

* Implement caching for regex and DSL expressions in extractors and matchers to enhance performance. Introduce a buffer pool in raw requests to reduce memory allocations. Update template cache management for improved efficiency.

* feat: improve concurrency to be bound

* refactor: replace fmt.Sprintf with fmt.Fprintf for improved performance in header handling

* feat: add regex matching tests and benchmarks for performance evaluation

* feat: add prefix check in regex extraction to optimize matching process

* feat: implement regex caching mechanism to enhance performance in extractors and matchers, along with tests and benchmarks for validation

* feat: add unit tests for template execution in the core engine, enhancing test coverage and reliability

* feat: enhance error handling in template execution and improve regex caching logic for better performance

* Implement caching for regex and DSL expressions in the cache package, replacing previous sync.Map usage. Add unit tests for cache functionality, including eviction by capacity and retrieval of cached items. Update extractors and matchers to utilize the new cache system for improved performance and memory efficiency.

* Add tests for SetCapacities in cache package to ensure cache behavior on capacity changes

- Implemented TestSetCapacities_NoRebuildOnZero to verify that setting capacities to zero does not clear existing caches.
- Added TestSetCapacities_BeforeFirstUse to confirm that initial cache settings are respected and not overridden by subsequent capacity changes.

* Refactor matchers and update load test generator to use io package

- Removed maxRegexScanBytes constant from match.go.
- Replaced ioutil with io package in load_test.go for NopCloser usage.
- Restored TestValidate_AllowsInlineMultiline in load_test.go to ensure inline validation functionality.

* Add cancellation support in template execution and enhance test coverage

- Updated executeTemplateWithTargets to respect context cancellation.
- Introduced fakeTargetProvider and slowExecuter for testing.
- Added Test_executeTemplateWithTargets_RespectsCancellation to validate cancellation behavior during template execution.
2025-09-15 23:48:02 +05:30

249 lines
7.6 KiB
Go

package core
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/projectdiscovery/nuclei/v3/pkg/input/provider"
"github.com/projectdiscovery/nuclei/v3/pkg/output"
"github.com/projectdiscovery/nuclei/v3/pkg/protocols/common/contextargs"
"github.com/projectdiscovery/nuclei/v3/pkg/scan"
"github.com/projectdiscovery/nuclei/v3/pkg/templates"
"github.com/projectdiscovery/nuclei/v3/pkg/templates/types"
generalTypes "github.com/projectdiscovery/nuclei/v3/pkg/types"
syncutil "github.com/projectdiscovery/utils/sync"
)
// Executors are low level executors that deals with template execution on a target
// executeAllSelfContained executes all self contained templates that do not use `target`
func (e *Engine) executeAllSelfContained(ctx context.Context, alltemplates []*templates.Template, results *atomic.Bool, sg *sync.WaitGroup) {
for _, v := range alltemplates {
sg.Add(1)
go func(template *templates.Template) {
defer sg.Done()
var err error
var match bool
ctx := scan.NewScanContext(ctx, contextargs.New(ctx))
if e.Callback != nil {
if results, err := template.Executer.ExecuteWithResults(ctx); err == nil {
for _, result := range results {
e.Callback(result)
}
}
match = true
} else {
match, err = template.Executer.Execute(ctx)
}
if err != nil {
e.options.Logger.Warning().Msgf("[%s] Could not execute step (self-contained): %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), err)
}
results.CompareAndSwap(false, match)
}(v)
}
}
// executeTemplateWithTargets executes a given template on x targets (with a internal targetpool(i.e concurrency))
func (e *Engine) executeTemplateWithTargets(ctx context.Context, template *templates.Template, target provider.InputProvider, results *atomic.Bool) {
if e.workPool == nil {
e.workPool = e.GetWorkPool()
}
// Bounded worker pool using input concurrency
pool := e.workPool.InputPool(template.Type())
workerCount := 1
if pool != nil && pool.Size > 0 {
workerCount = pool.Size
}
var (
index uint32
)
e.executerOpts.ResumeCfg.Lock()
currentInfo, ok := e.executerOpts.ResumeCfg.Current[template.ID]
if !ok {
currentInfo = &generalTypes.ResumeInfo{}
e.executerOpts.ResumeCfg.Current[template.ID] = currentInfo
}
if currentInfo.InFlight == nil {
currentInfo.InFlight = make(map[uint32]struct{})
}
resumeFromInfo, ok := e.executerOpts.ResumeCfg.ResumeFrom[template.ID]
if !ok {
resumeFromInfo = &generalTypes.ResumeInfo{}
e.executerOpts.ResumeCfg.ResumeFrom[template.ID] = resumeFromInfo
}
e.executerOpts.ResumeCfg.Unlock()
// track progression
cleanupInFlight := func(index uint32) {
currentInfo.Lock()
delete(currentInfo.InFlight, index)
currentInfo.Unlock()
}
// task represents a single target execution unit
type task struct {
index uint32
skip bool
value *contextargs.MetaInput
}
tasks := make(chan task)
var workersWg sync.WaitGroup
workersWg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer workersWg.Done()
for t := range tasks {
func() {
defer cleanupInFlight(t.index)
select {
case <-ctx.Done():
return
default:
}
if t.skip {
return
}
match, err := e.executeTemplateOnInput(ctx, template, t.value)
if err != nil {
e.options.Logger.Warning().Msgf("[%s] Could not execute step on %s: %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), t.value.Input, err)
}
results.CompareAndSwap(false, match)
}()
}
}()
}
target.Iterate(func(scannedValue *contextargs.MetaInput) bool {
select {
case <-ctx.Done():
return false // exit
default:
}
// Best effort to track the host progression
// skips indexes lower than the minimum in-flight at interruption time
var skip bool
if resumeFromInfo.Completed { // the template was completed
e.options.Logger.Debug().Msgf("[%s] Skipping \"%s\": Resume - Template already completed", template.ID, scannedValue.Input)
skip = true
} else if index < resumeFromInfo.SkipUnder { // index lower than the sliding window (bulk-size)
e.options.Logger.Debug().Msgf("[%s] Skipping \"%s\": Resume - Target already processed", template.ID, scannedValue.Input)
skip = true
} else if _, isInFlight := resumeFromInfo.InFlight[index]; isInFlight { // the target wasn't completed successfully
e.options.Logger.Debug().Msgf("[%s] Repeating \"%s\": Resume - Target wasn't completed", template.ID, scannedValue.Input)
// skip is already false, but leaving it here for clarity
skip = false
} else if index > resumeFromInfo.DoAbove { // index above the sliding window (bulk-size)
// skip is already false - but leaving it here for clarity
skip = false
}
currentInfo.Lock()
currentInfo.InFlight[index] = struct{}{}
currentInfo.Unlock()
// Skip if the host has had errors
if e.executerOpts.HostErrorsCache != nil && e.executerOpts.HostErrorsCache.Check(e.executerOpts.ProtocolType.String(), contextargs.NewWithMetaInput(ctx, scannedValue)) {
skipEvent := &output.ResultEvent{
TemplateID: template.ID,
TemplatePath: template.Path,
Info: template.Info,
Type: e.executerOpts.ProtocolType.String(),
Host: scannedValue.Input,
MatcherStatus: false,
Error: "host was skipped as it was found unresponsive",
Timestamp: time.Now(),
}
if e.Callback != nil {
e.Callback(skipEvent)
} else if e.executerOpts.Output != nil {
_ = e.executerOpts.Output.Write(skipEvent)
}
return true
}
tasks <- task{index: index, skip: skip, value: scannedValue}
index++
return true
})
close(tasks)
workersWg.Wait()
// on completion marks the template as completed
currentInfo.Lock()
currentInfo.Completed = true
currentInfo.Unlock()
}
// executeTemplatesOnTarget execute given templates on given single target
func (e *Engine) executeTemplatesOnTarget(ctx context.Context, alltemplates []*templates.Template, target *contextargs.MetaInput, results *atomic.Bool) {
// all templates are executed on single target
// wp is workpool that contains different waitgroups for
// headless and non-headless templates
// global waitgroup should not be used here
wp := e.GetWorkPool()
defer wp.Wait()
for _, tpl := range alltemplates {
select {
case <-ctx.Done():
return
default:
}
// resize check point - nop if there are no changes
wp.RefreshWithConfig(e.GetWorkPoolConfig())
var sg *syncutil.AdaptiveWaitGroup
if tpl.Type() == types.HeadlessProtocol {
sg = wp.Headless
} else {
sg = wp.Default
}
sg.Add()
go func(template *templates.Template, value *contextargs.MetaInput, wg *syncutil.AdaptiveWaitGroup) {
defer wg.Done()
match, err := e.executeTemplateOnInput(ctx, template, value)
if err != nil {
e.options.Logger.Warning().Msgf("[%s] Could not execute step on %s: %s\n", e.executerOpts.Colorizer.BrightBlue(template.ID), value.Input, err)
}
results.CompareAndSwap(false, match)
}(tpl, target, sg)
}
}
// executeTemplateOnInput performs template execution for a single input and returns match status and error
func (e *Engine) executeTemplateOnInput(ctx context.Context, template *templates.Template, value *contextargs.MetaInput) (bool, error) {
ctxArgs := contextargs.New(ctx)
ctxArgs.MetaInput = value
scanCtx := scan.NewScanContext(ctx, ctxArgs)
switch template.Type() {
case types.WorkflowProtocol:
return e.executeWorkflow(scanCtx, template.CompiledWorkflow), nil
default:
if e.Callback != nil {
results, err := template.Executer.ExecuteWithResults(scanCtx)
if err != nil {
return false, err
}
for _, result := range results {
e.Callback(result)
}
return len(results) > 0, nil
}
return template.Executer.Execute(scanCtx)
}
}