機械学習による株価予測(LightGBM)
LightGBMを使用して株価予測をしていきたいと思います。ソースの全文は本記事の末尾に載せました。忙しい人は1.概要と末尾のソースで十分かもしれません。
1.概要
1.1.予測対象
将来のデンソー(6902)株価の上下動を予測して、買うか買わないか判断するプログラムを実装します。
具体的には20日後までの株価について以下を予測します。
① 最大株価が5%以上上昇するか
② 最小株価が5%以上下落しないか
③ 平均株価が0%以上上昇するか
上記をすべて満たす場合に購入することとします。株価を直接予測することもできるのですが、あまり精度が出ませんでした。
1.2.説明変数
説明変数は以下の通りです。
① 日経平均株価
② ダウ平均株価
③ NASDAQ総合指数
④ S&P500種指数
⑤ トヨタ株価
⑥ 各国の為替
⑦ 失業率
⑧ インフレ率
⑨ 消費者物価指数
⑩ 各種コモディティ指数(金や麦等)
⑪ 不動産価格(J-REIT)
1.3.分析期間
訓練データ:2016年1月1日~2018年12月31日
試験データ:2019年1月1日~2020年1月1日
コロナによる暴落は予測しようがないと思ったので、分析期間から除外しました。
1.4.予測結果
試験データを対象に予測を実施し、混同行列により評価を行いました。結果は以下の通りです。
AIの判断に従って購入した場合と完全にランダムに購入した場合の勝率の違いは以下の通りです。
- AIの判断:24/(24+16)=60%
- ランダム:(24+62)/(24+62+16+139)=36%
悪くない結果なんじゃないかなと思います。ただ、機会損失が62回発生している点は注意が必要。
今後の改善点は以下2点。
- コロナ禍以降の株価予測もできるように、より汎用的なモデルにしていきたい
- 機会損失を減らしたい
説明変数を増やしたりしたら改善できますかね。。色々試してみます。
2.株価データ等の取得
pandas-datareaderというライブラリでデータを簡単に取得できます。
取得先はStooqというポーランドサイトとFREDという米国サイトです。
pandas-datareaderの使い方は以下の通りです。
# ライブラリのインポート import pandas_datareader.data as web # Stooqからデータ取得 stooq = web.DataReader('6902.JP','stooq',start=st,end=ed) # FREDからデータ取得 fred = web.DataReader('NIKKEI225','fred',start=st,end=ed)
① 取得先サイトの指定(pandas-datareader第2引数)
DataReaderの第2引数に取得先を定義します。Stooqの場合は'stooq'、FREDの場合は'fred'。
Stooqでは株価やETFの価格が取得できます。本当は各種指数や為替等の情報も取得できそうなのですが、なぜか私の環境ではうまく取得できませんでした。取得できなかった情報はFREDから取得するようにしました。
② 取得銘柄の指定(pandas-datareader第1引数)
・Stooqの場合
「6902.JP」のように「銘柄番号.JP」という形で定義することで取得可能です。同じ方法でETFも取得可能です。
・FREDの場合
FREDのサイト上でDataReaderに引き渡すキーを調べることができます。例えば、日経平均株価の場合は以下の通り。その他の指数等についても幅広く扱っているので、色々検索してみるとおもしろいかもしれません。
(引用:Nikkei Stock Average, Nikkei 225 (NIKKEI225) | FRED | St. Louis Fed)
・複数銘柄を取得する場合
第1引数にリストを渡すことで複数銘柄を同時に取得することも可能です。
# 複数銘柄取得する場合の例 lst1=['6902.JP', #デンソー '1540.JP', #ETF(金) '1398.JP', #ETF(J-REIT(不動産)) '7203.JP', #トヨタ ] stooq = web.DataReader(lst1,'stooq',start=st,end=ed)
3.データの前処理
3.1.StooqとFREDから取得したデータを結合して、欠損値を補完する
データの種類によっては月次のデータだったりするので、強引ですが直前のデータで欠損値を補完しました。
# StooqとFredから取得したデータを結合する(結合キーはインデックス=Date) stock = pd.merge(stooq['Close'], fred, how='left', left_index=True, right_index=True) # 欠損値を直前のデータで補完する stock = stock.fillna(method='ffill')
3.2.目的変数データの作成
概要でも触れたように、分析対象は以下3点。
① 基準日から20日間の最大株価の上昇率が5%以上になるか
② 基準日から20日間の最小株価の下落率が5%以下に留まるか
③ 基準日から20日間の平均株価の上昇率が0%以上になるか
なので、上記3点をフラグ化します。
特定の期間におけるデータの集計はrolling関数を使用することで簡単になります。例えば、rolling(5).max()とすると、基準日を含めた5日分のデータから最大値が抽出されます。
rollingを使用する際の注意点なのですが、基準日以前のデータが集計されます。なので、未来日の集計を行いたい場合は少し工夫が必要で、shift関数を組み合わる必要があります。例えば、翌日から5日間の最大値を抽出したい場合はrolling(5).max().shift(-5)となるわけです。
実装は以下の通りです。
# 20営業日先の最大株価(5%以上上昇した場合:1、しなかった場合:0) stock['target.future.max'] = stock['6902.JP'].rolling(20).max().shift(-20) stock['target.ratio.max'] = stock['target.future.max']/stock['6902.JP'] stock['target.binary.max'] = np.where(stock['target.ratio.max'] > 1.05, 1, 0) # 20営業日先の最小株価(5%以上下落しなかった場合:1、した場合:0) stock['target.future.min'] = stock['6902.JP'].rolling(20).min().shift(-20) stock['target.ratio.min'] = stock['target.future.min']/stock['6902.JP'] stock['target.binary.min'] = np.where(stock['target.ratio.min'] < 0.95, 0, 1) # 20営業日先の最小株価(0%以上上昇した場合:1、しなかった場合:0) stock['target.future.mean'] = stock['6902.JP'].rolling(20).mean().shift(-20) stock['target.ratio.mean'] = stock['target.future.mean']/stock['6902.JP'] stock['target.binary.mean'] = np.where(stock['target.ratio.mean'] > 1.00, 1, 0) # 不要カラムの削除 stock = stock.drop(columns=['target.future.max','target.future.min','target.future.mean']) stock = stock.drop(columns=['target.ratio.max','target.ratio.min','target.ratio.mean'])
3.3.説明変数データの作成
StooqとFREDから取得したデータをそのまま使ってもいいのですが、各データの時系列的な変化が表現されないので、各データの移動平均を算出し、現在値との乖離率を算出しました。移動平均の採用期間は5日、10日、・・・、55日、60日としました。
# 各指数の移動平均との乖離率を算出する for stock_name in lst1: for i in [5,10,15,20,25,30,35,40,45,50,55,60]: exec('stock["{}_{}days_diverge"] = \ (stock["{}"].rolling({}).mean() - stock["{}"])/stock["{}"].rolling({}).mean()' \ .format(stock_name,i,stock_name,i,stock_name,stock_name,i) ) for stock_name in lst2: for i in [5,10,15,20,25,30,35,40,45,50,55,60]: exec('stock["{}_{}days_diverge"] = \ (stock["{}"].rolling({}).mean() - stock["{}"])/stock["{}"].rolling({}).mean()' \ .format(stock_name,i,stock_name,i,stock_name,stock_name,i) ) # 各指数の列を削除する stock = stock.drop(columns=lst1) stock = stock.drop(columns=lst2)
3.4.訓練用データと試験データを分割
# データ期間の定義 st_train = '2016/01/01' ed_train = '2018/12/31' st_test = '2019/01/01' ed_test = '2020/01/01' # TrainデータとTestデータを分割 df_train = stock[st_train : ed_train] df_test = stock[st_test : ed_test] df_train = df_train.reset_index(drop=True) df_test = df_test.reset_index(drop=True)
3.5.Numpyに変換し説明変数と目的変数に分割する
# numpyに変換する関数を定義 def toNumpy(x_df, t_max_df, t_min_df, t_mean_df): X = x_df.values y_max = t_max_df.values y_max = y_max.reshape(-1) y_min = t_min_df.values y_min = y_min.reshape(-1) y_mean = t_mean_df.values y_mean = y_mean.reshape(-1) return X, y_max, y_min, y_mean # 目的変数と説明変数に分ける関数を定義 def split(inputData): ## 学習データの分割 x_df = inputData.drop(columns=['target.binary.min','target.binary.max','target.binary.mean']) t_max_df = inputData['target.binary.max'] t_min_df = inputData['target.binary.min'] t_mean_df = inputData['target.binary.mean'] ## Numpyに変換 X, y_max, y_min, y_mean = toNumpy(x_df, t_max_df, t_min_df, t_mean_df) return X, y_max, y_min, y_mean # データ作成 ## 学習用データ X_train, y_trainmax, y_trainmin, y_trainmean = split(df_train) ## 試験用データ X_test, y_testmax, y_testmin, y_testmean = split(df_test)
4.LightGBMの実装と学習実施
LightGBMを使って分類問題を学習します。Optunaを使用して、ハイパーパラメータを自動チューニングしています。Optunaの使用方法は非常に簡単で、以下の通り、ライブラリをインポートして、通常通りtrain関数で学習するだけでOKです。
from optuna.integration import lightgbm as lgb
交差検証はKFoldを使用して5分割にしました。データリーク(学習用データと評価用データのデータ期間が被ること)を防ぐために、Shuffleはしないようにしています。実装は以下の通りです。以下を実行することでbests_max、bests_min、bests_meanというリストが作成されます。bests_maxには最大株価、bests_minには最小株価、bests_meanには平均株価の変動を予測したモデルがそれぞれ5つ格納されます(KFoldを5分割としたため)。
# ライブラリのインポート from sklearn.model_selection import KFold from optuna.integration import lightgbm as lgb # 乱数シードの固定 SEED = 123 # LightGBM訓練用関数を定義 def trainGbdt(X_train, y_train): bests = [] kf = KFold(n_splits=5, shuffle=False) for learn_index, valid_index in kf.split(X_train, y_train): X_learn, X_valid = X_train[learn_index], X_train[valid_index] y_learn, y_valid = y_train[learn_index], y_train[valid_index] #LGB用のデータに変形 lgb_learn = lgb.Dataset(X_learn, y_learn) lgb_valid = lgb.Dataset(X_valid, y_valid) param = { 'objective': 'binary', 'metric': 'auc', 'boosting_type': 'gbdt', 'random_seed':SEED, } # 訓練実施 best = lgb.train(param, lgb_learn, valid_sets=[lgb_learn,lgb_valid], ) bests.append(best) return bests # 勾配ブースティングによる学習実施(最大株価予測) bests_max = trainGbdt(X_train, y_trainmax) # 勾配ブースティングによる学習実施(最小株価予測) bests_min = trainGbdt(X_train, y_trainmin) # 勾配ブースティングによる学習実施(平均株価予測) bests_mean = trainGbdt(X_train, y_trainmean)
5.テストデータによる予測
作成したモデルを使用してテストデータの予測を実施します。実装は以下の通り。
# テストデータを予測する(最大株価予測) y_preds = [] for i in range(5): y_pred = bests_max[i].predict(X_test, num_iteration=bests_max[i].best_iteration) y_preds.append(y_pred) y_predmax = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5 # テストデータを予測する(最小株価予測) y_preds = [] for i in range(5): y_pred = bests_min[i].predict(X_test, num_iteration=bests_min[i].best_iteration) y_preds.append(y_pred) y_predmin = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5 # テストデータを予測する(平均株価予測) y_preds = [] for i in range(5): y_pred = bests_mean[i].predict(X_test, num_iteration=bests_mean[i].best_iteration) y_preds.append(y_pred) y_predmean = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5
5.予測結果の表示
5.1.予測結果の取りまとめ
# 予測値と実値との比較 result = np.c_[y_predmax,y_testmax, y_predmin,y_testmin, y_predmean,y_testmean ] df_result = pd.DataFrame(data=result, columns=['pred_max','true_max', 'pred_min','true_min', 'pred_mean','true_mean' ]) display(df_result)
上記を実行すると以下の通り出力される。
上記の通り、予測値が確率表現となってしまっているので、閾値を設けて、1か0に分類する必要がある。今回は四捨五入して1と0に分類する。ついでに買うか買わないかの判断も結果に表示するようにします。判断基準は概要にも記載しましたが、最大株価が5%以上上昇し、最小株価が5%以上下落せず、平均株価が0%以上上昇した場合は買いと判断することとします。
# 閾値0.5として1,0に分類 df_result['pred_max'] = np.round(df_result['pred_max']) df_result['pred_min'] = np.round(df_result['pred_min']) df_result['pred_mean'] = np.round(df_result['pred_mean']) df_result['buy_flg'] = df_result['pred_max'] * df_result['pred_min'] * df_result['pred_mean'] df_result['buy_flg_true'] = df_result['true_max'] * df_result['true_min'] * df_result['true_mean'] display(df_result)
結果は以下の通り。
5.2.予測結果の混合行列(Confusion Matrix)を表示
最後に混合行列を出力し、どの程度予測が的中したか確認します。コードは以下の通り。
# 混合行列(コンフュージョンマトリクス)を表示する関数を定義 def showConfusionMatrix(true,pred,pred_type): cm = confusion_matrix(true, pred, labels=[1, 0]) labels = [1, 0] # データフレームに変換 cm_labeled = pd.DataFrame(cm, columns=labels, index=labels) # 結果の表示 print("◆混合行列(",pred_type,")◆") display(cm_labeled) # 混合行列を表示 showConfusionMatrix(df_result['true_max'], df_result['pred_max'],"最大株価") showConfusionMatrix(df_result['true_min'], df_result['pred_min'],"最小株価") showConfusionMatrix(df_result['true_mean'], df_result['pred_mean'],"平均株価") showConfusionMatrix(df_result['buy_flg_true'], df_result['buy_flg'],"買うか買わないか")
結果は以下の通りです。
混合行列の見方ですが、「混合行列(最大株価)」を例に解説します。
上記の図でなんとなく伝わると思いますが、29+127件はAIが正しく予測できた件数で、16+69件はAIが外した件数です。
株価予測では上記の内、TPとFPが重要になると考えます。なぜかというと、AIの言う通りに株を購入する場合、TP+FPが実際に購入する件数になるからです。また、TPは実際に株価が上昇する件数になりますので、TP/(TP+FP)が、AIの言う通りに株を購入した場合の勝率となります。混合行列を使った精度の評価についてはこちらのサイトが良くまとまっています。
続いて、買うか買わないかの判断をAIに任せた場合の勝率について見てみましょう。4つ目の混合行列(◆混合行列(買うか買わないか)◆)に着目します。24/(24+16)=60%の確率で勝てることになることが分かります。完全にランダムに買う場合は(24+62)/(16+139)=36%の勝率なので、悪くない結果なのではないでしょうか。
参考:ソースコード全文
# ライブラリのインポート import pandas as pd pd.set_option('display.max_rows', 500) import numpy as np import pandas_datareader.data as web from sklearn import metrics from sklearn.metrics import confusion_matrix,mean_squared_error from sklearn.model_selection import KFold from optuna.integration import lightgbm as lgb # データ取得期間の定義 ## 欠損値補完のために直前のデータを使うため、Startは少し長めに取得。 ## 実際の分析期間は2016-01-01~とする。 st = '2015/01/01' ed = '2021/01/01' st_train = '2016/01/01' ed_train = '2018/12/31' st_test = '2019/01/01' ed_test = '2020/01/01' # 乱数シードの固定 SEED = 123 # STOOQから株価価格等の取得 lst1=['6902.JP', #デンソー '1540.JP', #ETF(金) '1398.JP', #ETF(J-REIT(不動産)) '7203.JP', #トヨタ ] stooq = web.DataReader(lst1,'stooq',start=st,end=ed) # FREDから各種指数の取得 lst2=['NIKKEI225', #日経平均 'T10YIE', #10-Year Breakeven Inflation Rate 'T5YIE', #5-Year Breakeven Inflation Rate 'IRLTLT01USM156N', #Long-Term Government Bond Yields: 10-year: Main (Including Benchmark) for the United States 'IRLTLT01EZM156N', #Long-Term Government Bond Yields: 10-year: Main (Including Benchmark) for the Euro Area 'IRLTLT01JPM156N', #Long-Term Government Bond Yields: 10-year: Main (Including Benchmark) for Japan 'IRLTLT01CAM156N', #Long-Term Government Bond Yields: 10-year: Main (Including Benchmark) for Canada 'JPNCPIALLMINMEI', #Consumer Price Index of All Items in Japan 'GOLDAMGBD228NLBM',#Gold Fixing Price 10:30 A.M. (London time) in London Bullion Market, based in U.S. Dollars 'PWHEAMTUSDM', #Global price of Wheat 'IPUTIL', #Industrial Production: Utilities: Electric and Gas Utilities (NAICS = 2211,2) 'DJIA', #Dow Jones Industrial Average 'SP500', #S&P 500 'NASDAQCOM', #NASDAQ Composite Index 'DEXJPUS', #Japan / U.S. Foreign Exchange Rate 'DEXCHUS', #China / U.S. Foreign Exchange Rate 'DEXUSEU', #U.S. / Euro Foreign Exchange Rate 'DEXINUS', #India / U.S. Foreign Exchange Rate 'UNRATE', #Unemployment Rate 'LRUN64TTJPM156S', #Unemployment Rate: Aged 15-64: All Persons for Japan ] fred = web.DataReader(lst2,'fred',start=st,end=ed) # StooqとFredから取得したデータを結合する(結合キーはインデックス=Date) stock = pd.merge(stooq['Close'], fred, how='left', left_index=True, right_index=True) # 欠損値を直前のデータで保管する stock = stock.fillna(method='ffill') # 2016-01-01以降のデータを抽出 stock = stock[st:ed] # 20営業日先の最大株価 stock['target.future.max'] = stock['6902.JP'].rolling(20).max().shift(-20) stock['target.ratio.max'] = stock['target.future.max']/stock['6902.JP'] stock['target.binary.max'] = np.where(stock['target.ratio.max'] > 1.05, 1, 0) # 20営業日先の最小株価 stock['target.future.min'] = stock['6902.JP'].rolling(20).min().shift(-20) stock['target.ratio.min'] = stock['target.future.min']/stock['6902.JP'] stock['target.binary.min'] = np.where(stock['target.ratio.min'] < 0.95, 0, 1) # 20営業日先の最小株価 stock['target.future.mean'] = stock['6902.JP'].rolling(20).mean().shift(-20) stock['target.ratio.mean'] = stock['target.future.mean']/stock['6902.JP'] stock['target.binary.mean'] = np.where(stock['target.ratio.mean'] > 1.00, 1, 0) # 不要カラムの削除 stock = stock.drop(columns=['target.future.max','target.future.min','target.future.mean']) stock = stock.drop(columns=['target.ratio.max','target.ratio.min','target.ratio.mean']) # 各指数の移動平均との乖離率を算出する for stock_name in lst1: for i in [5,10,15,20,25,30,35,40,45,50,55,60]: exec('stock["{}_{}days_diverge"] = \ (stock["{}"].rolling({}).mean() - stock["{}"])/stock["{}"].rolling({}).mean()' \ .format(stock_name,i,stock_name,i,stock_name,stock_name,i) ) for stock_name in lst2: for i in [5,10,15,20,25,30,35,40,45,50,55,60]: exec('stock["{}_{}days_diverge"] = \ (stock["{}"].rolling({}).mean() - stock["{}"])/stock["{}"].rolling({}).mean()' \ .format(stock_name,i,stock_name,i,stock_name,stock_name,i) ) # 各指数の列を削除する stock = stock.drop(columns=lst1) stock = stock.drop(columns=lst2) # TrainデータとTestデータを分割 df_train = stock[st_train : ed_train] df_test = stock[st_test : ed_test] df_train = df_train.reset_index(drop=True) df_test = df_test.reset_index(drop=True) print('学習用', df_train.shape) print('試験用', df_test.shape) # numpyに変換する関数を定義 def toNumpy(x_df, t_max_df, t_min_df, t_mean_df): X = x_df.values y_max = t_max_df.values y_max = y_max.reshape(-1) y_min = t_min_df.values y_min = y_min.reshape(-1) y_mean = t_mean_df.values y_mean = y_mean.reshape(-1) return X, y_max, y_min, y_mean # 目的変数と説明変数に分ける関数を定義 def split(inputData): ## 学習データの分割 x_df = inputData.drop(columns=['target.binary.min','target.binary.max','target.binary.mean']) t_max_df = inputData['target.binary.max'] t_min_df = inputData['target.binary.min'] t_mean_df = inputData['target.binary.mean'] ## Numpyに変換 X, y_max, y_min, y_mean = toNumpy(x_df, t_max_df, t_min_df, t_mean_df) return X, y_max, y_min, y_mean # データ作成 ## 学習用データ X_train, y_trainmax, y_trainmin, y_trainmean = split(df_train) ## 試験用データ X_test, y_testmax, y_testmin, y_testmean = split(df_test) # LightGBM訓練用関数を定義 def trainGbdt(X_train, y_train): bests = [] kf = KFold(n_splits=5, shuffle=False) for learn_index, valid_index in kf.split(X_train, y_train): X_learn, X_valid = X_train[learn_index], X_train[valid_index] y_learn, y_valid = y_train[learn_index], y_train[valid_index] #LGB用のデータに変形 lgb_learn = lgb.Dataset(X_learn, y_learn) lgb_valid = lgb.Dataset(X_valid, y_valid) param = { 'objective': 'binary', 'metric': 'auc', 'boosting_type': 'gbdt', 'random_seed':SEED, } # 訓練実施 best = lgb.train(param, lgb_learn, valid_sets=[lgb_learn,lgb_valid], ) bests.append(best) return bests # 勾配ブースティングによる学習実施(最大株価予測) bests_max = trainGbdt(X_train, y_trainmax) # 勾配ブースティングによる学習実施(最小株価予測) bests_min = trainGbdt(X_train, y_trainmin) # 勾配ブースティングによる学習実施(平均株価予測) bests_mean = trainGbdt(X_train, y_trainmean) # テストデータを予測する(最大株価予測) y_preds = [] for i in range(5): y_pred = bests_max[i].predict(X_test, num_iteration=bests_max[i].best_iteration) y_preds.append(y_pred) y_predmax = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5 # テストデータを予測する(最小株価予測) y_preds = [] for i in range(5): y_pred = bests_min[i].predict(X_test, num_iteration=bests_min[i].best_iteration) y_preds.append(y_pred) y_predmin = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5 # テストデータを予測する(平均株価予測) y_preds = [] for i in range(5): y_pred = bests_mean[i].predict(X_test, num_iteration=bests_mean[i].best_iteration) y_preds.append(y_pred) y_predmean = (y_preds[0] + y_preds[1] + y_preds[2] + y_preds[3] + y_preds[4])/5 # 予測値と実値との比較 result = np.c_[y_predmax,y_testmax, y_predmin,y_testmin, y_predmean,y_testmean ] df_result = pd.DataFrame(data=result, columns=['pred_max','true_max', 'pred_min','true_min', 'pred_mean','true_mean' ]) display(df_result) # 閾値0.5として1,0に分類 df_result['pred_max'] = np.round(df_result['pred_max']) df_result['pred_min'] = np.round(df_result['pred_min']) df_result['pred_mean'] = np.round(df_result['pred_mean']) # 買うか買わないか判断する df_result['buy_flg'] = df_result['pred_max'] * df_result['pred_min'] * df_result['pred_mean'] df_result['buy_flg_true'] = df_result['true_max'] * df_result['true_min'] * df_result['true_mean'] display(df_result) # 混合行列(コンフュージョンマトリクス)を表示する関数を定義 def showConfusionMatrix(true,pred,pred_type): cm = confusion_matrix(true, pred, labels=[1, 0]) labels = [1, 0] # データフレームに変換 cm_labeled = pd.DataFrame(cm, columns=labels, index=labels) # 結果の表示 print("◆混合行列(",pred_type,")◆") display(cm_labeled) # 混合行列を表示 showConfusionMatrix(df_result['true_max'], df_result['pred_max'],"最大株価") showConfusionMatrix(df_result['true_min'], df_result['pred_min'],"最小株価") showConfusionMatrix(df_result['true_mean'], df_result['pred_mean'],"平均株価") showConfusionMatrix(df_result['buy_flg_true'], df_result['buy_flg'],"買うか買わないか")