返回文章列表
面试考点
等待型并发安全Map (WaitableMap)
等待型并发安全Map (WaitableMap)
KKiana
2025年9月15日24 次阅读

等待型并发安全Map (WaitableMap)
题目要求
实现一个 map:
- 面向高并发
- 只存在插入和查询操作 O(1)
- 查询时,若 key 存在,直接返回 val;若 key 不存在,阻塞直到 key val 对被放入后,获取 val 返回
- 不能有死锁或者 panic 风险。
设计思路
为了满足题目要求,我设计了一个线程安全的并发Map,具有以下特点:
- 线程安全:使用互斥锁保护共享数据访问
- 等待机制:使用通道实现等待/通知机制
- 超时控制:使用context实现灵活的超时和取消功能
- 高效通知:通过关闭通道一次性通知所有等待者
核心数据结构
type MyConCurrectMap struct {
sync.Mutex
Mp map[string]interface{} // 存储键值对
Waiters map[string]*MyChan // 存储等待特定key的通道
}
type MyChan struct {
sync.Once
ch chan struct{}
}Mp:存储实际的键值对Waiters:存储等待特定key的通道MyChan:封装了一个只能关闭一次的通道,避免重复关闭导致panic
关键实现分析
1. 安全关闭通道
使用sync.Once确保通道只被关闭一次,避免panic:
func (m *MyChan) Close() {
m.Do(func() {
close(m.ch)
})
}2. Put操作
当插入一个键值对时,同时通知所有等待这个key的goroutine:
func (m *MyConCurrectMap) Put(key string, value interface{}) {
m.Lock()
defer m.Unlock()
// 存储键值对
m.Mp[key] = value
// 检查是否有goroutine在等待这个key
ch, ok := m.Waiters[key]
if !ok {
return
}
// 关闭通道通知所有等待者
ch.Close()
}3. Get操作
当key不存在时,创建一个等待通道并阻塞等待:
func (m *MyConCurrectMap) Get(ctx context.Context, key string) (interface{}, error) {
m.Lock()
// 检查key是否已存在
v, ok := m.Mp[key]
if ok {
m.Unlock()
return v, nil
}
// 获取或创建等待通道
ch, ok := m.Waiters[key]
if !ok {
ch = NewMyChan()
m.Waiters[key] = ch
}
m.Unlock()
// 等待结果或上下文取消
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ch.Chan():
// 通道被关闭,说明key已被设置,重新获取值
m.Lock()
v, ok := m.Mp[key]
m.Unlock()
if !ok {
// 理论上不应该发生,因为Put会先设置值再关闭通道
return nil, &TimeoutError{"key not found after notification"}
}
return v, nil
}
}4. 超时控制
使用context实现灵活的超时控制:
func (m *MyConCurrectMap) GetWithTimeout(key string, maxWaitingDuration time.Duration) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), maxWaitingDuration)
defer cancel()
return m.Get(ctx, key)
}完整代码
MyChan实现 (core/mychan.go)
package core
import (
"sync"
)
// MyChan 结构体封装了一个只能关闭一次的通道
type MyChan struct {
sync.Once
ch chan struct{}
}
// NewMyChan 创建一个新的MyChan实例
func NewMyChan() *MyChan {
return &MyChan{
ch: make(chan struct{}),
}
}
// Close 安全地关闭通道,确保只关闭一次
func (m *MyChan) Close() {
m.Do(func() {
close(m.ch)
})
}
// Chan 返回底层通道,用于接收操作
func (m *MyChan) Chan() <-chan struct{} {
return m.ch
}并发Map实现 (core/concurrentmap.go)
package core
import (
"context"
"sync"
"time"
)
// MyConCurrectMap 实现了一个线程安全的并发Map
// 支持查询不存在的key时等待直到key被设置
type MyConCurrectMap struct {
sync.Mutex
Mp map[string]interface{} // 导出字段
Waiters map[string]*MyChan // 导出字段
}
// NewMyConCurrectMap 创建一个新的并发安全Map
func NewMyConCurrectMap() *MyConCurrectMap {
return &MyConCurrectMap{
Mp: make(map[string]interface{}),
Waiters: make(map[string]*MyChan),
}
}
// Put 向Map中添加或更新键值对
// 如果有goroutine正在等待这个key,会通知它们
func (m *MyConCurrectMap) Put(key string, value interface{}) {
m.Lock()
defer m.Unlock()
// 存储键值对
m.Mp[key] = value
// 检查是否有goroutine在等待这个key
ch, ok := m.Waiters[key]
if !ok {
return
}
// 关闭通道通知所有等待者
ch.Close()
}
// Get 获取指定key的值
// 如果key不存在,会等待直到key被设置或上下文被取消
func (m *MyConCurrectMap) Get(ctx context.Context, key string) (interface{}, error) {
m.Lock()
// 检查key是否已存在
v, ok := m.Mp[key]
if ok {
m.Unlock()
return v, nil
}
// 获取或创建等待通道
ch, ok := m.Waiters[key]
if !ok {
ch = NewMyChan()
m.Waiters[key] = ch
}
m.Unlock()
// 等待结果或上下文取消
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ch.Chan():
// 通道被关闭,说明key已被设置,重新获取值
m.Lock()
v, ok := m.Mp[key]
m.Unlock()
if !ok {
// 理论上不应该发生,因为Put会先设置值再关闭通道
return nil, &TimeoutError{"key not found after notification"}
}
return v, nil
}
}
// GetWithTimeout 是Get方法的便捷包装,使用超时参数创建上下文
func (m *MyConCurrectMap) GetWithTimeout(key string, maxWaitingDuration time.Duration) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), maxWaitingDuration)
defer cancel()
return m.Get(ctx, key)
}
// TimeoutError 定义超时错误类型
type TimeoutError struct {
Msg string // 导出字段
}
func (e *TimeoutError) Error() string {
return e.Msg
}
// ErrTimeout 表示等待超时错误
var ErrTimeout = &TimeoutError{"operation timed out"}主程序 (main.go)
package main
import (
"context"
"fmt"
"sync"
"time"
"concurrentmap/core"
)
func main() {
// 创建一个新的并发Map
m := core.NewMyConCurrectMap()
fmt.Println("=== 并发Map演示 ===")
// 添加一些初始值
m.Put("key1", "value1")
m.Put("key2", 100)
m.Put("key3", true)
// 获取已存在的值
ctx := context.Background()
val1, _ := m.Get(ctx, "key1")
val2, _ := m.Get(ctx, "key2")
val3, _ := m.Get(ctx, "key3")
fmt.Printf("key1 = %v\n", val1)
fmt.Printf("key2 = %v\n", val2)
fmt.Printf("key3 = %v\n", val3)
// 演示等待功能
var wg sync.WaitGroup
wg.Add(1)
// 在另一个goroutine中等待一个不存在的key
go func() {
defer wg.Done()
fmt.Println("\n等待key4被设置...")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
start := time.Now()
val, err := m.Get(ctx, "key4")
elapsed := time.Since(start)
if err != nil {
fmt.Printf("等待key4失败: %v (等待了 %v)\n", err, elapsed)
} else {
fmt.Printf("key4 = %v (等待了 %v)\n", val, elapsed)
}
}()
// 短暂延迟后设置key4
fmt.Println("1秒后设置key4...")
time.Sleep(time.Second)
m.Put("key4", "delayed_value")
// 等待goroutine完成
wg.Wait()
// 演示超时功能
fmt.Println("\n演示超时功能...")
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
start := time.Now()
_, err := m.Get(ctx, "nonexistent_key")
elapsed := time.Since(start)
if err != nil {
fmt.Printf("等待超时: %v (等待了 %v)\n", err, elapsed)
}
// 演示GetWithTimeout便捷方法
fmt.Println("\n使用GetWithTimeout便捷方法...")
val, err := m.GetWithTimeout("key1", time.Millisecond*10)
if err != nil {
fmt.Printf("获取key1失败: %v\n", err)
} else {
fmt.Printf("key1 = %v\n", val)
}
_, err = m.GetWithTimeout("nonexistent_key", time.Millisecond*100)
if err != nil {
fmt.Printf("获取nonexistent_key失败: %v\n", err)
}
}单元测试
我们实现了基本的测试套件,覆盖了并发Map的核心功能。以下是主要测试用例:
基本操作测试
func TestMyConCurrectMap_BasicOperations(t *testing.T) {
m := core.NewMyConCurrectMap()
// 测试基本的Put和Get操作
m.Put("key1", "value1")
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
val, err := m.Get(ctx, "key1")
if err != nil {
t.Errorf("Get返回了错误: %v", err)
}
if val != "value1" {
t.Errorf("期望值为 'value1', 实际值为 %v", val)
}
// 测试不存在的key
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
_, err = m.Get(ctx, "nonexistent")
if err == nil || err.Error() != context.DeadlineExceeded.Error() {
t.Errorf("对于不存在的key,期望超时错误,实际得到: %v", err)
}
// 测试更新已存在的key
m.Put("key1", "updated_value")
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
val, err = m.Get(ctx, "key1")
if err != nil {
t.Errorf("Get返回了错误: %v", err)
}
if val != "updated_value" {
t.Errorf("期望值为 'updated_value', 实际值为 %v", val)
}
// 测试nil值
m.Put("nil_key", nil)
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
val, err = m.Get(ctx, "nil_key")
if err != nil {
t.Errorf("Get返回了错误: %v", err)
}
if val != nil {
t.Errorf("期望值为 nil, 实际值为 %v", val)
}
}等待值测试
func TestMyConCurrectMap_WaitForValue(t *testing.T) {
m := core.NewMyConCurrectMap()
// 启动一个goroutine在短暂延迟后添加key
go func() {
time.Sleep(time.Millisecond * 50)
m.Put("delayed_key", "delayed_value")
}()
// 等待key被添加
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
start := time.Now()
val, err := m.Get(ctx, "delayed_key")
elapsed := time.Since(start)
if err != nil {
t.Errorf("Get返回了错误: %v", err)
}
if val != "delayed_value" {
t.Errorf("期望值为 'delayed_value', 实际值为 %v", val)
}
if elapsed < time.Millisecond*50 {
t.Errorf("Get返回太快,期望至少等待50ms,实际等待 %v", elapsed)
}
}超时测试
func TestMyConCurrectMap_Timeout(t *testing.T) {
m := core.NewMyConCurrectMap()
// 测试超时情况
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
start := time.Now()
_, err := m.Get(ctx, "timeout_key")
elapsed := time.Since(start)
if err == nil || err.Error() != context.DeadlineExceeded.Error() {
t.Errorf("期望超时错误,实际得到: %v", err)
}
if elapsed < time.Millisecond*50 {
t.Errorf("超时太快,期望至少等待50ms,实际等待 %v", elapsed)
}
}测试覆盖的场景
我们的测试套件覆盖了以下基本场景:
-
基本操作测试:测试基本的Put和Get操作,包括更新已存在的key和nil值处理
- 测试原因:验证Map的基本功能是否正常工作,确保能正确存储和检索值,这是所有其他功能的基础
- 边缘情况:测试nil值的处理,确保Map能正确处理特殊值
-
等待值测试:测试当key不存在时,Get会等待直到key被设置
- 测试原因:这是并发Map的核心功能,验证阻塞等待机制是否正常工作
- 重要性:确保Get操作在key不存在时能正确阻塞,并在key被设置后立即返回
-
超时测试:测试当key不存在且超时时,Get会返回超时错误
- 测试原因:验证超时机制是否正常工作,避免无限期阻塞
- 安全性:确保系统在异常情况下能够正常恢复,不会导致资源泄漏
性能与测试结果
基本功能测试全部通过,表明实现满足了题目的基本要求:
- 能够正确存储和检索键值对
- 当key不存在时,Get操作会阻塞等待
- 支持超时控制,避免无限期阻塞
这些测试确保了并发Map的核心功能正常工作,为后续扩展提供了坚实基础。
面试考点
评论 (0)
后参与评论
// 暂无评论,来说点什么吧