SW/주가 예측

주가, 비트코인 예측 : 머신러닝 모델 : 구현, 개발, 코드, 방법

얇은생각 2019. 7. 10. 07:30
반응형

주가, 비트코인 예측 : 머신러닝 모델 : 구현, 개발, 코드, 방법



머신러닝 모델 구현


파이썬과 Scikit-learn 라이브러리를 이용해 주가방향을 예측할 수 있는 간단한 주가 방향 예측변수를 구현합니다.


핵심 아이디어는 전날의 종가 또는 거래량 데이터를 이용해 다음날의 주가 방향을 예측하는 것입니다. 사용자는 입력 변수로 종가나 거래량 중 하나를 사용할 수 있습니다. 또는 2개를 모두 사용할 수 있습니다.


또한, 하루 전의 데이터를 기준으로 예측할지, 며칠 전의 데이터를 기준으로 예측할지 선택할 수 있습니다.




데이터셋

하나의 데이터 셋을 만들고, 만들어진 데이터 셋을 일정 비율로 나누어 학습과 테스트에 사용합니다. 학습용 데이터셋은 주가방향 예측변수를 학습시키기 위해 사용하는 것으로, 이 데이터를 이용해 각 주가방향 예측변수 모델을 완성합니다. 테스트용 데이터셋은 학습용 데이터 셋과 겹치지 않아야 합니다. 


데이터 셋을 만드는 함수는 다음과 같습니다. 



def make_dataset(df, time_lags=5):
df_lag = pd.DataFrame(index=df.index)
df_lag["Close"] = df["Close"]
df_lag["Volume"] = df["Volume"]

df_lag["Close_Lag%s" % str(time_lags)] = df["Close"].shift(time_lags)
df_lag["Close_Lag%s_Change" % str(time_lags)] = df_lag["Close_Lag%s" % str(time_lags)].pct_change()*100.0

df_lag["Volume_Lag%s" % str(time_lags)] = df["Volume"].shift(time_lags)
df_lag["Volume_Lag%s_Change" % str(time_lags)] = df_lag["Volume_Lag%s" % str(time_lags)].pct_change()*100.0

df_lag["Close_Direction"] = np.sign(df_lag["Close_Lag%s_Change" % str(time_lags)])
df_lag["Volume_Direction"] = np.sign(df_lag["Volume_Lag%s_Change" % str(time_lags)])

return df_lag.dropna(how='any')


인자로 전달된 DataFrame의 데이터를 바탕으로 학습과 테스트에 사용할 DataFrame을 만들어 돌려줍니다. 


분류는 미래의 주가 방향을 예측하기 위해 과거의 데이터를 이용합니다. time_lags 인자는 현재일 기준으로 며칠 전의 데이터를 이용할 것인가를 정의합니다. 


pct_change() 함수는 pandas에서 제공해주는 함수입니다. 주어진 데이터의 변화를 퍼센트로 계산하는 함수입니다. 주가방향을 예측하는 분류는 지도학습입니다. 따라서 입력 변수와 출력변수를 매치해야 합니다. Close_Direction은 주가의 방향을 의미합니다. Volume_Direction은 거래량의 방향을 나타냅니다. 




데이터셋 나누기

make_dataset() 함수를 이용해 주가 방향 예측 변수에 사용할 데이터 셋을 만듭니다. 그 후, 이를 학습 데이터 셋과 테스트 데이터 셋으로 나누어야 합니다. split_dataset() 함수는 일정한 비율로 데이터 셋을 나누어 줍니다. 



def split_dataset(df,input_column_array,output_column,spllit_ratio):
split_date = get_date_by_percent(df.index[0],df.index[df.shape[0]-1],spllit_ratio)

input_data = df[input_column_array]
output_data = df[output_column]

# Create training and test sets
X_train = input_data[input_data.index < split_date]
X_test = input_data[input_data.index >= split_date]
Y_train = output_data[output_data.index < split_date]
Y_test = output_data[output_data.index >= split_date]

return X_train,X_test,Y_train,Y_test

def get_date_by_percent(start_date,end_date,percent):
days = (end_date - start_date).days
target_days = np.trunc(days * percent)
target_date = start_date + datetime.timedelta(days=target_days)
#print days, target_days,target_date
return target_date


