Deep Dive into InfluxDB - Storage Engine

Deep Dive into InfluxDB - Storage Engine

Introduction

The previous article mainly introduced how data points are written from clients to InfluxDB. For the first time at the source code level, we touched on several main structures of the InfluxDB storage engine and detailed the process of data from database all the way to WAL disk writing. Based on the previous article, this article will continue to introduce in detail each component inside the InfluxDB storage engine and its working principles.

Understanding the Overall Architecture

The following diagram shows several main components included in the storage engine. It should be noted that WAL and TSM are essentially files, except that WAL is an append-only data file optimized for fast writes, while TSM is a read-only data file optimized for fast reads. Other components above them exist in memory and are built by the runtime.

Internally, WAL consists of a group of segment files named using numbers, appending compressed series for writes. When the current writing file exceeds 10MB, the current file will be closed and a new file created to receive incoming data, as shown below:

Data Consistency

Some readers may have questions when they see this point: why are the files being written to inconsistent with the files being read? As a database, does this mean data consistency has been broken? In the official design document, they answer the data consistency issue like this: sacrificing strong data consistency to achieve higher performance reads and writes. The direct result of this design is that the data being read may not be the latest data, but this in no way affects our use of time-series databases. Usually when querying these data, we pay more attention to the trend changes of large amounts of points over time, so the small amount of points that might be lost each time will not affect our judgment of overall trends. Moreover, query intervals are typically less than data sampling write intervals, and this “natural delay” exists in common usage scenarios of time-series databases. With proper write load control, data consistency can be basically guaranteed.

The advantages brought by such design are also obvious: there’s no need for file gap locks to handle read-write conflicts on the same file, the code implementation is simpler, files used for writing and reading are mutually unaffected at the IO level, but still need to maintain a consistent data view. This leads us to the first component Cache.

Cache

Cache is a full memory backup of WAL, with an internal data structure that is a simple hash ring based on fixed-size slices. When there are only write and delete requests, this data will be written to both Cache and WAL, causing Cache to occupy more and more memory. Therefore, the storage engine uses two parameters to limit the size of Cache: cache-snapshot-memory-size and cache-max-memory-size. The former specifies the minimum threshold for starting to convert cache to TSM Files, while the latter specifies the maximum cache memory usage value. If this value is reached, the database will start rejecting writes. Specifically, a goroutine is started to periodically check the cache size:

func (e *Engine) compactCache() {
	t := time.NewTicker(time.Second)
	defer t.Stop()
	for {
		e.mu.RLock()
		quit := e.snapDone
		e.mu.RUnlock()

		select {
		case <-quit:
			return
		// Check once per second
		case <-t.C:
			// ...
			// Check if snapshot conditions are met
			if e.ShouldCompactCache(time.Now()) {
				start := time.Now()
				err := e.WriteSnapshot() // Write cache snapshot to TSM files
				// Error handling
			}
		}
	}
}

func (e *Engine) WriteSnapshot() (err error) {
	started := time.Now()

	closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
		e.mu.Lock()
		defer e.mu.Unlock()

		if e.WALEnabled {
			// Close current WAL segment file and open new file
			if err = e.WAL.CloseSegment(); err != nil {
				return
			}
			// Return closed segments
			segments, err = e.WAL.ClosedSegments()
			if err != nil {
				return
			}
		}
		// Take snapshot of cache, essentially creating new ring and swapping pointers with existing old hash ring
		snapshot, err = e.Cache.Snapshot()
		if err != nil {
			return
		}

		return
	}()

	if err != nil {
		return err
	}
	dedup := time.Now()
	// Deduplicate and stable sort the snapshot before writing
	snapshot.Deduplicate()

	return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}

func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
	// ...
	// Create multiple TSM files, with Compactor writing snapshot cache shards concurrently
	newFiles, err := e.Compactor.WriteSnapshot(snapshot) // 
	// Update FileStore
	if err := e.FileStore.Replace(nil, newFiles); err != nil {
		// If error occurs, delete new files
		for _, file := range newFiles {
			err := os.Remove(file)
		}
		return err
	}
	// Snapshot has been written to disk, clean cache and WAL to release space
	e.Cache.ClearSnapshot(true)
	e.WAL.Remove(closedFiles)
	return nil
}

Compactor

Over time, as snapshot writes increase, if each one produces at least one TSM file, files in the data folder will become increasingly fragmented, potentially causing sharp declines in read performance. If every query needs to search multiple files simultaneously, one process will occupy too many file descriptors and cause IO overload. Therefore, besides being responsible for writing snapshots, Compactor also needs to merge different snapshot TSM files. A goroutine is opened to execute merging regularly, with code as follows:

