spray/core/pool/pool.go

73 lines
1.4 KiB
Go
Raw Normal View History

2024-02-10 18:23:50 +08:00
package pool
import (
"context"
"github.com/chainreactors/parsers"
"github.com/chainreactors/spray/core/baseline"
"github.com/chainreactors/spray/core/ihttp"
2024-02-10 18:23:50 +08:00
"github.com/chainreactors/spray/pkg"
"github.com/chainreactors/words"
"sync"
"sync/atomic"
2024-02-10 18:23:50 +08:00
)
type BasePool struct {
2024-02-10 18:23:50 +08:00
*Config
Statistor *pkg.Statistor
Bar *pkg.Bar
Worder *words.Worder
2024-05-30 23:33:57 +08:00
Cancel context.CancelFunc
2024-02-10 18:23:50 +08:00
client *ihttp.Client
ctx context.Context
processCh chan *baseline.Baseline // 待处理的baseline
2024-02-10 18:23:50 +08:00
dir string
reqCount int
failedCount int
additionCh chan *Unit
closeCh chan struct{}
2024-08-12 15:12:43 +08:00
wg *sync.WaitGroup
isFallback atomic.Bool
2024-02-10 18:23:50 +08:00
}
func (pool *BasePool) doRetry(bl *baseline.Baseline) {
if bl.Retry >= pool.RetryLimit {
2024-02-10 18:23:50 +08:00
return
}
pool.wg.Add(1)
go func() {
defer pool.wg.Done()
pool.addAddition(&Unit{
path: bl.Path,
2024-10-14 02:20:16 +08:00
parent: bl.Number,
host: bl.Host,
2024-02-10 18:23:50 +08:00
source: parsers.RetrySource,
2024-10-14 02:20:16 +08:00
from: bl.Source,
2024-02-10 18:23:50 +08:00
retry: bl.Retry + 1,
})
}()
}
func (pool *BasePool) addAddition(u *Unit) {
2024-02-10 18:23:50 +08:00
// 强行屏蔽报错, 防止goroutine泄露
pool.wg.Add(1)
defer func() {
if err := recover(); err != nil {
}
}()
pool.additionCh <- u
}
func (pool *BasePool) putToOutput(bl *baseline.Baseline) {
2024-07-01 19:51:16 +08:00
if bl.IsValid || bl.IsFuzzy {
bl.Collect()
}
2024-07-14 04:08:50 +08:00
pool.Outwg.Add(1)
2024-02-10 18:23:50 +08:00
pool.OutputCh <- bl
}
func (pool *BasePool) putToFuzzy(bl *baseline.Baseline) {
2024-07-14 04:08:50 +08:00
pool.Outwg.Add(1)
2024-02-10 18:23:50 +08:00
bl.IsFuzzy = true
pool.FuzzyCh <- bl
}