よっしーの私的空間

機械学習を中心に興味のあることについて更新します

機械学習における不均衡データへの対処方法(Over Sampling, Under Sampling)

機械学習における不均衡データへの対処方法としてアンダーサンプリングやオーバーサンプリングについてまとめます。不均衡データとは目的変数のクラスの度数が極端に偏っているデータのことです。今回はKaggleで公開されている「Credit Card Fraud Detection」のデータセットを使用して以下の手法を試してみます。

  • Random Under Sampling
  • Random Over Sampling
  • SMOTE

「Credit Card Fraud Detection」の目的変数の各クラスの度数は以下の通り。偏りが発生していることが分かります。
f:id:t-yoshi-book:20210731100835p:plain

1.何も対策せずに機械学習させるとどうなるか

試しに適当なニューラルネットワークを構築して試してみました。構築したニューラルネットワークは以下の通り。コード全文は本稿末尾にまとめました。

from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import InputLayer, Dense, Activation, Dropout
from tensorflow.keras.optimizers import Adam

def train_dnn(new_nx_train,new_t_train):
    model = Sequential()
    
    model.add(InputLayer(input_shape=new_nx_train[1].shape))
    model.add(Dense(8))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(8))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    
    callbacks = EarlyStopping(monitor='val_loss', patience=3, mode='auto')
    
    # モデルのコンパイル
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    # 学習
    ep=100
    model.fit(new_nx_train, new_t_train, epochs=ep, batch_size=126, validation_split=0.3, verbose=2, callbacks=callbacks)
    
    return model

上記で学習させた結果は以下の通り。ちなみに学習データ70%、テストデータ30%に分割して、テストデータに対して予測した結果です。
  f:id:t-yoshi-book:20210731124508p:plain

全部0と予測してしまっています。

2.不均衡データの均衡化

2.1.Random Under Sampling

少数派のクラスに合わせて、多数派のクラスのデータをランダムに削除する手法です。imblearn.under_sampling.RandomUnderSamplerを使用することで、簡単に実装できます。

from imblearn.under_sampling import RandomUnderSampler
# 少数派クラスのデータ数をカウント
positive_count_train = y_train["Class"].value_counts()[1]
# Random under sampling
rus = RandomUnderSampler(sampling_strategy={0:np.round(positive_count_train * 9).astype(int), 1:positive_count_train}, random_state=0)
X_train_undersampled, y_train_undersampled = rus.fit_resample(X_train, y_train)

上記を実行することで、以下の通り多数派クラスのデータが減ります。
f:id:t-yoshi-book:20210731130231p:plain
上記のデータを使って1.で構築したニューラルネットワークを学習させました。結果は以下の通り。予測精度が向上していることが分かります。
  f:id:t-yoshi-book:20210731131037p:plain

2.2.Random Over Sampling

多数派のクラスに合わせて少数派のクラスのデータをランダムに複製する手法です。imblearn.over_sampling.RandomOverSamplerを使います。

from imblearn.over_sampling import RandomOverSampler
# 多数派クラスのデータ数をカウント
negative_count_train = y_train["Class"].value_counts()[0]
# Random over sampling
ros = RandomOverSampler(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, random_state=0)
X_train_oversampled, y_train_oversampled = ros.fit_resample(X_train, y_train)

以下の通り少数派クラスのデータが増えます。
f:id:t-yoshi-book:20210731133841p:plain
上記のデータを学習させてみたところ、予測精度は以下の通りとなりました。Random Under Samplingの方が若干精度が良かったですが、何もしないよりはかなり良い結果となりました。
  f:id:t-yoshi-book:20210731133809p:plain

2.3.SMOTE

SMOTEとは元のデータから類似するデータを生成する手法で、以下の手順を繰り返すことでデータを生成します。

  1. 元データからランダムにデータを選択
  2. 選択したデータの近傍に存在するデータからランダムに1つを選択
  3. 選択したデータと近傍データの間に新たなデータを生成

f:id:t-yoshi-book:20210731142720p:plain

