深入浅出 InfluxDB — 写操作
前言
本文为深入浅出 InfluxDB 系列文章的第二篇,已理解 InfluxDB 基本术语的读者可以放心食用,源码讲解基于开源版 InfluxDB v1.8
交互模型分析
典型的 CS 模式,client/v2 包作为客户端使用 http 协议与数据库的 HTTP handler 进行交互实现数据的读写操作
文档上的请求示例如下
# 写数据
curl -XPOST "http://localhost:8086/write?db=mydb" \
-d 'cpu,host=server01,region=uswest load=42 1434055562000000000'
# 查询数据
curl -G "http://localhost:8086/query?pretty=true" --data-urlencode "db=mydb" \
--data-urlencode "q=SELECT * FROM cpu WHERE host='server01' AND time < now() - 1d"易知写操作请求的路径是 /write ,读操作请求路径为 /query
写操作
所有的 http handler 处理逻辑可以在 services/httpd 包下的 handler.go 中找到
func (h *Handler) serveWrite(database string, retentionPolicy string, w http.ResponseWriter, r *http.Request, user meta.User) {
// ...
body := r.Body
var bs []byte
// ..
// bytes.Buffer 读取客户端发送的格式化好的数据行描述
buf := bytes.NewBuffer(bs)
_, err := buf.ReadFrom(body)
// 解析格式化数据行反序列化成数据点
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
// 写入数据点
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
// 错误处理并返回写请求的结果
// ...
}反序列化得到的数据点切片的基本类型为 Point 接口,其中定义了很多 getter 和 setter 接口方法,可以很方便地获取或者迭代数据点中的 key tag field timestamp checksum 等
PointsWriter 是 Handler 内部的匿名接口,由API服务端初始化的时候注入
type Handler struct {
mux *pat.PatternServeMux
// ...
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
}
// Server端初始化HTTP服务
func (s *Server) appendHTTPDService(c httpd.Config) {
// ...
srv := httpd.NewService(c)
srv.Handler.MetaClient = s.MetaClient
authorizer := meta.NewQueryAuthorizer(s.MetaClient)
// ...
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter // *coordinator.PointsWriter
ss := storage.NewStore(s.TSDBStore, s.MetaClient)
srv.Handler.Store = ss
if s.config.HTTPD.FluxEnabled {
srv.Handler.Controller = control.NewController(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.Logger)
}
s.Services = append(s.Services, srv)
}找到真正实现这个接口的 PointsWriter 结构体,实际调用的是 WritePointsPrivileged 这个更通用的方法
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
// 数据的组织层级自顶向下是 database - retention policy - shard - series
// 如果没有指定写入哪个保留策略目录下
if retentionPolicy == "" {
// 检查最顶层的数据库名是否存在
db := w.MetaClient.Database(database)
if db == nil {
return influxdb.ErrDatabaseNotFound(database)
}
// 不指定则写入系统默认创建的保留策略目录
retentionPolicy = db.DefaultRetentionPolicy
}
// 将数据点进行时间戳检查,接着哈希分组找到它们应该插入到的 shard
shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
if err != nil {
return err
}
// 把分好组的数据点按同一shard的点开启一个协程批量写入,ch作为接收写入错误的回调通道
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
// 写入到 shard 中
err := w.writeToShard(shard, database, retentionPolicy, points)
// shard 可能已经被删除或等待被删除
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}
ch <- err
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}
// 尝试先订阅写请求,可用于PointsWriter的优雅关闭,防止丢数据
var ok, dropped int64
pts := &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}
// 获取读锁,如果 PointsWriter 正在 close,能保证 close 先进行
// 所有 ch 如果都是 nil,则 select 运行 default 分支
w.mu.RLock()
// 默认为1个订阅者
for _, ch := range w.subPoints {
// 每个订阅通道容量默认为1
select {
case ch <- pts:
ok++
default:
dropped++
}
}
w.mu.RUnlock()
// 错误处理,有的数据点在分配 shard 过程中被丢弃
if err == nil && len(shardMappings.Dropped) > 0 {
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
}
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()
// 写入点要进行超时控制,在一定时间范围内接收每一个shard的写入结果
// 有一组 shard 写入错误就将错误立即返回
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout.C:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
return ErrTimeout
case err := <-ch:
if err != nil {
return err
}
}
}
return err
}继续跟踪 WriteToShard接口,具体的实现在 tsdb 包下
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RLock()
select {
case <-s.closing:
s.mu.RUnlock()
return ErrStoreClosed
default:
}
sh := s.shards[shardID]
if sh == nil {
s.mu.RUnlock()
return ErrShardNotFound
}
// 每个 shard 会自带一个写屏障,以便对删除和写入进行同步,保证在删除前写入操作能够全部结束
epoch := s.epochs[shardID]
s.mu.RUnlock()
// 得到写屏障中的拦截器
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// 如果有删除操作对此次写入有影响,则在删除完成前写入操作必须等待
for _, guard := range guards {
if guard.Matches(points) {
guard.Wait()
}
}
return sh.WritePoints(points)
}
func (s *Shard) WritePoints(points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
// 确保当前 shard 是可读可写的
engine, err := s.engineNoLock()
if err != nil {
return err
}
var writeError error
// 对写入数据进行清洗,去除非法字符
// 检查 fields 是否一致
// 如果有不一致的返回 diff 后新增的 fields
points, fieldsToCreate, err := s.validateSeriesAndFields(points)
// 错误处理...
// 新的 fields 需要在 measurement 中创建,并更新 index 索引
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
}
// 存储引擎写入数据点
if err := engine.WritePoints(points); err != nil {
return fmt.Errorf("engine: %s", err)
}
return writeError
}目前为止还没有到具体的数据落盘,依然是在内存中,接着往下挖,可以看到数据被写入了两次,一次写入 Cache,另一次写入 WAL,图示和具体代码如下:

