深入浅出 InfluxDB — 存储引擎

深入浅出 InfluxDB — 存储引擎

前言

上一篇文章主要介绍了数据点是如何从客户端写入 InfluxDB 的,初次从源码层面接触到了 InfluxDB 存储引擎的几个主要结构,详细介绍了数据从 database 直到 WAL 的落盘过程。本文将在前一篇的基础上继续逐一详细介绍 InfluxDB 存储引擎内部的组件及其运作原理。

初识整体架构

下图展示是存储引擎包含的几个主要的组件,需要注意的是 WALTSM 的本质是文件,只不过 WAL 是对快速写入进行优化的只进行追加写的数据文件,TSM 是对快速读取进行优化的只读数据文件。在它们的上层的其他组件都是存在于内存中,由运行时构建。

WAL 的内部是一组 segment files,使用编号进行命名,追加写入压缩的 series,当前写入的文件超过 10M 后会关闭当前文件并新建新文件接收写入的数据,图示如下:

数据一致性

有的读者可能看到这里就有疑问了,为什么写的文件和读的文件不一致?作为数据库,这是否意味着数据的一致性被破坏?在官方设计文档中,它们对数据的一致性问题是这样解答的:牺牲数据的强一致性来换取更高性能的读写。这样设计直接导致的结果是读取的数据可能并不是最新的数据,但是这丝毫不会影响我们使用时序数据库,通常查询这些数据的时候,我们会更加关注大量的点在时间线上的趋势变化,因此每次可能丢失的少量点并不会影响我们对于整体趋势的判断。况且查询的时间间隔通常小于数据采样写入的时间间隔,这一“天然时延”存在于时序数据库普遍的使用场景中,在控制好写入负载的前提下数据的一致性是可以得到基本保证的。

这样的设计带来的优势也十分明显,不需要文件间隙锁来处理对同一个文件的读写冲突,代码实现也更加简单,用于写入的文件和用于读取的文件在IO层面上互不影响,但依然要维护一个一致的数据视图,这就引出了第一个组件 Cache

Cache

Cache 是 WAL 在内存中的全量备份,内部存储的数据结构是一个基于固定大小 slice 的简单哈希环,在只有写入和删除的请求的情况下,这些数据都会被写入 Cache 和 WAL中,从而导致 Cache 会占用越来越多的内存,因此存储引擎会使用两个参数对 Cache 的大小进行限制,分别为 cache-snapshot-memory-sizecache-max-memory-size ,前者规定了开始将 cache 转化为 TSM Files 的最低阈值,后者规定了 cache 内存用量的最大值,即如果达到该值则数据库会开始拒绝写入。具体实现上会开启协程定时对 cache 的大小进行检查:

func (e *Engine) compactCache() {
	t := time.NewTicker(time.Second)
	defer t.Stop()
	for {
		e.mu.RLock()
		quit := e.snapDone
		e.mu.RUnlock()

		select {
		case <-quit:
			return
		// 每秒钟检查一次
		case <-t.C:
			// ...
			// 检查是否满足快照条件
			if e.ShouldCompactCache(time.Now()) {
				start := time.Now()
				err := e.WriteSnapshot() // 对 cache 的快照写入 TSM files
				// 错误处理
			}
		}
	}
}

func (e *Engine) WriteSnapshot() (err error) {
	started := time.Now()

	closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
		e.mu.Lock()
		defer e.mu.Unlock()

		if e.WALEnabled {
			// 关闭当前 WAL 的 segment file 并打开新文件
			if err = e.WAL.CloseSegment(); err != nil {
				return
			}
			// 返回已关闭的 segment
			segments, err = e.WAL.ClosedSegments()
			if err != nil {
				return
			}
		}
		// 对 cache 进行快照,本质上是创建新环并和已有旧哈希环交换指针
		snapshot, err = e.Cache.Snapshot()
		if err != nil {
			return
		}

		return
	}()

	if err != nil {
		return err
	}
	dedup := time.Now()
	// 在写入前需要对 snapshot 进行去重和稳定排序
	snapshot.Deduplicate()

	return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}

func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
	// ...
	// 创建一个过多个 TSM file,由 Compactor 将快照 cache 分片并发写入
	newFiles, err := e.Compactor.WriteSnapshot(snapshot) // 
	// 更新 FileStore
	if err := e.FileStore.Replace(nil, newFiles); err != nil {
		// 如果出错就将新文件删除
		for _, file := range newFiles {
			err := os.Remove(file)
		}
		return err
	}
	// 快照已落盘,清理 cache 和 WAL 释放空间
	e.Cache.ClearSnapshot(true)
	e.WAL.Remove(closedFiles)
	return nil
}

Compactor

随着时间的推移,写入快照的次数越来越多,如果每次至少会产生一个 TSM file,数据文件夹中的文件也会越来越分散,就可能会出现读性能的急剧下降,如果每一次的查询都需要对多个文件同时进行查找,一个进程会占用过多的文件描述符,而且会导致 IO 的不堪重负。因此 Compactor 除了负责将快照写入外,还需要将不同的快照的 TSM 文件进行合并,同样是开一个协程定期执行合并,代码如下:

func (e *Engine) compact(wg *sync.WaitGroup) {
	t := time.NewTicker(time.Second)
	defer t.Stop()

	for {
		e.mu.RLock()
		quit := e.done
		e.mu.RUnlock()

		select {
		case <-quit:
			return

		case <-t.C:

			// 逐层寻找相邻的 generation 并将它们进行分组
			level1Groups := e.CompactionPlan.PlanLevel(1)
			level2Groups := e.CompactionPlan.PlanLevel(2)
			level3Groups := e.CompactionPlan.PlanLevel(3)
			level4Groups := e.CompactionPlan.Plan(e.LastModified())
			atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))

			// 不需要全量合并,尝试进行查询优化
			if len(level4Groups) == 0 {
				level4Groups = e.CompactionPlan.PlanOptimize()
			}

			// 设定每一层级用于合并的队列的长度
			e.scheduler.setDepth(1, len(level1Groups))
			e.scheduler.setDepth(2, len(level2Groups))
			e.scheduler.setDepth(3, len(level3Groups))
			e.scheduler.setDepth(4, len(level4Groups))

			// 由调度器进行判断哪个 level 的合并操作是可以执行的
			if level, runnable := e.scheduler.next(); runnable {
				switch level {
				case 1:
					if e.compactHiPriorityLevel(level1Groups[0], 1, false, wg) {
						level1Groups = level1Groups[1:]
					}
				case 2:
					if e.compactHiPriorityLevel(level2Groups[0], 2, false, wg) {
						level2Groups = level2Groups[1:]
					}
				case 3:
					if e.compactLoPriorityLevel(level3Groups[0], 3, true, wg) {
						level3Groups = level3Groups[1:]
					}
				case 4:
					if e.compactFull(level4Groups[0], wg) {
						level4Groups = level4Groups[1:]
					}
				}
			}

			// 每次只取分组中队头的那一组进行合并,其他的需要手动解除避免锁定
			e.CompactionPlan.Release(level1Groups)
			e.CompactionPlan.Release(level2Groups)
			e.CompactionPlan.Release(level3Groups)
			e.CompactionPlan.Release(level4Groups)
		}
	}
}

这时出现了 4 个Level,分别代表不同优先级的四组 TSM file,compactor 通过 fileStore 来获取 TSM 文件的详情,写入的快照默认先放在 Level 1,文件名会以 [generation]-[序列号seq] 的方式命名,以 seq 作为分 level 的依据,generation 会在Cache哈希环进行分片时即对 series 进行分组的时候进行自增,相邻 generation 的文件合并为一个并选择较大的 generation 作为新文件的 generation,seq 只有在超过最大的文件大小最大能索引的数据块大小时才能自增,seq 的自增相当于将文件写入下一个 level,seq 的level在1~4的等级越高,代表着调度器会采取不同的措施进行合并以达到更高的压缩比和生成更优化的索引。