func (e *Engine) compact(wg *sync.WaitGroup) {
	t := time.NewTicker(time.Second)
	defer t.Stop()

	for {
		e.mu.RLock()
		quit := e.done
		e.mu.RUnlock()

		select {
		case <-quit:
			return

		case <-t.C:

			// Find adjacent generations layer by layer and group them
			level1Groups := e.CompactionPlan.PlanLevel(1)
			level2Groups := e.CompactionPlan.PlanLevel(2)
			level3Groups := e.CompactionPlan.PlanLevel(3)
			level4Groups := e.CompactionPlan.Plan(e.LastModified())
			atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))

			// No need for full merge, try query optimization
			if len(level4Groups) == 0 {
				level4Groups = e.CompactionPlan.PlanOptimize()
			}

			// Set queue length for each level used for merging
			e.scheduler.setDepth(1, len(level1Groups))
			e.scheduler.setDepth(2, len(level2Groups))
			e.scheduler.setDepth(3, len(level3Groups))
			e.scheduler.setDepth(4, len(level4Groups))

			// Scheduler determines which level merge operations can be executed
			if level, runnable := e.scheduler.next(); runnable {
				switch level {
				case 1:
					if e.compactHiPriorityLevel(level1Groups[0], 1, false, wg) {
						level1Groups = level1Groups[1:]
					}
				case 2:
					if e.compactHiPriorityLevel(level2Groups[0], 2, false, wg) {
						level2Groups = level2Groups[1:]
					}
				case 3:
					if e.compactLoPriorityLevel(level3Groups[0], 3, true, wg) {
						level3Groups = level3Groups[1:]
					}
				case 4:
					if e.compactFull(level4Groups[0], wg) {
						level4Groups = level4Groups[1:]
					}
				}
			}

			// Only take the head group from each grouping for merging, others need manual release to avoid locking
			e.CompactionPlan.Release(level1Groups)
			e.CompactionPlan.Release(level2Groups)
			e.CompactionPlan.Release(level3Groups)
			e.CompactionPlan.Release(level4Groups)
		}
	}
}

At this point, 4 Levels appear, representing four groups of TSM files with different priorities. Compactor gets TSM file details through fileStore. Written snapshots are placed in Level 1 by default, with filenames named as [generation]-[sequence number seq], using seq as the basis for level division. Generation increments when the Cache hash ring is sharded, i.e., when series are grouped. Adjacent generation files are merged into one and the larger generation is selected as the new file’s generation. Seq only increments when exceeding the maximum file size or maximum indexable data block size. Seq increment is equivalent to writing files to the next level. Higher seq levels (1~4) represent that the scheduler will take different measures for merging to achieve higher compression ratios and generate more optimized indexes.

// Divide level according to tsm file seq
func (t *tsmGeneration) level() int {
	_, seq, _ := t.parseFileName(t.files[0].Path)
	if seq < 4 {
		return seq
	}
	// If seq exceeds 4, treat as L4
	return 4
}

L1 and L2 files are high-priority read files, meaning most queries will likely read these files. Therefore, during merging, Compactor tries to avoid operations that heavily consume CPU like decompression or reorganizing data blocks. The merging diagram is as follows:

Shard

The existence of data retention policies means data in TSM files cannot grow endlessly, requiring a fast deletion mechanism. Through time periods specified by data retention policies, continuous time periods of data retention are divided into individual shards. For example, if the retention policy keeps one week of data, then each shard contains 1 day of data arranged sequentially. If there’s only one RP strategy, the entire shard can be deleted directly. If multiple RP strategies exist, the shard can be attached to matching RPs.

Bonus: Fixed-Length Hash Ring

If we don’t consider whether this structure is a ring and only consider hashing, the first structure that comes to mind is the built-in map. If there are many keys causing the load factor to exceed the limit, the map will automatically expand, pre-allocate new buckets, and gradually migrate data from old buckets to new ones. However, the benefits of custom hash structures are:

  • Ability to define your own hash function
  • More flexible control over space usage

Being able to define your own hash function means we can make types in Go that don’t support == or != operations also support mapping structures. But I believe the fundamental reason for implementing cache with custom hash structures in InfluxDB source code is to evenly distribute all entries, to spread out computational load, so they simply wrap another layer of custom mapping structure outside, while ensuring that specific entries can be quickly found through series key. Returning to specific implementation, the first problem to solve is how to use a hash function to transform []byte keys into array index subscripts. In the source code implementation, mainly using the 64-bit xxhash algorithm to quickly calculate 8-byte digest values for []byte, then taking modulo of the digest value to determine the specific slot in the array. In the slot, it’s converted to string as key for the built-in map to map to entries. Specific implementation is as follows:

const partitions = 16

type ring struct {
	keysHint int64 // Count keys
	// Underlying array of the ring
	partitions []*partition
}

type partition struct {
	mu    sync.RWMutex
	store map[string]*entry
}

// Construct hash ring, n must be a power of 2, maximum 256
func newring(n int) (*ring, error) {
	if n <= 0 || n > partitions {
		return nil, fmt.Errorf("invalid number of partitions: %d", n)
	}

	r := ring{
		partitions: make([]*partition, n),
	}

	for i := 0; i < len(r.partitions); i++ {
		r.partitions[i] = &partition{
			store: make(map[string]*entry),
		}
	}
	return &r, nil
}

// Specific implementation of cache quick sharding
func (r *ring) split(n int) []storer {
	var keys int
	storers := make([]storer, n)
	for i := 0; i < n; i++ {
		storers[i], _ = newring(len(r.partitions))
	}

	for i, p := range r.partitions {
		r := storers[i%n].(*ring)
		r.partitions[i] = p
		keys += len(p.store)
	}
	return storers
}

// Quickly find the group where the key is located
func (r *ring) getPartition(key []byte) *partition {
	return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}

// For the same key, first find the group, then find the entry in the group
func (r *ring) entry(key []byte) *entry {
	return r.getPartition(key).entry(key)
}

// Normal mapping
func (p *partition) entry(key []byte) *entry {
	p.mu.RLock()
	e := p.store[string(key)]
	p.mu.RUnlock()
	return e
}

A simple diagram to illustrate this mapping relationship, which helps understand the query process later, as shown below: