使用了不太优雅的办法解决并发调度与可能的goroutine泄露问题

This commit is contained in:
M09Ic 2023-01-16 17:30:54 +08:00
parent 3ff46e5e1f
commit 8756b7503e

View File

@ -51,6 +51,7 @@ func NewPool(ctx context.Context, config *pkg.Config) (*Pool, error) {
tempCh: make(chan *pkg.Baseline, config.Thread), tempCh: make(chan *pkg.Baseline, config.Thread),
checkCh: make(chan int), checkCh: make(chan int),
additionCh: make(chan *Unit, 100), additionCh: make(chan *Unit, 100),
closeCh: make(chan struct{}),
waiter: sync.WaitGroup{}, waiter: sync.WaitGroup{},
initwg: sync.WaitGroup{}, initwg: sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.RateLimit), 1), limiter: rate.NewLimiter(rate.Limit(config.RateLimit), 1),
@ -137,7 +138,9 @@ func NewPool(ctx context.Context, config *pkg.Config) (*Pool, error) {
} }
} }
} }
if !pool.closed {
pool.OutputCh <- bl pool.OutputCh <- bl
}
pool.waiter.Done() pool.waiter.Done()
} }
@ -161,6 +164,8 @@ type Pool struct {
tempCh chan *pkg.Baseline // 待处理的baseline tempCh chan *pkg.Baseline // 待处理的baseline
checkCh chan int // 独立的check管道 防止与redirect/crawl冲突 checkCh chan int // 独立的check管道 防止与redirect/crawl冲突
additionCh chan *Unit additionCh chan *Unit
closeCh chan struct{}
closed bool
wordOffset int wordOffset int
failedCount int failedCount int
isFailed bool isFailed bool
@ -231,22 +236,26 @@ func (pool *Pool) Run(ctx context.Context, offset, limit int) {
go pool.doCommonFile() go pool.doCommonFile()
} }
closeCh := make(chan struct{}) var done bool
var worderDone bool go func() {
wait := func() { for {
if !worderDone { if done {
worderDone = true
pool.waiter.Wait() pool.waiter.Wait()
close(closeCh) close(pool.closeCh)
return
} else {
time.Sleep(100)
} }
} }
}()
Loop: Loop:
for { for {
select { select {
case u, ok := <-pool.worder.C: case u, ok := <-pool.worder.C:
if !ok { if !ok {
go wait() done = true
continue continue
} }
pool.Statistor.End++ pool.Statistor.End++
@ -256,7 +265,7 @@ Loop:
} }
if pool.Statistor.End > limit { if pool.Statistor.End > limit {
go wait() done = true
continue continue
} }
@ -271,7 +280,7 @@ Loop:
pool.reqPool.Invoke(newUnitWithNumber(pool.safePath(pkg.RandPath()), source, pool.wordOffset)) pool.reqPool.Invoke(newUnitWithNumber(pool.safePath(pkg.RandPath()), source, pool.wordOffset))
} }
case unit, ok := <-pool.additionCh: case unit, ok := <-pool.additionCh:
if !ok { if !ok || pool.closed {
continue continue
} }
if _, ok := pool.urls[unit.path]; ok { if _, ok := pool.urls[unit.path]; ok {
@ -282,7 +291,7 @@ Loop:
unit.number = pool.wordOffset unit.number = pool.wordOffset
pool.reqPool.Invoke(unit) pool.reqPool.Invoke(unit)
} }
case <-closeCh: case <-pool.closeCh:
break Loop break Loop
case <-ctx.Done(): case <-ctx.Done():
break Loop break Loop
@ -290,7 +299,7 @@ Loop:
break Loop break Loop
} }
} }
pool.closed = true
pool.Close() pool.Close()
} }
@ -665,6 +674,11 @@ func (pool *Pool) doCheck() {
} }
func (pool *Pool) addAddition(u *Unit) { func (pool *Pool) addAddition(u *Unit) {
// 强行屏蔽报错, 防止goroutine泄露
defer func() {
if err := recover(); err != nil {
}
}()
pool.additionCh <- u pool.additionCh <- u
} }
@ -702,11 +716,11 @@ func (pool *Pool) recover() {
func (pool *Pool) Close() { func (pool *Pool) Close() {
for pool.analyzeDone { for pool.analyzeDone {
// 等待缓存的待处理任务完成
time.Sleep(time.Duration(100) * time.Millisecond) time.Sleep(time.Duration(100) * time.Millisecond)
} }
close(pool.additionCh) // 关闭addition管道
close(pool.additionCh) close(pool.checkCh) // 关闭check管道
close(pool.checkCh)
pool.Statistor.EndTime = time.Now().Unix() pool.Statistor.EndTime = time.Now().Unix()
pool.bar.Close() pool.bar.Close()
} }