// 根据 tsm 文件的 seq 划分 level
func (t *tsmGeneration) level() int {
	_, seq, _ := t.parseFileName(t.files[0].Path)
	if seq < 4 {
		return seq
	}
	// seq 如果超过4则一律当作L4
	return 4
}

L1 和 L2 的文件属于高优先读的文件,意味着大量的查询大概率会读取这些文件,因此在合并的时候 Compactor 会尽量避免使用解压缩或者重组数据块等大量占用 CPU 的操作。合并图示如下:

Shard

数据淘汰策略的存在使得 TSM 文件中的数据不可能无休止地增长,必须要有一个快速删除的机制,通过数据淘汰策略指定的时间段,数据保留的这段连续时间划分出一个个 shard,例如数据淘汰策略保留一周的数据,那么每一个 shard 就是 1 天的数据并按序排列,如果只有一个 RP 策略,则直接删除整个 shard 即可,如果存在多个 RP 策略,则将 shard 挂到匹配的 RP 中即可

彩蛋:定长哈希环

如果不考虑这个结构是否为环,只考虑哈希,第一个联想到的结构就是内置的 map,如果 key 较多导致负载因子超过限定值,则 map 会进行自动扩容,预分配新桶,并将旧桶中的数据逐步迁移到新桶中。然而自定义哈希结构的好处在于:

  • 能够定义自己的哈希函数
  • 更灵活地控制空间的使用

能够定义自己的哈希函数意味着我们能够让 go 中不支持 == 或者 != 操作的类型也能够支持映射结构,但在 influxdb 的源码中自定义哈希结构实现 cache 的根本原因我认为是以便对所有 entry 进行均分,为了将计算量进行均摊,索性就在外面再包一层自定义的映射结构,且同时保证了能够通过 series key 快速找到某一 entry。回到具体的实现中,首先要解决的是如何使用哈希函数将 []byte 的键变成数组的下标索引,在源码的实现中主要使用了64位的 xxhash 算法快速对 []byte 计算出 8 字节的摘要值,接着对摘要值取模确定在数组中的具体槽位,在槽位中再转成 string 为 key 用内置的 map 映射到 entry,具体实现如下:

const partitions = 16

type ring struct {
	keysHint int64 // 对 key 进行计数
	// 环的底层数组
	partitions []*partition
}

type partition struct {
	mu    sync.RWMutex
	store map[string]*entry
}

// 构造哈希环,n必须是2的指数次幂,最大不超过256
func newring(n int) (*ring, error) {
	if n <= 0 || n > partitions {
		return nil, fmt.Errorf("invalid number of paritions: %d", n)
	}

	r := ring{
		partitions: make([]*partition, n),
	}

	for i := 0; i < len(r.partitions); i++ {
		r.partitions[i] = &partition{
			store: make(map[string]*entry),
		}
	}
	return &r, nil
}

// cache 的快速分片的具体实现
func (r *ring) split(n int) []storer {
	var keys int
	storers := make([]storer, n)
	for i := 0; i < n; i++ {
		storers[i], _ = newring(len(r.partitions))
	}

	for i, p := range r.partitions {
		r := storers[i%n].(*ring)
		r.partitions[i] = p
		keys += len(p.store)
	}
	return storers
}

// 快速找到 key 所在的分组
func (r *ring) getPartition(key []byte) *partition {
	return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}

// 同一个 key 先找到分组,再找分组中的 entry
func (r *ring) entry(key []byte) *entry {
	return r.getPartition(key).entry(key)
}

// 正常的映射
func (p *partition) entry(key []byte) *entry {
	p.mu.RLock()
	e := p.store[string(key)]
	p.mu.RUnlock()
	return e
}

简单图解一下这个映射关系,对后面理解查询过程有一定的帮助,图示如下: