|
|
import osimport pickleimport numpy as npimport pandas as pdimport qlibfrom qlib.config import REG_CNfrom qlib.data import Dfrom qlib.data.dataset.loader import QlibDataLoaderfrom tqdm import trange
from config import Config
class QlibDataPreprocessor: """
A class to handle the loading, processing, and splitting of Qlib financial data. """
def __init__(self): """Initializes the preprocessor with configuration and data fields.""" self.config = Config() self.data_fields = ['open', 'close', 'high', 'low', 'volume', 'vwap'] self.data = {} # A dictionary to store processed data for each symbol.
def initialize_qlib(self): """Initializes the Qlib environment.""" print("Initializing Qlib...") qlib.init(provider_uri=self.config.qlib_data_path, region=REG_CN)
def load_qlib_data(self): """
Loads raw data from Qlib, processes it symbol by symbol, and stores it in the `self.data` attribute. """
print("Loading and processing data from Qlib...") data_fields_qlib = ['$' + f for f in self.data_fields] cal: np.ndarray = D.calendar()
# Determine the actual start and end times to load, including buffer for lookback and predict windows. start_index = cal.searchsorted(pd.Timestamp(self.config.dataset_begin_time)) end_index = cal.searchsorted(pd.Timestamp(self.config.dataset_end_time))
# Check if start_index lookbackw_window will cause negative index adjusted_start_index = max(start_index - self.config.lookback_window, 0) real_start_time = cal[adjusted_start_index]
# Check if end_index exceeds the range of the array if end_index >= len(cal): end_index = len(cal) - 1 elif cal[end_index] != pd.Timestamp(self.config.dataset_end_time): end_index -= 1
# Check if end_index+predictw_window will exceed the range of the array adjusted_end_index = min(end_index + self.config.predict_window, len(cal) - 1) real_end_time = cal[adjusted_end_index]
# Load data using Qlib's data loader. data_df = QlibDataLoader(config=data_fields_qlib).load( self.config.instrument, real_start_time, real_end_time ) data_df = data_df.stack().unstack(level=1) # Reshape for easier access.
symbol_list = list(data_df.columns) for i in trange(len(symbol_list), desc="Processing Symbols"): symbol = symbol_list[i] symbol_df = data_df[symbol]
# Pivot the table to have features as columns and datetime as index. symbol_df = symbol_df.reset_index().rename(columns={'level_1': 'field'}) symbol_df = pd.pivot(symbol_df, index='datetime', columns='field', values=symbol) symbol_df = symbol_df.rename(columns={f'${field}': field for field in self.data_fields})
# Calculate amount and select final features. symbol_df['vol'] = symbol_df['volume'] symbol_df['amt'] = (symbol_df['open'] + symbol_df['high'] + symbol_df['low'] + symbol_df['close']) / 4 * symbol_df['vol'] symbol_df = symbol_df[self.config.feature_list]
# Filter out symbols with insufficient data. symbol_df = symbol_df.dropna() if len(symbol_df) < self.config.lookback_window + self.config.predict_window + 1: continue
self.data[symbol] = symbol_df
def prepare_dataset(self): """
Splits the loaded data into train, validation, and test sets and saves them to disk. """
print("Splitting data into train, validation, and test sets...") train_data, val_data, test_data = {}, {}, {}
symbol_list = list(self.data.keys()) for i in trange(len(symbol_list), desc="Preparing Datasets"): symbol = symbol_list[i] symbol_df = self.data[symbol]
# Define time ranges from config. train_start, train_end = self.config.train_time_range val_start, val_end = self.config.val_time_range test_start, test_end = self.config.test_time_range
# Create boolean masks for each dataset split. train_mask = (symbol_df.index >= train_start) & (symbol_df.index <= train_end) val_mask = (symbol_df.index >= val_start) & (symbol_df.index <= val_end) test_mask = (symbol_df.index >= test_start) & (symbol_df.index <= test_end)
# Apply masks to create the final datasets. train_data[symbol] = symbol_df[train_mask] val_data[symbol] = symbol_df[val_mask] test_data[symbol] = symbol_df[test_mask]
# Save the datasets using pickle. os.makedirs(self.config.dataset_path, exist_ok=True) with open(f"{self.config.dataset_path}/train_data.pkl", 'wb') as f: pickle.dump(train_data, f) with open(f"{self.config.dataset_path}/val_data.pkl", 'wb') as f: pickle.dump(val_data, f) with open(f"{self.config.dataset_path}/test_data.pkl", 'wb') as f: pickle.dump(test_data, f)
print("Datasets prepared and saved successfully.")
if __name__ == '__main__': # This block allows the script to be run directly to perform data preprocessing. preprocessor = QlibDataPreprocessor() preprocessor.initialize_qlib() preprocessor.load_qlib_data() preprocessor.prepare_dataset()
|