需求分析 #
实现一个包含16个缓冲队列的数组,使用ID取模的方式路由消息到不同队列,并等待所有队列处理完成的解决方案。
代码示例 #
package main
import (
"fmt"
"sync"
"time"
)
const (
queueCount = 16 // 队列数量
bufferSize = 100 // 每个队列的缓冲区大小
messageCount = 1000 // 要发送的消息总数
)
func main() {
// 1. 创建16个缓冲队列
queues := make([]chan int, queueCount)
for i := range queues {
queues[i] = make(chan int, bufferSize)
}
// 2. 创建WaitGroup用于等待所有消费者完成
var wg sync.WaitGroup
wg.Add(queueCount)
// 3. 为每个队列启动一个消费者goroutine
for i, queue := range queues {
go func(queueID int, ch <-chan int) {
defer wg.Done()
processQueue(queueID, ch)
}(i, queue)
}
// 4. 生产者发送消息(使用ID取模路由)
start := time.Now()
for id := 0; id < messageCount; id++ {
queueIndex := id % queueCount // 取模决定路由到哪个队列
queues[queueIndex] <- id // 发送到对应队列
}
// 5. 关闭所有队列(通知消费者处理完成)
for _, queue := range queues {
close(queue)
}
// 6. 等待所有消费者完成
wg.Wait()
fmt.Printf("All queues processed. Time elapsed: %v\n", time.Since(start))
}
// 处理队列中的消息
func processQueue(queueID int, ch <-chan int) {
count := 0
for msg := range ch {
// 模拟处理消息(这里简单打印)
if count%100 == 0 { // 每100条打印一次,避免输出太多
fmt.Printf("Queue %d processing message %d\n", queueID, msg)
}
count++
time.Sleep(10 * time.Millisecond) // 模拟处理耗时
}
fmt.Printf("Queue %d processed %d messages\n", queueID, count)
}
代码解析 #
- 队列创建
queues := make([]chan int, queueCount)
for i := range queues {
queues[i] = make(chan int, bufferSize)
}
- 创建包含16个元素的通道数组。
- 每个通道都是缓冲通道,容量为100。
- 消费者goroutine
// 通道未关闭时:for range会持续接收新消息,包括其他goroutine新插入的消息
// 通道关闭后:for range会处理完已缓冲的消息后退出,不会接收关闭后新发送的消息(会panic)
// 遍历是阻塞的:如果没有新消息,for range会阻塞等待
for i, queue := range queues {
// chan T:双向通道 (可读可写)
// <-chan T:只读通道 (接收操作)
// chan<- T:只写通道 (发送操作)
go func(queueID int, ch <-chan int) {
defer wg.Done()
processQueue(queueID, ch)
}(i, queue)
}
- 为每个队列启动一个消费者goroutine。
- 使用
sync.WaitGroup
来等待所有消费者完成。 processQueue
函数会持续从通道接收并处理消息。
- 消息路由
for id := 0; id < messageCount; id++ {
queueIndex := id % queueCount // 取模决定路由
queues[queueIndex] <- id // 发送到对应队列
}
- 使用ID取模(
id % 16
)的方式均匀分布消息到不同队列。 - 这种路由方式确保相同ID总是路由到同一队列。
- 优雅关闭
for _, queue := range queues {
close(queue)
}
- 关闭所有队列通道,通知消费者处理完剩余消息后退出。
- 这是优雅终止消费者的标准模式。
- 等待完成
wg.Wait()
- 阻塞主goroutine,直到所有消费者完成工作。