SkillAgentSearch skills...

Mandodb

🤔 A minimize Time Series Database, written from scratch as a learning project. 从零开始实现一个 TSDB

Install / Use

/learn @chenjiandongx/Mandodb
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

mandodb

🤔 A minimize Time Series Database, written from scratch as a learning project.

<a href="https://github.com/chenjiandongx/mandodb/pulls"> <img src="https://img.shields.io/badge/contributions-welcome-brightgreen.svg?style=flat" alt="Contributions welcome"> </a> <a href="https://goreportcard.com/report/github.com/chenjiandongx/mandodb"> <img src="https://goreportcard.com/badge/github.com/chenjiandongx/mandodb" alt="Go Report Card"> </a> <a href="https://opensource.org/licenses/MIT"> <img src="https://img.shields.io/badge/License-MIT-brightgreen.svg" alt="MIT License"> </a> <a href="https://pkg.go.dev/github.com/chenjiandongx/mandodb"> <img src="https://godoc.org/github.com/chenjiandongx/mandodb?status.svg" alt="GoDoc"> </a>

时序数据库(TSDB: Time Series Database)大多数时候都是为了满足监控场景的需求,这里先介绍两个概念:

  • 数据点(Point): 时序数据的数据点是一个包含 (Timestamp:int64, Value:float64) 的二元组。
  • 时间线(Series): 不同标签(Label)的组合称为不同的时间线,如
series1: {"__name__": "netspeed", "host": "localhost", "iface": "eth0"}
series2: {"__name__": "netspeed", "host": "localhost", "iface": "eth1"}

Prometheus, InfluxDB, M3, TimescaleDB 都是时下流行的 TSDB。时序数据的压缩算法很大程度上决定了 TSDB 的性能,以上几个项目的实现都参考了 Facebook 2015 年发表的论文《Gorilla: A fast, scalable, in-memory time series database》 中提到的差值算法,该算法平均可以将 16 字节的数据点压缩成 1.37 字节,下文会介绍。

Who's mando?

Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

<p align="center"><image src="./images/mando.png" width="620px"></p>

What's mandodb?

作为一名监控系统开发人员,自然要对时序数据库有所了解。mandodb 是我在研究过程中实现的一个最小化的 TSDB,从概念上来讲它还算不上是一个完整的 TSDB,因为它:

  • 没有实现自己的查询引擎(实现难度大)
  • 缺少磁盘归档文件 Compact 操作(天气好的话会实现)
  • 没有 WAL 作为灾备保证高可用(心情好的话会实现)

mandodb 主要受到了两个项目的启发。本项目仅限于学习用途,未经生产环境测试验证!

prometheus 的核心开发者 Fabian Reinartz 写了一篇文章 《Writing a Time Series Database from Scratch》 来介绍 prometheus TSDB 的演变过程,非常值得一读,强烈推荐。

📖 TOC

  • 💡 数据模型 & API
  • 🛠 配置选项
  • 🔖 用法示例
  • 🧮 Gorilla 差值算法
  • 📝 数据写入
  • 🖇 Mmap 内存映射
  • 📍 索引设计
  • 🗂 存储布局
  • ❓ FAQ

💡 数据模型 & API 文档

数据模型定义

// Point 表示一个数据点 (ts, value) 二元组
type Point struct {
	Ts    int64 // in seconds
	Value float64
}

// Label 代表一个标签组合
type Label struct {
	Name  string
	Value string
}

// Row 一行时序数据 包括数据点和标签组合
type Row struct {
	Metric string
	Labels LabelSet
	Point  Point
}

// LabelSet 表示 Label 组合
type LabelSet []Label

// LabelMatcher Label 匹配器 支持正则
type LabelMatcher struct {
	Name   string
	Value  string
	IsRegx bool
}

// LabelMatcherSet 表示 LabelMatcher 组合
type LabelMatcherSet []LabelMatcher

API

// InsertRows 写数据
InsertRows(rows []*Row) error 

// QueryRange 查询时序数据点
QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)

// QuerySeries 查询时序序列组合
QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)

// QueryLabelValues 查询标签值
QueryLabelValues(label string, start, end int64) []string

🛠 配置选项

配置项在初始化 TSDB 的时候设置。

// WithMetaSerializerType 设置 Metadata 数据的序列化类型
// 目前只提供了 BinaryMetaSerializer
WithMetaSerializerType(t MetaSerializerType) Option 

