fix checkpool bar thread safe bug

This commit is contained in:
M09Ic 2024-05-30 23:33:57 +08:00
parent 4c8c00416b
commit cd34c7b2dd
7 changed files with 68 additions and 61 deletions

View File

@ -48,8 +48,8 @@ func NewClient(config *ClientConfig) *Client {
MaxConnsPerHost: config.Thread * 3 / 2,
MaxIdleConnDuration: config.Timeout,
//MaxConnWaitTimeout: time.Duration(timeout) * time.Second,
//ReadTimeout: time.Duration(timeout) * time.Second,
//WriteTimeout: time.Duration(timeout) * time.Second,
//ReadTimeout: config.Timeout * time.Second,
//WriteTimeout: config.Timeout * time.Second,
ReadBufferSize: 16384, // 16k
MaxResponseBodySize: int(DefaultMaxBodySize),
NoDefaultUserAgentHeader: true,
@ -68,6 +68,7 @@ func NewClient(config *ClientConfig) *Client {
Renegotiation: tls.RenegotiateOnceAsClient,
InsecureSkipVerify: true,
},
TLSHandshakeTimeout: config.Timeout,
MaxConnsPerHost: config.Thread * 3 / 2,
IdleConnTimeout: config.Timeout,
ReadBufferSize: 16384, // 16k

View File

@ -142,7 +142,7 @@ type MiscOptions struct {
Mod string `short:"m" long:"mod" default:"path" choice:"path" choice:"host" description:"String, path/host spray" config:"mod"`
Client string `short:"C" long:"client" default:"auto" choice:"fast" choice:"standard" choice:"auto" description:"String, Client type" config:"client"`
Deadline int `long:"deadline" default:"999999" description:"Int, deadline (seconds)" config:"deadline"` // todo 总的超时时间,适配云函数的deadline
Timeout int `long:"timeout" default:"5" description:"Int, timeout with request (seconds)" config:"timeout"`
Timeout int `short:"T" long:"timeout" default:"5" description:"Int, timeout with request (seconds)" config:"timeout"`
PoolSize int `short:"P" long:"pool" default:"5" description:"Int, Pool size" config:"pool"`
Threads int `short:"t" long:"thread" default:"20" description:"Int, number of threads per pool" config:"thread"`
Debug bool `long:"debug" description:"Bool, output debug info" config:"debug"`
@ -169,9 +169,9 @@ func (opt *Option) PrepareRunner() (*Runner, error) {
Offset: opt.Offset,
Total: opt.Limit,
taskCh: make(chan *Task),
outputCh: make(chan *pkg.Baseline, 100),
outputCh: make(chan *pkg.Baseline, 256),
outwg: &sync.WaitGroup{},
fuzzyCh: make(chan *pkg.Baseline, 100),
fuzzyCh: make(chan *pkg.Baseline, 256),
Fuzzy: opt.Fuzzy,
Force: opt.Force,
CheckOnly: opt.CheckOnly,
@ -443,10 +443,6 @@ func (opt *Option) PrepareRunner() (*Runner, error) {
// 根据不同的输入类型生成任务
if len(opt.URL) == 1 {
//u, err := fixUrl(opt.URL[0])
//if err != nil {
// return nil, err
//}
go func() {
opt.GenerateTasks(tasks, opt.URL[0], ports)
close(tasks)

View File

@ -50,6 +50,7 @@ func NewBrutePool(ctx context.Context, config *Config) (*BrutePool, error) {
}),
additionCh: make(chan *Unit, config.Thread),
closeCh: make(chan struct{}),
processCh: make(chan *pkg.Baseline, config.Thread),
wg: sync.WaitGroup{},
},
base: u.Scheme + "://" + u.Host,
@ -58,7 +59,6 @@ func NewBrutePool(ctx context.Context, config *Config) (*BrutePool, error) {
scopeurls: make(map[string]struct{}),
uniques: make(map[uint16]struct{}),
handlerCh: make(chan *pkg.Baseline, config.Thread),
checkCh: make(chan struct{}, config.Thread),
initwg: sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.RateLimit), 1),
@ -91,7 +91,6 @@ type BrutePool struct {
reqPool *ants.PoolWithFunc
scopePool *ants.PoolWithFunc
handlerCh chan *pkg.Baseline // 待处理的baseline
checkCh chan struct{} // 独立的check管道 防止与redirect/crawl冲突
closed bool
wordOffset int
@ -406,7 +405,7 @@ func (pool *BrutePool) Invoke(v interface{}) {
case parsers.WordSource:
// 异步进行性能消耗较大的深度对比
pool.handlerCh <- bl
pool.processCh <- bl
if int(pool.Statistor.ReqTotal)%pool.CheckPeriod == 0 {
pool.doCheck()
} else if pool.failedCount%pool.ErrPeriod == 0 {
@ -416,9 +415,9 @@ func (pool *BrutePool) Invoke(v interface{}) {
pool.Bar.Done()
case parsers.RedirectSource:
bl.FrontURL = unit.frontUrl
pool.handlerCh <- bl
pool.processCh <- bl
default:
pool.handlerCh <- bl
pool.processCh <- bl
}
}
@ -454,7 +453,7 @@ func (pool *BrutePool) NoScopeInvoke(v interface{}) {
}
func (pool *BrutePool) Handler() {
for bl := range pool.handlerCh {
for bl := range pool.processCh {
if bl.IsValid {
pool.addFuzzyBaseline(bl)
}

View File

@ -2,13 +2,11 @@ package pool
import (
"context"
"errors"
"github.com/chainreactors/logs"
"github.com/chainreactors/parsers"
"github.com/chainreactors/spray/internal/ihttp"
"github.com/chainreactors/spray/pkg"
"github.com/panjf2000/ants/v2"
"github.com/valyala/fasthttp"
"net/url"
"strings"
"sync"
@ -18,6 +16,7 @@ import (
// 类似httpx的无状态, 无scope, 无并发池的检测模式
func NewCheckPool(ctx context.Context, config *Config) (*CheckPool, error) {
pctx, cancel := context.WithCancel(ctx)
config.ClientType = ihttp.STANDARD
pool := &CheckPool{
&BasePool{
Config: config,
@ -31,14 +30,16 @@ func NewCheckPool(ctx context.Context, config *Config) (*CheckPool, error) {
ProxyAddr: config.ProxyAddr,
}),
wg: sync.WaitGroup{},
additionCh: make(chan *Unit, 100),
additionCh: make(chan *Unit, 1024),
closeCh: make(chan struct{}),
processCh: make(chan *pkg.Baseline, config.Thread),
},
}
pool.Headers = map[string]string{"Connection": "close"}
p, _ := ants.NewPoolWithFunc(config.Thread, pool.Invoke)
pool.BasePool.Pool = p
go pool.Handler()
return pool, nil
}
@ -100,21 +101,32 @@ Loop:
}
func (pool *CheckPool) Invoke(v interface{}) {
defer func() {
pool.reqCount++
pool.wg.Done()
}()
unit := v.(*Unit)
req, err := pool.genReq(unit.path)
if err != nil {
logs.Log.Error(err.Error())
logs.Log.Debug(err.Error())
bl := &pkg.Baseline{
SprayResult: &parsers.SprayResult{
UrlString: unit.path,
IsValid: false,
ErrString: err.Error(),
Reason: pkg.ErrUrlError.Error(),
ReqDepth: unit.depth,
},
}
pool.processCh <- bl
return
}
req.SetHeaders(pool.Headers)
start := time.Now()
var bl *pkg.Baseline
resp, reqerr := pool.client.Do(pool.ctx, req)
if pool.ClientType == ihttp.FAST {
defer fasthttp.ReleaseResponse(resp.FastResponse)
defer fasthttp.ReleaseRequest(req.FastRequest)
}
if reqerr != nil && !errors.Is(reqerr, fasthttp.ErrBodyTooLarge) {
if reqerr != nil {
pool.failedCount++
bl = &pkg.Baseline{
SprayResult: &parsers.SprayResult{
@ -125,13 +137,8 @@ func (pool *CheckPool) Invoke(v interface{}) {
ReqDepth: unit.depth,
},
}
if strings.Contains(reqerr.Error(), "timed out") || strings.Contains(reqerr.Error(), "actively refused") {
} else {
logs.Log.Debugf("%s, %s", unit.path, reqerr.Error())
pool.doUpgrade(bl)
}
} else {
bl = pkg.NewBaseline(req.URI(), req.Host(), resp)
bl.Collect()
@ -139,11 +146,14 @@ func (pool *CheckPool) Invoke(v interface{}) {
bl.ReqDepth = unit.depth
bl.Source = unit.source
bl.Spended = time.Since(start).Milliseconds()
pool.processCh <- bl
}
// 手动处理重定向
func (pool *CheckPool) Handler() {
for bl := range pool.processCh {
if bl.IsValid {
if bl.RedirectURL != "" {
pool.doRedirect(bl, unit.depth)
pool.doRedirect(bl, bl.ReqDepth)
pool.putToFuzzy(bl)
} else if bl.Status == 400 {
pool.doUpgrade(bl)
@ -152,19 +162,17 @@ func (pool *CheckPool) Invoke(v interface{}) {
params := map[string]interface{}{
"current": bl,
}
if pool.MatchExpr == nil || pkg.CompareWithExpr(pool.MatchExpr, params) {
pool.putToOutput(bl)
if pool.MatchExpr != nil && pkg.CompareWithExpr(pool.MatchExpr, params) {
bl.IsValid = true
}
}
}
if bl.Source == parsers.CheckSource {
pool.Bar.Done()
}
pool.reqCount++
pool.wg.Done()
pool.putToOutput(bl)
}
}
func (pool *CheckPool) doRedirect(bl *pkg.Baseline, depth int) {
if depth >= MaxRedirect {
return

View File

@ -27,6 +27,7 @@ type Config struct {
Thread int
Wordlist []string
Timeout int
ProcessCh chan *pkg.Baseline
OutputCh chan *pkg.Baseline
FuzzyCh chan *pkg.Baseline
OutLocker *sync.WaitGroup

View File

@ -20,9 +20,10 @@ type BasePool struct {
Pool *ants.PoolWithFunc
Bar *pkg.Bar
Worder *words.Worder
Cancel context.CancelFunc
client *ihttp.Client
ctx context.Context
Cancel context.CancelFunc
processCh chan *pkg.Baseline // 待处理的baseline
dir string
reqCount int
failedCount int

View File

@ -5,6 +5,7 @@ import (
"github.com/chainreactors/spray/cmd"
"github.com/gookit/config/v2"
"github.com/gookit/config/v2/yaml"
//_ "net/http/pprof"
)
func init() {