Coding-量化交易-qlib workflow

摘要

基于qlib的量化交易workflow框架实现,包括数据,因子,模型,回测部分。

Csv数据转化

convert_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 从scripts.dump_bin模块导入DumpDataAll类
from scripts.dump_bin import DumpDataAll

# 当脚本作为主程序运行时执行以下代码
if __name__ == '__main__':
# 创建DumpDataAll类的实例,用于数据导出
dump_util = DumpDataAll(
data_path="./mydata/five_stock", # 原始数据存放路径
qlib_dir="./my_qlib_dir/five_stock", # QLib格式数据的输出目录
# include_fields='open,high,low,close,volume,position', # 注释掉的字段配置
include_fields='volume,high,low,close,open', # 需要导出的字段列表
date_field_name="date", # 日期字段的名称
symbol_field_name="symbol", # 股票代码字段的名称
freq="day" # 数据频率,这里是日线数据
)
# 调用dump方法执行数据导出操作
dump_util.dump()

workflow主函数

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#  Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Qlib提供了两种类型的接口:
(1) 用户可以通过简单的配置定义量化研究工作流
(2) Qlib以模块化方式设计,支持像搭积木一样通过代码创建研究工作流

(1)的接口是`qrun XXX.yaml`,(2)的接口是类似本脚本的形式,其功能与`qrun XXX.yaml`几乎相同
"""
# 导入qlib库及相关组件
import qlib
from qlib.constant import REG_CN # 导入中国市场常量
from qlib.utils import init_instance_by_config, flatten_dict # 工具函数:通过配置初始化实例、字典扁平化
from qlib.workflow import R # 实验记录与管理组件
from qlib.workflow.record_temp import ( # 记录模板:用于生成各种分析记录
SignalRecord, # 信号记录
PortAnaRecord, # 组合分析记录
SigAnaRecord # 信号分析记录
)

# 从自定义配置文件导入基准指数和任务配置
from mycode.myconfig import CSI300_BENCH, CSI5_GBDT_TASK

if __name__ == "__main__":
# 数据存储路径配置
provider_uri = "./my_qlib_dir/five_stock" # 目标数据目录

# 初始化qlib
# 注:GetData().qlib_data(...)用于获取数据,这里假设数据已存在
qlib.init(provider_uri=provider_uri, region=REG_CN) # 初始化并指定数据路径和市场区域(中国)

# 根据配置初始化模型和数据集
model = init_instance_by_config(CSI5_GBDT_TASK["model"]) # 从任务配置中初始化模型
dataset = init_instance_by_config(CSI5_GBDT_TASK["dataset"]) # 从任务配置中初始化数据集

# 组合分析配置(回测相关设置)
port_analysis_config = {
"executor": { # 执行器配置:控制回测执行逻辑
"class": "SimulatorExecutor", # 模拟器执行器
"module_path": "qlib.backtest.executor", # 执行器所在模块
"kwargs": {
"time_per_step": "day", # 每步执行时间单位(按天)
"generate_portfolio_metrics": True, # 生成组合指标
},
},
"strategy": { # 策略配置:定义交易策略逻辑
"class": "TopkDropoutStrategy", # TopkDropout策略(选取排名前k的股票,剔除部分)
"module_path": "qlib.contrib.strategy.signal_strategy", # 策略所在模块
"kwargs": {
"signal": (model, dataset), # 信号源:使用模型和数据集生成信号
"topk": 50, # 选取排名前50的股票
"n_drop": 5, # 每次调仓剔除5只股票
},
},
"backtest": { # 回测配置:定义回测的基本参数
"start_time": "2019-11-28", # 回测开始时间
"end_time": "2020-09-23", # 回测结束时间
"account": 100000000, # 初始资金(1亿元)
"benchmark": CSI300_BENCH, # 基准指数(沪深300)
"exchange_kwargs": { # 交易所相关参数(手续费等)
"freq": "day", # 交易频率(按天)
"limit_threshold": 0.095, # 涨跌停阈值(9.5%)
"deal_price": "close", # 成交价格(收盘价)
"open_cost": 0.0005, # 开仓手续费率(0.05%)
"close_cost": 0.0015, # 平仓手续费率(0.15%)
"min_cost": 5, # 最低手续费(5元)
},
},
}

# 注意:以下代码是可选的
# 用于演示数据集可以单独使用
example_df = dataset.prepare("train") # 获取训练集数据
print(example_df.head()) # 打印训练集前5行

# 开始实验(使用R组件记录实验过程)
with R.start(experiment_name="workflow"): # 实验名称为"workflow"
# 记录参数:将任务配置扁平化后存入日志
R.log_params(**flatten_dict(CSI5_GBDT_TASK))
# 模型训练
model.fit(dataset)
# 保存模型参数
R.save_objects(** {"params.pkl": model})

# 预测阶段
recorder = R.get_recorder() # 获取记录器
sr = SignalRecord(model, dataset, recorder) # 创建信号记录器
sr.generate() # 生成信号记录

# 信号分析
sar = SigAnaRecord(recorder) # 创建信号分析记录器
sar.generate() # 生成信号分析结果

# 回测分析
# 注:如果用户想基于自己的预测结果进行回测,
# 请参考https://qlib.readthedocs.io/en/latest/component/recorder.html#record-template
par = PortAnaRecord(recorder, port_analysis_config, "day") # 创建组合分析记录器
par.generate() # 生成组合分析结果

配置函数

myconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# 导入qlib数据处理相关组件
from qlib.data.dataset.handler import DataHandlerLP # 数据处理基类
from qlib.data.dataset.loader import QlibDataLoader # 数据加载器基类

from qlib.data.dataset.processor import Processor # 数据处理器基类
from qlib.utils import get_callable_kwargs # 工具函数:获取可调用对象的参数
from qlib.data.dataset import processor as processor_module # 数据处理器模块
from inspect import getfullargspec # 用于获取函数参数信息


# 基准指数配置
CSI300_BENCH = "SH000300" # 沪深300指数代码
CSI5_MARKET = "all" # 市场范围(全部股票)
DATASET_A4_CLASS = "A4" # 数据集类名称


# 默认的学习阶段数据处理器配置
_DEFAULT_LEARN_PROCESSORS = [
{"class": "DropnaLabel"}, # 移除标签为空的数据
{"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, # 对标签进行Z-Score标准化
{"class": "Fillna"}, # 填充缺失值
]


def check_transform_proc(proc_l, fit_start_time, fit_end_time):
"""
检查并处理数据处理器配置,为需要的处理器添加拟合时间范围

