
B+树是现代数据库系统中最重要的索引结构,几乎所有主流数据库(MySQL、PostgreSQL、Oracle等)都采用B+树作为主要索引实现。本文档将深入解析B+树的理论基础、设计原理、算法实现,并详细分析我们系统中的具体实现代码。
B树特点:
┌─────────────────────────────────────────────┐
│ 内部节点既存储键也存储值 │
│ ┌───┬───┬───┐ │
│ │ K1│V1 │K2│V2│K3│V3│ │
│ └─┬─┴───┴─┬─┴───┴─┬─┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 子树 子树 子树 │
└─────────────────────────────────────────────┘
B+树特点:
┌─────────────────────────────────────────────┐
│ 内部节点只存储键和指针 │
│ ┌───┬───┬───┐ │
│ │ K1│PTR│K2│PTR│K3│PTR│ │
│ └─┬─┴───┴─┬─┴───┴─┬─┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 子树 子树 子树 │
│ │
│ 叶子节点存储所有数据并形成链表 │
│ [K1,V1]→[K2,V2]→[K3,V3]→[K4,V4] │
└─────────────────────────────────────────────┘
| 特性 | B树 | B+树 | 影响 |
|---|---|---|---|
| 数据存储 | 所有节点 | 仅叶子节点 | B+树内部节点更小,缓存效率更高 |
| 范围查询 | 需要中序遍历 | 叶子节点链表 | B+树范围查询性能显著优于B树 |
| 内存利用 | 分散存储 | 集中存储 | B+树可以将内部节点完全缓存 |
| IO次数 | 可能较多 | 相对固定 | B+树IO次数更可预测 |
对于m阶B+树:
设:
- n = 总记录数
- m = B+树的阶数
- h = 树的高度
最坏情况下的高度:
h ≤ log⌈m/2⌉(n) + 1
最好情况下的容量:
n ≤ m^h
实际案例:
- 4阶B+树,3层高度,最多存储 4³ = 64条记录
- 100阶B+树,3层高度,最多存储 100³ = 1,000,000条记录
查询操作的IO次数:
- 点查询:O(log_m n) 次磁盘IO
- 范围查询:O(log_m n + k/m) 次磁盘IO,k为结果集大小
插入操作的IO次数:
- 普通插入:O(log_m n) 次磁盘IO
- 引起分裂:O(log_m n) 次磁盘IO + 分裂写入成本
删除操作的IO次数:
- 普通删除:O(log_m n) 次磁盘IO
- 引起合并:O(log_m n) 次磁盘IO + 合并重写成本
InnoDB聚集索引结构:
┌─────────────────────────────────────────────┐
│ Root Node │
│ [10] [20] [30] [40] │
│ │ │ │ │ │
├───┼────┼────┼────┼─────────────────────────┤
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌───┐┌───┐┌───┐┌───┐ │
│ │...││...││...││...│ │
│ └───┘└───┘└───┘└───┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ 1,行│→│11,行│→│21,行│→│31,行│ │
│ │ 2,行│ │12,行│ │22,行│ │32,行│ │
│ │ ...│ │ ...│ │ ...│ │ ...│ │
│ │10,行│ │20,行│ │30,行│ │40,行│ │
│ └─────┘ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────────────┘
叶子节点存储完整的行数据
InnoDB二级索引结构:
┌─────────────────────────────────────────────┐
│ Root Node │
│ ['Alice'] ['John'] ['Tom'] │
│ │ │ │ │
├─────┼────────┼───────┼─────────────────────┤
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ ... │ │ ... │ │ ... │ │
│ └─────┘ └─────┘ └─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │'Alice',│→│'John', │→│'Tom', │ │
│ │ 1 │ │ 15 │ │ 28 │ │
│ │'Amy', │ │'Json', │ │'Tony', │ │
│ │ 3 │ │ 18 │ │ 35 │ │
│ └───────┘ └───────┘ └───────┘ │
└─────────────────────────────────────────────┘
叶子节点存储索引键值和主键值
InnoDB页面结构(16KB):
┌─────────────────────────────────────────┐
│ File Header (38字节) │ ← 文件头部
├─────────────────────────────────────────┤
│ Page Header (56字节) │ ← 页面头部
├─────────────────────────────────────────┤
│ Infimum Record │ ← 最小伪记录
├─────────────────────────────────────────┤
│ User Records │ ← 用户记录
│ ┌─────┬─────┬─────┬─────┐ │
│ │Rec1 │Rec2 │Rec3 │Rec4 │ ←→ 记录通过指针相连
│ └──┬──┴─────┴─────┴─────┘ │
│ │ ↑ │
│ └──┘ │
├─────────────────────────────────────────┤
│ Free Space │ ← 空闲空间
├─────────────────────────────────────────┤
│ Page Directory │ ← 页面目录(槽)
│ ┌─────┬─────┬─────┬─────┐ │
│ │Slot1│Slot2│Slot3│Slot4│ │
│ └─────┴─────┴─────┴─────┘ │
├─────────────────────────────────────────┤
│ Supremum Record │ ← 最大伪记录
├─────────────────────────────────────────┤
│ File Trailer (8字节) │ ← 文件尾部
└─────────────────────────────────────────┘
InnoDB记录格式(Compact):
┌─────────────────────────────────────────┐
│ 变长字段长度列表 │ NULL标志位 │ 记录头信息 │ 列数据 │
├─────────────────┼───────────┼─────────┼──────┤
│ 1-2字节 * n字段 │ 1字节 │ 5字节 │ 变长 │
└─────────────────┴───────────┴─────────┴──────┘
记录头信息详解(5字节):
┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│预留位1│预留位2│delete│min_rec│n_owned│heap_no│record│next │
│ 1bit │ 1bit │ 1bit │ 1bit │ 4bit │10bit │ 3bit │16bit│
└──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘
字段含义:
- delete_flag:标记记录是否被删除
- min_rec_flag:标记是否为非叶子节点的最小记录
- n_owned:该记录拥有的记录数(用于页面目录)
- heap_no:在页面堆中的位置
- record_type:记录类型(0=普通,1=内部节点,2=Infimum,3=Supremum)
- next_record:指向下一条记录的偏移量
-- 查询示例
SELECT id, name FROM users WHERE age BETWEEN 20 AND 30;
-- 如果存在索引:KEY idx_age_name (age, name)
-- MySQL可以只扫描索引,无需回表查询
-- 复合索引:KEY idx_abc (a, b, c)
-- 以下查询可以使用索引:
SELECT * FROM table WHERE a = 1; -- ✓
SELECT * FROM table WHERE a = 1 AND b = 2; -- ✓
SELECT * FROM table WHERE a = 1 AND b = 2 AND c = 3;-- ✓
SELECT * FROM table WHERE a = 1 AND c = 3; -- ✓ (只能用到a)
-- 以下查询无法使用索引:
SELECT * FROM table WHERE b = 2; -- ✗
SELECT * FROM table WHERE c = 3; -- ✗
SELECT * FROM table WHERE b = 2 AND c = 3; -- ✗
// 我们的实现
type InternalNode struct {
order int // B+树的阶数
keyCount int // 当前键的数量
keys [][]byte // 键数组
children []uint32 // 子节点页面ID数组
}
// MySQL InnoDB等价结构概念
struct page_cur_t {
page_t* page; // 页面指针
rec_t* rec; // 记录指针
ulint info_bits; // 信息位
// ... 更多字段
};
// 我们的实现
type LeafNode struct {
order int // B+树的阶数
keyCount int // 当前键的数量
keys [][]byte // 键数组
values [][]byte // 值数组
next uint32 // 下一个叶子节点页面ID
prev uint32 // 上一个叶子节点页面ID
}
// MySQL InnoDB的页面结构更复杂,包含:
// - Page Directory(页面目录)
// - Free Space Management(空闲空间管理)
// - Record Heap(记录堆)
// - 多种记录类型和格式
// 4KB页面的容量计算
const PageSize = 4096
const PageHeaderSize = 16
// 内部节点容量计算
func calculateInternalNodeCapacity() int {
availableSpace := PageSize - PageHeaderSize
// 每个键值对需要的空间:
// - 键长度:2字节
// - 键数据:平均8字节(假设)
// - 子节点指针:4字节
keyPairSize := 2 + 8 + 4 // = 14字节
// 加上一个额外的子节点指针
extraPointerSize := 4
maxKeys := (availableSpace - extraPointerSize) / keyPairSize
return maxKeys // 约285个键
}
// 叶子节点容量计算
func calculateLeafNodeCapacity() int {
availableSpace := PageSize - PageHeaderSize - 8 // 减去prev/next指针
// 每个键值对需要的空间:
// - 键长度:2字节
// - 值长度:2字节
// - 键数据:平均8字节
// - 值数据:平均64字节(假设)
keyValuePairSize := 2 + 2 + 8 + 64 // = 76字节
maxPairs := availableSpace / keyValuePairSize
return maxPairs // 约53个键值对
}
// 动态容量计算(考虑变长字段)
func (ln *LeafNode) calculateRealCapacity() int {
usedSpace := 0
for i := 0; i < ln.keyCount; i++ {
usedSpace += 4 // 长度字段
usedSpace += len(ln.keys[i])
usedSpace += len(ln.values[i])
}
availableSpace := PageSize - PageHeaderSize - 8 - usedSpace
return availableSpace
}
// 优化前:分散存储
type NodeV1 struct {
keys [][]byte // 指针数组,数据分散在堆上
values [][]byte // 指针数组,数据分散在堆上
}
// 优化后:紧凑存储
type NodeV2 struct {
data []byte // 紧凑的字节数组
keySlots []KeySlot // 键的位置信息
}
type KeySlot struct {
keyOffset uint16 // 键在data中的偏移
keyLength uint16 // 键的长度
valueOffset uint16 // 值在data中的偏移
valueLength uint16 // 值的长度
}
// 内存对齐优化
type AlignedNode struct {
// 将经常一起访问的字段放在一起
keyCount int // 4字节
order int // 4字节
// 指针字段放在一起(8字节对齐)
keys [][]byte // 24字节(切片头)
values [][]byte // 24字节(切片头)
// 小字段放在最后
nodeType uint8 // 1字节
_ [7]byte // 填充到8字节对齐
}
// 叶子节点中的二分查找
func (ln *LeafNode) binarySearch(key []byte) int {
left, right := 0, ln.keyCount
// 二分查找的核心循环
for left < right {
mid := (left + right) / 2
// 关键比较操作
cmp := bytes.Compare(key, ln.keys[mid])
if cmp <= 0 {
right = mid // key <= keys[mid],搜索左半部分
} else {
left = mid + 1 // key > keys[mid],搜索右半部分
}
}
return left // 返回插入位置
}
// 时间复杂度分析:
// - 最好情况:O(1) - 第一次比较就找到
// - 最坏情况:O(log n) - 需要log₂(n)次比较
// - 平均情况:O(log n)
func (in *InternalNode) findChildIndex(key []byte) int {
// 使用二分查找确定子节点索引
left, right := 0, in.keyCount
for left < right {
mid := (left + right) / 2
// 对于内部节点,比较逻辑略有不同
if bytes.Compare(key, in.keys[mid]) < 0 {
right = mid // key < keys[mid],走左子树
} else {
left = mid + 1 // key >= keys[mid],走右子树
}
}
return left // 返回子节点索引
}
// B+树查找的完整流程
func (tree *BPlusTree) searchPath(key []byte) ([]BTreeNode, []int) {
path := make([]BTreeNode, 0, tree.height)
indices := make([]int, 0, tree.height)
current := tree.root
// 从根节点向下查找
for current != nil {
path = append(path, current)
if current.IsLeaf() {
// 到达叶子节点,查找具体位置
index := current.(*LeafNode).binarySearch(key)
indices = append(indices, index)
break
} else {
// 内部节点,继续向下
index := current.(*InternalNode).findChildIndex(key)
indices = append(indices, index)
// 加载子节点(可能触发磁盘IO)
childPageID := current.(*InternalNode).children[index]
childNode, err := tree.pageManager.ReadPage(childPageID)
if err != nil {
return nil, nil
}
current = tree.deserializeNode(childNode)
}
}
return path, indices
}
func (ln *LeafNode) Insert(key []byte, value []byte) error {
// 1. 检查容量限制
if ln.keyCount >= ln.order {
return errors.New("叶子节点已满")
}
// 2. 找到插入位置
insertPos := ln.binarySearch(key)
// 3. 检查是否为重复键(更新操作)
if insertPos < ln.keyCount && bytes.Equal(ln.keys[insertPos], key) {
// 更新现有值
ln.values[insertPos] = make([]byte, len(value))
copy(ln.values[insertPos], value)
return nil
}
// 4. 扩展切片容量(如果需要)
if len(ln.keys) <= ln.keyCount {
ln.keys = append(ln.keys, nil)
ln.values = append(ln.values, nil)
}
// 5. 移动后续元素,为新元素腾出空间
// 这是插入操作中最耗时的部分:O(n)
copy(ln.keys[insertPos+1:], ln.keys[insertPos:ln.keyCount])
copy(ln.values[insertPos+1:], ln.values[insertPos:ln.keyCount])
// 6. 插入新的键值对
ln.keys[insertPos] = make([]byte, len(key))
copy(ln.keys[insertPos], key)
ln.values[insertPos] = make([]byte, len(value))
copy(ln.values[insertPos], value)
// 7. 更新计数
ln.keyCount++
return nil
}
// 性能分析:
// - 查找位置:O(log n) 二分查找
// - 移动元素:O(n) 最坏情况需要移动所有元素
// - 总体复杂度:O(n),其中n为节点中的键数量
func (tree *BPlusTree) Insert(key []byte, value []byte) error {
// 1. 查找插入路径
path, indices := tree.searchPath(key)
if len(path) == 0 {
return errors.New("查找路径为空")
}
// 2. 尝试在叶子节点插入
leafNode := path[len(path)-1].(*LeafNode)
err := leafNode.Insert(key, value)
if err == nil {
// 插入成功,无需分裂
tree.markDirty(leafNode)
return nil
}
// 3. 叶子节点已满,需要分裂
newKey, rightNode, err := tree.splitLeaf(leafNode, key, value)
if err != nil {
return err
}
// 4. 递归向上处理分裂
return tree.insertInternal(path[:len(path)-1], indices[:len(indices)-1], newKey, rightNode)
}
// 递归处理内部节点插入
func (tree *BPlusTree) insertInternal(path []BTreeNode, indices []int, key []byte, rightChild BTreeNode) error {
if len(path) == 0 {
// 到达根节点上方,需要创建新根
return tree.createNewRoot(key, tree.root, rightChild)
}
// 获取父节点
parent := path[len(path)-1].(*InternalNode)
insertIndex := indices[len(indices)-1]
// 尝试在父节点插入
err := parent.insertKeyAndChild(key, tree.getPageID(rightChild), insertIndex)
if err == nil {
// 插入成功
tree.markDirty(parent)
return nil
}
// 父节点也满了,需要继续分裂
newKey, newRightNode, err := tree.splitInternal(parent, key, rightChild, insertIndex)
if err != nil {
return err
}
// 继续向上递归
return tree.insertInternal(path[:len(path)-1], indices[:len(indices)-1], newKey, newRightNode)
}
func (ln *LeafNode) Delete(key []byte) error {
// 1. 查找要删除的键
index := ln.binarySearch(key)
// 2. 检查键是否存在
if index >= ln.keyCount || !bytes.Equal(ln.keys[index], key) {
return errors.New("键不存在")
}
// 3. 移动后续元素填补空缺
// 这也是O(n)操作
copy(ln.keys[index:], ln.keys[index+1:])
copy(ln.values[index:], ln.values[index+1:])
// 4. 清理最后一个位置(防止内存泄漏)
ln.keys[ln.keyCount-1] = nil
ln.values[ln.keyCount-1] = nil
// 5. 缩小切片(可选优化)
ln.keys = ln.keys[:ln.keyCount-1]
ln.values = ln.values[:ln.keyCount-1]
// 6. 更新计数
ln.keyCount--
return nil
}
func (tree *BPlusTree) mergeNodes(left, right *LeafNode, parentKey []byte) error {
// 1. 检查合并后的大小是否超过限制
if left.keyCount + right.keyCount > left.order {
return errors.New("合并后节点过大")
}
// 2. 将右节点的所有键值对移动到左节点
for i := 0; i < right.keyCount; i++ {
left.keys = append(left.keys, right.keys[i])
left.values = append(left.values, right.values[i])
}
left.keyCount += right.keyCount
// 3. 更新叶子节点链表指针
left.next = right.next
if right.next != 0 {
nextNode, err := tree.loadNode(right.next)
if err == nil {
if nextLeaf, ok := nextNode.(*LeafNode); ok {
nextLeaf.prev = tree.getPageID(left)
tree.markDirty(nextLeaf)
}
}
}
// 4. 释放右节点
tree.pageManager.FreePage(tree.getPageID(right))
// 5. 标记左节点为脏
tree.markDirty(left)
return nil
}
func (tree *BPlusTree) splitLeaf(node *LeafNode, key []byte, value []byte) ([]byte, *LeafNode, error) {
// 1. 创建新的右节点
rightNode := &LeafNode{
order: node.order,
keyCount: 0,
keys: make([][]byte, 0, node.order),
values: make([][]byte, 0, node.order),
next: node.next,
prev: tree.getPageID(node),
}
// 2. 分配新页面
rightPage, rightPageID, err := tree.pageManager.AllocatePage()
if err != nil {
return nil, nil, err
}
// 3. 创建临时数组存储所有键值对(包括新插入的)
allKeys := make([][]byte, 0, node.keyCount+1)
allValues := make([][]byte, 0, node.keyCount+1)
// 找到新键的插入位置
insertPos := node.binarySearch(key)
// 构建完整的有序数组
for i := 0; i < insertPos; i++ {
allKeys = append(allKeys, node.keys[i])
allValues = append(allValues, node.values[i])
}
allKeys = append(allKeys, key)
allValues = append(allValues, value)
for i := insertPos; i < node.keyCount; i++ {
allKeys = append(allKeys, node.keys[i])
allValues = append(allValues, node.values[i])
}
// 4. 计算分割点(通常是中点)
splitPoint := len(allKeys) / 2
// 5. 重新分配键值对
// 左节点保留前半部分
node.keys = node.keys[:0] // 清空但保持容量
node.values = node.values[:0]
node.keyCount = splitPoint
for i := 0; i < splitPoint; i++ {
node.keys = append(node.keys, allKeys[i])
node.values = append(node.values, allValues[i])
}
// 右节点获得后半部分
rightNode.keyCount = len(allKeys) - splitPoint
for i := splitPoint; i < len(allKeys); i++ {
rightNode.keys = append(rightNode.keys, allKeys[i])
rightNode.values = append(rightNode.values, allValues[i])
}
// 6. 更新链表指针
node.next = rightPageID
if rightNode.next != 0 {
nextNode, err := tree.loadNode(rightNode.next)
if err == nil {
if nextLeaf, ok := nextNode.(*LeafNode); ok {
nextLeaf.prev = rightPageID
tree.markDirty(nextLeaf)
}
}
}
// 7. 序列化右节点到页面
tree.serializeNode(rightNode, rightPage)
tree.markDirty(rightNode)
tree.markDirty(node)
// 8. 返回上升到父节点的键(右节点的第一个键)
upKey := make([]byte, len(rightNode.keys[0]))
copy(upKey, rightNode.keys[0])
return upKey, rightNode, nil
}
func (tree *BPlusTree) splitInternal(node *InternalNode, key []byte, rightChild BTreeNode, insertIndex int) ([]byte, *InternalNode, error) {
// 1. 创建右节点
rightNode := &InternalNode{
order: node.order,
keyCount: 0,
keys: make([][]byte, 0, node.order),
children: make([]uint32, 0, node.order+1),
}
rightPage, rightPageID, err := tree.pageManager.AllocatePage()
if err != nil {
return nil, nil, err
}
// 2. 构建完整的键和子节点数组
allKeys := make([][]byte, 0, node.keyCount+1)
allChildren := make([]uint32, 0, node.keyCount+2)
// 插入新键和子节点
for i := 0; i < insertIndex; i++ {
allKeys = append(allKeys, node.keys[i])
allChildren = append(allChildren, node.children[i])
}
allKeys = append(allKeys, key)
allChildren = append(allChildren, node.children[insertIndex])
allChildren = append(allChildren, tree.getPageID(rightChild))
for i := insertIndex; i < node.keyCount; i++ {
allKeys = append(allKeys, node.keys[i])
allChildren = append(allChildren, node.children[i+1])
}
// 3. 计算分割点
splitPoint := len(allKeys) / 2
upKey := allKeys[splitPoint] // 中间的键要上升到父节点
// 4. 重新分配
// 左节点:保留分割点之前的键
node.keys = node.keys[:0]
node.children = node.children[:0]
node.keyCount = splitPoint
for i := 0; i < splitPoint; i++ {
node.keys = append(node.keys, allKeys[i])
node.children = append(node.children, allChildren[i])
}
node.children = append(node.children, allChildren[splitPoint])
// 右节点:获得分割点之后的键
rightNode.keyCount = len(allKeys) - splitPoint - 1
for i := splitPoint + 1; i < len(allKeys); i++ {
rightNode.keys = append(rightNode.keys, allKeys[i])
}
for i := splitPoint + 1; i < len(allChildren); i++ {
rightNode.children = append(rightNode.children, allChildren[i])
}
// 5. 序列化和标记
tree.serializeNode(rightNode, rightPage)
tree.markDirty(rightNode)
tree.markDirty(node)
// 返回上升的键
upKeyCopy := make([]byte, len(upKey))
copy(upKeyCopy, upKey)
return upKeyCopy, rightNode, nil
}
func (tree *BPlusTree) shouldMerge(node BTreeNode) bool {
minKeys := node.GetKeyCount() / 2 // 最小填充度
if node.IsLeaf() {
return node.GetKeyCount() < minKeys
} else {
// 内部节点需要考虑子指针数量
return node.GetKeyCount() < minKeys
}
}
// 尝试重新分布或合并
func (tree *BPlusTree) handleUnderflow(path []BTreeNode, indices []int) error {
if len(path) == 0 {
return nil
}
node := path[len(path)-1]
if !tree.shouldMerge(node) {
return nil // 不需要合并
}
if len(path) == 1 {
// 根节点,特殊处理
return tree.handleRootUnderflow(node)
}
parent := path[len(path)-2].(*InternalNode)
nodeIndex := indices[len(indices)-2]
// 尝试从兄弟节点借用
if tree.tryBorrow(parent, nodeIndex, node) {
return nil
}
// 无法借用,进行合并
return tree.mergeWithSibling(parent, nodeIndex, node, path[:len(path)-1], indices[:len(indices)-1])
}
type DeferredSplit struct {
node BTreeNode
threshold float64 // 分裂阈值(例如0.9表示90%满时分裂)
}
func (tree *BPlusTree) shouldSplitEarly(node BTreeNode) bool {
utilization := float64(node.GetKeyCount()) / float64(tree.order)
return utilization >= tree.splitThreshold
}
func (tree *BPlusTree) BatchInsert(keyValues []KeyValue) error {
// 1. 对键值对进行排序
sort.Slice(keyValues, func(i, j int) bool {
return bytes.Compare(keyValues[i].Key, keyValues[j].Key) < 0
})
// 2. 找到插入范围
startLeaf, endLeaf := tree.findInsertRange(keyValues)
// 3. 批量插入到连续的叶子节点
return tree.batchInsertToLeaves(keyValues, startLeaf, endLeaf)
}
func (tree *BPlusTree) RangeQuery(startKey, endKey []byte, limit int) ([]KeyValue, error) {
results := make([]KeyValue, 0, limit)
// 1. 找到起始叶子节点
startPath, startIndices := tree.searchPath(startKey)
if len(startPath) == 0 {
return nil, errors.New("查找起始路径失败")
}
startLeaf := startPath[len(startPath)-1].(*LeafNode)
startIndex := startIndices[len(startIndices)-1]
// 2. 从起始位置开始遍历叶子节点链表
currentLeaf := startLeaf
currentIndex := startIndex
for currentLeaf != nil && len(results) < limit {
// 3. 遍历当前叶子节点
for currentIndex < currentLeaf.keyCount {
key := currentLeaf.keys[currentIndex]
// 4. 检查是否超出范围
if bytes.Compare(key, endKey) > 0 {
return results, nil
}
// 5. 添加到结果集
results = append(results, KeyValue{
Key: append([]byte(nil), key...), // 深拷贝
Value: append([]byte(nil), currentLeaf.values[currentIndex]...),
})
currentIndex++
if len(results) >= limit {
break
}
}
// 6. 移动到下一个叶子节点
if currentLeaf.next == 0 {
break
}
nextPage, err := tree.pageManager.ReadPage(currentLeaf.next)
if err != nil {
return results, err
}
currentLeaf = tree.deserializeLeafNode(nextPage)
currentIndex = 0 // 重置索引
}
return results, nil
}
// 性能测试函数
func (tree *BPlusTree) analyzeRangeQueryPerformance(startKey, endKey []byte) QueryStats {
stats := QueryStats{
StartTime: time.Now(),
}
// 计算预期的IO次数
startPath, _ := tree.searchPath(startKey)
stats.SeekIOCount = len(startPath) // 查找起始位置的IO次数
// 估算扫描的叶子节点数量
leafCapacity := tree.order // 每个叶子节点的平均容量
keyRange := estimateKeyRange(startKey, endKey)
estimatedLeafNodes := keyRange / leafCapacity
stats.ScanIOCount = int(estimatedLeafNodes)
// 总IO预估
stats.TotalIOCount = stats.SeekIOCount + stats.ScanIOCount
return stats
}
type QueryStats struct {
StartTime time.Time
SeekIOCount int // 定位IO次数
ScanIOCount int // 扫描IO次数
TotalIOCount int // 总IO次数
ResultCount int // 结果数量
}
// 为支持覆盖索引,扩展叶子节点结构
type CoveringLeafNode struct {
LeafNode
coveredColumns []string // 覆盖的列名
columnData [][]interface{} // 列数据
}
func (tree *BPlusTree) CoveringQuery(key []byte, requestedColumns []string) (map[string]interface{}, error) {
// 1. 检查索引是否覆盖所有请求的列
if !tree.indexCoversColumns(requestedColumns) {
return nil, errors.New("索引不覆盖所有请求的列")
}
// 2. 普通查找
path, indices := tree.searchPath(key)
if len(path) == 0 {
return nil, errors.New("查找失败")
}
leaf := path[len(path)-1].(*CoveringLeafNode)
index := indices[len(indices)-1]
if index >= leaf.keyCount || !bytes.Equal(leaf.keys[index], key) {
return nil, errors.New("键不存在")
}
// 3. 构建结果映射(无需回表)
result := make(map[string]interface{})
for i, column := range leaf.coveredColumns {
for _, requestedCol := range requestedColumns {
if column == requestedCol {
result[column] = leaf.columnData[index][i]
break
}
}
}
return result, nil
}
type ConcurrentBPlusTree struct {
*BPlusTree
// 树级别的读写锁
treeLock sync.RWMutex
// 节点级别的锁映射
nodeLocks map[uint32]*sync.RWMutex
locksMutex sync.RWMutex
// 锁池,避免频繁创建锁对象
lockPool sync.Pool
}
func (tree *ConcurrentBPlusTree) getLock(pageID uint32) *sync.RWMutex {
tree.locksMutex.RLock()
if lock, exists := tree.nodeLocks[pageID]; exists {
tree.locksMutex.RUnlock()
return lock
}
tree.locksMutex.RUnlock()
// 双重检查锁定模式
tree.locksMutex.Lock()
defer tree.locksMutex.Unlock()
if lock, exists := tree.nodeLocks[pageID]; exists {
return lock
}
// 从池中获取锁对象
lock := tree.lockPool.Get().(*sync.RWMutex)
tree.nodeLocks[pageID] = lock
return lock
}
func (tree *ConcurrentBPlusTree) ConcurrentInsert(key []byte, value []byte) error {
// 1. 先获取读锁进行查找
tree.treeLock.RLock()
path, indices := tree.searchPath(key)
tree.treeLock.RUnlock()
if len(path) == 0 {
return errors.New("查找路径为空")
}
// 2. 获取叶子节点的写锁
leafPageID := tree.getPageID(path[len(path)-1])
leafLock := tree.getLock(leafPageID)
leafLock.Lock()
defer leafLock.Unlock()
// 3. 尝试插入
leafNode := path[len(path)-1].(*LeafNode)
err := leafNode.Insert(key, value)
if err == nil {
// 插入成功,无需分裂
tree.markDirty(leafNode)
return nil
}
// 4. 需要分裂,升级到树级别写锁
leafLock.Unlock()
tree.treeLock.Lock()
defer tree.treeLock.Unlock()
// 5. 重新验证路径(可能已被其他线程修改)
path, indices = tree.searchPath(key)
// 6. 执行分裂逻辑
return tree.insertWithSplit(path, indices, key, value)
}
func (tree *ConcurrentBPlusTree) lockNodesInOrder(pageIDs []uint32) []*sync.RWMutex {
// 对页面ID排序,避免死锁
sort.Slice(pageIDs, func(i, j int) bool {
return pageIDs[i] < pageIDs[j]
})
locks := make([]*sync.RWMutex, len(pageIDs))
for i, pageID := range pageIDs {
locks[i] = tree.getLock(pageID)
locks[i].Lock()
}
return locks
}
func unlockNodes(locks []*sync.RWMutex) {
// 逆序解锁
for i := len(locks) - 1; i >= 0; i-- {
locks[i].Unlock()
}
}
func (tree *ConcurrentBPlusTree) TryLockWithTimeout(pageID uint32, timeout time.Duration) bool {
lock := tree.getLock(pageID)
done := make(chan bool, 1)
go func() {
lock.Lock()
done <- true
}()
select {
case <-done:
return true
case <-time.After(timeout):
return false
}
}
type BPlusTree struct {
root BTreeNode // 根节点
order int // B+树的阶数
pageManager *PageManager // 页面管理器
height int // 树的高度
// 统计信息
nodeCount uint64 // 节点总数
recordCount uint64 // 记录总数
}
// 构造函数的深度解析
func NewBPlusTree(order int, pageManager *PageManager) *BPlusTree {
// 1. 参数验证
if order < 3 {
// B+树的阶数至少为3,否则无法保证平衡性
order = 3
}
if pageManager == nil {
panic("页面管理器不能为nil")
}
// 2. 创建根节点(初始为空的叶子节点)
rootLeaf := &LeafNode{
order: order,
keyCount: 0,
keys: make([][]byte, 0, order),
values: make([][]byte, 0, order),
next: 0,
prev: 0,
}
// 3. 初始化B+树
tree := &BPlusTree{
root: rootLeaf,
order: order,
pageManager: pageManager,
height: 1, // 初始高度为1(只有根节点)
nodeCount: 1, // 初始只有一个节点
recordCount: 0, // 初始无记录
}
return tree
}
// BTreeNode接口定义了B+树节点的通用行为
type BTreeNode interface {
IsLeaf() bool // 判断是否为叶子节点
GetKeys() [][]byte // 获取所有键
GetKeyCount() int // 获取键的数量
IsFull() bool // 判断节点是否已满
Search(key []byte) ([]byte, bool) // 搜索键值对
Insert(key []byte, value []byte) error // 插入键值对
Delete(key []byte) error // 删除键值对
Split() (BTreeNode, []byte, error) // 分裂节点
Serialize() ([]byte, error) // 序列化节点
Deserialize(data []byte) error // 反序列化节点
}
type LeafNode struct {
order int // B+树的阶数
keyCount int // 当前键的数量
keys [][]byte // 键数组
values [][]byte // 值数组
next uint32 // 下一个叶子节点的页面ID
prev uint32 // 上一个叶子节点的页面ID
}
// IsLeaf实现 - 叶子节点标识
func (ln *LeafNode) IsLeaf() bool {
return true // 叶子节点总是返回true
}
// IsFull实现 - 容量检查
func (ln *LeafNode) IsFull() bool {
// 当键的数量达到阶数时,节点被认为是满的
return ln.keyCount >= ln.order
}
// GetKeys实现 - 获取有效键
func (ln *LeafNode) GetKeys() [][]byte {
// 只返回有效的键(keyCount数量的键)
return ln.keys[:ln.keyCount]
}
// Search实现 - 叶子节点搜索
func (ln *LeafNode) Search(key []byte) ([]byte, bool) {
// 1. 使用二分查找定位键
index := ln.binarySearch(key)
// 2. 检查是否找到精确匹配
if index < ln.keyCount && bytes.Equal(ln.keys[index], key) {
// 3. 返回值的副本(避免外部修改)
valueCopy := make([]byte, len(ln.values[index]))
copy(valueCopy, ln.values[index])
return valueCopy, true
}
// 4. 未找到
return nil, false
}
type InternalNode struct {
order int // B+树的阶数
keyCount int // 当前键的数量
keys [][]byte // 键数组(分割键)
children []uint32 // 子节点页面ID数组
}
// IsLeaf实现 - 内部节点标识
func (ln *InternalNode) IsLeaf() bool {
return false // 内部节点总是返回false
}
// Search实现 - 内部节点不直接存储数据
func (in *InternalNode) Search(key []byte) ([]byte, bool) {
// 内部节点不存储实际数据,这个方法不应该被调用
// 搜索应该通过findChildIndex找到子节点,然后在子节点中搜索
return nil, false
}
// findChildIndex - 内部节点的核心方法
func (in *InternalNode) findChildIndex(key []byte) int {
// 使用二分查找找到合适的子节点索引
left, right := 0, in.keyCount
for left < right {
mid := (left + right) / 2
// 关键比较逻辑:
// 如果key < keys[mid],则在左子树
// 如果key >= keys[mid],则在右子树
if bytes.Compare(key, in.keys[mid]) < 0 {
right = mid
} else {
left = mid + 1
}
}
return left // 返回子节点索引
}
func (ln *LeafNode) Serialize() ([]byte, error) {
// 1. 计算序列化后的总大小
totalSize := 16 // 头部信息:keyCount(4) + next(4) + prev(4) + reserved(4)
// 计算键值对数据的大小
for i := 0; i < ln.keyCount; i++ {
totalSize += 4 // keyLen(2) + valueLen(2)
totalSize += len(ln.keys[i]) // 键数据
totalSize += len(ln.values[i]) // 值数据
}
// 2. 分配缓冲区
buffer := make([]byte, totalSize)
offset := 0
// 3. 写入头部信息
binary.LittleEndian.PutUint32(buffer[offset:], uint32(ln.keyCount))
offset += 4
binary.LittleEndian.PutUint32(buffer[offset:], ln.next)
offset += 4
binary.LittleEndian.PutUint32(buffer[offset:], ln.prev)
offset += 4
binary.LittleEndian.PutUint32(buffer[offset:], 0) // 保留字段
offset += 4
// 4. 写入键值对数据
for i := 0; i < ln.keyCount; i++ {
// 写入键长度
binary.LittleEndian.PutUint16(buffer[offset:], uint16(len(ln.keys[i])))
offset += 2
// 写入值长度
binary.LittleEndian.PutUint16(buffer[offset:], uint16(len(ln.values[i])))
offset += 2
// 写入键数据
copy(buffer[offset:], ln.keys[i])
offset += len(ln.keys[i])
// 写入值数据
copy(buffer[offset:], ln.values[i])
offset += len(ln.values[i])
}
return buffer, nil
}
func (ln *LeafNode) Deserialize(data []byte) error {
// 1. 验证数据长度
if len(data) < 16 {
return errors.New("数据长度不足:缺少头部信息")
}
offset := 0
// 2. 读取头部信息
keyCount := binary.LittleEndian.Uint32(data[offset:])
offset += 4
ln.next = binary.LittleEndian.Uint32(data[offset:])
offset += 4
ln.prev = binary.LittleEndian.Uint32(data[offset:])
offset += 4
offset += 4 // 跳过保留字段
// 3. 初始化数组
ln.keyCount = int(keyCount)
ln.keys = make([][]byte, 0, ln.order)
ln.values = make([][]byte, 0, ln.order)
// 4. 读取键值对数据
for i := 0; i < ln.keyCount; i++ {
// 检查剩余数据长度
if offset+4 > len(data) {
return fmt.Errorf("数据不足:缺少第%d个键值对的长度信息", i)
}
// 读取长度信息
keyLen := binary.LittleEndian.Uint16(data[offset:])
offset += 2
valueLen := binary.LittleEndian.Uint16(data[offset:])
offset += 2
// 检查数据长度
totalLen := int(keyLen) + int(valueLen)
if offset+totalLen > len(data) {
return fmt.Errorf("数据不足:缺少第%d个键值对的实际数据", i)
}
// 读取键数据
key := make([]byte, keyLen)
copy(key, data[offset:offset+int(keyLen)])
offset += int(keyLen)
// 读取值数据
value := make([]byte, valueLen)
copy(value, data[offset:offset+int(valueLen)])
offset += int(valueLen)
// 添加到数组
ln.keys = append(ln.keys, key)
ln.values = append(ln.values, value)
}
return nil
}
func (ln *LeafNode) validateNode() error {
// 1. 检查键的数量是否在合理范围内
if ln.keyCount < 0 {
return fmt.Errorf("无效的键数量: %d (不能为负)", ln.keyCount)
}
if ln.keyCount > ln.order {
return fmt.Errorf("键数量超过限制: %d > %d", ln.keyCount, ln.order)
}
// 2. 检查数组长度一致性
if len(ln.keys) != len(ln.values) {
return fmt.Errorf("键数组长度(%d)与值数组长度(%d)不匹配",
len(ln.keys), len(ln.values))
}
// 3. 检查键是否有序
for i := 1; i < ln.keyCount; i++ {
if bytes.Compare(ln.keys[i-1], ln.keys[i]) >= 0 {
return fmt.Errorf("键在位置%d处无序: %v >= %v",
i, ln.keys[i-1], ln.keys[i])
}
}
// 4. 检查链表指针的合理性
if ln.next == ln.prev && ln.next != 0 {
return fmt.Errorf("无效的链表指针: next和prev不能相等且非零")
}
return nil
}
func (tree *BPlusTree) recoverFromCorruption(pageID uint32) error {
// 1. 尝试从备份页面恢复
backupPageID := tree.findBackupPage(pageID)
if backupPageID != 0 {
return tree.restoreFromBackup(pageID, backupPageID)
}
// 2. 尝试重建节点
if tree.canRebuildNode(pageID) {
return tree.rebuildCorruptedNode(pageID)
}
// 3. 标记节点为不可用
return tree.markNodeAsCorrupted(pageID)
}
// 操作复杂度分析
/*
设:
- n: 总记录数
- m: B+树的阶数
- h: 树的高度 = O(log_m n)
各操作的时间复杂度:
1. 搜索操作:
- 路径查找:O(h * log m) = O(log m * log_m n) = O(log n)
- 叶子节点内查找:O(log m)
- 总计:O(log n)
2. 插入操作:
- 查找插入位置:O(log n)
- 叶子节点插入:O(m) (最坏情况需要移动m个元素)
- 节点分裂:O(m) (需要重新分配m个元素)
- 向上传播分裂:O(h) = O(log_m n)
- 总计:O(log n + m),实际中m << log n,所以是O(log n)
3. 删除操作:
- 查找删除位置:O(log n)
- 叶子节点删除:O(m)
- 节点合并:O(m)
- 向上传播合并:O(log_m n)
- 总计:O(log n + m) ≈ O(log n)
4. 范围查询:
- 定位起始位置:O(log n)
- 扫描k个结果:O(k/m) (k个结果分布在k/m个叶子节点中)
- 总计:O(log n + k/m)
*/
// 空间复杂度分析
/*
1. 树结构空间:
- 叶子节点数:⌈n/m⌉ (n个记录,每个叶子节点最多m个记录)
- 内部节点数:⌈n/m²⌉ + ⌈n/m³⌉ + ... ≈ O(n/m)
- 总节点数:O(n/m)
- 每个节点空间:O(m)
- 总空间:O(n)
2. 内存缓存空间:
- 缓存节点数:min(cache_size, total_nodes)
- 每个缓存节点:O(m)
- 总缓存空间:O(cache_size * m)
3. 操作临时空间:
- 插入分裂:O(m) (临时数组)
- 查询路径:O(h) = O(log_m n)
- 范围查询结果:O(k) (k为结果数量)
*/
func BenchmarkBPlusTree(b *testing.B) {
// 创建测试环境
pm, _ := NewPageManager("bench.db", 1000)
defer pm.Close()
tree := NewBPlusTree(100, pm) // 100阶B+树
// 插入性能测试
b.Run("Insert", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key%010d", i))
value := []byte(fmt.Sprintf("value%010d", i))
tree.Insert(key, value)
}
})
// 查询性能测试
b.Run("Search", func(b *testing.B) {
// 预填充数据
for i := 0; i < 100000; i++ {
key := []byte(fmt.Sprintf("key%010d", i))
value := []byte(fmt.Sprintf("value%010d", i))
tree.Insert(key, value)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := []byte(fmt.Sprintf("key%010d", i%100000))
tree.Search(key)
}
})
// 范围查询性能测试
b.Run("RangeQuery", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
startKey := []byte(fmt.Sprintf("key%010d", i%90000))
endKey := []byte(fmt.Sprintf("key%010d", (i%90000)+1000))
tree.RangeQuery(startKey, endKey, 1000)
}
})
}
type PerformanceMonitor struct {
insertCount uint64
searchCount uint64
rangeQueryCount uint64
totalInsertTime time.Duration
totalSearchTime time.Duration
totalRangeTime time.Duration
pageIOCount uint64
cacheHitCount uint64
cacheMissCount uint64
mutex sync.RWMutex
}
func (pm *PerformanceMonitor) RecordInsert(duration time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.insertCount++
pm.totalInsertTime += duration
}
func (pm *PerformanceMonitor) GetStats() PerformanceStats {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
return PerformanceStats{
AvgInsertTime: pm.totalInsertTime / time.Duration(pm.insertCount),
AvgSearchTime: pm.totalSearchTime / time.Duration(pm.searchCount),
CacheHitRate: float64(pm.cacheHitCount) / float64(pm.cacheHitCount + pm.cacheMissCount),
IOPerSecond: float64(pm.pageIOCount) / pm.getTotalTimeSeconds(),
}
}
func (tree *BPlusTree) prefetchNextLeaves(currentLeaf *LeafNode, count int) {
go func() {
nextPageID := currentLeaf.next
for i := 0; i < count && nextPageID != 0; i++ {
// 异步预读下一个叶子节点
if _, err := tree.pageManager.ReadPage(nextPageID); err != nil {
break
}
// 获取下下个节点的页面ID
nextPage, err := tree.pageManager.ReadPage(nextPageID)
if err != nil {
break
}
nextLeaf := tree.deserializeLeafNode(nextPage)
nextPageID = nextLeaf.next
}
}()
}
func (tree *BPlusTree) BulkLoad(sortedData []KeyValue) error {
if !tree.isSorted(sortedData) {
return errors.New("数据必须预先排序")
}
// 1. 计算叶子节点数量
leafCapacity := tree.order
leafCount := (len(sortedData) + leafCapacity - 1) / leafCapacity
// 2. 批量创建叶子节点
leaves := make([]*LeafNode, leafCount)
for i := 0; i < leafCount; i++ {
startIdx := i * leafCapacity
endIdx := min(startIdx+leafCapacity, len(sortedData))
leaves[i] = tree.createLeafFromData(sortedData[startIdx:endIdx])
}
// 3. 构建叶子节点链表
for i := 0; i < len(leaves)-1; i++ {
leaves[i].next = tree.getPageID(leaves[i+1])
leaves[i+1].prev = tree.getPageID(leaves[i])
}
// 4. 自底向上构建内部节点
currentLevel := make([]BTreeNode, len(leaves))
for i, leaf := range leaves {
currentLevel[i] = leaf
}
for len(currentLevel) > 1 {
currentLevel = tree.buildNextLevel(currentLevel)
tree.height++
}
tree.root = currentLevel[0]
return nil
}
func (tree *BPlusTree) PrintTree() {
fmt.Println("B+Tree Structure:")
tree.printNode(tree.root, 0)
}
func (tree *BPlusTree) printNode(node BTreeNode, level int) {
indent := strings.Repeat(" ", level)
if node.IsLeaf() {
leaf := node.(*LeafNode)
fmt.Printf("%s[LEAF] Keys: %d, Values: %v\n",
indent, leaf.keyCount, tree.keysToStrings(leaf.keys[:leaf.keyCount]))
if leaf.next != 0 {
fmt.Printf("%s → Next: Page%d\n", indent, leaf.next)
}
if leaf.prev != 0 {
fmt.Printf("%s ← Prev: Page%d\n", indent, leaf.prev)
}
} else {
internal := node.(*InternalNode)
fmt.Printf("%s[INTERNAL] Keys: %d, Children: %d\n",
indent, internal.keyCount, len(internal.children))
fmt.Printf("%s Keys: %v\n",
indent, tree.keysToStrings(internal.keys[:internal.keyCount]))
// 递归打印子节点
for i, childPageID := range internal.children {
if childPage, err := tree.pageManager.ReadPage(childPageID); err == nil {
childNode := tree.deserializeNode(childPage)
fmt.Printf("%s Child[%d] (Page%d):\n", indent, i, childPageID)
tree.printNode(childNode, level+1)
}
}
}
}
func (tree *BPlusTree) ValidateTree() error {
errors := make([]string, 0)
// 1. 检查根节点
if tree.root == nil {
return fmt.Errorf("根节点不能为空")
}
// 2. 递归验证所有节点
if err := tree.validateNodeRecursive(tree.root, nil, nil, 0); err != nil {
errors = append(errors, err.Error())
}
// 3. 检查叶子节点链表
if err := tree.validateLeafChain(); err != nil {
errors = append(errors, err.Error())
}
// 4. 检查高度一致性
if calculatedHeight := tree.calculateHeight(); calculatedHeight != tree.height {
errors = append(errors, fmt.Sprintf(
"高度不一致:存储值=%d,计算值=%d", tree.height, calculatedHeight))
}
if len(errors) > 0 {
return fmt.Errorf("树验证失败:\n%s", strings.Join(errors, "\n"))
}
return nil
}
func (tree *BPlusTree) validateNodeRecursive(node BTreeNode, minKey, maxKey []byte, level int) error {
// 1. 验证节点内部一致性
if err := tree.validateSingleNode(node); err != nil {
return fmt.Errorf("节点验证失败(level %d): %w", level, err)
}
// 2. 验证键的范围
keys := node.GetKeys()
for _, key := range keys {
if minKey != nil && bytes.Compare(key, minKey) < 0 {
return fmt.Errorf("键 %v 小于最小值 %v", key, minKey)
}
if maxKey != nil && bytes.Compare(key, maxKey) >= 0 {
return fmt.Errorf("键 %v 大于等于最大值 %v", key, maxKey)
}
}
// 3. 如果是内部节点,递归验证子节点
if !node.IsLeaf() {
internal := node.(*InternalNode)
for i, childPageID := range internal.children {
childPage, err := tree.pageManager.ReadPage(childPageID)
if err != nil {
return fmt.Errorf("读取子节点失败: %w", err)
}
childNode := tree.deserializeNode(childPage)
// 确定子节点的键范围
var childMinKey, childMaxKey []byte
if i > 0 {
childMinKey = internal.keys[i-1]
} else {
childMinKey = minKey
}
if i < len(internal.keys) {
childMaxKey = internal.keys[i]
} else {
childMaxKey = maxKey
}
if err := tree.validateNodeRecursive(childNode, childMinKey, childMaxKey, level+1); err != nil {
return err
}
}
}
return nil
}
type DiagnosticReport struct {
TreeHeight int
NodeCount uint64
RecordCount uint64
AvgNodeUtilization float64
CacheHitRate float64
IOPatterns []IOPattern
HotSpots []KeyRange
}
func (tree *BPlusTree) GenerateDiagnosticReport() DiagnosticReport {
report := DiagnosticReport{}
// 1. 基本统计
report.TreeHeight = tree.height
report.NodeCount = tree.nodeCount
report.RecordCount = tree.recordCount
// 2. 节点利用率分析
totalSlots := 0
usedSlots := 0
tree.traverseForUtilization(tree.root, &totalSlots, &usedSlots)
report.AvgNodeUtilization = float64(usedSlots) / float64(totalSlots)
// 3. 缓存性能分析
cacheStats := tree.pageManager.GetCacheStats()
report.CacheHitRate = cacheStats.HitRate
// 4. IO模式分析
report.IOPatterns = tree.analyzeIOPatterns()
// 5. 热点分析
report.HotSpots = tree.detectHotSpots()
return report
}
func (tree *BPlusTree) RepairCorruption() error {
// 1. 扫描所有节点,找出损坏的节点
corruptedNodes := tree.scanForCorruption()
if len(corruptedNodes) == 0 {
return nil // 没有损坏
}
// 2. 尝试修复每个损坏的节点
for _, nodeID := range corruptedNodes {
if err := tree.repairSingleNode(nodeID); err != nil {
log.Printf("修复节点 %d 失败: %v", nodeID, err)
}
}
// 3. 重建树结构
return tree.rebuildTreeStructure()
}
func (tree *BPlusTree) rebuildTreeStructure() error {
// 1. 收集所有有效的叶子节点
validLeaves := tree.collectValidLeaves()
// 2. 重新排序叶子节点
sort.Slice(validLeaves, func(i, j int) bool {
if len(validLeaves[i].keys) == 0 || len(validLeaves[j].keys) == 0 {
return false
}
return bytes.Compare(validLeaves[i].keys[0], validLeaves[j].keys[0]) < 0
})
// 3. 重建叶子节点链表
for i := 0; i < len(validLeaves)-1; i++ {
validLeaves[i].next = tree.getPageID(validLeaves[i+1])
validLeaves[i+1].prev = tree.getPageID(validLeaves[i])
}
// 4. 重建内部节点
return tree.rebuildInternalNodes(validLeaves)
}
本文档详细解析了B+树索引结构与算法实现的各个方面:
这个实现虽然相比工业级数据库还有差距,但作为学习和原型开发已经具备了完整的功能和良好的性能表现。通过这份文档,读者可以深入理解B+树的设计原理、实现细节和优化策略,为进一步的数据库开发奠定坚实基础。