深入浅出 InfluxDB — 从索引到数据块

深入浅出 InfluxDB — 从索引到数据块

前言

上一篇文章主要介绍了存储引擎的主要组件以及它们之间的关系,本文在读者理解了存储引擎的大致工作原理的基础上,继续从基本概念开始着手介绍 influxdb 是如何对硬盘中的数据进行索引的,以及查询语句是如何通过索引快速找到对应的数据并且返回的。

从 B+ 树到 LSM Tree

构建游标

用户使用 influxQL 可以对 influxDB 进行查询操作,influxQL 是一种类 sql 的查询语言,能够让习惯了使用 sql 查询数据的开发人员也能快速使用 influxDB 进行开发。SELECT 语句会经过词法分析,语法分析并最终提取出 measurement field tagSet 和时序条件等选项。根据存储引擎的官方设计文档的描述,查询语句在执行前会事先构造一系列的游标 (Cursor) ,根据查询语句中涉及到的每个 field 都创建一个游标,不同域的数据在上层都会被抽象成 Value 接口类型进行表达来屏蔽不同类型的数据的差异,通过条件表达式中的条件对个别游标对应的 field 进行有条件的迭代,通过交并集合运算得出查询结果的最小集,最后再将多个游标上的数据按索引进行合并得到我们想要的结果。简单来说,与关系型数据库不同的是,时序数据库更像一种 KV 存储,最终表面看起来是行的视图实际上是由多列的数据合并而成。

数据列是通过 Series key 和时间戳来进行索引的,FROM 可以帮助我们找到对应的measurment,WHERE 中时间相关的条件表达式可以帮助我们找到对应的 retention policy 和在时间范围内的 shard,从 CacheFileStore 两个数据源中拉取数据,FileStore 接口类型为快速查找索引文件以及TSM文件提供便利并屏蔽掉底层查找的细节。每一个游标需要迭代的数据一部分来自于 Cache 映射查找,另一部分则是来自 FileStore,但在构造游标这个阶段,只有 Cache 中的 Value 集合被取出,而 TSM file中的 Value 还没有进行读取,只是通过 mmap 索引文件找到数据点所在的一系列的 Block 号。

// 使用自动生成技术来解决不同类型游标的构造,以 int 类型为例
func (e *Engine) buildIntegerCursor(ctx context.Context, measurement, seriesKey, field string, opt query.IteratorOptions) integerCursor {
	key := SeriesFieldKeyBytes(seriesKey, field)
	// 直接通过 series key 拿到 []Value
	cacheValues := e.Cache.Values(key)
	// 从索引中取出含有数据点的 block 号
	keyCursor := e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending)
	return newIntegerCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}

// 由 e.KeyCursor(..) 调用
func newKeyCursor(ctx context.Context, fs *FileStore, key []byte, t int64, ascending bool) *KeyCursor {
	c := &KeyCursor{
		key:       key,
		seeks:     fs.locations(key, t, ascending),
		ctx:       ctx,
		col:       metrics.GroupFromContext(ctx),
		ascending: ascending,
	}

	if ascending {
		sort.Sort(ascLocations(c.seeks))
	} else {
		sort.Sort(descLocations(c.seeks))
	}

	// 对目标 TSM 文件标记引用
	for _, f := range c.seeks {
		f.r.Ref()
	}

	c.seek(t)
	return c
}

