機械学習における不均衡データへの対処方法(Over Sampling, Under Sampling)
機械学習における不均衡データへの対処方法としてアンダーサンプリングやオーバーサンプリングについてまとめます。不均衡データとは目的変数のクラスの度数が極端に偏っているデータのことです。今回はKaggleで公開されている「Credit Card Fraud Detection」のデータセットを使用して以下の手法を試してみます。
- Random Under Sampling
- Random Over Sampling
- SMOTE
「Credit Card Fraud Detection」の目的変数の各クラスの度数は以下の通り。偏りが発生していることが分かります。
1.何も対策せずに機械学習させるとどうなるか
試しに適当なニューラルネットワークを構築して試してみました。構築したニューラルネットワークは以下の通り。コード全文は本稿末尾にまとめました。
from tensorflow.keras.callbacks import EarlyStopping from tensorflow.keras.models import Sequential from tensorflow.keras.layers import InputLayer, Dense, Activation, Dropout from tensorflow.keras.optimizers import Adam def train_dnn(new_nx_train,new_t_train): model = Sequential() model.add(InputLayer(input_shape=new_nx_train[1].shape)) model.add(Dense(8)) model.add(Activation('relu')) model.add(Dropout(0.5)) model.add(Dense(8)) model.add(Activation('relu')) model.add(Dropout(0.5)) model.add(Dense(1)) model.add(Activation('sigmoid')) callbacks = EarlyStopping(monitor='val_loss', patience=3, mode='auto') # モデルのコンパイル model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 学習 ep=100 model.fit(new_nx_train, new_t_train, epochs=ep, batch_size=126, validation_split=0.3, verbose=2, callbacks=callbacks) return model
上記で学習させた結果は以下の通り。ちなみに学習データ70%、テストデータ30%に分割して、テストデータに対して予測した結果です。
全部0と予測してしまっています。
2.不均衡データの均衡化
2.1.Random Under Sampling
少数派のクラスに合わせて、多数派のクラスのデータをランダムに削除する手法です。imblearn.under_sampling.RandomUnderSamplerを使用することで、簡単に実装できます。
from imblearn.under_sampling import RandomUnderSampler # 少数派クラスのデータ数をカウント positive_count_train = y_train["Class"].value_counts()[1] # Random under sampling rus = RandomUnderSampler(sampling_strategy={0:np.round(positive_count_train * 9).astype(int), 1:positive_count_train}, random_state=0) X_train_undersampled, y_train_undersampled = rus.fit_resample(X_train, y_train)
上記を実行することで、以下の通り多数派クラスのデータが減ります。
上記のデータを使って1.で構築したニューラルネットワークを学習させました。結果は以下の通り。予測精度が向上していることが分かります。
2.2.Random Over Sampling
多数派のクラスに合わせて少数派のクラスのデータをランダムに複製する手法です。imblearn.over_sampling.RandomOverSamplerを使います。
from imblearn.over_sampling import RandomOverSampler # 多数派クラスのデータ数をカウント negative_count_train = y_train["Class"].value_counts()[0] # Random over sampling ros = RandomOverSampler(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, random_state=0) X_train_oversampled, y_train_oversampled = ros.fit_resample(X_train, y_train)
以下の通り少数派クラスのデータが増えます。
上記のデータを学習させてみたところ、予測精度は以下の通りとなりました。Random Under Samplingの方が若干精度が良かったですが、何もしないよりはかなり良い結果となりました。
2.3.SMOTE
SMOTEとは元のデータから類似するデータを生成する手法で、以下の手順を繰り返すことでデータを生成します。
- 元データからランダムにデータを選択
- 選択したデータの近傍に存在するデータからランダムに1つを選択
- 選択したデータと近傍データの間に新たなデータを生成
imblearn.over_sampling.SMOTEを使って実装できます。
from imblearn.over_sampling import SMOTE # 多数派クラスのデータ数をカウント negative_count_train = y_train["Class"].value_counts()[0] # SMOTE smote = SMOTE(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, k_neighbors=5, random_state=0) X_train_smotesampled, y_train_smotesampled = smote.fit_resample(X_train, y_train)
以下の通り少数派クラスのデータが増えます。
上記のデータを学習させてみたところ、予測精度は以下の通りとなりました。こちらもRandom Under Samplingの方が若干精度が良かったですが、何もしないよりはかなり良い結果となりました。
3.その他手法
SMOTEには多数の亜種があるようです。また、Under Samplingの手法も数多く存在します。imblearnには色々な手法が揃っていますので、興味がある人は一度見てみてください。
参考:ソースコード
再現性確保のためのおまじない
# 再現性確保のためのおまじない import os os.environ['PYTHONHASHSEED'] = '0' import tensorflow as tf os.environ['TF_DETERMINISTIC_OPS'] = 'true' os.environ['TF_CUDNN_DETERMINISTIC'] = 'true' import random as rn import numpy as np SEED = 123 def reset_random_seeds(): tf.random.set_seed(SEED) np.random.seed(SEED) rn.seed(SEED) reset_random_seeds() session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=32, inter_op_parallelism_threads=32) tf.compat.v1.set_random_seed(SEED) sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
データのインポート
# データのインポート import pandas as pd df = pd.read_csv("./creditcard.csv") *** 各種関数の定義 >|pyhton| # 標準化とNumpy化用の関数 def standard_and_toNumpy(X_train, X_test, y_train): # 平均、標準偏差計算 X_train_mean = X_train.mean() X_train_std = X_train.std() # データの標準化 nx_train_df = (X_train - X_train_mean)/X_train_std nx_test_df = (X_test - X_train_mean)/X_train_std # numpyに変換 nx_train = nx_train_df.values nx_test = nx_test_df.values t_train = y_train.values return nx_train, nx_test, t_train # 学習用データをシャッフルするための関数 ## シャッフルしないと、model.fitのvalidation_split時に目的変数に偏りが発生してしまうため def shuffle_in_unison(a, b): assert len(a) == len(b) shuffled_a = np.empty(a.shape, dtype=a.dtype) shuffled_b = np.empty(b.shape, dtype=b.dtype) permutation = np.random.permutation(len(a)) for old_index, new_index in enumerate(permutation): shuffled_a[new_index] = a[old_index] shuffled_b[new_index] = b[old_index] return shuffled_a, shuffled_b from tensorflow.keras.callbacks import EarlyStopping from tensorflow.keras.models import Sequential from tensorflow.keras.layers import InputLayer, Dense, Activation, Dropout from tensorflow.keras.optimizers import Adam # 訓練用関数 def train_dnn(new_nx_train,new_t_train): model = Sequential() model.add(InputLayer(input_shape=new_nx_train[1].shape)) model.add(Dense(8)) model.add(Activation('relu')) model.add(Dropout(0.5)) model.add(Dense(8)) model.add(Activation('relu')) model.add(Dropout(0.5)) model.add(Dense(1)) model.add(Activation('sigmoid')) callbacks = EarlyStopping(monitor='val_loss', patience=3, mode='auto') # モデルのコンパイル model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # 学習 ep=100 model.fit(new_nx_train, new_t_train, epochs=ep, batch_size=126, validation_split=0.3, verbose=2, callbacks=callbacks) return model # 混同行列を見やすくする関数。以下を参考とした。 # https://qiita.com/makaishi2/items/9fb6bf94daa6208c8ed0 def make_cm(matrix, columns): # columns 項目名リスト n = len(columns) # '正解データ'をn回繰り返すリスト生成 act = ['正解データ'] * n pred = ['予測結果'] * n #データフレーム生成 cm = pd.DataFrame(matrix, columns=[pred, columns], index=[act, columns]) return cm from sklearn.metrics import confusion_matrix, precision_score, accuracy_score, recall_score, f1_score # テストデータの予測精度を表示 def show_evaluation_index(y_test,pred): display(make_cm(confusion_matrix(y_test, pred),["0","1"])) print("accuracy_score:{:.5f}".format(accuracy_score(y_test, pred))) print("precision_score:{:.5f}".format(precision_score(y_test, pred))) print("recall_score:{:.5f}".format(recall_score(y_test, pred))) print("f1_score:{:.5f}".format(f1_score(y_test, pred)))
まずはシンプルにニューラルネットワークにデータを突っ込む
# 訓練データとテストデータの分割 from sklearn.model_selection import train_test_split X = df.drop(["Class","Time"], axis=1) y = df[["Class"]] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=100, stratify=y) # 標準化とNumpy化 nx_train, nx_test, t_train = standard_and_toNumpy(X_train, X_test, y_train) # 学習データのシャッフル実施 reset_random_seeds() new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train) # 訓練 reset_random_seeds() model = train_dnn(new_nx_train,new_t_train) # 予測 y_pred = model.predict(nx_test) y_pred = np.round(y_pred) show_evaluation_index(y_test,y_pred)
Under Sampling
# 少数派クラスのデータ数をカウント positive_count_train = y_train["Class"].value_counts()[1] # Random under sampling rus = RandomUnderSampler(sampling_strategy={0:np.round(positive_count_train * 9).astype(int), 1:positive_count_train}, random_state=0) X_train_undersampled, y_train_undersampled = rus.fit_resample(X_train, y_train) # 標準化とNumpy化 nx_train, nx_test, t_train = standard_and_toNumpy(X_train_undersampled, X_test, y_train_undersampled) # 学習データのシャッフル実施 reset_random_seeds() new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train) # 訓練 reset_random_seeds() model_undersample = train_dnn(new_nx_train,new_t_train) # 予測 y_pred_undersample = model_undersample.predict(nx_test) y_pred_undersample = np.round(y_pred_undersample) show_evaluation_index(y_test,y_pred_undersample)
Over Sampling
# 多数派クラスのデータ数をカウント negative_count_train = y_train["Class"].value_counts()[0] # Random over sampling ros = RandomOverSampler(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, random_state=0) X_train_oversampled, y_train_oversampled = ros.fit_resample(X_train, y_train) # 標準化とNumpy化 nx_train, nx_test, t_train = standard_and_toNumpy(X_train_oversampled, X_test, y_train_oversampled) # 学習データのシャッフル実施 reset_random_seeds() new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train) # 訓練 reset_random_seeds() model_oversample = train_dnn(new_nx_train,new_t_train) # 予測 y_pred_oversample = model_oversample.predict(nx_test) y_pred_oversample = np.round(y_pred_oversample) show_evaluation_index(y_test,y_pred_oversample)
SMOTE
# SMOTE smote = SMOTE(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, k_neighbors=5, random_state=0) X_train_smotesampled, y_train_smotesampled = smote.fit_resample(X_train, y_train) # 標準化とNumpy化 nx_train, nx_test, t_train = standard_and_toNumpy(X_train_smotesampled, X_test, y_train_smotesampled) # 学習データのシャッフル実施 reset_random_seeds() new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train) # 訓練 reset_random_seeds() model_smotesample = train_dnn(new_nx_train,new_t_train) # 予測 y_pred_smotesample = model_smotesample.predict(nx_test) y_pred_smotesample = np.round(y_pred_smotesample) show_evaluation_index(y_test,y_pred_smotesample)