BaoStock数据缓存系统 - 本地多级缓存机制
- 多级缓存:内存缓存 + SQLite数据库 + 本地文件
- API兼容:与BaoStock API完全兼容
- 性能优化:显著提高数据获取速度
- 错误处理:自动从缓存恢复,提高系统稳定性
- 批量处理:支持多线程并发请求
- 监控指标:实时监控系统性能
- 配置灵活:支持自定义缓存大小和过期时间
# 克隆仓库
git clone <repository-url>
cd baostock_cache_system
# 安装依赖
pip install -r requirements.txt
# 安装包
pip install -e .pip install baostock_cache_systemfrom baostock_cache_system import BaoStockCacheSystem
# 初始化缓存系统
bs = BaoStockCacheSystem(
data_root="e:/code/baostock_data", # 数据存储目录
cache_size=1000, # 内存缓存大小
max_workers=4 # 最大工作线程数
)
# 登录BaoStock
lg = bs.login()
print(lg.error_code, lg.error_msg)
# 获取股票日线数据(带缓存)
rs = bs.query_history_k_data_plus(
"sh.600000",
"date,code,open,high,low,close,volume",
start_date="2025-01-01",
end_date="2025-01-31",
frequency="d",
adjustflag="3"
)
# 处理数据
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
print(df)
# 登出
bs.logout()import baostock as bs
from baostock_cache_system import patch_baostock
# 自动替换BaoStock函数
patch_baostock(data_root="e:/code/baostock_data")
# 原有代码无需修改
rs = bs.query_history_k_data_plus(
"sh.600000",
"date,code,open,high,low,close,volume",
start_date="2025-01-01",
end_date="2025-01-31"
)
# 处理数据...创建 config.yaml 文件来自定义配置:
baostock_cache:
data_root: "e:/code/baostock_data"
cache_size: 1000
max_workers: 4
enable_compression: true
api_timeout: 30
retry_times: 3
# 缓存过期时间(天)
cache_expiry:
daily: 1 # 日线数据1天
weekly: 7 # 周线数据7天
monthly: 30 # 月线数据30天
minutely: 0.04 # 分钟数据1小时
basic: 30 # 基本信息30天
# 日志配置
logging:
level: "INFO"
file: "logs/baostock_cache.log"| 数据类型 | API接口 | 缓存策略 | 过期时间 |
|---|---|---|---|
| 日线数据 | query_history_k_data_plus (frequency="d") | 三级缓存 | 1天 |
| 周线数据 | query_history_k_data_plus (frequency="w") | 三级缓存 | 7天 |
| 月线数据 | query_history_k_data_plus (frequency="m") | 三级缓存 | 30天 |
| 5分钟数据 | query_history_k_data_plus (frequency="5") | 三级缓存 | 1小时 |
| 15分钟数据 | query_history_k_data_plus (frequency="15") | 三级缓存 | 1小时 |
| 30分钟数据 | query_history_k_data_plus (frequency="30") | 三级缓存 | 1小时 |
| 60分钟数据 | query_history_k_data_plus (frequency="60") | 三级缓存 | 1小时 |
| 股票基本信息 | query_stock_basic | 三级缓存 | 30天 |
| 指数数据 | query_index_k_data_plus | 三级缓存 | 1天 |
| 行业数据 | query_industry_k_data_plus | 三级缓存 | 1天 |
| 交易日历 | query_trade_dates | 三级缓存 | 30天 |
| 测试场景 | 无缓存 | 有缓存 | 提升倍数 |
|---|---|---|---|
| 首次获取日线数据 | 5-10秒 | 5-10秒 | 1x |
| 重复获取日线数据 | 5-10秒 | 0.1-0.5秒 | 10-20x |
| 首次获取分钟数据 | 10-15秒 | 10-15秒 | 1x |
| 重复获取分钟数据 | 10-15秒 | 0.1-0.5秒 | 20-30x |
| 缓存命中率 | 0% | >90% | - |
from baostock_cache_system import BaoStockCacheSystem
import concurrent.futures
bs = BaoStockCacheSystem()
bs.login()
# 批量获取多只股票
symbols = ["sh.600000", "sh.600519", "sz.000001"]
results = {}
def fetch_stock(symbol):
rs = bs.query_history_k_data_plus(
symbol, "date,code,open,high,low,close,volume",
start_date="2025-01-01", end_date="2025-01-31"
)
data = []
while rs.next():
data.append(rs.get_row_data())
return symbol, data
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(fetch_stock, s): s for s in symbols}
for future in concurrent.futures.as_completed(futures):
symbol, data = future.result()
results[symbol] = data
print(results)
bs.logout()from baostock_cache_system import BaoStockCacheSystem
import time
bs = BaoStockCacheSystem()
bs.login()
# 测试性能
symbol = "sh.600000"
# 首次获取
start = time.time()
rs1 = bs.query_history_k_data_plus(
symbol, "date,code,open,high,low,close,volume",
start_date="2025-01-01", end_date="2025-01-31"
)
time1 = time.time() - start
print(f"首次获取: {time1:.2f}秒")
# 重复获取(缓存)
start = time.time()
rs2 = bs.query_history_k_data_plus(
symbol, "date,code,open,high,low,close,volume",
start_date="2025-01-01", end_date="2025-01-31"
)
time2 = time.time() - start
print(f"缓存获取: {time2:.2f}秒")
print(f"性能提升: {time1/time2:.2f}倍")
# 获取性能指标
metrics = bs.get_performance_metrics()
print(metrics)
bs.logout()- 内存缓存:使用OrderedDict实现LRU缓存
- 数据库缓存:SQLite数据库持久化存储
- 文件缓存:本地文件大容量存储
- 缓存管理:自动处理缓存过期和淘汰
- K线数据:支持日线、周线、月线、分钟线
- 基本信息:股票基本信息和公司资料
- API兼容:与BaoStock API完全兼容
- 指数数据:大盘和行业指数
- 行业数据:行业板块数据
- 交易日历:交易日和非交易日信息
- 日期处理:日期格式化和范围计算
- 数据转换:数值转换和指标计算
- 数据处理:重采样和分割
- 系统配置:缓存大小和过期时间
- 环境变量:支持环境变量配置
- 配置文件:YAML配置文件支持
# 获取性能指标
metrics = bs.get_performance_metrics()
print(metrics)
# 输出示例
{
'cache_hits': 100,
'cache_misses': 10,
'api_calls': 10,
'source_timing': {},
'request_counts': {}
}# 清空缓存
bs.clear_cache()
# 检查缓存大小
print(len(bs.memory_cache))日志文件位于 logs/baostock_cache.log,包含:
- 系统初始化信息
- 数据获取记录
- 缓存操作记录
- 错误和异常信息
- 性能指标
原因:数据访问模式随机,缓存未预热
解决方案:
- 增加缓存大小
- 预热缓存(提前获取常用数据)
- 优化数据访问模式
原因:缓存文件过多或过大
解决方案:
- 增加磁盘空间
- 减少缓存过期时间
- 定期清理缓存
原因:内存缓存过大
解决方案:
- 减少缓存大小
- 增加max_workers
- 定期清空缓存
原因:网络问题或API限制
解决方案:
- 检查网络连接
- 增加重试次数
- 优化请求频率
┌───────────────────────────────────────────────────────────────┐
│ BaoStock Cache System │
├───────────────────────────────────────────────────────────────┤
│ ┌────────────────────┐ ┌────────────────────┐ ┌───────────┐
│ │ Memory Cache │ │ SQLite Database │ │ Local │
│ │ (LRU Cache) │ │ (Persistent) │ │ Files │
│ └────────────────────┘ └────────────────────┘ └───────────┘
│ │ │ │
├──────────┼───────────────────────┼───────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Core Engine │ │
│ │ - 缓存管理 - 数据获取 - 错误处理 - 性能监控 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
├──────────────────────────┼─────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ API Interface │ │
│ │ - 与BaoStock API完全兼容 │ │
│ └─────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────┘
MIT License
欢迎提交Issue和Pull Request!
- 作者:Yang Feng
- 邮箱:yangfeng0537@163.com
- 版本:1.0.0