imblearn.over_sampling.SMOTEを使って実装できます。

from imblearn.over_sampling import SMOTE
# 多数派クラスのデータ数をカウント
negative_count_train = y_train["Class"].value_counts()[0]
# SMOTE
smote = SMOTE(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, k_neighbors=5, random_state=0)
X_train_smotesampled, y_train_smotesampled = smote.fit_resample(X_train, y_train)

以下の通り少数派クラスのデータが増えます。
f:id:t-yoshi-book:20210731133841p:plain
上記のデータを学習させてみたところ、予測精度は以下の通りとなりました。こちらもRandom Under Samplingの方が若干精度が良かったですが、何もしないよりはかなり良い結果となりました。
  f:id:t-yoshi-book:20210731135133p:plain

3.その他手法

SMOTEには多数の亜種があるようです。また、Under Samplingの手法も数多く存在します。imblearnには色々な手法が揃っていますので、興味がある人は一度見てみてください。

参考:ソースコード

再現性確保のためのおまじない
# 再現性確保のためのおまじない
import os
os.environ['PYTHONHASHSEED'] = '0'
import tensorflow as tf
os.environ['TF_DETERMINISTIC_OPS'] = 'true'
os.environ['TF_CUDNN_DETERMINISTIC'] = 'true'

import random as rn
import numpy as np

SEED = 123
def reset_random_seeds():
    tf.random.set_seed(SEED)
    np.random.seed(SEED)
    rn.seed(SEED)

reset_random_seeds()    

session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=32, inter_op_parallelism_threads=32)
tf.compat.v1.set_random_seed(SEED)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
データのインポート
# データのインポート
import pandas as pd
df = pd.read_csv("./creditcard.csv")

*** 各種関数の定義
>|pyhton|
# 標準化とNumpy化用の関数
def standard_and_toNumpy(X_train, X_test, y_train):
    # 平均、標準偏差計算
    X_train_mean = X_train.mean()
    X_train_std  = X_train.std()
    # データの標準化
    nx_train_df  = (X_train - X_train_mean)/X_train_std
    nx_test_df  = (X_test - X_train_mean)/X_train_std
    # numpyに変換
    nx_train = nx_train_df.values
    nx_test  = nx_test_df.values
    t_train  = y_train.values
    
    return nx_train, nx_test, t_train

# 学習用データをシャッフルするための関数
## シャッフルしないと、model.fitのvalidation_split時に目的変数に偏りが発生してしまうため
def shuffle_in_unison(a, b):
    assert len(a) == len(b)
    shuffled_a  = np.empty(a.shape, dtype=a.dtype)
    shuffled_b  = np.empty(b.shape, dtype=b.dtype)
    permutation = np.random.permutation(len(a))
    for old_index, new_index in enumerate(permutation):
        shuffled_a[new_index] = a[old_index]
        shuffled_b[new_index] = b[old_index]
    return shuffled_a, shuffled_b

from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import InputLayer, Dense, Activation, Dropout
from tensorflow.keras.optimizers import Adam

# 訓練用関数
def train_dnn(new_nx_train,new_t_train):
    model = Sequential()
    
    model.add(InputLayer(input_shape=new_nx_train[1].shape))
    
    model.add(Dense(8))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))

    model.add(Dense(8))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    
    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    
    callbacks = EarlyStopping(monitor='val_loss', patience=3, mode='auto')
    
    # モデルのコンパイル
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    # 学習
    ep=100
    model.fit(new_nx_train, new_t_train, epochs=ep, batch_size=126, validation_split=0.3, verbose=2, callbacks=callbacks)
    
    return model

# 混同行列を見やすくする関数。以下を参考とした。
# https://qiita.com/makaishi2/items/9fb6bf94daa6208c8ed0
def make_cm(matrix, columns):
    # columns 項目名リスト
    n = len(columns)

    # '正解データ'をn回繰り返すリスト生成
    act = ['正解データ'] * n
    pred = ['予測結果'] * n

    #データフレーム生成
    cm = pd.DataFrame(matrix, columns=[pred, columns], index=[act, columns])
    return cm