// 返回符合含有点的 block 号
func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
	var cache []IndexEntry
	locations := make([]*location, 0, len(f.files))
	for _, fd := range f.files {
		minTime, maxTime := fd.TimeRange()

		// 跳过不符合时序条件的 tsm 文件
		if ascending && maxTime < t {
			continue
		} else if !ascending && minTime > t {
			continue
		}
		tombstones := fd.TombstoneRange(key)

		// 寻找文件中可能有目标 field 点的数据块
		// 涉及对 mmap 索引文件进行二分查找
		// 返回含有 key 的数据块在文件中的偏移和size
		entries := fd.ReadEntries(key, &cache)

	LOOP:
		for i := 0; i < len(entries); i++ {
			ie := entries[i]

			// 跳过已经删除的块
			for _, t := range tombstones {
				if t.Min <= ie.MinTime && t.Max >= ie.MaxTime {
					continue LOOP
				}
			}

			// 跳过不符合条件的块
			if ascending && ie.MaxTime < t {
				continue
			} else if !ascending && ie.MinTime > t {
				continue
			}

			location := &location{
				r:     fd,
				entry: ie,
			}

			if ascending {
				// 升序游标则将前面的时间设为已读
				location.readMin = math.MinInt64
				location.readMax = t - 1
			} else {
				// 降序游标同理
				location.readMin = t + 1
				location.readMax = math.MaxInt64
			}
			// 返回文件以及目标点集的块
			locations = append(locations, location)
		}
	}
	return locations
}

游标本身也是一种接口类型,接口方法清晰简单,具有极强的通用性。由基础的游标接口类型可以非常简单地扩展成各种各样的游标以满足查询的需求,同时提供了多种不同的迭代器支持对数据进行各种聚合,变换,有序操作。

type cursor interface {
	close() error
	next() (t int64, v interface{})
}

// 缓冲游标,专用于对某些域应用某个 sql 函数如 min、max
type bufCursor struct {
	cur cursor
	buf struct {
		key    int64
		value  interface{}
		filled bool
	}
	ascending bool
}

func newBufCursor(cur cursor, ascending bool) *bufCursor {
	return &bufCursor{cur: cur, ascending: ascending}
}

// 对某个int类型的域的值进行迭代的普通游标,其他类型同样使用自动生成批量生成
type integerCursor interface {
	cursor
	nextInteger() (t int64, v int64)
}

func newIntegerCursor(seek int64, ascending bool, cacheValues Values, tsmKeyCursor *KeyCursor) integerCursor {
	if ascending {
		return newIntegerAscendingCursor(seek, cacheValues, tsmKeyCursor)
	}
	return newIntegerDescendingCursor(seek, cacheValues, tsmKeyCursor)
}

// 具体的实现integerCursor接口的结构体
type integerAscendingCursor struct {
	cache struct {
		values Values
		pos    int
	}

	tsm struct {
		values    []IntegerValue
		pos       int
		keyCursor *KeyCursor
	}
}

func newIntegerAscendingCursor(seek int64, cacheValues Values, tsmKeyCursor *KeyCursor) *integerAscendingCursor {
	c := &integerAscendingCursor{}

	c.cache.values = cacheValues
	c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
		return c.cache.values[i].UnixNano() >= seek
	})

	c.tsm.keyCursor = tsmKeyCursor
	// 读取 block
	c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values)
	c.tsm.pos = sort.Search(len(c.tsm.values), func(i int) bool {
		return c.tsm.values[i].UnixNano() >= seek
	})

	return c
}

func (c *integerAscendingCursor) nextTSM() {
	c.tsm.pos++
	if c.tsm.pos >= len(c.tsm.values) {
		c.tsm.keyCursor.Next()
		c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerBlock(&c.tsm.values)
		if len(c.tsm.values) == 0 {
			return
		}
		c.tsm.pos = 0
	}
}

可以看到读取 block 的重要方法 ReadIntegerBlock ,这时才真正发生迭代读取磁盘上 block 的操作,再仔细看看这个方法

func (c *KeyCursor) ReadIntegerBlock(buf *[]IntegerValue) ([]IntegerValue, error) {

	first := c.current[0]
	*buf = (*buf)[:0]
	var values IntegerValues
	values, err := first.r.ReadIntegerBlockAt(&first.entry, buf)

	// ...
}

ReadIntegerBlockAt 通过传入的偏移量在文件中寻找相应的 block,这里的文件实际上是访问 mmap 内存映射的文件,这种方式能够让我们以更好的性能像访问内存中一个 byte 切片那样访问一个文件,写的时候也只需要写入这段切片,等操作系统自动刷回或者手动刷回即可。对 mmap 映射内存的访问交给了 mmapAccessor 来完成,简单看一下它的结构和使用方法