参数:
proc_l: 处理器配置列表
fit_start_time: 拟合开始时间
fit_end_time: 拟合结束时间

返回:
处理后的处理器配置列表
"""
new_l = []
for p in proc_l:
if not isinstance(p, Processor):
# 获取处理器类和参数
klass, pkwargs = get_callable_kwargs(p, processor_module)
# 获取类的参数列表
args = getfullargspec(klass).args
# 如果处理器需要拟合时间范围参数,则添加
if "fit_start_time" in args and "fit_end_time" in args:
# 确保拟合时间范围已设置
assert (
fit_start_time is not None and fit_end_time is not None
), "Make sure `fit_start_time` and `fit_end_time` are not None."
pkwargs.update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
# 构建处理器配置
proc_config = {"class": klass.__name__, "kwargs": pkwargs}
# 如果原始配置包含模块路径,则保留
if isinstance(p, dict) and "module_path" in p:
proc_config["module_path"] = p["module_path"]
new_l.append(proc_config)
else:
new_l.append(p)
return new_l


class A4DL(QlibDataLoader):
"""
自定义数据加载器:用于获取指定因子(MACD、成交量、价格波动范围、3日均值)
"""

def __init__(self, config=None, **kwargs):
"""初始化数据加载器"""
_config = {
"feature": self.get_feature_config(), # 设置特征配置
}
if config is not None:
_config.update(config)
super().__init__(config=_config,** kwargs)

@staticmethod
def get_feature_config(config={}):
"""
创建指定的因子配置:MACD、成交量、价格波动范围、3日均值

参数:
config: 额外配置(预留)