// WithMetaBytesCompressorType 设置字节数据的压缩算法
// 目前提供了
// * 不压缩: NoopBytesCompressor(默认)
// * ZSTD: ZstdBytesCompressor
// * Snappy: SnappyBytesCompressor
WithMetaBytesCompressorType(t BytesCompressorType) Option

// WithOnlyMemoryMode 设置是否默认只存储在内存中
// 默认为 false
WithOnlyMemoryMode(memoryMode bool) Option

// WithEnabledOutdated 设置是否支持乱序写入 此特性会增加资源开销 但会提升数据完整性
// 默认为 true
WithEnabledOutdated(outdated bool) Option

// WithMaxRowsPerSegment 设置单 Segment 最大允许存储的点数
// 默认为 19960412(夹杂私货 🐶)
WithMaxRowsPerSegment(n int64) Option

// WithDataPath 设置 Segment 持久化存储文件夹
// 默认为 "."
WithDataPath(d string) Option

// WithRetention 设置 Segment 持久化数据保存时长
// 默认为 7d
WithRetention(t time.Duration) Option

// WithWriteTimeout 设置写入超时阈值
// 默认为 30s
WithWriteTimeout(t time.Duration) Option

// WithLoggerConfig 设置日志配置项
// logger: github.com/chenjiandongx/logger
WithLoggerConfig(opt *logger.Options) Option

🔖 用法示例

package main

import (
	"fmt"
	"time"

	"github.com/chenjiandongx/mandodb"
)

func main() {
	store := mandodb.OpenTSDB(
		mandodb.WithOnlyMemoryMode(true),
		mandodb.WithWriteTimeout(10*time.Second),
	)
	defer store.Close()

	// 插入数据
	_ = store.InsertRows([]*mandodb.Row{
		{
			Metric: "cpu.busy",
			Labels: []mandodb.Label{
				{Name: "node", Value: "vm1"},
				{Name: "dc", Value: "gz-idc"},
			},
			Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
		},
		{
			Metric: "cpu.busy",
			Labels: []mandodb.Label{
				{Name: "node", Value: "vm2"},
				{Name: "dc", Value: "sz-idc"},
			},
			Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
		},
	})

	time.Sleep(time.Millisecond)

	// 时序数据查询
	data, _ := store.QueryRange("cpu.busy", nil, 1600000000, 1600000002)
	fmt.Printf("data: %+v\n", data)
	// output:
	// data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]

	// 查询 Series
	// __name__ 是 metric 名称在 TSDB 中的 Label Key
	ser, _ := store.QuerySeries(
        mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 1600000000, 1600000002)
	for _, d := range ser {
		fmt.Printf("data: %+v\n", d)
	}
	// output:
	// data: map[__name__:cpu.busy dc:gz-idc node:vm1]
	// data: map[__name__:cpu.busy dc:sz-idc node:vm2]

	// 查询标签值
	lvs := store.QueryLabelValues("node", 1600000000, 1600000002)
	fmt.Printf("data: %+v\n", lvs)
	// output:
	// data: [vm1 vm2]
}

下面是我对这段时间学习内容的整理,尝试完整介绍如何从零开始实现一个小型的 TSDB。

<p align="center"><image src="./images/教我做事.png" width="320px"></p>

我本身并没有数据库开发的背景,某些描述可能并不那么准确,所以欢迎 ~~实名 diss~~ 指正。

🧮 Gorilla 差值算法

Gorilla 论文 4.1 小节介绍了压缩算法,先整体看一下压缩方案,T/V 是紧挨存储的,'0'/'10'/'11' 表示控制位。

Figure: Gorilla 压缩算法

<p align="center"><image src="./images/gorilla.png" width="600px"></p>

Timestamp DOD 压缩:

在时序的场景中,每个时序点都有一个对应的 Timestamp,一条时序序列中相邻数据点的间隔是有规律可循的。一般来讲,监控数据的采集都是会以固定的时间间隔进行的,所以就可以用差值来记录时间间隔,更进一步,我们可以用差值的差值来记录以此来减少存储空间。

t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
--------------------------------------------------------------
// 差值:delta
t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10; 
--------------------------------------------------------------
// 差值的差值:delta of delta
t1: 1627401800; dod1: 0; dod2: 0; dod3: 0; 

实际环境中当然不可能每个间隔都这么均匀,由于网络延迟等其他原因,差值会有波动。

