Use global pipes for capturing, experimental unified rendering

This commit is contained in:
Manuel Bua 2020-08-01 21:44:14 +02:00
parent 29576f9ced
commit 63cfd354b9
7 changed files with 123 additions and 177 deletions

View File

@ -13,38 +13,36 @@ import (
"time"
)
var (
RefreshHz = 4.
RefreshMillis = int64((1. / RefreshHz) * 1000.)
)
// global output refresh rate
const RefreshHz = 8
// Encapsulates progress tracking.
type IProgress interface {
InitProgressbar(hostCount int64, templateCount int, requestCount int64)
AddToTotal(delta int64)
Update()
render()
Drop(count int64)
Wait()
StartStdCapture()
StopStdCapture()
}
type Progress struct {
progress *mpb.Progress
bar *mpb.Bar
total int64
initialTotal int64
totalMutex *sync.Mutex
captureData *captureData
stdCaptureMutex *sync.Mutex
stdout *strings.Builder
stderr *strings.Builder
colorizer aurora.Aurora
renderChan chan time.Time
renderMutex *sync.Mutex
renderTime time.Time
firstTimeOutput bool
progress *mpb.Progress
bar *mpb.Bar
total int64
initialTotal int64
totalMutex *sync.Mutex
colorizer aurora.Aurora
// stdio capture and rendering
renderChan chan time.Time
captureData *captureData
stdCaptureMutex *sync.Mutex
stdOut *strings.Builder
stdErr *strings.Builder
stdStopRenderEvent chan bool
stdRenderEvent *time.Ticker
stdRenderWaitGroup *sync.WaitGroup
}
// Creates and returns a new progress tracking object.
@ -53,6 +51,8 @@ func NewProgress(noColor bool, active bool) IProgress {
return &NoOpProgress{}
}
refreshMillis := int64(1. / float64(RefreshHz) * 1000.)
renderChan := make(chan time.Time)
p := &Progress{
progress: mpb.New(
@ -60,15 +60,17 @@ func NewProgress(noColor bool, active bool) IProgress {
mpb.PopCompletedMode(),
mpb.WithManualRefresh(renderChan),
),
totalMutex: &sync.Mutex{},
stdCaptureMutex: &sync.Mutex{},
stdout: &strings.Builder{},
stderr: &strings.Builder{},
colorizer: aurora.NewAurora(!noColor),
renderChan: renderChan,
renderMutex: &sync.Mutex{},
renderTime: time.Now(),
firstTimeOutput: true,
totalMutex: &sync.Mutex{},
colorizer: aurora.NewAurora(!noColor),
// stdio capture and rendering
renderChan: renderChan,
stdCaptureMutex: &sync.Mutex{},
stdOut: &strings.Builder{},
stdErr: &strings.Builder{},
stdStopRenderEvent: make(chan bool),
stdRenderEvent: time.NewTicker(time.Millisecond * time.Duration(refreshMillis)),
stdRenderWaitGroup: &sync.WaitGroup{},
}
return p
}
@ -90,6 +92,12 @@ func (p *Progress) InitProgressbar(hostCount int64, templateCount int, requestCo
pluralize(hostCount, "host", "hosts"))
p.bar = p.setupProgressbar("["+barName+"]", requestCount, 0)
// creates r/w pipes and divert stdout+stderr writers to them and start capturing their output
p.captureData = startCapture(p.stdCaptureMutex, p.stdOut, p.stdErr)
// starts rendering the captured stdout+stderr data
p.renderStdData()
}
// Update total progress request count
@ -103,7 +111,6 @@ func (p *Progress) AddToTotal(delta int64) {
// Update progress tracking information and increments the request counter by one unit.
func (p *Progress) Update() {
p.bar.Increment()
p.render()
}
// Drops the specified number of requests from the progress bar total.
@ -111,7 +118,6 @@ func (p *Progress) Update() {
func (p *Progress) Drop(count int64) {
// mimic dropping by incrementing the completed requests
p.bar.IncrInt64(count)
p.render()
}
// Ensures that a progress bar's total count is up-to-date if during an enumeration there were uncompleted requests and
@ -126,56 +132,60 @@ func (p *Progress) Wait() {
p.totalMutex.Unlock()
p.progress.Wait()
// close the writers and wait for the EOF condition
stopCapture(p.captureData)
// stop the renderer and wait for it
p.stdStopRenderEvent <- true
p.stdRenderWaitGroup.Wait()
// drain any stdout/stderr data
p.drainStringBuilderTo(p.stdout, os.Stdout)
p.drainStringBuilderTo(p.stderr, os.Stderr)
p.drainStringBuilderTo(p.stdOut, os.Stdout)
p.drainStringBuilderTo(p.stdErr, os.Stderr)
}
// Starts capturing stdout and stderr instead of producing visual output that may interfere with the progress bars.
func (p *Progress) StartStdCapture() {
p.stdCaptureMutex.Lock()
p.captureData = startStdCapture()
}
func (p *Progress) renderStdData() {
// trigger a render event
p.renderChan <- time.Now()
gologger.Infof("Waiting for your terminal to settle..")
time.Sleep(time.Millisecond * 250)
// Stops capturing stdout and stderr and store both output to be shown later.
func (p *Progress) StopStdCapture() {
stopStdCapture(p.captureData)
p.stdout.Write(p.captureData.DataStdOut.Bytes())
p.stderr.Write(p.captureData.DataStdErr.Bytes())
count := 0
p.stdRenderWaitGroup.Add(1)
go func(waitGroup *sync.WaitGroup) {
for {
select {
case <-p.stdStopRenderEvent:
waitGroup.Done()
return
case _ = <-p.stdRenderEvent.C:
p.stdCaptureMutex.Lock()
{
hasStdout := p.stdOut.Len() > 0
hasStderr := p.stdErr.Len() > 0
hasOutput := hasStdout || hasStderr
p.renderMutex.Lock()
{
hasStdout := p.stdout.Len() > 0
hasStderr := p.stderr.Len() > 0
hasOutput := hasStdout || hasStderr
if hasOutput {
count++
stdout := p.captureData.backupStdout
stderr := p.captureData.backupStderr
if hasOutput {
if p.firstTimeOutput {
// trigger a render event
p.renderChan <- time.Now()
gologger.Infof("Waiting for your terminal to settle..")
// no way to sync to it? :(
time.Sleep(time.Millisecond * 250)
p.firstTimeOutput = false
}
// go back one line and clean it all
fmt.Fprint(stderr, "\u001b[1A\u001b[2K")
p.drainStringBuilderTo(p.stdOut, stdout)
p.drainStringBuilderTo(p.stdErr, stderr)
if can, now := p.canRender(); can {
// go back one line and clean it all
fmt.Fprint(os.Stderr, "\u001b[1A\u001b[2K")
p.drainStringBuilderTo(p.stdout, os.Stdout)
p.drainStringBuilderTo(p.stderr, os.Stderr)
// make space for the progressbar to render itself
fmt.Fprintln(stderr, "")
}
// make space for the progressbar to render itself
fmt.Fprintln(os.Stderr, "")
// always trigger a render event to try ensure it's visible even with fast output
p.renderChan <- now
p.renderTime = now
// always trigger a render event to try ensure it's visible even with fast output
p.renderChan <- time.Now()
}
p.stdCaptureMutex.Unlock()
}
}
}
p.renderMutex.Unlock()
p.stdCaptureMutex.Unlock()
}(p.stdRenderWaitGroup)
}
// Creates and returns a progress bar.
@ -203,23 +213,6 @@ func (p *Progress) setupProgressbar(name string, total int64, priority int) *mpb
)
}
func (p *Progress) render() {
p.renderMutex.Lock()
if can, now := p.canRender(); can {
p.renderChan <- now
p.renderTime = now
}
p.renderMutex.Unlock()
}
func (p *Progress) canRender() (bool, time.Time) {
now := time.Now()
if now.Sub(p.renderTime).Milliseconds() >= RefreshMillis {
return true, now
}
return false, now
}
func pluralize(count int64, singular, plural string) string {
if count > 1 {
return plural

View File

@ -5,8 +5,5 @@ type NoOpProgress struct{}
func (p *NoOpProgress) InitProgressbar(hostCount int64, templateCount int, requestCount int64) {}
func (p *NoOpProgress) AddToTotal(delta int64) {}
func (p *NoOpProgress) Update() {}
func (p *NoOpProgress) render() {}
func (p *NoOpProgress) Drop(count int64) {}
func (p *NoOpProgress) Wait() {}
func (p *NoOpProgress) StartStdCapture() {}
func (p *NoOpProgress) StopStdCapture() {}

View File

@ -2,28 +2,25 @@ package progress
/**
Inspired by the https://github.com/PumpkinSeed/cage module
*/
*/
import (
"bytes"
"bufio"
"github.com/projectdiscovery/gologger"
"io"
"os"
"strings"
"sync"
)
type captureData struct {
backupStdout *os.File
writerStdout *os.File
backupStderr *os.File
writerStderr *os.File
DataStdOut *bytes.Buffer
DataStdErr *bytes.Buffer
outStdout chan []byte
outStderr chan []byte
backupStdout *os.File
writerStdout *os.File
backupStderr *os.File
writerStderr *os.File
waitFinishRead *sync.WaitGroup
}
func startStdCapture() *captureData {
func startCapture(writeMutex *sync.Mutex, stdout *strings.Builder, stderr *strings.Builder) *captureData {
rStdout, wStdout, errStdout := os.Pipe()
if errStdout != nil {
panic(errStdout)
@ -41,54 +38,51 @@ func startStdCapture() *captureData {
backupStderr: os.Stderr,
writerStderr: wStderr,
outStdout: make(chan []byte),
outStderr: make(chan []byte),
DataStdOut: &bytes.Buffer{},
DataStdErr: &bytes.Buffer{},
waitFinishRead: &sync.WaitGroup{},
}
os.Stdout = c.writerStdout
os.Stderr = c.writerStderr
stdCopy := func(out chan<- []byte, reader *os.File) {
var buffer bytes.Buffer
_, _ = io.Copy(&buffer, reader)
if buffer.Len() > 0 {
out <- buffer.Bytes()
stdCopy := func(builder *strings.Builder, reader *os.File, waitGroup *sync.WaitGroup) {
r := bufio.NewReader(reader)
buf := make([]byte, 0, 4*1024)
for {
n, err := r.Read(buf[:cap(buf)])
buf = buf[:n]
if n == 0 {
if err == nil {
continue
}
if err == io.EOF {
waitGroup.Done()
break
}
waitGroup.Done()
gologger.Fatalf("stdcapture error: %s", err)
}
if err != nil && err != io.EOF {
waitGroup.Done()
gologger.Fatalf("stdcapture error: %s", err)
}
writeMutex.Lock()
builder.Write(buf)
writeMutex.Unlock()
}
close(out)
}
go stdCopy(c.outStdout, rStdout)
go stdCopy(c.outStderr, rStderr)
c.waitFinishRead.Add(2)
go stdCopy(stdout, rStdout, c.waitFinishRead)
go stdCopy(stderr, rStderr, c.waitFinishRead)
return c
}
func stopStdCapture(c *captureData) {
func stopCapture(c *captureData) {
_ = c.writerStdout.Close()
_ = c.writerStderr.Close()
var wg sync.WaitGroup
stdRead := func(in <-chan []byte, outData *bytes.Buffer) {
defer wg.Done()
for {
out, more := <-in
if more {
outData.Write(out)
} else {
return
}
}
}
wg.Add(2)
go stdRead(c.outStdout, c.DataStdOut)
go stdRead(c.outStderr, c.DataStdErr)
wg.Wait()
c.waitFinishRead.Wait()
os.Stdout = c.backupStdout
os.Stderr = c.backupStderr

View File

@ -336,7 +336,7 @@ func (r *Runner) RunEnumeration() {
gologger.Errorf("Could not find any valid input URLs.")
} else if totalRequests > 0 || hasWorkflows {
// track global progress
// tracks global progress and captures stdout/stderr until p.Wait finishes
p.InitProgressbar(r.inputCount, templateCount, totalRequests)
for _, match := range allTemplates {
@ -384,9 +384,7 @@ func (r *Runner) processTemplateWithList(p progress.IProgress, template *templat
if template.Info.Severity != "" {
message += " [" + template.Info.Severity + "]"
}
p.StartStdCapture()
gologger.Infof("%s\n", message)
p.StopStdCapture()
var writer *bufio.Writer
if r.output != nil {
@ -432,9 +430,7 @@ func (r *Runner) processTemplateWithList(p progress.IProgress, template *templat
}
if err != nil {
p.Drop(request.(*requests.BulkHTTPRequest).GetRequestCount())
p.StartStdCapture()
gologger.Warningf("Could not create http client: %s\n", err)
p.StopStdCapture()
return false
}
@ -462,9 +458,7 @@ func (r *Runner) processTemplateWithList(p progress.IProgress, template *templat
globalresult.Or(result.GotResults)
}
if result.Error != nil {
p.StartStdCapture()
gologger.Warningf("Could not execute step: %s\n", result.Error)
p.StopStdCapture()
}
<-r.limiter
}(text)
@ -489,9 +483,7 @@ func (r *Runner) ProcessWorkflowWithList(p progress.IProgress, workflow *workflo
defer wg.Done()
if err := r.ProcessWorkflow(p, workflow, text); err != nil {
p.StartStdCapture()
gologger.Warningf("Could not run workflow for %s: %s\n", text, err)
p.StopStdCapture()
}
<-r.limiter
}(text)
@ -522,9 +514,7 @@ func (r *Runner) ProcessWorkflow(p progress.IProgress, workflow *workflows.Workf
// Check if the template is an absolute path or relative path.
// If the path is absolute, use it. Otherwise,
if r.isRelative(value) {
p.StartStdCapture()
newPath, err := r.resolvePath(value)
p.StopStdCapture()
if err != nil {
newPath, err = r.resolvePathWithBaseFolder(filepath.Dir(workflow.GetPath()), value)
if err != nil {
@ -629,9 +619,7 @@ func (r *Runner) ProcessWorkflow(p progress.IProgress, workflow *workflows.Workf
_, err := script.RunContext(context.Background())
if err != nil {
p.StartStdCapture()
gologger.Errorf("Could not execute workflow '%s': %s\n", workflow.ID, err)
p.StopStdCapture()
return err
}
return nil

View File

@ -94,10 +94,8 @@ func (e *DNSExecuter) ExecuteDNS(p progress.IProgress, URL string) (result Resul
}
if e.debug {
p.StartStdCapture()
gologger.Infof("Dumped DNS request for %s (%s)\n\n", URL, e.template.ID)
fmt.Fprintf(os.Stderr, "%s\n", compiledRequest.String())
p.StopStdCapture()
}
// Send the request to the target servers
@ -110,15 +108,11 @@ func (e *DNSExecuter) ExecuteDNS(p progress.IProgress, URL string) (result Resul
p.Update()
p.StartStdCapture()
gologger.Verbosef("Sent DNS request to %s\n", "dns-request", URL)
p.StopStdCapture()
if e.debug {
p.StartStdCapture()
gologger.Infof("Dumped DNS response for %s (%s)\n\n", URL, e.template.ID)
fmt.Fprintf(os.Stderr, "%s\n", resp.String())
p.StopStdCapture()
}
matcherCondition := e.dnsRequest.GetMatchersCondition()
@ -133,9 +127,7 @@ func (e *DNSExecuter) ExecuteDNS(p progress.IProgress, URL string) (result Resul
// If the matcher has matched, and its an OR
// write the first output then move to next matcher.
if matcherCondition == matchers.ORCondition && len(e.dnsRequest.Extractors) == 0 {
p.StartStdCapture()
e.writeOutputDNS(domain, matcher, nil)
p.StopStdCapture()
result.GotResults = true
}
}
@ -155,9 +147,7 @@ func (e *DNSExecuter) ExecuteDNS(p progress.IProgress, URL string) (result Resul
// Write a final string of output if matcher type is
// AND or if we have extractors for the mechanism too.
if len(e.dnsRequest.Extractors) > 0 || matcherCondition == matchers.ANDCondition {
p.StartStdCapture()
e.writeOutputDNS(domain, nil, extractorResults)
p.StopStdCapture()
}
return

View File

@ -146,9 +146,7 @@ func (e *HTTPExecuter) ExecuteHTTP(p progress.IProgress, URL string) (result Res
remaining--
}
p.StartStdCapture()
gologger.Verbosef("Sent HTTP request to %s\n", "http-request", URL)
p.StopStdCapture()
return
}
@ -162,10 +160,8 @@ func (e *HTTPExecuter) handleHTTP(p progress.IProgress, URL string, request *req
if err != nil {
return errors.Wrap(err, "could not make http request")
}
p.StartStdCapture()
gologger.Infof("Dumped HTTP request for %s (%s)\n\n", URL, e.template.ID)
fmt.Fprintf(os.Stderr, "%s", string(dumpedRequest))
p.StopStdCapture()
}
resp, err := e.httpClient.Do(req)
if err != nil {
@ -180,10 +176,8 @@ func (e *HTTPExecuter) handleHTTP(p progress.IProgress, URL string, request *req
if err != nil {
return errors.Wrap(err, "could not dump http response")
}
p.StartStdCapture()
gologger.Infof("Dumped HTTP response for %s (%s)\n\n", URL, e.template.ID)
fmt.Fprintf(os.Stderr, "%s\n", string(dumpedResponse))
p.StopStdCapture()
}
data, err := ioutil.ReadAll(resp.Body)
@ -220,9 +214,7 @@ func (e *HTTPExecuter) handleHTTP(p progress.IProgress, URL string, request *req
result.Matches[matcher.Name] = nil
// probably redundant but ensures we snapshot current payload values when matchers are valid
result.Meta = request.Meta
p.StartStdCapture()
e.writeOutputHTTP(request, resp, body, matcher, nil)
p.StopStdCapture()
result.GotResults = true
}
}
@ -249,9 +241,7 @@ func (e *HTTPExecuter) handleHTTP(p progress.IProgress, URL string, request *req
// Write a final string of output if matcher type is
// AND or if we have extractors for the mechanism too.
if len(outputExtractorResults) > 0 || matcherCondition == matchers.ANDCondition {
p.StartStdCapture()
e.writeOutputHTTP(request, resp, body, nil, outputExtractorResults)
p.StopStdCapture()
result.GotResults = true
}

View File

@ -67,16 +67,12 @@ func (n *NucleiVar) Call(args ...tengo.Object) (ret tengo.Object, err error) {
httpExecuter, err := executer.NewHTTPExecuter(template.HTTPOptions)
if err != nil {
p.Drop(request.GetRequestCount())
p.StartStdCapture()
gologger.Warningf("Could not compile request for template '%s': %s\n", template.HTTPOptions.Template.ID, err)
p.StopStdCapture()
continue
}
result := httpExecuter.ExecuteHTTP(p, n.URL)
if result.Error != nil {
p.StartStdCapture()
gologger.Warningf("Could not send request for template '%s': %s\n", template.HTTPOptions.Template.ID, result.Error)
p.StopStdCapture()
continue
}
@ -94,9 +90,7 @@ func (n *NucleiVar) Call(args ...tengo.Object) (ret tengo.Object, err error) {
dnsExecuter := executer.NewDNSExecuter(template.DNSOptions)
result := dnsExecuter.ExecuteDNS(p, n.URL)
if result.Error != nil {
p.StartStdCapture()
gologger.Warningf("Could not compile request for template '%s': %s\n", template.HTTPOptions.Template.ID, result.Error)
p.StopStdCapture()
continue
}