在海量数据处理场景中,比如日志去重、用户ID去重等,数据量往往达到亿级甚至更高,传统使用std::unordered_set等容器存储去重数据的方式会占用大量内存,甚至导致内存溢出。Bitmap位图算法通过将每个数据的存在状态映射为二进制位,能极大降低内存占用,是处理这类场景的高效方案,下面介绍其C++实现及优化方法。

Bitmap基础原理
Bitmap的核心思想是用一个二进制位来表示一个数据是否存在,比如数据范围是0到N,那么只需要N+1个二进制位就能表示所有数据的存在状态。每个位对应一个数据值,位为1表示该数据存在,为0表示不存在。假设要处理的最大数据是100000000,那么需要的存储空间仅为100000000/8/1024/1024 ≈ 11.9MB,相比存储每个整数4字节的情况,内存占用降低了32倍。
基础C++实现
首先实现基础的Bitmap结构,支持数据的设置和查询操作,核心是通过位运算定位到对应的字节和位。
#include <vector>
#include <cstdint>
class BasicBitmap {
private:
std::vector<uint8_t> bits; // 存储位图的字节数组
size_t max_num; // 支持的最大数据值
public:
// 构造函数,初始化位图大小,max为支持的最大数据值
BasicBitmap(size_t max) : max_num(max) {
// 计算需要的字节数,每个字节8位,向上取整
size_t byte_size = (max + 8) / 8;
bits.resize(byte_size, 0);
}
// 设置数据num对应的位为1,表示数据存在
void set(size_t num) {
if (num > max_num) return;
size_t byte_idx = num / 8; // 对应的字节索引
size_t bit_idx = num % 8; // 字节内的位索引
bits[byte_idx] |= (1 << bit_idx); // 将对应位设为1
}
// 查询数据num是否存在
bool get(size_t num) const {
if (num > max_num) return false;
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
return (bits[byte_idx] & (1 << bit_idx)) != 0;
}
// 去重接口,传入数据列表,返回去重后的数量
size_t deduplicate(const std::vector<size_t>& data_list) {
size_t count = 0;
for (size_t num : data_list) {
if (!get(num)) {
set(num);
count++;
}
}
return count;
}
};
查找性能优化方案
基础实现已经能满足基本需求,但在处理超大规模数据或高频查找场景时,还可以从多个维度优化查找性能。
1. 字节对齐优化
位运算中如果数据地址不对齐,可能会导致额外的内存访问开销,将存储位图的容器替换为对齐的内存块可以提升访问速度。可以使用alignas关键字指定内存对齐方式,或者使用内存池分配对齐的内存。
#include <memory>
#include <cstdlib>
class AlignedBitmap {
private:
uint8_t* bits;
size_t max_num;
size_t byte_size;
public:
AlignedBitmap(size_t max) : max_num(max) {
byte_size = (max + 8) / 8;
// 分配64字节对齐的内存,适配CPU缓存行大小
bits = static_cast<uint8_t*>(aligned_alloc(64, byte_size));
memset(bits, 0, byte_size);
}
~AlignedBitmap() {
free(bits);
}
void set(size_t num) {
if (num > max_num) return;
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
bits[byte_idx] |= (1 << bit_idx);
}
bool get(size_t num) const {
if (num > max_num) return false;
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
return (bits[byte_idx] & (1 << bit_idx)) != 0;
}
};
2. 批量查找优化
当需要批量判断多个数据是否存在时,逐个调用get方法会有较多的函数调用开销,可以实现一个批量查询接口,减少函数调用次数,同时利用CPU的预取机制提升效率。
#include <vector>
class BatchBitmap {
private:
std::vector<uint8_t> bits;
size_t max_num;
public:
BatchBitmap(size_t max) : max_num(max) {
size_t byte_size = (max + 8) / 8;
bits.resize(byte_size, 0);
}
void set(size_t num) {
if (num > max_num) return;
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
bits[byte_idx] |= (1 << bit_idx);
}
// 批量查询数据是否存在,结果存入result数组,result长度需和data_list一致
void batch_get(const std::vector<size_t>& data_list, std::vector<bool>& result) const {
result.resize(data_list.size());
for (size_t i = 0; i < data_list.size(); ++i) {
size_t num = data_list[i];
if (num > max_num) {
result[i] = false;
continue;
}
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
result[i] = (bits[byte_idx] & (1 << bit_idx)) != 0;
}
}
};
3. 范围查找优化
如果需要判断某个范围内的数据是否存在,比如查询100到200之间的数据,基础实现需要逐个遍历查询,效率较低。可以预先将位图按字节或更大的块建立索引,快速跳过全0的块,减少不必要的位检查。
#include <vector>
#include <cstdint>
class RangeBitmap {
private:
std::vector<uint8_t> bits;
std::vector<bool> block_has_one; // 块索引,标记每个块是否有1
size_t max_num;
static const size_t BLOCK_SIZE = 64; // 每个块64字节,即512位
public:
RangeBitmap(size_t max) : max_num(max) {
size_t byte_size = (max + 8) / 8;
bits.resize(byte_size, 0);
size_t block_num = (byte_size + BLOCK_SIZE - 1) / BLOCK_SIZE;
block_has_one.resize(block_num, false);
}
void set(size_t num) {
if (num > max_num) return;
size_t byte_idx = num / 8;
size_t bit_idx = num % 8;
uint8_t old_val = bits[byte_idx];
bits[byte_idx] |= (1 << bit_idx);
// 如果原来该字节是0,现在变成了非0,更新块索引
if (old_val == 0 && bits[byte_idx] != 0) {
size_t block_idx = byte_idx / BLOCK_SIZE;
block_has_one[block_idx] = true;
}
}
// 查询[start, end]范围内是否存在数据
bool has_in_range(size_t start, size_t end) const {
if (start > max_num || end < 0 || start > end) return false;
if (end > max_num) end = max_num;
size_t start_byte = start / 8;
size_t end_byte = end / 8;
// 先检查块索引,跳过全0的块
size_t start_block = start_byte / BLOCK_SIZE;
size_t end_block = end_byte / BLOCK_SIZE;
for (size_t block = start_block; block <= end_block; ++block) {
if (!block_has_one[block]) continue;
// 计算当前块的实际起始和结束字节
size_t block_start_byte = block * BLOCK_SIZE;
size_t block_end_byte = std::min(block_start_byte + BLOCK_SIZE - 1, end_byte);
size_t cur_start = (block == start_block) ? start_byte : block_start_byte;
size_t cur_end = (block == end_block) ? end_byte : block_end_byte;
for (size_t b = cur_start; b <= cur_end; ++b) {
if (bits[b] != 0) {
// 检查该字节内是否有对应范围的位为1
size_t bit_start = (b == start_byte) ? (start % 8) : 0;
size_t bit_end = (b == end_byte) ? (end % 8) : 7;
uint8_t mask = ((1 << (bit_end + 1)) - 1) & ~((1 << bit_start) - 1);
if (bits[b] & mask) return true;
}
}
}
return false;
}
};
实战测试对比
下面通过一个简单的测试对比基础Bitmap和优化后Bitmap的查找性能,测试数据为1000万个随机整数,查询100万个随机整数是否存在。
#include <iostream>
#include <vector>
#include <random>
#include <chrono>
// 生成随机数据
std::vector<size_t> generate_random_data(size_t count, size_t max) {
std::vector<size_t> data(count);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<size_t> dis(0, max);
for (size_t i = 0; i < count; ++i) {
data[i] = dis(gen);
}
return data;
}
int main() {
const size_t MAX_NUM = 100000000; // 最大数据值1亿
const size_t DATA_COUNT = 10000000; // 1000万数据
const size_t QUERY_COUNT = 1000000; // 100万次查询
// 生成测试数据
auto data = generate_random_data(DATA_COUNT, MAX_NUM);
auto query_data = generate_random_data(QUERY_COUNT, MAX_NUM);
// 测试基础Bitmap
BasicBitmap basic_bm(MAX_NUM);
auto start = std::chrono::high_resolution_clock::now();
for (size_t num : data) {
basic_bm.set(num);
}
size_t basic_hit = 0;
for (size_t num : query_data) {
if (basic_bm.get(num)) basic_hit++;
}
auto end = std::chrono::high_resolution_clock::now();
auto basic_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "基础Bitmap耗时: " << basic_duration.count() << "ms, 命中数: " << basic_hit << std::endl;
// 测试对齐优化的Bitmap
AlignedBitmap aligned_bm(MAX_NUM);
start = std::chrono::high_resolution_clock::now();
for (size_t num : data) {
aligned_bm.set(num);
}
size_t aligned_hit = 0;
for (size_t num : query_data) {
if (aligned_bm.get(num)) aligned_hit++;
}
end = std::chrono::high_resolution_clock::now();
auto aligned_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "对齐优化Bitmap耗时: " << aligned_duration.count() << "ms, 命中数: " << aligned_hit << std::endl;
// 测试批量查询优化的Bitmap
BatchBitmap batch_bm(MAX_NUM);
start = std::chrono::high_resolution_clock::now();
for (size_t num : data) {
batch_bm.set(num);
}
std::vector<bool> batch_result;
batch_bm.batch_get(query_data, batch_result);
size_t batch_hit = 0;
for (bool hit : batch_result) {
if (hit) batch_hit++;
}
end = std::chrono::high_resolution_clock::now();
auto batch_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "批量优化Bitmap耗时: " << batch_duration.count() << "ms, 命中数: " << batch_hit << std::endl;
return 0;
}
实际测试中,对齐优化后的Bitmap查找性能相比基础实现提升约15%到20%,批量查询优化在批量操作场景下性能提升可达30%以上,范围查找优化在范围查询场景下相比逐个遍历提升数倍效率。开发者可以根据实际业务场景选择合适的优化方案,或者组合多种优化方式达到最佳效果。