SSJB's blog

いろいろです。

Keras / LSTM で自然数列を予測する簡単なサンプル

LSTM (Long short-term memory) の学習のために、Kerasで自然数列を推測しました。
気温や正弦波、株価の予測などをしている記事は多く見かけましたが、最も単純な自然数(正の整数)の数列はなかったので、LSTMの入り口としてコードを書いてみました。

(もっとも機械学習でやる必要がないのでそもそもやる意味がない)
ただ、正誤判断は単純なので、LSTMの触りをやるには丁度いい題材ではないでしょうか。

自然数の数列

数列の中で最も単純な自然数の数列です。
ここでは、0を含めない正の整数をとります。

 1, 2, 3, 4, 5, 6, 7, 8 , 9, 10, ...
(The positive integers, or the natural numbers)

データの準備

自然数なので、準備も簡単です。
sklearn の MinMaxScaler()で正規化を行います。

from sklearn import preprocessing

SERIES_LENGTH = 1000

# Prepare dataset
dataset = np.arange(1, SERIES_LENGTH+1, 1).reshape(SERIES_LENGTH, 1).astype(np.float)

# Transform
scaler = preprocessing.MinMaxScaler()
dataset = scaler.fit_transform(dataset)

# Split dataset into train and test subsets
train_dataset = dataset[0:int(len(dataset)*0.8), :]
test_dataset =  dataset[len(train_dataset):len(dataset), :]

次に、それぞれのデータを入力値と出力値に分けます。
例えば、1つ前を参照するとすると、

X = 1, 2, 3, 4, 5, 6, 7, 8 , 9, 10, ...
Y = 2, 3, 4, 5, 6, 7, 8 , 9, 10, 11, ...

3つ前までだと、

X = [1, 2, 3], [2, 3, 4], [3, 4, 5], ...
Y = 4, 5, 6, ...

のような感じになります。

コードにすると、このような感じです。
look_back の値で、どのくらい前までを参照するかを決めます。

look_back = 1
x, y = [], []
for i in range(len(dataset) - look_back):
    a = i + look_back
    x.append(dataset[i:a, 0])
    y.append(dataset[a, 0])

予測

あとは、予測部分。
(実際のコードは一番下の全文を参考ください)

from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM

model = Sequential()
model.add(LSTM(128, input_shape=(1, look_back)))
model.add(Dense(1))
model.add(Activation('linear'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(x, y, batch_size=2, epochs=10, verbose=1)

Keras 簡単すぎます。
(活性化関数のlinearは線形分類なので、なくてもいいのではないかと思います。あっても意味はない?)

あとは、図に描画できるように整形してやります。

予測結果

Epoch 1/10
799/799 [==============================] - 2s - loss: 0.0128
Epoch 2/10
799/799 [==============================] - 1s - loss: 2.5285e-05
Epoch 3/10
799/799 [==============================] - 1s - loss: 1.6923e-05
Epoch 4/10
799/799 [==============================] - 1s - loss: 8.7859e-06
Epoch 5/10
799/799 [==============================] - 1s - loss: 3.0853e-06
Epoch 6/10
799/799 [==============================] - 1s - loss: 6.9152e-07
Epoch 7/10
799/799 [==============================] - 1s - loss: 4.0892e-08
Epoch 8/10
799/799 [==============================] - 1s - loss: 2.6624e-08
Epoch 9/10
799/799 [==============================] - 1s - loss: 1.1367e-07
Epoch 10/10
799/799 [==============================] - 1s - loss: 2.4738e-06
_________________________________________________________________
Layer (type)                 Output Shape              Param #
=================================================================
lstm_1 (LSTM)                (None, 128)               66560
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 129
_________________________________________________________________
activation_1 (Activation)    (None, 1)                 0
=================================================================
Total params: 66,689
Trainable params: 66,689
Non-trainable params: 0
_________________________________________________________________
None

Train Score: 1.491 RMSE
Test  Score: 0.979 RMSE

Next prediction: 1000.09


Time: 17.5sec

遠目から見ると一致しているように見えますが、拡大すると少しずれています。
予測自体は、しっかりと右肩上がりで予測はできています。
赤線は、最新のデータを使った次の予測になります。

f:id:suprsonicjetboy:20170904170418p:plain f:id:suprsonicjetboy:20170904170423p:plain

参考

Time Series Prediction with LSTM Recurrent Neural Networks in Python with Keras - Machine Learning Mastery

コード

import urllib, urllib.request
import numpy as np
import matplotlib.pyplot as plt
import math
import time
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
from sklearn import preprocessing
from sklearn.metrics import mean_squared_error


class PredictionLSTM :

    def __init__(self):
        self.look_back = 3
        self.units = 128
        self.epochs = 10
        self.batch_size = 1


    def create_dataset(self, dataset, look_back=1):
        x, y = [], []
        for i in range(len(dataset) - look_back):
            a = i + look_back
            x.append(dataset[i:a, 0])
            y.append(dataset[a, 0])

        return np.array(x), np.array(y)


    def create_model(self):
        model = Sequential()
        model.add(LSTM(self.units, input_shape=(1, self.look_back)))
        model.add(Dense(1))
        model.add(Activation('linear'))
        model.compile(loss='mean_squared_error', optimizer='adam')

        return model


    def train(self, x, y):
        model = self.create_model()
        model.fit(x, y, batch_size=self.batch_size, epochs=self.epochs, verbose=1)

        return model


if __name__ == "__main__":
    START_TIME = time.time()
    SERIES_LENGTH = 1000

    # Prepare dataset
    dataset = np.arange(1, SERIES_LENGTH+1, 1).reshape(SERIES_LENGTH, 1).astype(np.float)

    # Transform
    scaler = preprocessing.MinMaxScaler()
    dataset = scaler.fit_transform(dataset)

    # Split dataset into train and test subsets
    train_dataset = dataset[0:int(len(dataset)*0.8), :]
    test_dataset =  dataset[len(train_dataset):len(dataset), :]



    # LSTM
    prediction_ltsm = PredictionLSTM()

    # Create train dataset
    train_x, train_y = prediction_ltsm.create_dataset(train_dataset, prediction_ltsm.look_back)

    train_x = np.reshape(train_x, (train_x.shape[0], 1, train_x.shape[1]))
    # Create test dataset
    test_x, test_y  = prediction_ltsm.create_dataset(test_dataset, prediction_ltsm.look_back)
    test_x = np.reshape(test_x, (test_x.shape[0], 1, test_x.shape[1]))


    # Create and fit the LSTM network
    model = prediction_ltsm.train(train_x, train_y)
    print(model.summary())


    # Predict train dataset
    train_prediction = model.predict(train_x)
    train_prediction = scaler.inverse_transform(train_prediction)
    train_y = scaler.inverse_transform([train_y])

    # Predict test dataset
    test_prediction = model.predict(test_x)
    test_prediction = scaler.inverse_transform(test_prediction)
    test_y = scaler.inverse_transform([test_y])


    # Calculate RMSE(Root Mean Squared Error)
    train_score = math.sqrt(mean_squared_error(train_y[0], train_prediction[:, 0]))
    test_score = math.sqrt(mean_squared_error(test_y[0], test_prediction[:, 0]))
    print("\nTrain Score: {0:.3f} RMSE".format(train_score))
    print("Test  Score: {0:.3f} RMSE".format(test_score))


    # Predict the next value using the latest data
    latest_x = np.array([test_dataset[-prediction_ltsm.look_back:]])
    latest_x = np.reshape(latest_x, (latest_x.shape[0], 1, latest_x.shape[1]))
    next_prediction = model.predict(latest_x)
    next_prediction = scaler.inverse_transform(next_prediction)
    print("\nNext prediction: {0:.2f}".format(list(next_prediction)[0][0]), "\n"*2)

    print("Time: {0:.1f}sec".format(time.time() - START_TIME))


    # Draw a figure
    placeholder = np.append(dataset, np.zeros((1, dataset.shape[1])), axis=0)
    placeholder[:, :] = np.nan

    correct_dataset_plt = scaler.inverse_transform(dataset)

    train_plt = np.copy(placeholder)
    train_plt[prediction_ltsm.look_back:len(train_prediction)+prediction_ltsm.look_back, :] = train_prediction

    test_plt = np.copy(placeholder)
    test_plt[len(train_prediction)+(prediction_ltsm.look_back*2):len(dataset), :] = test_prediction

    nest_plt = np.copy(placeholder)
    nest_plt[len(placeholder)-2:len(placeholder), :] = np.append(test_prediction[-1], next_prediction.reshape(1)).reshape(2, 1)

    plt.plot(correct_dataset_plt, label='Correct prediction')
    plt.plot(train_plt, label='Train')
    plt.plot(test_plt, label='Test')
    plt.plot(nest_plt, label='Next prediction', c='r')
    plt.legend() 
    plt.show()
    

github.com