type mmapAccessor struct {
	accessCount uint64 // 访问计数
	freeCount   uint64 // 释放前检查引用计数
	mmapWillNeed bool

	mu sync.RWMutex
	b  []byte	// 映射到内存的 block
	f  *os.File // 映射到内存的文件

	index *indirectIndex
}

// 存储引擎会为每一个 tsm 文件都创建 mmap 内存映射
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
	t := &TSMReader{}
	// 不定长函数参数自定义配置,插件式的配置设计可以减少导出API接口的改动
	for _, option := range options {
		option(t)
	}

	stat, err := f.Stat()
	if err != nil {
		return nil, err
	}
	t.size = stat.Size()
	t.lastModified = stat.ModTime().UnixNano()
	t.accessor = &mmapAccessor{
		f:            f,
		mmapWillNeed: t.madviseWillNeed,
	}

	index, err := t.accessor.init()
	if err != nil {
		return nil, err
	}

	t.index = index
	t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)

	if err := t.applyTombstones(); err != nil {
		return nil, err
	}

	return t, nil
}

// 从 mmap 内存中读取块的通用方法
// first.r.ReadIntegerBlockAt 使用的是自动生成的读取 int 类型块的方法,减少类型转换的内存开销
func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
	m.incAccess()

	m.mu.RLock()
	defer m.mu.RUnlock()

	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
		return nil, ErrTSMClosed
	}
	// 根据 entry 中的偏移量读取 TSM 文件的块
	var err error
	values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
	if err != nil {
		return nil, err
	}

	return values, nil
}

至此每个游标拿到了各自列的值,那么这些值是如何组合在一起的呢?接着往下看

迭代器合并

在最开始创建 tagSet 的迭代器的时候,query.Iterator 切片会被返回,多个迭代器会合并成一个迭代器返回。合并的细节如下:

func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) {
	// 是否为内置函数表达式
	call, ok := opt.Expr.(*influxql.Call)
	// 根据 opt 选项确定是否需要有序输出
	if !ok && opt.MergeSorted() {
		// 按name或者tag有序输出需要创建有序合并迭代器
		itr := NewSortedMergeIterator(a, opt)
		if itr != nil && opt.InterruptCh != nil {
			itr = NewInterruptIterator(itr, opt.InterruptCh)
		}
		return itr, nil
	}

	// 不需要有序输出,
	itr := NewMergeIterator(a, opt)
	if itr == nil {
		return nil, nil
	}

	// ...

	if !ok {
		return itr, nil
	}
}

本质上对 SELECT 语句的解析过程可以概括为编译原理中一个不断进行递归下降的过程,自顶向下分别为 shard- engine -tagSet - series key ,迭代器自底向上生成通过上述方式进行合并然后返回给它的上层,query.Iterator 是连接这一切的通用迭代器接口。

func buildCursor(ctx context.Context, stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) (Cursor, error) {
	// ...
	// Produce an iterator for every single call and create an iterator scanner
	// associated with it.
	scanners := make([]IteratorScanner, 0, len(valueMapper.calls))
	for call := range valueMapper.calls {
		driver := valueMapper.table[call]
		if driver.Type == influxql.Unknown {
			// The primary driver of this call is of unknown type, so skip this.
			continue
		}

		itr, err := buildFieldIterator(ctx, call, ic, stmt.Sources, opt, selector, stmt.Target != nil)
		if err != nil {
			for _, s := range scanners {
				s.Close()
			}
			return nil, err
		}

		keys := make([]influxql.VarRef, 0, len(auxKeys)+1)
		keys = append(keys, driver)
		keys = append(keys, auxKeys...)

		scanner := NewIteratorScanner(itr, keys, opt.FillValue)
		scanners = append(scanners, scanner)
	}

	if len(scanners) == 0 {
		return newNullCursor(fields), nil
	} else if len(scanners) == 1 {
		return newScannerCursor(scanners[0], fields, opt), nil
	}
	return newMultiScannerCursor(scanners, fields, opt), nil
}