input_column_array는 입력변수로 사용할 DataFrame의 칼럼 이름을 배열 형태로 전달하는 인수입니다. output_column은 출력변수로 사용할 칼럼 이름입니다. split_ratio는 학습과 테스트로 나눌 비율을 지정합니다. 


get_date_by_percent() 함수는 주어진 데이터셋의 시작일과 마감일의 날수를 계산합니다. 그 후, 사용자가 지정한 split_ratio에 따라 계산해 날 수를 돌려주는 함수입니다. 




주가 방향 예측 변수 구현

Scikit-leran은 로지스틱 회귀, 랜덤 포레스트, SVM 등의 머신러닝 알고리즘과 상관 없이 동일한 방법으로 예측 프로그램을 만들 수 있습니다. 준비한 데이터 셋을 활용해 예측 프로그램을 구현해 보겠습니다. 각 함수는 예측 프로그램을 생성하고 학습시킨 후에 반환합니다. 동일한 형태로 상당히 직관적이고 단순하다는 것을 확인할 수 있습니다.


def do_logistic_regression(x_train,y_train):
classifier = LogisticRegression()
classifier.fit(x_train, y_train)
return classifier


def do_random_forest(x_train,y_train):
classifier = RandomForestClassifier()
classifier.fit(x_train, y_train)
return classifier


def do_svm(x_train,y_train):
classifier = SVC()
classifier.fit(x_train, y_train)
return classifier




주가방향 예측 변수 실행 및 평가


# -*- coding: utf-8 -*-
from __future__ import division

import os,sys,datetime
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import pprint
import statsmodels.tsa.stattools as ts
import pandas as pd
pd.core.common.is_list_like = pd.api.types.is_list_like
import pandas_datareader.data as web
import datetime
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import sklearn.discriminant_analysis
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.metrics import confusion_matrix
from sklearn.svm import LinearSVC, SVC
from pandas.plotting import scatter_matrix
from pandas.plotting import scatter_matrix, autocorrelation_plot
from pandas.compat import range, lrange, lmap, map, zip


def get_data_file_path(file_name):
    full_file_name = "%s/data/%s" % (os.path.dirname(os.path.abspath(__file__)),file_name)
    #print full_file_name
    return full_file_name

def save_stock_data(df,file_name):
    new_file_name = get_data_file_path(file_name)
    df.to_pickle(new_file_name)

def download_stock_data(file_name,company_code,year1,month1,date1,year2,month2,date2):
    start = datetime.datetime(year1, month1, date1)
    end = datetime.datetime(year2, month2, date2)
    df = web.DataReader("%s.KS" % (company_code), "yahoo", start, end)
    save_stock_data(df,file_name)

    return df

def load_stock_data(file_name):
df = pd.read_pickle(file_name)
return df

parentPath = os.path.abspath("..")
if parentPath not in sys.path:
sys.path.insert(0, parentPath)

def make_dataset(df, time_lags=5):
df_lag = pd.DataFrame(index=df.index)
df_lag["Close"] = df["Close"]
df_lag["Volume"] = df["Volume"]

df_lag["Close_Lag%s" % str(time_lags)] = df["Close"].shift(time_lags)
df_lag["Close_Lag%s_Change" % str(time_lags)] = \
df_lag["Close_Lag%s" % str(time_lags)].pct_change()*100.0

df_lag["Volume_Lag%s" % str(time_lags)] = df["Volume"].shift(time_lags)
df_lag["Volume_Lag%s_Change" % str(time_lags)] = \
df_lag["Volume_Lag%s" % str(time_lags)].pct_change()*100.0

df_lag["Close_Direction"] = np.sign(df_lag["Close_Lag%s_Change" % str(time_lags)])
df_lag["Volume_Direction"] = np.sign(df_lag["Volume_Lag%s_Change" % str(time_lags)])

return df_lag.dropna(how='any')


def split_dataset(df,input_column_array,output_column,spllit_ratio):
split_date = get_date_by_percent(df.index[0],df.index[df.shape[0]-1],spllit_ratio)

input_data = df[input_column_array]
output_data = df[output_column]

# Create training and test sets
X_train = input_data[input_data.index < split_date]
X_test = input_data[input_data.index >= split_date]
Y_train = output_data[output_data.index < split_date]
Y_test = output_data[output_data.index >= split_date]

return X_train,X_test,Y_train,Y_test


