2.BiTの実装方法(CIFAR10でファインチューニング)
BiTの実装方法についてまとめます。なお、実装にあたって以下のサンプルコードを参考にしていますが、サンプルコードはtf_flowersを使用しているのに対して、本記事ではCIFAR10を対象にファインチューニングをしています。そのため、画像の前処理の方法等を変更しています。また、個人的に使いやすいように色々変更しています。
big_transfer/big_transfer_tf2.ipynb at master · google-research/big_transfer · GitHub
2.1.CIFAR10の画像データをインポート
from tensorflow.keras.datasets import cifar10
(x_train_load, y_train), (x_test_load, y_test) = cifar10.load_data()
2.2.入力画像のリサイズ
入力画像の大きさに従って画像をリサイズ・クロップします。CIFAR10の場合は32pxなので160pxにリサイズします。なお、クロップについてはデータ拡張(Data Augmentation)時に実施します。
import numpy as np
import cv2
IMAGE_SIZE = 32
if IMAGE_SIZE <= 96:
RESIZE_TO = 160
CROP_TO = 128
else:
RESIZE_TO = 512
CROP_TO = 480
def upscale(image):
size = len(image)
data_upscaled = np.zeros((size, RESIZE_TO, RESIZE_TO, 3,))
for i in range(len(image)):
data_upscaled[i] = cv2.resize(image[i], dsize=(RESIZE_TO, RESIZE_TO), interpolation=cv2.INTER_CUBIC)
image = np.array(data_upscaled, dtype=np.int)
return image
x_train = upscale(x_train_load)
x_test = upscale(x_test_load)
2.3.BiTモデルの定義
① 使用モデルについて
Tensorflow Hubで公開されているBiT-MのR50x1(ImageNet-21kで事前学習されたResNet-50同等規模のモデル)を使用。BiT-MでもBiT-Sでもパラメータの数は変わらないので、より性能の良いBiT-Mを使用することにしました。どうでもいいですが、BiT-Sの存在価値は良く分からないです…。
公式のサンプルコードではモデルを呼び出す際に「trainable=True」を指定していないですが、これを指定しないと重みが更新されずファインチューニングされないはずなので、「trainable=True」を指定しています。
bit_model = hub.KerasLayer("https://tfhub.dev/google/bit/m-r50x1/1", trainable=True)
② 学習率のスケジューリング
optimizerの定義とあわせて学習率のスケジュールを実装します。学習率は学習スケジュールの30%、60%、90%のタイミングで1/10にします。
import tensorflow_hub as hub
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import optimizers
NUM_CLASSES = 10
DATASET_SIZE = len(x_train_load)
if DATASET_SIZE < 20000:
SCHEDULE_LENGTH = 500
SCHEDULE_BOUNDARIES = [200, 300, 400]
elif DATASET_SIZE < 500000:
SCHEDULE_LENGTH = 10000
SCHEDULE_BOUNDARIES = [3000, 6000, 9000]
else:
SCHEDULE_LENGTH = 20000
SCHEDULE_BOUNDARIES = [6000, 12000, 18000]
def buildModel_BiT():
bit_model = hub.KerasLayer("https://tfhub.dev/google/bit/m-r50x1/1", trainable=True)
model = tf.keras.Sequential([
bit_model,
tf.keras.layers.Dense(NUM_CLASSES, kernel_initializer='zeros', activation="softmax")
],
name = 'BiT')
lr = 0.003 * BATCH_SIZE / 512
lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(boundaries=SCHEDULE_BOUNDARIES,
values=[lr, lr*0.1, lr*0.001, lr*0.0001])
optimizer = tf.keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.9)
model.compile(optimizer=optimizer,
loss="categorical_crossentropy",
metrics=["accuracy"])
return model
2.4.訓練用関数の定義
学習用データを訓練データと評価データに分割し、データ拡張(Data Augmentation)をし、訓練を実行する関数を定義します。データ拡張の際にBiT Hyper-Ruleに則ってMixUpとCropをするのですが、デフォルトのImageDataGenerator(tensorflow.keras.preprocessing.image.ImageDataGenerator)ではMixUpやCropはできないので、ImageDataGeneratorを継承した独自ジェネレータを使用しています。独自ジェネレータの作成にあたってこちらを参考にしました。本章では独自ジェネレータのソースについては割愛しますが、ソースを見たい方は参考をご確認ください。
def train_BiT(X, y, STEPS_PER_EPOCH, SCHEDULE_LENGTH, BATCH_SIZE):
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, stratify=y, shuffle=True)
y_train = to_categorical(y_train)
y_valid = to_categorical(y_valid)
datagen = MyImageDataGenerator(horizontal_flip=True,
mix_up_alpha=0.1,
random_crop=(CROP_TO, CROP_TO)
)
train_generator = datagen.flow(X_train, y_train,batch_size=BATCH_SIZE)
model = buildModel_BiT()
history = model.fit(train_generator,
steps_per_epoch=STEPS_PER_EPOCH,
epochs=10,
validation_data=(X_valid, y_valid),
shuffle=True
)
return model, history
2.5.訓練実行
バッチサイズの公式推奨値は512なのですが、GPUメモリが足りずResourceExhaustedErrorになってしまうので、128まで下げています。(私の環境はGPUメモリを8GB積んでいるのですが、256だとエラーになっちゃいました。)
BATCH_SIZE = 128
SCHEDULE_LENGTH = SCHEDULE_LENGTH * 512 / BATCH_SIZE
STEPS_PER_EPOCH = 10
model, history = train_BiT(x_train, y_train, STEPS_PER_EPOCH, SCHEDULE_LENGTH, BATCH_SIZE)
import os
os.environ['PYTHONHASHSEED'] = '0'
import tensorflow as tf
os.environ['TF_DETERMINISTIC_OPS'] = 'true'
os.environ['TF_CUDNN_DETERMINISTIC'] = 'true'
import numpy as np
import random as rn
SEED = 123
def reset_random_seeds():
tf.random.set_seed(SEED)
np.random.seed(SEED)
rn.seed(SEED)
reset_random_seeds()
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=32, inter_op_parallelism_threads=32)
tf.compat.v1.set_random_seed(SEED)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
import pandas as pd
import tensorflow_hub as hub
from sklearn.model_selection import train_test_split
from tensorflow.keras.datasets import cifar10
import cv2
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import optimizers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
(x_train_load, y_train), (x_test_load, y_test) = cifar10.load_data()
IMAGE_SIZE = 32
DATASET_SIZE = len(x_train_load)
NUM_CLASSES = 10
if IMAGE_SIZE <= 96:
RESIZE_TO = 160
CROP_TO = 128
else:
RESIZE_TO = 512
CROP_TO = 480
if DATASET_SIZE < 20000:
SCHEDULE_LENGTH = 500
SCHEDULE_BOUNDARIES = [200, 300, 400]
elif DATASET_SIZE < 500000:
SCHEDULE_LENGTH = 10000
SCHEDULE_BOUNDARIES = [3000, 6000, 9000]
else:
SCHEDULE_LENGTH = 20000
SCHEDULE_BOUNDARIES = [6000, 12000, 18000]
def upscale(image):
size = len(image)
data_upscaled = np.zeros((size, RESIZE_TO, RESIZE_TO, 3,))
for i in range(len(image)):
data_upscaled[i] = cv2.resize(image[i], dsize=(RESIZE_TO, RESIZE_TO), interpolation=cv2.INTER_CUBIC)
image = np.array(data_upscaled, dtype=np.int)
return image
x_train = upscale(x_train_load)
x_test = upscale(x_test_load)
x_train = np.array(x_train/255, dtype=np.float32)
x_test = np.array(x_test/255, dtype=np.float32)
plt.subplot(121).imshow(x_train_load[0])
plt.subplot(122).imshow(x_train[0])
plt.show()
print("※左図:リサイズ前(32*32)、右図:リサイズ後(160*160)")
class MyImageDataGenerator(ImageDataGenerator):
def __init__(self, featurewise_center = False, samplewise_center = False,
featurewise_std_normalization = False, samplewise_std_normalization = False,
zca_whitening = False, zca_epsilon = 1e-06, rotation_range = 0.0, width_shift_range = 0.0,
height_shift_range = 0.0, brightness_range = None, shear_range = 0.0, zoom_range = 0.0,
channel_shift_range = 0.0, fill_mode = 'nearest', cval = 0.0, horizontal_flip = False,
vertical_flip = False, rescale = None, preprocessing_function = None, data_format = None, validation_split = 0.0,
random_crop = None, mix_up_alpha = 0.0):
super().__init__(featurewise_center, samplewise_center, featurewise_std_normalization, samplewise_std_normalization, zca_whitening, zca_epsilon, rotation_range, width_shift_range, height_shift_range, brightness_range, shear_range, zoom_range, channel_shift_range, fill_mode, cval, horizontal_flip, vertical_flip, rescale, preprocessing_function, data_format, validation_split)
assert mix_up_alpha >= 0.0
self.mix_up_alpha = mix_up_alpha
assert random_crop == None or len(random_crop) == 2
self.random_crop_size = random_crop
def random_crop(self, original_img):
assert original_img.shape[2] == 3
if original_img.shape[0] < self.random_crop_size[0] or original_img.shape[1] < self.random_crop_size[1]:
raise ValueError(f"Invalid random_crop_size : original = {original_img.shape}, crop_size = {self.random_crop_size}")
height, width = original_img.shape[0], original_img.shape[1]
dy, dx = self.random_crop_size
x = np.random.randint(0, width - dx + 1)
y = np.random.randint(0, height - dy + 1)
return original_img[y:(y+dy), x:(x+dx), :]
def mix_up(self, X1, y1, X2, y2):
assert X1.shape[0] == y1.shape[0] == X2.shape[0] == y2.shape[0]
batch_size = X1.shape[0]
l = np.random.beta(self.mix_up_alpha, self.mix_up_alpha, batch_size)
X_l = l.reshape(batch_size, 1, 1, 1)
y_l = l.reshape(batch_size, 1)
X = X1 * X_l + X2 * (1-X_l)
y = y1 * y_l + y2 * (1-y_l)
return X, y
def flow(self, x, y=None, batch_size=32, shuffle=True, sample_weight=None,
seed=None, save_to_dir=None, save_prefix='', save_format='png', subset=None):
batches = super().flow(x=x, y=y, batch_size=batch_size, shuffle=shuffle, sample_weight=sample_weight,
seed=seed, save_to_dir=save_to_dir, save_prefix=save_prefix, save_format=save_format, subset=subset)
while True:
batch_x, batch_y = next(batches)
if self.mix_up_alpha > 0:
while True:
batch_x_2, batch_y_2 = next(batches)
m1, m2 = batch_x.shape[0], batch_x_2.shape[0]
if m1 < m2:
batch_x_2 = batch_x_2[:m1]
batch_y_2 = batch_y_2[:m1]
break
elif m1 == m2:
break
batch_x, batch_y = self.mix_up(batch_x, batch_y, batch_x_2, batch_y_2)
if self.random_crop_size != None:
x = np.zeros((batch_x.shape[0], self.random_crop_size[0], self.random_crop_size[1], 3))
for i in range(batch_x.shape[0]):
x[i] = self.random_crop(batch_x[i])
batch_x = x
yield (batch_x, batch_y)
def buildModel_BiT():
bit_model = hub.KerasLayer("https://tfhub.dev/google/bit/m-r50x1/1", trainable=True)
model = tf.keras.Sequential([
bit_model,
tf.keras.layers.Dense(NUM_CLASSES, kernel_initializer='zeros', activation="softmax")
],
name = 'BiT')
lr = 0.003 * BATCH_SIZE / 512
lr_schedule = tf.keras.optimizers.schedules.PiecewiseConstantDecay(boundaries=SCHEDULE_BOUNDARIES,
values=[lr, lr*0.1, lr*0.001, lr*0.0001])
optimizer = tf.keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.9)
model.compile(optimizer=optimizer,
loss="categorical_crossentropy",
metrics=["accuracy"])
return model
def train_BiT(X, y, STEPS_PER_EPOCH, SCHEDULE_LENGTH, BATCH_SIZE):
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, stratify=y, shuffle=True)
y_train = to_categorical(y_train)
y_valid = to_categorical(y_valid)
datagen = MyImageDataGenerator(horizontal_flip=True,
mix_up_alpha=0.1,
random_crop=(CROP_TO, CROP_TO)
)
train_generator = datagen.flow(X_train, y_train,batch_size=BATCH_SIZE)
model = buildModel_BiT()
history = model.fit(train_generator,
steps_per_epoch=STEPS_PER_EPOCH,
epochs=10,
validation_data=(X_valid, y_valid),
shuffle=True
)
return model, history
BATCH_SIZE = 128
SCHEDULE_LENGTH = SCHEDULE_LENGTH * 512 / BATCH_SIZE
STEPS_PER_EPOCH = 10
model, history = train_BiT(x_train, y_train, STEPS_PER_EPOCH, SCHEDULE_LENGTH, BATCH_SIZE)
X = x_test
pred = model.predict(X)
df_pred = pd.DataFrame(pred)
pred = np.array(df_pred.idxmax(axis=1))
df_pred = pd.DataFrame(pred)
df_y = pd.DataFrame(y_test)
df_result = pd.concat([df_y, df_pred], axis=1, join_axes=[df_y.index])
df_result.columns = ['y','pred']
display(df_result)
print('Confusion Matrix:')
print(confusion_matrix(df_result['y'],df_result['pred']))
print()
print('Accuracy :{:.4f}'.format(accuracy_score(df_result['y'],df_result['pred'])))
print('Precision:{:.4f}'.format(precision_score(df_result['y'],df_result['pred'],average='macro')))
print('Recall :{:.4f}'.format(recall_score(df_result['y'],df_result['pred'],average='macro')))
print('F_score :{:.4f}'.format(f1_score(df_result['y'],df_result['pred'],average='macro')))