LSTM (Long short-term memory) の学習のために、Kerasで自然数列を推測しました。
気温や正弦波、株価の予測などをしている記事は多く見かけましたが、最も単純な自然数(正の整数)の数列はなかったので、LSTMの入り口としてコードを書いてみました。
(もっとも機械学習でやる必要がないのでそもそもやる意味がない)
ただ、正誤判断は単純なので、LSTMの触りをやるには丁度いい題材ではないでしょうか。
自然数の数列
数列の中で最も単純な自然数の数列です。
ここでは、0を含めない正の整数をとります。
(The positive integers, or the natural numbers)
データの準備
自然数なので、準備も簡単です。
sklearn の MinMaxScaler()で正規化を行います。
from sklearn import preprocessing SERIES_LENGTH = 1000 # Prepare dataset dataset = np.arange(1, SERIES_LENGTH+1, 1).reshape(SERIES_LENGTH, 1).astype(np.float) # Transform scaler = preprocessing.MinMaxScaler() dataset = scaler.fit_transform(dataset) # Split dataset into train and test subsets train_dataset = dataset[0:int(len(dataset)*0.8), :] test_dataset = dataset[len(train_dataset):len(dataset), :]
次に、それぞれのデータを入力値と出力値に分けます。
例えば、1つ前を参照するとすると、
3つ前までだと、
のような感じになります。
コードにすると、このような感じです。
look_back
の値で、どのくらい前までを参照するかを決めます。
look_back = 1 x, y = [], [] for i in range(len(dataset) - look_back): a = i + look_back x.append(dataset[i:a, 0]) y.append(dataset[a, 0])
予測
あとは、予測部分。
(実際のコードは一番下の全文を参考ください)
from keras.models import Sequential from keras.layers.core import Dense, Activation from keras.layers.recurrent import LSTM model = Sequential() model.add(LSTM(128, input_shape=(1, look_back))) model.add(Dense(1)) model.add(Activation('linear')) model.compile(loss='mean_squared_error', optimizer='adam') model.fit(x, y, batch_size=2, epochs=10, verbose=1)
Keras 簡単すぎます。
(活性化関数のlinearは線形分類なので、なくてもいいのではないかと思います。あっても意味はない?)
あとは、図に描画できるように整形してやります。
予測結果
Epoch 1/10 799/799 [==============================] - 2s - loss: 0.0128 Epoch 2/10 799/799 [==============================] - 1s - loss: 2.5285e-05 Epoch 3/10 799/799 [==============================] - 1s - loss: 1.6923e-05 Epoch 4/10 799/799 [==============================] - 1s - loss: 8.7859e-06 Epoch 5/10 799/799 [==============================] - 1s - loss: 3.0853e-06 Epoch 6/10 799/799 [==============================] - 1s - loss: 6.9152e-07 Epoch 7/10 799/799 [==============================] - 1s - loss: 4.0892e-08 Epoch 8/10 799/799 [==============================] - 1s - loss: 2.6624e-08 Epoch 9/10 799/799 [==============================] - 1s - loss: 1.1367e-07 Epoch 10/10 799/799 [==============================] - 1s - loss: 2.4738e-06 _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= lstm_1 (LSTM) (None, 128) 66560 _________________________________________________________________ dense_1 (Dense) (None, 1) 129 _________________________________________________________________ activation_1 (Activation) (None, 1) 0 ================================================================= Total params: 66,689 Trainable params: 66,689 Non-trainable params: 0 _________________________________________________________________ None Train Score: 1.491 RMSE Test Score: 0.979 RMSE Next prediction: 1000.09 Time: 17.5sec
遠目から見ると一致しているように見えますが、拡大すると少しずれています。
予測自体は、しっかりと右肩上がりで予測はできています。
赤線は、最新のデータを使った次の予測になります。
参考
コード
import urllib, urllib.request import numpy as np import matplotlib.pyplot as plt import math import time from keras.models import Sequential from keras.layers.core import Dense, Activation from keras.layers.recurrent import LSTM from sklearn import preprocessing from sklearn.metrics import mean_squared_error class PredictionLSTM : def __init__(self): self.look_back = 3 self.units = 128 self.epochs = 10 self.batch_size = 1 def create_dataset(self, dataset, look_back=1): x, y = [], [] for i in range(len(dataset) - look_back): a = i + look_back x.append(dataset[i:a, 0]) y.append(dataset[a, 0]) return np.array(x), np.array(y) def create_model(self): model = Sequential() model.add(LSTM(self.units, input_shape=(1, self.look_back))) model.add(Dense(1)) model.add(Activation('linear')) model.compile(loss='mean_squared_error', optimizer='adam') return model def train(self, x, y): model = self.create_model() model.fit(x, y, batch_size=self.batch_size, epochs=self.epochs, verbose=1) return model if __name__ == "__main__": START_TIME = time.time() SERIES_LENGTH = 1000 # Prepare dataset dataset = np.arange(1, SERIES_LENGTH+1, 1).reshape(SERIES_LENGTH, 1).astype(np.float) # Transform scaler = preprocessing.MinMaxScaler() dataset = scaler.fit_transform(dataset) # Split dataset into train and test subsets train_dataset = dataset[0:int(len(dataset)*0.8), :] test_dataset = dataset[len(train_dataset):len(dataset), :] # LSTM prediction_ltsm = PredictionLSTM() # Create train dataset train_x, train_y = prediction_ltsm.create_dataset(train_dataset, prediction_ltsm.look_back) train_x = np.reshape(train_x, (train_x.shape[0], 1, train_x.shape[1])) # Create test dataset test_x, test_y = prediction_ltsm.create_dataset(test_dataset, prediction_ltsm.look_back) test_x = np.reshape(test_x, (test_x.shape[0], 1, test_x.shape[1])) # Create and fit the LSTM network model = prediction_ltsm.train(train_x, train_y) print(model.summary()) # Predict train dataset train_prediction = model.predict(train_x) train_prediction = scaler.inverse_transform(train_prediction) train_y = scaler.inverse_transform([train_y]) # Predict test dataset test_prediction = model.predict(test_x) test_prediction = scaler.inverse_transform(test_prediction) test_y = scaler.inverse_transform([test_y]) # Calculate RMSE(Root Mean Squared Error) train_score = math.sqrt(mean_squared_error(train_y[0], train_prediction[:, 0])) test_score = math.sqrt(mean_squared_error(test_y[0], test_prediction[:, 0])) print("\nTrain Score: {0:.3f} RMSE".format(train_score)) print("Test Score: {0:.3f} RMSE".format(test_score)) # Predict the next value using the latest data latest_x = np.array([test_dataset[-prediction_ltsm.look_back:]]) latest_x = np.reshape(latest_x, (latest_x.shape[0], 1, latest_x.shape[1])) next_prediction = model.predict(latest_x) next_prediction = scaler.inverse_transform(next_prediction) print("\nNext prediction: {0:.2f}".format(list(next_prediction)[0][0]), "\n"*2) print("Time: {0:.1f}sec".format(time.time() - START_TIME)) # Draw a figure placeholder = np.append(dataset, np.zeros((1, dataset.shape[1])), axis=0) placeholder[:, :] = np.nan correct_dataset_plt = scaler.inverse_transform(dataset) train_plt = np.copy(placeholder) train_plt[prediction_ltsm.look_back:len(train_prediction)+prediction_ltsm.look_back, :] = train_prediction test_plt = np.copy(placeholder) test_plt[len(train_prediction)+(prediction_ltsm.look_back*2):len(dataset), :] = test_prediction nest_plt = np.copy(placeholder) nest_plt[len(placeholder)-2:len(placeholder), :] = np.append(test_prediction[-1], next_prediction.reshape(1)).reshape(2, 1) plt.plot(correct_dataset_plt, label='Correct prediction') plt.plot(train_plt, label='Train') plt.plot(test_plt, label='Test') plt.plot(nest_plt, label='Next prediction', c='r') plt.legend() plt.show()