from sklearn.metrics import confusion_matrix, precision_score, accuracy_score, recall_score, f1_score
# テストデータの予測精度を表示
def show_evaluation_index(y_test,pred):
    display(make_cm(confusion_matrix(y_test, pred),["0","1"]))
    print("accuracy_score:{:.5f}".format(accuracy_score(y_test, pred)))
    print("precision_score:{:.5f}".format(precision_score(y_test, pred)))
    print("recall_score:{:.5f}".format(recall_score(y_test, pred)))
    print("f1_score:{:.5f}".format(f1_score(y_test, pred)))
まずはシンプルにニューラルネットワークにデータを突っ込む
# 訓練データとテストデータの分割
from sklearn.model_selection import train_test_split
X = df.drop(["Class","Time"], axis=1)
y = df[["Class"]]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=100, stratify=y)

# 標準化とNumpy化
nx_train, nx_test, t_train = standard_and_toNumpy(X_train, X_test, y_train)
# 学習データのシャッフル実施
reset_random_seeds()
new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train)

# 訓練
reset_random_seeds()
model = train_dnn(new_nx_train,new_t_train)

# 予測
y_pred = model.predict(nx_test)
y_pred = np.round(y_pred)

show_evaluation_index(y_test,y_pred)
Under Sampling
# 少数派クラスのデータ数をカウント
positive_count_train = y_train["Class"].value_counts()[1]

# Random under sampling
rus = RandomUnderSampler(sampling_strategy={0:np.round(positive_count_train * 9).astype(int), 1:positive_count_train}, random_state=0)
X_train_undersampled, y_train_undersampled = rus.fit_resample(X_train, y_train)

# 標準化とNumpy化
nx_train, nx_test, t_train = standard_and_toNumpy(X_train_undersampled, X_test, y_train_undersampled)
# 学習データのシャッフル実施
reset_random_seeds()
new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train)

# 訓練
reset_random_seeds()
model_undersample = train_dnn(new_nx_train,new_t_train)

# 予測
y_pred_undersample = model_undersample.predict(nx_test)
y_pred_undersample = np.round(y_pred_undersample)

show_evaluation_index(y_test,y_pred_undersample)
Over Sampling
# 多数派クラスのデータ数をカウント
negative_count_train = y_train["Class"].value_counts()[0]

# Random over sampling
ros = RandomOverSampler(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, random_state=0)
X_train_oversampled, y_train_oversampled = ros.fit_resample(X_train, y_train)

# 標準化とNumpy化
nx_train, nx_test, t_train = standard_and_toNumpy(X_train_oversampled, X_test, y_train_oversampled)
# 学習データのシャッフル実施
reset_random_seeds()
new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train)

# 訓練
reset_random_seeds()
model_oversample = train_dnn(new_nx_train,new_t_train)

# 予測
y_pred_oversample = model_oversample.predict(nx_test)
y_pred_oversample = np.round(y_pred_oversample)

show_evaluation_index(y_test,y_pred_oversample)
SMOTE
# SMOTE
smote = SMOTE(sampling_strategy={0: negative_count_train, 1: negative_count_train//9}, k_neighbors=5, random_state=0)
X_train_smotesampled, y_train_smotesampled = smote.fit_resample(X_train, y_train)

# 標準化とNumpy化
nx_train, nx_test, t_train = standard_and_toNumpy(X_train_smotesampled, X_test, y_train_smotesampled)
# 学習データのシャッフル実施
reset_random_seeds()
new_nx_train, new_t_train = shuffle_in_unison(nx_train, t_train)

# 訓練
reset_random_seeds()
model_smotesample = train_dnn(new_nx_train,new_t_train)

# 予測
y_pred_smotesample = model_smotesample.predict(nx_test)
y_pred_smotesample = np.round(y_pred_smotesample)

show_evaluation_index(y_test,y_pred_smotesample)