def get_date_by_percent(start_date,end_date,percent):
days = (end_date - start_date).days
target_days = np.trunc(days * percent)
target_date = start_date + datetime.timedelta(days=target_days)
#print days, target_days,target_date
return target_date


def do_logistic_regression(x_train,y_train):
classifier = LogisticRegression()
classifier.fit(x_train, y_train)
return classifier


def do_random_forest(x_train,y_train):
classifier = RandomForestClassifier()
classifier.fit(x_train, y_train)
return classifier


def do_svm(x_train,y_train):
classifier = SVC()
classifier.fit(x_train, y_train)
return classifier


def test_predictor(classifier,x_test,y_test):
pred = classifier.predict(x_test)

hit_count = 0
total_count = len(y_test)
for index in range(total_count):
if (pred[index]) == (y_test[index]):
hit_count = hit_count + 1

hit_ratio = hit_count/total_count
score = classifier.score(x_test, y_test)
#print "hit_count=%s, total=%s, hit_ratio = %s" % (hit_count,total_count,hit_ratio)

return hit_ratio, score
# Output the hit-rate and the confusion matrix for each model

#print("%s\n" % confusion_matrix(pred, y_test))



if __name__ == "__main__":
# Calculate and output the CADF test on the residuals

download_stock_data('mobis.data', '012330', 2018, 12, 1, 2018, 12, 15)
download_stock_data('mando.data', '204320', 2018, 12, 1, 2018, 12, 15)

avg_hit_ratio = 0
for time_lags in range(1,6):
print("- Time Lags=%s" % (time_lags))

for company in ['mobis','mando']:
df_company = load_stock_data('%s.data'%(company))

df_dataset = make_dataset(df_company,time_lags)
X_train,X_test,Y_train,Y_test = \
split_dataset( \
df_dataset, \
["Close_Lag%s"%(time_lags), "Volume_Lag%s"%(time_lags)], \
"Close_Direction",0.75)
#print X_test

lr_classifier = do_logistic_regression(X_train,Y_train)
lr_hit_ratio, lr_score = test_predictor(lr_classifier,X_test,Y_test)

rf_classifier = do_random_forest(X_train,Y_train)
rf_hit_ratio, rf_score = test_predictor(rf_classifier,X_test,Y_test)

svm_classifier = do_svm(X_train,Y_train)
svm_hit_ratio, svm_score = test_predictor(rf_classifier,X_test,Y_test)

print("%s : Hit Ratio - Logistic Regreesion=%0.2f, \
RandomForest=%0.2f, SVM=%0.2f" % ( \
company, lr_hit_ratio, rf_hit_ratio, svm_hit_ratio))


만도와 모비스 주가를 입력 변수로 산정합니다. time_lags를 변화시켜 예측치의 적중률을 출력해봅니다.


DEBUG:root:- Time Lags=1

DEBUG:root:mobis : Hit Ratio - Logistic Regreesion=0.48, RandomForest=0.52, SVM=0.52

DEBUG:root:mando : Hit Ratio - Logistic Regreesion=0.50, RandomForest=0.48, SVM=0.48

DEBUG:root:- Time Lags=2

DEBUG:root:mobis : Hit Ratio - Logistic Regreesion=0.47, RandomForest=0.45, SVM=0.45

DEBUG:root:mando : Hit Ratio - Logistic Regreesion=0.50, RandomForest=0.47, SVM=0.47

DEBUG:root:- Time Lags=3

DEBUG:root:mobis : Hit Ratio - Logistic Regreesion=0.47, RandomForest=0.55, SVM=0.55

DEBUG:root:mando : Hit Ratio - Logistic Regreesion=0.48, RandomForest=0.47, SVM=0.47

DEBUG:root:- Time Lags=4

DEBUG:root:mobis : Hit Ratio - Logistic Regreesion=0.47, RandomForest=0.47, SVM=0.47

DEBUG:root:mando : Hit Ratio - Logistic Regreesion=0.48, RandomForest=0.45, SVM=0.45

DEBUG:root:- Time Lags=5

DEBUG:root:mobis : Hit Ratio - Logistic Regreesion=0.44, RandomForest=0.50, SVM=0.50

DEBUG:root:mando : Hit Ratio - Logistic Regreesion=0.48, RandomForest=0.45, SVM=0.45


입력 변수에 주가와 거래량, 2개의 입력 변수를 넣고 실행해봅니다.

반응형