Value XOR 压缩:

Figure: IEEE 浮点数以及 XOR 计算结果

<p align="center"><image src="./images/float64.png" width="600px"></p>

当两个数据点数值值比较接近的话,通过异或操作计算出来的结果是比较相似的,利用这点就可以通过记录前置零和后置零个数以及数值部分来达到压缩空间的目的。

下面通过算法实现来介绍,代码来自项目 dgryski/go-tsz。代码完全按照论文中给出的步骤来实现。

// New 初始化 block 这里会将第一个原始时间戳写入到 block 中
func New(t0 uint32) *Series {
	s := Series{
		T0:      t0,
		leading: ^uint8(0),
	}

	s.bw.writeBits(uint64(t0), 32)
	return &s
}

// Push 负责写入时序数据
func (s *Series) Push(t uint32, v float64) {
	// ....
	// 如果是第一个数据点的话写入原始数据后直接返回
	if s.t == 0 {
		s.t = t
		s.val = v
		s.tDelta = t - s.T0 // 实际上这里为 0

		// The block header stores the starting time stamp, t-1(前一个时间戳),
		// which is aligned to a two hour window; the first time
		// stamp, t0, in the block is stored as a delta from t−1 in 14 bits.
        
		// 用 14 个 bit 写入时间戳差值
		s.bw.writeBits(uint64(s.tDelta), 14)
		// 原始数据点完整写入
		s.bw.writeBits(math.Float64bits(v), 64)
		return
	}

	tDelta := t - s.t
	dod := int32(tDelta - s.tDelta) // 计算差值的差值 Detla of Delta

	// 下面开始就处理非第一个数据点的情况了
	switch {
		// If D is zero, then store a single ‘0’ bit
		// 如果是零的话 那直接用 '0' 一个字节就可以直接表示
	case dod == 0:
		s.bw.writeBit(zero)

		//  If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
	case -63 <= dod && dod <= 64:
		s.bw.writeBits(0x02, 2) // 控制位 '10'
		s.bw.writeBits(uint64(dod), 7) // 7bits 可以表示 [-63, 64] 的范围

		// If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
	case -255 <= dod && dod <= 256:
		s.bw.writeBits(0x06, 3) // 控制位 '110'
		s.bw.writeBits(uint64(dod), 9)

		// if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
	case -2047 <= dod && dod <= 2048:
		s.bw.writeBits(0x0e, 4) // 控制位 '1110'
		s.bw.writeBits(uint64(dod), 12)

		// Otherwise store ‘1111’ followed by D using 32 bits
	default:
		s.bw.writeBits(0x0f, 4) // 其余情况控制位均用 '1111'
		s.bw.writeBits(uint64(dod), 32)
	}

	// 到这里 (T, V) 中的时间戳已经写入完毕了 接下来是写 V 部分

	// 先计算两个值的异或结果
	vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)

	// If XOR with the previous is zero (same value), store single ‘0’ bit
	// 如果前后两个值相等的话 直接用 '0' 1 个 bit 就可以表示
	// 所以如果上报的时序数据是 1 或者 0 这种的话 占用的内存会非常少

	// zero = '0'; one = '1'
	if vDelta == 0 {
		s.bw.writeBit(zero)
	} else {    // 非 0 情况那就要把控制位置为 1
		s.bw.writeBit(one)

		// 计算前置 0 和后置 0
		leading := uint8(bits.LeadingZeros64(vDelta))
		trailing := uint8(bits.TrailingZeros64(vDelta))

		// clamp number of leading zeros to avoid overflow when encoding
		if leading >= 32 {
			leading = 31
		}

		// (Control bit ‘0’) If the block of meaningful bits
		// falls within the block of previous meaningful bits,
		// i.e., there are at least as many leading zeros and
		// as many trailing zeros as with the previous value,
		// use that information for the block position and
		// just store the meaningful XORed value.

		// 如果前置 0 不小于上一个值计算的异或结果的前置 0 且后置 0 也不小于上一个值计算的异或结果的后置 0
		if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => 控制位 '10'
			s.bw.writeBit(zero)
			// 记录异或值非零部分
			s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
		} else { // => 控制位 '11'

			// (Control bit ‘1’) Store 
View on GitHub
GitHub Stars703
CategoryData
Updated23d ago
Forks77

Languages

Go

Security Score

100/100

Audited on Mar 15, 2026

No findings