Skip to main content

Queue

·297 words·2 mins
😈long && 😻liang
Author
😈long && 😻liang
A IT worker with PHP/GO as the main technology stack

需求分析
#

实现一个包含16个缓冲队列的数组,使用ID取模的方式路由消息到不同队列,并等待所有队列处理完成的解决方案。

代码示例
#

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	queueCount = 16      // 队列数量
	bufferSize = 100     // 每个队列的缓冲区大小
	messageCount = 1000  // 要发送的消息总数
)

func main() {
	// 1. 创建16个缓冲队列
	queues := make([]chan int, queueCount)
	for i := range queues {
		queues[i] = make(chan int, bufferSize)
	}

	// 2. 创建WaitGroup用于等待所有消费者完成
	var wg sync.WaitGroup
	wg.Add(queueCount)

	// 3. 为每个队列启动一个消费者goroutine
	for i, queue := range queues {
		go func(queueID int, ch <-chan int) {
			defer wg.Done()
			processQueue(queueID, ch)
		}(i, queue)
	}

	// 4. 生产者发送消息(使用ID取模路由)
	start := time.Now()
	for id := 0; id < messageCount; id++ {
		queueIndex := id % queueCount  // 取模决定路由到哪个队列
		queues[queueIndex] <- id      // 发送到对应队列
	}

	// 5. 关闭所有队列(通知消费者处理完成)
	for _, queue := range queues {
		close(queue)
	}

	// 6. 等待所有消费者完成
	wg.Wait()

	fmt.Printf("All queues processed. Time elapsed: %v\n", time.Since(start))
}

// 处理队列中的消息
func processQueue(queueID int, ch <-chan int) {
	count := 0
	for msg := range ch {
		// 模拟处理消息(这里简单打印)
		if count%100 == 0 { // 每100条打印一次,避免输出太多
			fmt.Printf("Queue %d processing message %d\n", queueID, msg)
		}
		count++
		time.Sleep(10 * time.Millisecond) // 模拟处理耗时
	}
	fmt.Printf("Queue %d processed %d messages\n", queueID, count)
}

代码解析
#

  1. 队列创建
queues := make([]chan int, queueCount)
for i := range queues {
    queues[i] = make(chan int, bufferSize)
}
  • 创建包含16个元素的通道数组。
  • 每个通道都是缓冲通道,容量为100。
  1. 消费者goroutine
// 通道未关闭时:for range会持续接收新消息,包括其他goroutine新插入的消息
// 通道关闭后:for range会处理完已缓冲的消息后退出,不会接收关闭后新发送的消息(会panic)
// 遍历是阻塞的:如果没有新消息,for range会阻塞等待
for i, queue := range queues {
    // chan T:双向通道 (可读可写)
    // <-chan T:只读通道 (接收操作)
    // chan<- T:只写通道 (发送操作)
    go func(queueID int, ch <-chan int) {
        defer wg.Done()
        processQueue(queueID, ch)
    }(i, queue)
}
  • 为每个队列启动一个消费者goroutine。
  • 使用sync.WaitGroup来等待所有消费者完成。
  • processQueue函数会持续从通道接收并处理消息。
  1. 消息路由
for id := 0; id < messageCount; id++ {
    queueIndex := id % queueCount  // 取模决定路由
    queues[queueIndex] <- id      // 发送到对应队列
}
  • 使用ID取模(id % 16)的方式均匀分布消息到不同队列。
  • 这种路由方式确保相同ID总是路由到同一队列。
  1. 优雅关闭
for _, queue := range queues {
    close(queue)
}
  • 关闭所有队列通道,通知消费者处理完剩余消息后退出。
  • 这是优雅终止消费者的标准模式。
  1. 等待完成
wg.Wait()
  • 阻塞主goroutine,直到所有消费者完成工作。