values := make(map[string][]Value, len(points))
// 映射 series的UUID -> 同一field的值集合
// []models.Point 到 values 的映射过程...
// 先尝试写入cache(快速写入内存,未落盘)
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
// 写入 write-ahead log(涉及IO操作较慢,数据落盘)
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}从前面 Engine 的 WritePoints 我们可以详细地看到写入同一个 shard 的点集是如何被一步步分成一个个 series 的,但在理解映射过程之前我们先要搞清楚 series 在数据库中的组织方式。理解 series 最直观的方式就是官方文档的图解:

根据图片可以直观看到标识 series 的key:Measurement Tag set 还有单个 Field key,这三者的组合作为 values 字典的 key,又因为 influxdb 中的每一个点通过 series 和时间戳进行唯一标识,因此每一个 Field key 的值还需要拷贝所在点的时间戳,单个 value 加上 timestamp 的组合即为 Value 接口所表达的数据。
中间映射过程的代码细节:
var (
keyBuf []byte
baseLen int
)
// 对于每一个点
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...) // measurement + tag set
keyBuf = append(keyBuf, keyFieldSeparator...) // measurement + tag set + 魔术分隔符
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
// series UUID = measurement + tag set + 魔术分隔符 + 当前迭代到的 Field key
// 压缩前缀树用 series UUID 快速查找 Field 是否存在
// 不存在则将当前 field 的 type 写入前缀树
// ...
// 根据当前 field 类型构造 Value,包装成接口屏蔽底层类型
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
v = NewFloatValue(t, fv) // 其他分支同理
case models.Integer:
case models.Unsigned:
case models.String:
case models.Boolean:
}
// 添加 value 至对应的 series
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}cache 和 WAL 写入的都是转换好的 series,需要注意的是写入 cache 的 series 不排序,不压缩,直接将 values 字典以 KV 的形式写入内存常驻存储。而落盘写入 WAL 文件的 series 需要进行排序和压缩,在数据刷盘的同时又保证了数据能够以最快的速度在内存被快速读取。
下面具体来看看 WAL 在数据写入文件之前都做了哪些预处理:
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
entry := &WriteWALEntry{
Values: values,
}
id, err := l.writeToLog(entry)
if err != nil {
return -1, err
}
return id, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// bytesPool 用于复用 byte 切片减少内存分配,这个结构的设计后面会有详解
bytes := bytesPool.Get(entry.MarshalSize()) // 预计算需要的 size,尝试从池中获取
// 数据真正序列化成 []byte
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}
// 再申请另外一块 []byte 用于存放 snappy 压缩后的数据
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
compressed := snappy.Encode(encBuf, b)
bytesPool.Put(bytes) // []byte 回收内存池中
syncErr := make(chan error)
segID, err := func() (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
// 确保文件处于打开状态
select {
case <-l.closing:
return -1, ErrWALClosed
default:
}
// 检查是否需要创建新的 segment file 用于存放数据
if err := l.rollSegment(); err != nil {
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
// 为压缩数据附加上固定长度的操作类型头,通过 write 系统调用将压缩数据刷入磁盘文件
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
select {
case l.syncWaiters <- syncErr:
default:
return -1, fmt.Errorf("error syncing wal")
}
l.scheduleSync()
l.lastWriteTime = time.Now().UTC()
return l.currentSegmentID, nil
}()
bytesPool.Put(encBuf)
if err != nil {
return segID, err
}
// 等待 fsync 完成返回
return segID, <-syncErr
}至此,一批数据最终被成功刷入磁盘,写操作的具体细节阐述完毕。
彩蛋:全局byte切片内存池
在大部分情况下,频繁分配大段连续的内存都会对 GC 造成较大的压力。InfluxDB 使用 bytespool 这个全局变量来管理 byte 切片的重用,具体的类型为 LimitedBytes,一般来说全局内存池的设计应该满足以下几点:
- 协程并发访问安全
- 手动管理,内部的元素不能被 GC 释放
- 获取元素遵循懒分配的原则,优先复用已有的元素,这样才能真正降低内存分配的压力
- 池中对象数量的最大值必须要有限制
type LimitedBytes struct {
maxSize int
pool chan []byte // 使用带缓冲的通道解决并发访问和复用元素的存储问题
}
// 构造函数返回的对象赋值给全局变量保证元素不会被GC释放
// 传入容量参数指定最大存储元素个数
func NewLimitedBytes(capacity int, maxSize int) *LimitedBytes {
return &LimitedBytes{
pool: make(chan []byte, capacity),
maxSize: maxSize,
}
}
func (p *LimitedBytes) Get(sz int) []byte {
var c []byte
select {
case c = <-p.pool: // 优先复用通道内已有的元素
default:
return make([]byte, sz) // 通道内没有复用对象,只能重新分配
}
// 复用切片容量小于需要的切片容量,只能重新分配
if cap(c) < sz {
return make([]byte, sz)
}
// 大于或等于需要的容量,进行裁剪返回
return c[:sz]
}
func (p *LimitedBytes) Put(c []byte) {
// 超过复用长度,切片交给 GC 管理
if cap(c) >= p.maxSize {
return
}
// 尝试放入通道,如果通道已满,则 c 切片由 GC 释放
select {
case p.pool <- c:
default:
}
}有意思的是超过复用长度的切片并不会得到复用,个人猜测这样设计的理由是为了防止大切片的存在,即便在构造的时候就已经对池中切片的数量进行了限制,但每个写操作向内存池申请切片指定的 size 是不可预知的,因此可能会在瞬时写入大量数据的时候导致内存池的占用瞬间飙升,如果不对这些大切片加以限制,就会导致内存的浪费。把容量大于最大值的大切片交给 GC 进行回收能够更好更充分地利用内存,而且便于数据库对自身内存用量进行统计分析。