需求分析 #
实现一整套任务管理体系,方便在后台运行程序
代码示例 #
type Demo struct {
wg *sync.WaitGroup // 利用同步机制来控制主协程不退出
// 信号量机制,方便监听来控制子协程
ctx context.Context
cancel context.CancelFunc
started int32 // 原子操作,用于标记是否启动,可以防止重复启动/关闭
// 由于需要等待多线程完全处理完毕,所以无法直接对关闭信号量(有可能导致依赖信号量存在的功能失效),需要使用新的通道来控制子协程
isCancel chan struct{}
}
type DemoChannel struct {
recordId uint64
}
func NewDemo() *Demo {
return &Demo{
wg: &sync.WaitGroup{},
}
}
func (s *Demo) Start(ctx context.Context) {
if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
g.Dump("already started")
return
}
s.ctx, s.cancel = context.WithCancel(ctx)
s.wg.Add(1)
s.isCancel = make(chan struct{})
go s.run()
g.Dump("started")
}
func (s *Demo) Stop() {
if !atomic.CompareAndSwapInt32(&s.started, 1, 0) {
g.Dump("already stoped")
return
}
g.Dump("start stoped")
close(s.isCancel)
s.wg.Wait()
if s.ctx != nil {
s.cancel()
}
g.Dump("stoped")
}
func (s *Demo) run() {
defer func() {
s.wg.Done()
if r := recover(); r != nil {
g.Dump("error")
}
}()
for {
select {
case <-s.isCancel:
g.Dump("exiting")
return
case <-s.ctx.Done():
g.Dump("exiting")
return
default:
// 模拟工作负载
s.runLogic()
}
time.Sleep(time.Second)
}
}
func (s *Demo) runLogic() {
g.Dump("分发队列开始")
// 内部开启多进程来进行事件的清洗
var (
wg = sync.WaitGroup{}
queueCount = 5
)
var (
queues = make([]chan DemoChannel, queueCount)
)
wg.Add(queueCount)
for queueId := range queues {
queues[queueId] = make(chan DemoChannel, 20)
go func(id int, ch <-chan DemoChannel) {
g.Dump("清洗队列开始")
// 数据清洗
defer func() {
g.Dump("清洗队列结束")
wg.Done()
}()
for temp := range ch {
// 调用clear
g.Dump(temp.recordId)
time.Sleep(time.Second)
}
}(queueId, queues[queueId])
}
// 原始数据整理并下发
for i := 0; i <= 1000; i++ {
var breakFlag bool
select {
case <-s.isCancel:
g.Dump("分发队列停止")
breakFlag = true
break
default:
}
if breakFlag {
break
}
ii := i % 5
queues[ii] <- DemoChannel{
recordId: uint64(i),
message: model.V3ESMessage{},
}
time.Sleep(time.Second)
}
// 关闭队列
for _, queue := range queues {
close(queue)
}
wg.Wait()
}