From cd34c7b2dd8f646e073700c3a8c0d21ff629897e Mon Sep 17 00:00:00 2001 From: M09Ic Date: Thu, 30 May 2024 23:33:57 +0800 Subject: [PATCH] fix checkpool bar thread safe bug --- internal/ihttp/client.go | 11 ++--- internal/option.go | 10 ++--- internal/pool/brutepool.go | 13 +++--- internal/pool/checkpool.go | 90 +++++++++++++++++++++----------------- internal/pool/config.go | 1 + internal/pool/pool.go | 3 +- spray.go | 1 + 7 files changed, 68 insertions(+), 61 deletions(-) diff --git a/internal/ihttp/client.go b/internal/ihttp/client.go index 0c00336..a2fa8f4 100644 --- a/internal/ihttp/client.go +++ b/internal/ihttp/client.go @@ -48,8 +48,8 @@ func NewClient(config *ClientConfig) *Client { MaxConnsPerHost: config.Thread * 3 / 2, MaxIdleConnDuration: config.Timeout, //MaxConnWaitTimeout: time.Duration(timeout) * time.Second, - //ReadTimeout: time.Duration(timeout) * time.Second, - //WriteTimeout: time.Duration(timeout) * time.Second, + //ReadTimeout: config.Timeout * time.Second, + //WriteTimeout: config.Timeout * time.Second, ReadBufferSize: 16384, // 16k MaxResponseBodySize: int(DefaultMaxBodySize), NoDefaultUserAgentHeader: true, @@ -68,9 +68,10 @@ func NewClient(config *ClientConfig) *Client { Renegotiation: tls.RenegotiateOnceAsClient, InsecureSkipVerify: true, }, - MaxConnsPerHost: config.Thread * 3 / 2, - IdleConnTimeout: config.Timeout, - ReadBufferSize: 16384, // 16k + TLSHandshakeTimeout: config.Timeout, + MaxConnsPerHost: config.Thread * 3 / 2, + IdleConnTimeout: config.Timeout, + ReadBufferSize: 16384, // 16k }, Timeout: config.Timeout, CheckRedirect: func(req *http.Request, via []*http.Request) error { diff --git a/internal/option.go b/internal/option.go index b374d3a..46eb2f8 100644 --- a/internal/option.go +++ b/internal/option.go @@ -142,7 +142,7 @@ type MiscOptions struct { Mod string `short:"m" long:"mod" default:"path" choice:"path" choice:"host" description:"String, path/host spray" config:"mod"` Client string `short:"C" long:"client" default:"auto" choice:"fast" choice:"standard" choice:"auto" description:"String, Client type" config:"client"` Deadline int `long:"deadline" default:"999999" description:"Int, deadline (seconds)" config:"deadline"` // todo 总的超时时间,适配云函数的deadline - Timeout int `long:"timeout" default:"5" description:"Int, timeout with request (seconds)" config:"timeout"` + Timeout int `short:"T" long:"timeout" default:"5" description:"Int, timeout with request (seconds)" config:"timeout"` PoolSize int `short:"P" long:"pool" default:"5" description:"Int, Pool size" config:"pool"` Threads int `short:"t" long:"thread" default:"20" description:"Int, number of threads per pool" config:"thread"` Debug bool `long:"debug" description:"Bool, output debug info" config:"debug"` @@ -169,9 +169,9 @@ func (opt *Option) PrepareRunner() (*Runner, error) { Offset: opt.Offset, Total: opt.Limit, taskCh: make(chan *Task), - outputCh: make(chan *pkg.Baseline, 100), + outputCh: make(chan *pkg.Baseline, 256), outwg: &sync.WaitGroup{}, - fuzzyCh: make(chan *pkg.Baseline, 100), + fuzzyCh: make(chan *pkg.Baseline, 256), Fuzzy: opt.Fuzzy, Force: opt.Force, CheckOnly: opt.CheckOnly, @@ -443,10 +443,6 @@ func (opt *Option) PrepareRunner() (*Runner, error) { // 根据不同的输入类型生成任务 if len(opt.URL) == 1 { - //u, err := fixUrl(opt.URL[0]) - //if err != nil { - // return nil, err - //} go func() { opt.GenerateTasks(tasks, opt.URL[0], ports) close(tasks) diff --git a/internal/pool/brutepool.go b/internal/pool/brutepool.go index e3777d6..a9e4562 100644 --- a/internal/pool/brutepool.go +++ b/internal/pool/brutepool.go @@ -50,6 +50,7 @@ func NewBrutePool(ctx context.Context, config *Config) (*BrutePool, error) { }), additionCh: make(chan *Unit, config.Thread), closeCh: make(chan struct{}), + processCh: make(chan *pkg.Baseline, config.Thread), wg: sync.WaitGroup{}, }, base: u.Scheme + "://" + u.Host, @@ -58,7 +59,6 @@ func NewBrutePool(ctx context.Context, config *Config) (*BrutePool, error) { scopeurls: make(map[string]struct{}), uniques: make(map[uint16]struct{}), - handlerCh: make(chan *pkg.Baseline, config.Thread), checkCh: make(chan struct{}, config.Thread), initwg: sync.WaitGroup{}, limiter: rate.NewLimiter(rate.Limit(config.RateLimit), 1), @@ -91,8 +91,7 @@ type BrutePool struct { reqPool *ants.PoolWithFunc scopePool *ants.PoolWithFunc - handlerCh chan *pkg.Baseline // 待处理的baseline - checkCh chan struct{} // 独立的check管道, 防止与redirect/crawl冲突 + checkCh chan struct{} // 独立的check管道, 防止与redirect/crawl冲突 closed bool wordOffset int failedCount int32 @@ -406,7 +405,7 @@ func (pool *BrutePool) Invoke(v interface{}) { case parsers.WordSource: // 异步进行性能消耗较大的深度对比 - pool.handlerCh <- bl + pool.processCh <- bl if int(pool.Statistor.ReqTotal)%pool.CheckPeriod == 0 { pool.doCheck() } else if pool.failedCount%pool.ErrPeriod == 0 { @@ -416,9 +415,9 @@ func (pool *BrutePool) Invoke(v interface{}) { pool.Bar.Done() case parsers.RedirectSource: bl.FrontURL = unit.frontUrl - pool.handlerCh <- bl + pool.processCh <- bl default: - pool.handlerCh <- bl + pool.processCh <- bl } } @@ -454,7 +453,7 @@ func (pool *BrutePool) NoScopeInvoke(v interface{}) { } func (pool *BrutePool) Handler() { - for bl := range pool.handlerCh { + for bl := range pool.processCh { if bl.IsValid { pool.addFuzzyBaseline(bl) } diff --git a/internal/pool/checkpool.go b/internal/pool/checkpool.go index 5cb0998..45aa58a 100644 --- a/internal/pool/checkpool.go +++ b/internal/pool/checkpool.go @@ -2,13 +2,11 @@ package pool import ( "context" - "errors" "github.com/chainreactors/logs" "github.com/chainreactors/parsers" "github.com/chainreactors/spray/internal/ihttp" "github.com/chainreactors/spray/pkg" "github.com/panjf2000/ants/v2" - "github.com/valyala/fasthttp" "net/url" "strings" "sync" @@ -18,6 +16,7 @@ import ( // 类似httpx的无状态, 无scope, 无并发池的检测模式 func NewCheckPool(ctx context.Context, config *Config) (*CheckPool, error) { pctx, cancel := context.WithCancel(ctx) + config.ClientType = ihttp.STANDARD pool := &CheckPool{ &BasePool{ Config: config, @@ -31,14 +30,16 @@ func NewCheckPool(ctx context.Context, config *Config) (*CheckPool, error) { ProxyAddr: config.ProxyAddr, }), wg: sync.WaitGroup{}, - additionCh: make(chan *Unit, 100), + additionCh: make(chan *Unit, 1024), closeCh: make(chan struct{}), + processCh: make(chan *pkg.Baseline, config.Thread), }, } pool.Headers = map[string]string{"Connection": "close"} p, _ := ants.NewPoolWithFunc(config.Thread, pool.Invoke) pool.BasePool.Pool = p + go pool.Handler() return pool, nil } @@ -100,21 +101,32 @@ Loop: } func (pool *CheckPool) Invoke(v interface{}) { + defer func() { + pool.reqCount++ + pool.wg.Done() + }() + unit := v.(*Unit) req, err := pool.genReq(unit.path) if err != nil { - logs.Log.Error(err.Error()) + logs.Log.Debug(err.Error()) + bl := &pkg.Baseline{ + SprayResult: &parsers.SprayResult{ + UrlString: unit.path, + IsValid: false, + ErrString: err.Error(), + Reason: pkg.ErrUrlError.Error(), + ReqDepth: unit.depth, + }, + } + pool.processCh <- bl + return } req.SetHeaders(pool.Headers) start := time.Now() var bl *pkg.Baseline resp, reqerr := pool.client.Do(pool.ctx, req) - if pool.ClientType == ihttp.FAST { - defer fasthttp.ReleaseResponse(resp.FastResponse) - defer fasthttp.ReleaseRequest(req.FastRequest) - } - - if reqerr != nil && !errors.Is(reqerr, fasthttp.ErrBodyTooLarge) { + if reqerr != nil { pool.failedCount++ bl = &pkg.Baseline{ SprayResult: &parsers.SprayResult{ @@ -125,13 +137,8 @@ func (pool *CheckPool) Invoke(v interface{}) { ReqDepth: unit.depth, }, } - - if strings.Contains(reqerr.Error(), "timed out") || strings.Contains(reqerr.Error(), "actively refused") { - - } else { - pool.doUpgrade(bl) - } - + logs.Log.Debugf("%s, %s", unit.path, reqerr.Error()) + pool.doUpgrade(bl) } else { bl = pkg.NewBaseline(req.URI(), req.Host(), resp) bl.Collect() @@ -139,32 +146,33 @@ func (pool *CheckPool) Invoke(v interface{}) { bl.ReqDepth = unit.depth bl.Source = unit.source bl.Spended = time.Since(start).Milliseconds() - - // 手动处理重定向 - if bl.IsValid { - if bl.RedirectURL != "" { - pool.doRedirect(bl, unit.depth) - pool.putToFuzzy(bl) - } else if bl.Status == 400 { - pool.doUpgrade(bl) - pool.putToFuzzy(bl) - } else { - params := map[string]interface{}{ - "current": bl, - } - if pool.MatchExpr == nil || pkg.CompareWithExpr(pool.MatchExpr, params) { - pool.putToOutput(bl) - } - } - } - - if bl.Source == parsers.CheckSource { - pool.Bar.Done() - } - pool.reqCount++ - pool.wg.Done() + pool.processCh <- bl } +func (pool *CheckPool) Handler() { + for bl := range pool.processCh { + if bl.IsValid { + if bl.RedirectURL != "" { + pool.doRedirect(bl, bl.ReqDepth) + pool.putToFuzzy(bl) + } else if bl.Status == 400 { + pool.doUpgrade(bl) + pool.putToFuzzy(bl) + } else { + params := map[string]interface{}{ + "current": bl, + } + if pool.MatchExpr != nil && pkg.CompareWithExpr(pool.MatchExpr, params) { + bl.IsValid = true + } + } + } + if bl.Source == parsers.CheckSource { + pool.Bar.Done() + } + pool.putToOutput(bl) + } +} func (pool *CheckPool) doRedirect(bl *pkg.Baseline, depth int) { if depth >= MaxRedirect { return diff --git a/internal/pool/config.go b/internal/pool/config.go index bbf2460..ee79bb7 100644 --- a/internal/pool/config.go +++ b/internal/pool/config.go @@ -27,6 +27,7 @@ type Config struct { Thread int Wordlist []string Timeout int + ProcessCh chan *pkg.Baseline OutputCh chan *pkg.Baseline FuzzyCh chan *pkg.Baseline OutLocker *sync.WaitGroup diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 6cfefe8..ebbf385 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -20,9 +20,10 @@ type BasePool struct { Pool *ants.PoolWithFunc Bar *pkg.Bar Worder *words.Worder + Cancel context.CancelFunc client *ihttp.Client ctx context.Context - Cancel context.CancelFunc + processCh chan *pkg.Baseline // 待处理的baseline dir string reqCount int failedCount int diff --git a/spray.go b/spray.go index 71b68c7..c57a737 100644 --- a/spray.go +++ b/spray.go @@ -5,6 +5,7 @@ import ( "github.com/chainreactors/spray/cmd" "github.com/gookit/config/v2" "github.com/gookit/config/v2/yaml" + //_ "net/http/pprof" ) func init() {