返回文章列表
面试考点

等待型并发安全Map (WaitableMap)

等待型并发安全Map (WaitableMap)

KKiana
2025年9月15日24 次阅读

等待型并发安全Map (WaitableMap)

题目要求

实现一个 map:

  1. 面向高并发
  2. 只存在插入和查询操作 O(1)
  3. 查询时,若 key 存在,直接返回 val;若 key 不存在,阻塞直到 key val 对被放入后,获取 val 返回
  4. 不能有死锁或者 panic 风险。

设计思路

为了满足题目要求,我设计了一个线程安全的并发Map,具有以下特点:

  1. 线程安全:使用互斥锁保护共享数据访问
  2. 等待机制:使用通道实现等待/通知机制
  3. 超时控制:使用context实现灵活的超时和取消功能
  4. 高效通知:通过关闭通道一次性通知所有等待者

核心数据结构

type MyConCurrectMap struct {
    sync.Mutex
    Mp      map[string]interface{} // 存储键值对
    Waiters map[string]*MyChan     // 存储等待特定key的通道
}

type MyChan struct {
    sync.Once
    ch chan struct{}
}
  • Mp:存储实际的键值对
  • Waiters:存储等待特定key的通道
  • MyChan:封装了一个只能关闭一次的通道,避免重复关闭导致panic

关键实现分析

1. 安全关闭通道

使用sync.Once确保通道只被关闭一次,避免panic:

func (m *MyChan) Close() {
    m.Do(func() {
        close(m.ch)
    })
}

2. Put操作

当插入一个键值对时,同时通知所有等待这个key的goroutine:

func (m *MyConCurrectMap) Put(key string, value interface{}) {
    m.Lock()
    defer m.Unlock()
    
    // 存储键值对
    m.Mp[key] = value
    
    // 检查是否有goroutine在等待这个key
    ch, ok := m.Waiters[key]
    if !ok {
        return
    }
    
    // 关闭通道通知所有等待者
    ch.Close()
}

3. Get操作

当key不存在时,创建一个等待通道并阻塞等待:

func (m *MyConCurrectMap) Get(ctx context.Context, key string) (interface{}, error) {
    m.Lock()
    // 检查key是否已存在
    v, ok := m.Mp[key]
    if ok {
        m.Unlock()
        return v, nil
    }
    
    // 获取或创建等待通道
    ch, ok := m.Waiters[key]
    if !ok {
        ch = NewMyChan()
        m.Waiters[key] = ch
    }
    m.Unlock()
    
    // 等待结果或上下文取消
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-ch.Chan():
        // 通道被关闭,说明key已被设置,重新获取值
        m.Lock()
        v, ok := m.Mp[key]
        m.Unlock()
        if !ok {
            // 理论上不应该发生,因为Put会先设置值再关闭通道
            return nil, &TimeoutError{"key not found after notification"}
        }
        return v, nil
    }
}

4. 超时控制

使用context实现灵活的超时控制:

func (m *MyConCurrectMap) GetWithTimeout(key string, maxWaitingDuration time.Duration) (interface{}, error) {
    ctx, cancel := context.WithTimeout(context.Background(), maxWaitingDuration)
    defer cancel()
    return m.Get(ctx, key)
}

完整代码

MyChan实现 (core/mychan.go)

package core

import (
    "sync"
)

// MyChan 结构体封装了一个只能关闭一次的通道
type MyChan struct {
    sync.Once
    ch chan struct{}
}

// NewMyChan 创建一个新的MyChan实例
func NewMyChan() *MyChan {
    return &MyChan{
        ch: make(chan struct{}),
    }
}

// Close 安全地关闭通道,确保只关闭一次
func (m *MyChan) Close() {
    m.Do(func() {
        close(m.ch)
    })
}

// Chan 返回底层通道,用于接收操作
func (m *MyChan) Chan() <-chan struct{} {
    return m.ch
}

并发Map实现 (core/concurrentmap.go)

package core

import (
    "context"
    "sync"
    "time"
)

// MyConCurrectMap 实现了一个线程安全的并发Map
// 支持查询不存在的key时等待直到key被设置
type MyConCurrectMap struct {
    sync.Mutex
    Mp      map[string]interface{} // 导出字段
    Waiters map[string]*MyChan     // 导出字段
}

// NewMyConCurrectMap 创建一个新的并发安全Map
func NewMyConCurrectMap() *MyConCurrectMap {
    return &MyConCurrectMap{
        Mp:      make(map[string]interface{}),
        Waiters: make(map[string]*MyChan),
    }
}