返回:
特征表达式和名称的元组
"""
# 1. 定义MACD因子
# MACD计算公式:(EMA(12)-EMA(26))/收盘价 - EMA((EMA(12)-EMA(26))/收盘价, 9)/收盘价
macd_exp = '(EMA($close, 12) - EMA($close, 26))/$close - EMA((EMA($close, 12) - EMA($close, 26))/$close, 9)/$close'

# 2. 定义其他特征
volume = '$volume' # 成交量
price_range = '$high-$low' # 最高价-最低价(价格波动范围)
ma3 = 'Mean($close, 3)' # 3日收盘价均值

# 组合特征字段和名称
fields = [macd_exp, volume, price_range, ma3]
names = ['MACD', 'VOLUME', 'RANGE', 'MA3']

return fields, names


class A4(DataHandlerLP):
"""
自定义数据处理器类,继承自DataHandlerLP
用于处理数据加载、特征工程和标签生成
"""
def __init__(
self,
instruments="all", # 股票池
start_time=None, # 数据开始时间
end_time=None, # 数据结束时间
freq="day", # 数据频率(按天)
infer_processors=[], # 推理阶段数据处理器
learn_processors=_DEFAULT_LEARN_PROCESSORS, # 学习阶段数据处理器
fit_start_time=None, # 处理器拟合开始时间
fit_end_time=None, # 处理器拟合结束时间
process_type=DataHandlerLP.PTYPE_A, # 处理类型
filter_pipe=None, # 过滤管道
inst_processors=None, # 股票处理器
**kwargs,
):
# 检查并处理数据处理器配置
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)

# 数据加载器配置
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": {
"feature": self.get_feature_config(), # 特征配置
"label": kwargs.pop("label", self.get_label_config()), # 标签配置(默认使用内置标签)
},
"filter_pipe": filter_pipe, # 过滤管道
"freq": freq, # 数据频率
"inst_processors": inst_processors, # 股票处理器
},
}
# 调用父类构造函数初始化
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
process_type=process_type,** kwargs,
)

def get_feature_config(self):
"""获取特征配置(复用A4DL的数据加载器)"""
return A4DL.get_feature_config()

def get_label_config(self):
"""
获取标签配置
这里定义的标签是:下一期的收盘价(用于预测)
"""
return ["Ref($close, -1)"], ["LABEL0"] # 表达式和标签名称


def get_data_handler_config(
start_time="2019-11-28", # 数据开始时间
end_time="2020-09-23", # 数据结束时间
fit_start_time="<dataset.kwargs.segments.train.0>", # 拟合开始时间(引用训练集开始时间)
fit_end_time="<dataset.kwargs.segments.train.1>", # 拟合结束时间(引用训练集结束时间)
instruments=CSI5_MARKET, # 股票池
):
"""
获取数据处理器配置

返回:
数据处理器配置字典
"""
return {
"start_time": start_time,
"end_time": end_time,
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
"instruments": instruments,
}


def get_dataset_config(
dataset_class=DATASET_A4_CLASS, # 数据集类名称
train=("2019-11-28", "2020-07-23"), # 训练集时间范围
valid=("2020-07-24", "2020-08-23"), # 验证集时间范围
test=("2020-08-24", "2020-09-23"), # 测试集时间范围
handler_kwargs={"instruments": CSI5_MARKET}, # 数据处理器参数
):
"""
获取数据集配置

返回:
数据集配置字典
"""
return {
"class": "DatasetH", # qlib自带的DatasetH类(用于包装数据处理器)
"module_path": "qlib.data.dataset", # DatasetH所在模块
"kwargs": {
"handler": { # 数据处理器配置
"class": dataset_class, # 自定义数据处理器类名
"module_path": "mycode.myconfig", # 自定义数据处理器所在模块
"kwargs": get_data_handler_config(**handler_kwargs), # 数据处理器参数
},
"segments": { # 数据集分割(训练/验证/测试)
"train": train,
"valid": valid,
"test": test,
},
},
}


# GBDT模型配置(使用LightGBM)
GBDT_MODEL = {
"class": "LGBModel", # qlib中的LGBModel类
"module_path": "qlib.contrib.model.gbdt", # LGBModel所在模块
"kwargs": {
"loss": "mse", # 损失函数(均方误差)
"colsample_bytree": 0.8879, # 特征采样比例
"learning_rate": 0.0421, # 学习率
"subsample": 0.8789, # 样本采样比例
"lambda_l1": 205.6999, # L1正则化系数
"lambda_l2": 580.9768, # L2正则化系数
"max_depth": 8, # 树的最大深度
"num_leaves": 210, # 叶子节点数量
"num_threads": 20, # 线程数量
},
}


def get_gbdt_task(dataset_kwargs={}, handler_kwargs={"instruments": CSI5_MARKET}):
"""
获取GBDT任务配置(模型+数据集)

参数:
dataset_kwargs: 数据集额外参数
handler_kwargs: 数据处理器额外参数

返回:
任务配置字典
"""
return {
"model": GBDT_MODEL, # 模型配置
"dataset": get_dataset_config(** dataset_kwargs, handler_kwargs=handler_kwargs), # 数据集配置
}


# 最终的CSI5 GBDT任务配置
CSI5_GBDT_TASK = get_gbdt_task(handler_kwargs={"instruments": CSI5_MARKET})