Deep Dive into InfluxDB - Write Operations
Introduction
This article is the second in the Deep Dive InfluxDB series. Readers familiar with basic InfluxDB terminology can proceed confidently. The source code analysis is based on open-source InfluxDB v1.8.
Interaction Model Analysis
The typical client-server pattern uses the client/v2 package as a client that communicates with the database’s HTTP handler via the HTTP protocol to perform read and write operations.
Example requests shown in documentation:
# Write data
curl -XPOST "http://localhost:8086/write?db=mydb" \
-d 'cpu,host=server01,region=uswest load=42 1434055562000000000'
# Query data
curl -G "http://localhost:8086/query?pretty=true" --data-urlencode "db=mydb" \
--data-urlencode "q=SELECT * FROM cpu WHERE host='server01' AND time < now() - 1d"As we can see, the write operation request path is /write, while the read operation request path is /query.
Write Operation
All HTTP handler processing logic can be found in handler.go under the services/httpd package.
func (h *Handler) serveWrite(database string, retentionPolicy string, w http.ResponseWriter, r *http.Request, user meta.User) {
// ...
body := r.Body
var bs []byte
// ..
// bytes.Buffer reads formatted data line descriptions sent by the client
buf := bytes.NewBuffer(bs)
_, err := buf.ReadFrom(body)
// Parse formatted data lines and deserialize into data points
points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
// Write data points
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
}
// Error handling and return write request results
// ...
}The deserialized data point slice has a basic type of the Point interface, which defines many getter and setter interface methods that can conveniently obtain or iterate through key, tag, field, timestamp, checksum, etc. within data points.
PointsWriter is an anonymous interface within Handler, injected during API server initialization.
type Handler struct {
mux *pat.PatternServeMux
// ...
PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
}
// Server-side HTTP service initialization
func (s *Server) appendHTTPDService(c httpd.Config) {
// ...
srv := httpd.NewService(c)
srv.Handler.MetaClient = s.MetaClient
authorizer := meta.NewQueryAuthorizer(s.MetaClient)
// ...
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter // *coordinator.PointsWriter
ss := storage.NewStore(s.TSDBStore, s.MetaClient)
srv.Handler.Store = ss
if s.config.HTTPD.FluxEnabled {
srv.Handler.Controller = control.NewController(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.Logger)
}
s.Services = append(s.Services, srv)
}Find the structure that actually implements this interface, PointsWriter. It calls the more general method WritePointsPrivileged.
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
// Data organization hierarchy from top to bottom: database - retention policy - shard - series
// If no retention policy is specified for writing
if retentionPolicy == "" {
// Check if the top-level database name exists
db := w.MetaClient.Database(database)
if db == nil {
return influxdb.ErrDatabaseNotFound(database)
}
// If not specified, write to the system-created default retention policy directory
retentionPolicy = db.DefaultRetentionPolicy
}
// Perform timestamp validation on data points, then hash group to find the shards they should be inserted into
shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
if err != nil {
return err
}
// Group data points by the same shard and start a goroutine to batch write them. ch serves as the callback channel for receiving write errors
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
// Write to shard
err := w.writeToShard(shard, database, retentionPolicy, points)
// Shard may have been deleted or is pending deletion
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}
ch <- err
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}
// Try to subscribe to write requests first, useful for graceful shutdown of PointsWriter to prevent data loss
var ok, dropped int64
pts := &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}
// Acquire read lock; if PointsWriter is closing, ensure close happens first
// If all ch are nil, select runs the default branch
w.mu.RLock()
// Default to 1 subscriber
for _, ch := range w.subPoints {
// Each subscription channel has default capacity of 1
select {
case ch <- pts:
ok++
default:
dropped++
}
}
w.mu.RUnlock()
// Error handling: some data points were dropped during shard allocation
if err == nil && len(shardMappings.Dropped) > 0 {
err = tsdb.PartialWriteError{Reason: "points beyond retention policy", Dropped: len(shardMappings.Dropped)}
}
timeout := time.NewTimer(w.WriteTimeout)
defer timeout.Stop()
// Write timeout control; receive write results from each shard within a certain time frame
// Return error immediately if one shard group fails to write
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout.C:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
return ErrTimeout
case err := <-ch:
if err != nil {
return err
}
}
}
return err
}Continue tracking the WriteToShard interface; the specific implementation is under the tsdb package.
func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RLock()
select {
case <-s.closing:
s.mu.RUnlock()
return ErrStoreClosed
default:
}
sh := s.shards[shardID]
if sh == nil {
s.mu.RUnlock()
return ErrShardNotFound
}
// Each shard has its own write barrier to synchronize deletion and writes, ensuring all write operations complete before deletion
epoch := s.epochs[shardID]
s.mu.RUnlock()
// Get interceptors from the write barrier
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// If any delete operation affects this write, the write operation must wait until deletion completes
for _, guard := range guards {
if guard.Matches(points) {
guard.Wait()
}
}
return sh.WritePoints(points)
}
func (s *Shard) WritePoints(points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
// Ensure current shard is readable and writable
engine, err := s.engineNoLock()
if err != nil {
return err
}
var writeError error
// Clean up write data, remove invalid characters
// Check if fields are consistent
// Return new fields after diff if inconsistent
points, fieldsToCreate, err := s.validateSeriesAndFields(points)
// Error handling...
// Create new fields in measurement and update index
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
}
// Storage engine writes data points
if err := engine.WritePoints(points); err != nil {
return fmt.Errorf("engine: %s", err)
}
return writeError
}So far, we haven’t reached the actual data persistence yet; it’s still in memory. Digging deeper, we can see data is written twice: once to Cache and once to WAL. The diagram and specific code are as follows:

values := make(map[string][]Value, len(points))
// Map series UUID -> collection of values for the same field
// []models.Point to values mapping process...
// First try to write to cache (fast in-memory write, not persisted to disk)
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
// Write to write-ahead log (involves I/O operations, slower, data persists to disk)
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}From the previous Engine’s WritePoints, we can see in detail how points in the same shard are progressively divided into individual series. However, before understanding the mapping process, we need to understand how series are organized in the database. The most intuitive way to understand series is through the official documentation diagram:

According to the image, we can visually see the key identifying the series: Measurement, Tag set, and single Field key. The combination of these three serves as the key for the values dictionary. Since each point in influxdb is uniquely identified by series and timestamp, each Field key’s value also needs to copy the timestamp of its point. The combination of a single value plus timestamp represents the data expressed by the Value interface.
Code details of the intermediate mapping process:
var (
keyBuf []byte
baseLen int
)
// For each point
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...) // measurement + tag set
keyBuf = append(keyBuf, keyFieldSeparator...) // measurement + tag set + magic separator
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
// series UUID = measurement + tag set + magic separator + current Field key being iterated
// Compressed prefix tree uses series UUID for fast lookup of whether Field exists
// If not exists, write current field's type to prefix tree
// ...
// Construct Value according to current field type, wrap as interface to shield underlying types
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
v = NewFloatValue(t, fv) // Other branches follow the same logic
case models.Integer:
case models.Unsigned:
case models.String:
case models.Boolean:
}
// Add value to corresponding series
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}Both cache and WAL write the converted series. Note that series written to cache are not sorted or compressed; they directly write the values dictionary to in-memory resident storage in KV format. Series written to WAL files for persistence need to be sorted and compressed, ensuring data can be quickly read from memory while being flushed to disk.
Let’s specifically look at what preprocessing WAL does before writing data to files:
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
entry := &WriteWALEntry{
Values: values,
}
id, err := l.writeToLog(entry)
if err != nil {
return -1, err
}
return id, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// bytesPool reuses byte slices to reduce memory allocation; this structure's design will be detailed later
bytes := bytesPool.Get(entry.MarshalSize()) // Pre-calculate needed size, try to get from pool
// Actually serialize data to []byte
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}
// Allocate another []byte for storing snappy-compressed data
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
compressed := snappy.Encode(encBuf, b)
bytesPool.Put(bytes) // Return []byte to memory pool
syncErr := make(chan error)
segID, err := func() (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
// Ensure file is open
select {
case <-l.closing:
return -1, ErrWALClosed
default:
}
// Check if we need to create a new segment file to store data
if err := l.rollSegment(); err != nil {
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
// Attach fixed-length operation type header to compressed data, flush compressed data to disk file via write system call
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
select {
case l.syncWaiters <- syncErr:
default:
return -1, fmt.Errorf("error syncing wal")
}
l.scheduleSync()
l.lastWriteTime = time.Now().UTC()
return l.currentSegmentID, nil
}()
bytesPool.Put(encBuf)
if err != nil {
return segID, err
}
// Wait for fsync to complete and return
return segID, <-syncErr
}At this point, a batch of data has been successfully flushed to disk, completing the detailed explanation of write operations.
Bonus: Global Byte Slice Memory Pool
In most cases, frequent allocation of large continuous memory segments puts significant pressure on GC. InfluxDB uses the global variable bytespool to manage byte slice reuse, with the specific type being LimitedBytes. Generally, global memory pool design should satisfy the following points:
- Thread-safe concurrent access
- Manual management, internal elements cannot be GC’d
- Follow lazy allocation principle when getting elements, prioritize reusing existing elements to truly reduce memory allocation pressure
- Maximum number of objects in pool must be limited
type LimitedBytes struct {
maxSize int
pool chan []byte // Use buffered channel to solve concurrent access and reusable element storage issues
}
// Constructor returns object assigned to global variable to ensure elements won't be GC'd
// Pass capacity parameter to specify maximum number of stored elements
func NewLimitedBytes(capacity int, maxSize int) *LimitedBytes {
return &LimitedBytes{
pool: make(chan []byte, capacity),
maxSize: maxSize,
}
}
func (p *LimitedBytes) Get(sz int) []byte {
var c []byte
select {
case c = <-p.pool: // Prioritize reusing existing elements in channel
default:
return make([]byte, sz) // No reusable objects in channel, must reallocate
}
// Reusable slice capacity less than needed slice capacity, must reallocate
if cap(c) < sz {
return make([]byte, sz)
}
// Greater than or equal to needed capacity, trim and return
return c[:sz]
}
func (p *LimitedBytes) Put(c []byte) {
// Exceeds reusable length, slice managed by GC
if cap(c) >= p.maxSize {
return
}
// Try to put into channel; if channel full, c slice released by GC
select {
case p.pool <- c:
default:
}
}Interestingly, slices exceeding reusable length don’t get reused. My personal guess for this design reason is to prevent the existence of large slices. Although the number of slices in the pool was already limited during construction, the size each write operation requests from the memory pool is unpredictable. Therefore, during instantaneous massive data writes, the memory pool usage might spike instantly. Without limiting these large slices, memory would be wasted. Handing over large slices with capacity greater than the maximum value to GC for recycling can better and more fully utilize memory, and facilitate database statistics and analysis of its own memory usage.