V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
roseduan
V2EX  ›  程序员

LotusDB 设计与实现—3 内存 memtable

  •  
  •   roseduan ·
    roseduan · 2022-04-23 12:35:38 +08:00 · 1247 次点击
    这是一个创建于 1003 天前的主题,其中的信息可能已经有所发展或是发生改变。

    LotusDB 是一个全新的 KV 存储引擎,Github 地址:https://github.com/flower-corp/lotusdb,希望大家多多支持呀,点个 star 或者参与进来!

    顾名思义,memtable 是内存中维护的组件,在 LSM Tree 存储模型中,memtable 相当于一块内存 buffer ,数据写入到 WAL 后,然后在 memtable 中更新。memtable 的数据积累到一定的阈值之后,批量 Flush 到磁盘,这样做的目的是延迟写磁盘,并且将随机的 IO 写入转换为批量的顺序 IO ,这也是 LSM 存储模型的核心思路。 design-overview.png 内存中的 memtable 一般会有多个,一个 memtable 写满之后,会转为不可变的 memtable ,不可变的 memtable 不能接收新的写入,并且等待被后台线程 Flush 持久化到磁盘。

    LotusDB 的一个 Column Family 结构体中,维护了最新的 memtable ,记为 activeMem ,以及不可变的 memtable 数组,即 immuMems 。

    Column Family 这个命名借鉴于 rocksdb ,表示一个 key/value 的命名空间,可以理解为表的概念。

    从 Column Family 的结构体定义中可以查看:

    // ColumnFamily is a namespace of keys and values.
    // Each key/value pair in LotusDB is associated with exactly one Column Family.
    // If no Column Family is specified, key-value pair is associated with Column Family "cf_default".
    // Column Families provide a way to logically partition the database.
    type ColumnFamily struct {
    	// Active memtable for writing.
    	activeMem *memtable
    	// Immutable memtables, waiting to be flushed to disk.
    	immuMems []*memtable
    	// Value Log(Put value into value log according to options ValueThreshold).
    	vlog *valueLog
    	// Store keys and meta info.
    	indexer index.Indexer
    	// When the active memtable is full, send it to the flushChn, see listenAndFlush.
    	flushChn  chan *memtable
    	flushLock sync.RWMutex // guarantee flush and compaction exclusive.
    	
        // ...
    }
    

    当有数据写入时,需要判断当前 memtable 是否已满,如果已满,则将 memtable 存放到 immuMems 这个 slice 里面,然后将 memtable 通过 channel 发送,由 channel 的另一端进行 flush 工作。发送到 channel 之后,需要创建一个新的 memtable 做为 activeMem 。

    大致的逻辑如下代码:

    func (cf *ColumnFamily) waitWritesMemSpace(size uint32) error {
        // check whether memtable is full
    	if !cf.activeMem.isFull(size) {
    		return nil
    	}
    
    	timer := time.NewTimer(cf.opts.MemSpaceWaitTimeout)
    	defer timer.Stop()
    	select {
    	case cf.flushChn <- cf.activeMem:
    		cf.immuMems = append(cf.immuMems, cf.activeMem)
    		// open a new active memtable.
            
            // ...
            
    		if table, err := openMemtable(memOpts); err != nil {
    			return err
    		} else {
    			cf.activeMem = table
    		}
    	case <-timer.C:
    		return ErrWaitMemSpaceTimeout
    	}
    	return nil
    }
    

    如果当前 memtable 容量和数量都达到了最大值, 不可变的 memtable 还来不及 flush ,这时候新的写入需要阻塞等待,等待的超时时间可以配置,默认是 100ms 。 但实际上,这种情况发生的概率较低,只有在 memtable 阈值小、数量少,并且有大量写入的情况下才有可能发生,如果在写入的过程中遇到了类似 wait memtable space timeout 的错误,建议调大 memtable 的阈值,或者增加超时时间的配置。

    需要注意在 memtable 中,如果是删除数据,那么实际上也是添加记录,并不会真正去执行删除,只是添加的记录加上了一个特殊的标记,一般称为墓碑值。 具体在 LotusDB 里面,添加记录操作的是 LogEntry 这个结构体,所以在里面加上了一个 Type 字段,标识这个 LogEntry 数据的类型,然后再添加到 memtable 中:

    // put new writes to memtable.
    func (mt *memtable) put(key []byte, value []byte, deleted bool, opts WriteOptions) error {
    	entry := &logfile.LogEntry{
    		Key:   key,
    		Value: value,
    	}
    	if opts.ExpiredAt > 0 {
    		entry.ExpiredAt = opts.ExpiredAt
    	}
    	if deleted {
    		entry.Type = logfile.TypeDelete
    	}
        
        // ....
    }
    

    在 channel 的另一端( listenAndFlush 方法中),启动了一个 goroutine 进行监听,如果有新的 memtable 数据进来,则会开始 flush 操作。LotusDB 的 memtable 的具体数据结构采用的是跳表,所以就取出对应的跳表的迭代器,从头开始遍历跳表中的数据。 这里需要注意判断,如果 memtable 中的 key 被标记为删除或已过期的话,也需要记录一下,否则,则说明是有效的 key/value 数据,那么便会将数据追加写到 value log 中,获取到对应数据的索引信息,即文件 id 和位置偏移 offset 。

    for iter.SeekToFirst(); iter.Valid(); iter.Next() {
        key := iter.Key()
        node := &index.IndexerNode{Key: key}
        mv := decodeMemValue(iter.Value())
    
        // delete invalid keys from indexer.
        if mv.typ == byte(logfile.TypeDelete) || (mv.expiredAt != 0 && mv.expiredAt <= time.Now().Unix()) {
            deletedKeys = append(deletedKeys, key)
        } else {
            // wiret data into value log.
            valuePos, esize, err := cf.vlog.Write(&logfile.LogEntry{
                Key:       key,
                Value:     mv.value,
                ExpiredAt: mv.expiredAt,
            })
            if err != nil {
                return err
            }
            node.Meta = &index.IndexerMeta{
                Fid:       valuePos.Fid,
                Offset:    valuePos.Offset,
                EntrySize: esize,
            }
            nodes = append(nodes, node)
        }
    }
    

    遍历完 memtable 中的数据之后,再调用索引提供的方法,针对无效的数据进行批量删除,针对有效的数据进行批量添加,完成之后需要调用 Sync 方法持久化写入的内容,保证数据的一致性。

    func (cf *ColumnFamily) flushUpdateIndex(nodes []*index.IndexerNode, keys [][]byte) error {
    	cf.flushLock.Lock()
    	defer cf.flushLock.Unlock()
    	// must put and delete in batch.
    	writeOpts := index.WriteOptions{SendDiscard: true}
    	if _, err := cf.indexer.PutBatch(nodes, writeOpts); err != nil {
    		return err
    	}
    	if len(keys) > 0 {
    		if err := cf.indexer.DeleteBatch(keys, writeOpts); err != nil {
    			return err
    		}
    	}
    	// must fsync before delete wal.
    	if err := cf.indexer.Sync(); err != nil {
    		return err
    	}
    	return nil
    }
    

    Flush 完成之后,便会将 immuMems 这个数组中的 memtable 删除掉,给后续新的 memtable 空出位置。

    memtable 相关的配置项:

    ColumnFamilyOptions:

    MemtableSize:一个 memtable 所占内存空间的阈值,默认 64MB

    MemtableNums:最多可存在的 memtable 的数量,默认 5

    MemSpaceWaitTimeout:等待 memtable 空闲空间的超时,默认 100ms


    LotusDB Github 地址:https://github.com/flower-corp/lotusdb

    3 条回复    2022-06-02 08:25:42 +08:00
    wellsc
        1
    wellsc  
       2022-04-23 13:46:53 +08:00
    👍🏻
    polythene
        2
    polythene  
       2022-06-01 18:34:02 +08:00
    value log 啥时候 compact 呢?
    roseduan
        3
    roseduan  
    OP
       2022-06-02 08:25:42 +08:00
    @polythene 后台线程自动进行的
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3009 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 12:32 · PVG 20:32 · LAX 04:32 · JFK 07:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.