// Put 向Map中添加或更新键值对
// 如果有goroutine正在等待这个key,会通知它们
func (m *MyConCurrectMap) Put(key string, value interface{}) {
    m.Lock()
    defer m.Unlock()
    
    // 存储键值对
    m.Mp[key] = value
    
    // 检查是否有goroutine在等待这个key
    ch, ok := m.Waiters[key]
    if !ok {
        return
    }
    
    // 关闭通道通知所有等待者
    ch.Close()
}

// Get 获取指定key的值
// 如果key不存在,会等待直到key被设置或上下文被取消
func (m *MyConCurrectMap) Get(ctx context.Context, key string) (interface{}, error) {
    m.Lock()
    // 检查key是否已存在
    v, ok := m.Mp[key]
    if ok {
        m.Unlock()
        return v, nil
    }
    
    // 获取或创建等待通道
    ch, ok := m.Waiters[key]
    if !ok {
        ch = NewMyChan()
        m.Waiters[key] = ch
    }
    m.Unlock()
    
    // 等待结果或上下文取消
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-ch.Chan():
        // 通道被关闭,说明key已被设置,重新获取值
        m.Lock()
        v, ok := m.Mp[key]
        m.Unlock()
        if !ok {
            // 理论上不应该发生,因为Put会先设置值再关闭通道
            return nil, &TimeoutError{"key not found after notification"}
        }
        return v, nil
    }
}

// GetWithTimeout 是Get方法的便捷包装,使用超时参数创建上下文
func (m *MyConCurrectMap) GetWithTimeout(key string, maxWaitingDuration time.Duration) (interface{}, error) {
    ctx, cancel := context.WithTimeout(context.Background(), maxWaitingDuration)
    defer cancel()
    return m.Get(ctx, key)
}

// TimeoutError 定义超时错误类型
type TimeoutError struct {
    Msg string // 导出字段
}

func (e *TimeoutError) Error() string {
    return e.Msg
}

// ErrTimeout 表示等待超时错误
var ErrTimeout = &TimeoutError{"operation timed out"}

主程序 (main.go)

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "concurrentmap/core"
)

func main() {
    // 创建一个新的并发Map
    m := core.NewMyConCurrectMap()

    fmt.Println("=== 并发Map演示 ===")
    
    // 添加一些初始值
    m.Put("key1", "value1")
    m.Put("key2", 100)
    m.Put("key3", true)
    
    // 获取已存在的值
    ctx := context.Background()
    val1, _ := m.Get(ctx, "key1")
    val2, _ := m.Get(ctx, "key2")
    val3, _ := m.Get(ctx, "key3")
    
    fmt.Printf("key1 = %v\n", val1)
    fmt.Printf("key2 = %v\n", val2)
    fmt.Printf("key3 = %v\n", val3)
    
    // 演示等待功能
    var wg sync.WaitGroup
    wg.Add(1)
    
    // 在另一个goroutine中等待一个不存在的key
    go func() {
        defer wg.Done()
        fmt.Println("\n等待key4被设置...")
        
        ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
        defer cancel()
        
        start := time.Now()
        val, err := m.Get(ctx, "key4")
        elapsed := time.Since(start)
        
        if err != nil {
            fmt.Printf("等待key4失败: %v (等待了 %v)\n", err, elapsed)
        } else {
            fmt.Printf("key4 = %v (等待了 %v)\n", val, elapsed)
        }
    }()
    
    // 短暂延迟后设置key4
    fmt.Println("1秒后设置key4...")
    time.Sleep(time.Second)
    m.Put("key4", "delayed_value")
    
    // 等待goroutine完成
    wg.Wait()
    
    // 演示超时功能
    fmt.Println("\n演示超时功能...")
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
    defer cancel()
    
    start := time.Now()
    _, err := m.Get(ctx, "nonexistent_key")
    elapsed := time.Since(start)
    
    if err != nil {
        fmt.Printf("等待超时: %v (等待了 %v)\n", err, elapsed)
    }
    
    // 演示GetWithTimeout便捷方法
    fmt.Println("\n使用GetWithTimeout便捷方法...")
    val, err := m.GetWithTimeout("key1", time.Millisecond*10)
    if err != nil {
        fmt.Printf("获取key1失败: %v\n", err)
    } else {
        fmt.Printf("key1 = %v\n", val)
    }
    
    _, err = m.GetWithTimeout("nonexistent_key", time.Millisecond*100)
    if err != nil {
        fmt.Printf("获取nonexistent_key失败: %v\n", err)
    }
}

