Skip to main content

backend task system

·266 words·2 mins
😈long && 😻liang
Author
😈long && 😻liang
A IT worker with PHP/GO as the main technology stack

需求分析
#

实现一整套任务管理体系,方便在后台运行程序

代码示例
#

type Demo struct {
    wg     *sync.WaitGroup // 利用同步机制来控制主协程不退出
    
    // 信号量机制,方便监听来控制子协程
    ctx    context.Context
    cancel context.CancelFunc

    started int32 // 原子操作,用于标记是否启动,可以防止重复启动/关闭

    // 由于需要等待多线程完全处理完毕,所以无法直接对关闭信号量(有可能导致依赖信号量存在的功能失效),需要使用新的通道来控制子协程
    isCancel chan struct{}
}

type DemoChannel struct {
    recordId uint64
}

func NewDemo() *Demo {
    return &Demo{
        wg: &sync.WaitGroup{},
    }
}

func (s *Demo) Start(ctx context.Context) {
    if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
        g.Dump("already started")
        return
    }

    s.ctx, s.cancel = context.WithCancel(ctx)
    s.wg.Add(1)

    s.isCancel = make(chan struct{})

    go s.run()

    g.Dump("started")
}

func (s *Demo) Stop() {
    if !atomic.CompareAndSwapInt32(&s.started, 1, 0) {
        g.Dump("already stoped")
        return
    }
    g.Dump("start stoped")

    close(s.isCancel)

    s.wg.Wait()
    
    if s.ctx != nil {
        s.cancel()
    }

    g.Dump("stoped")
}

func (s *Demo) run() {
    defer func() {
        s.wg.Done()

        if r := recover(); r != nil {
            g.Dump("error")
        }
    }()

    for {
        select {
        case <-s.isCancel:
            g.Dump("exiting")
            return
        case <-s.ctx.Done():
            g.Dump("exiting")
            return
        default:
            // 模拟工作负载
            s.runLogic()
        }

        time.Sleep(time.Second)
    }
}

func (s *Demo) runLogic() {
    g.Dump("分发队列开始")
    // 内部开启多进程来进行事件的清洗
    var (
        wg         = sync.WaitGroup{}
        queueCount = 5
    )

    var (
        queues = make([]chan DemoChannel, queueCount)
    )

    wg.Add(queueCount)
    for queueId := range queues {
        queues[queueId] = make(chan DemoChannel, 20)
        go func(id int, ch <-chan DemoChannel) {
            g.Dump("清洗队列开始")
            // 数据清洗
            defer func() {
                g.Dump("清洗队列结束")
                wg.Done()
            }()

            for temp := range ch {
                // 调用clear
                g.Dump(temp.recordId)
                time.Sleep(time.Second)
            }

        }(queueId, queues[queueId])
    }

    // 原始数据整理并下发
    for i := 0; i <= 1000; i++ {
        var breakFlag bool
        select {
        case <-s.isCancel:
            g.Dump("分发队列停止")
            breakFlag = true
            break
        default:
        }

        if breakFlag {
            break
        }

        ii := i % 5

        queues[ii] <- DemoChannel{
            recordId: uint64(i),
            message:  model.V3ESMessage{},
        }
        time.Sleep(time.Second)
    }

    // 关闭队列
    for _, queue := range queues {
        close(queue)
    }

    wg.Wait()
}