修复程序并发调度失败的bug

This commit is contained in:
M09Ic 2023-01-09 11:55:27 +08:00
parent e3fc74e78d
commit c84440a662
2 changed files with 20 additions and 9 deletions

View File

@ -210,33 +210,41 @@ func (pool *Pool) Run(ctx context.Context, offset, limit int) {
go pool.doCommonFile()
}
go func() {
for {
closeCh := make(chan struct{})
//go func() {
// select {
// case <-worderDone:
// closeCh <- struct{}{}
// }
//}()
var worderDone bool
wait := func() {
if !worderDone {
worderDone = true
pool.wg.Wait()
pool.closeCh <- struct{}{}
close(closeCh)
}
}
}()
Loop:
for {
select {
case u, ok := <-pool.worder.C:
if !ok {
go wait()
continue
}
pool.Statistor.End++
if int(pool.reqCount) < offset {
if pool.reqCount < offset {
pool.reqCount++
continue
}
if pool.Statistor.End > limit {
go wait()
continue
}
if u == "" {
continue
}
pool.wg.Add(1)
pool.reqPool.Invoke(newUnit(u, WordSource))
case source := <-pool.checkCh:
@ -251,7 +259,7 @@ Loop:
continue
}
pool.reqPool.Invoke(unit)
case <-pool.closeCh:
case <-closeCh:
break Loop
case <-ctx.Done():
break Loop
@ -260,6 +268,7 @@ Loop:
}
}
pool.wg.Wait()
for pool.analyzeDone {
time.Sleep(time.Duration(100) * time.Millisecond)
}

View File

@ -35,6 +35,8 @@ func GetSourceName(s int) string {
return "rule"
case 10:
return "bak"
case 11:
return "common"
default:
return "unknown"
}