单元测试

我们实现了基本的测试套件,覆盖了并发Map的核心功能。以下是主要测试用例:

基本操作测试

func TestMyConCurrectMap_BasicOperations(t *testing.T) {
    m := core.NewMyConCurrectMap()
    
    // 测试基本的Put和Get操作
    m.Put("key1", "value1")
    
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
    defer cancel()
    
    val, err := m.Get(ctx, "key1")
    if err != nil {
        t.Errorf("Get返回了错误: %v", err)
    }
    if val != "value1" {
        t.Errorf("期望值为 'value1', 实际值为 %v", val)
    }
    
    // 测试不存在的key
    ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
    defer cancel()
    
    _, err = m.Get(ctx, "nonexistent")
    if err == nil || err.Error() != context.DeadlineExceeded.Error() {
        t.Errorf("对于不存在的key,期望超时错误,实际得到: %v", err)
    }
    
    // 测试更新已存在的key
    m.Put("key1", "updated_value")
    ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
    defer cancel()
    
    val, err = m.Get(ctx, "key1")
    if err != nil {
        t.Errorf("Get返回了错误: %v", err)
    }
    if val != "updated_value" {
        t.Errorf("期望值为 'updated_value', 实际值为 %v", val)
    }
    
    // 测试nil值
    m.Put("nil_key", nil)
    ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
    defer cancel()
    
    val, err = m.Get(ctx, "nil_key")
    if err != nil {
        t.Errorf("Get返回了错误: %v", err)
    }
    if val != nil {
        t.Errorf("期望值为 nil, 实际值为 %v", val)
    }
}

等待值测试

func TestMyConCurrectMap_WaitForValue(t *testing.T) {
    m := core.NewMyConCurrectMap()
    
    // 启动一个goroutine在短暂延迟后添加key
    go func() {
        time.Sleep(time.Millisecond * 50)
        m.Put("delayed_key", "delayed_value")
    }()
    
    // 等待key被添加
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
    defer cancel()
    
    start := time.Now()
    val, err := m.Get(ctx, "delayed_key")
    elapsed := time.Since(start)
    
    if err != nil {
        t.Errorf("Get返回了错误: %v", err)
    }
    if val != "delayed_value" {
        t.Errorf("期望值为 'delayed_value', 实际值为 %v", val)
    }
    if elapsed < time.Millisecond*50 {
        t.Errorf("Get返回太快,期望至少等待50ms,实际等待 %v", elapsed)
    }
}

超时测试

func TestMyConCurrectMap_Timeout(t *testing.T) {
    m := core.NewMyConCurrectMap()
    
    // 测试超时情况
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
    defer cancel()
    
    start := time.Now()
    _, err := m.Get(ctx, "timeout_key")
    elapsed := time.Since(start)
    
    if err == nil || err.Error() != context.DeadlineExceeded.Error() {
        t.Errorf("期望超时错误,实际得到: %v", err)
    }
    if elapsed < time.Millisecond*50 {
        t.Errorf("超时太快,期望至少等待50ms,实际等待 %v", elapsed)
    }
}

测试覆盖的场景

我们的测试套件覆盖了以下基本场景:

  1. 基本操作测试:测试基本的Put和Get操作,包括更新已存在的key和nil值处理

    • 测试原因:验证Map的基本功能是否正常工作,确保能正确存储和检索值,这是所有其他功能的基础
    • 边缘情况:测试nil值的处理,确保Map能正确处理特殊值
  2. 等待值测试:测试当key不存在时,Get会等待直到key被设置

    • 测试原因:这是并发Map的核心功能,验证阻塞等待机制是否正常工作
    • 重要性:确保Get操作在key不存在时能正确阻塞,并在key被设置后立即返回
  3. 超时测试:测试当key不存在且超时时,Get会返回超时错误

    • 测试原因:验证超时机制是否正常工作,避免无限期阻塞
    • 安全性:确保系统在异常情况下能够正常恢复,不会导致资源泄漏

性能与测试结果

基本功能测试全部通过,表明实现满足了题目的基本要求:

  1. 能够正确存储和检索键值对
  2. 当key不存在时,Get操作会阻塞等待
  3. 支持超时控制,避免无限期阻塞

这些测试确保了并发Map的核心功能正常工作,为后续扩展提供了坚实基础。

面试考点

评论 (0)

后参与评论

// 暂无评论,来说点什么吧

等待型并发安全Map (WaitableMap) | 博客