GIL's LAB

실험 5. 주가가 상승하기 전의 시계열 패턴 찾기 (feat. 쉐이플릿) 본문

퀀트 투자/실험 일지

실험 5. 주가가 상승하기 전의 시계열 패턴 찾기 (feat. 쉐이플릿)

GIL~ 2021. 9. 14. 22:30

개요 

이번 실험에서는 주가가 상승하기 전에 보이는 새로운 시계열 패턴을 찾아보고자 한다. 

그러니까 대표적인 주가 상승 패턴 중 하나인 역헤드엔숄더 패턴처럼, 주가가 상승하기 전에 보이는 저런 패턴들을 찾는 것이 이번 실험의 목표이다. 

역헤드엔숄더 패턴: 이 패턴이 나오면 주가가 오른다더라 (...진짜 그런진 추후에 확인해보자!)

즉, 주가가 크게 오르기 전의 주가 데이터와 그렇지 않은 주가 데이터를 가지고, 주가가 크게 오르기 전의 주가 데이터에서만 주로 발생하는 패턴을 찾으면 되는데, 이 패턴은 결국 시계열 분류에서 사용되는 쉐이플릿 개념과 동일하다. 쉐이플릿에 대한 설명은 여기를 참고하기 바란다. 

 

실험 내용

실험 내용은 심플하다. 주가가 크게 오르기 전의 주가 데이터와 그렇지 않은 주가 데이터로 주가 데이터를 분할한 다음에, 쉐이플릿을 찾으면 된다. 말로 쓰니 내용이 어려우니, 코드를 보며 이해해보도록 하자.

 

먼저 실험에 필요한 모듈들을 불러온다. 

아나콘다를 사용한다면 대부분의 모듈은 설치되어 있겠지만, tslearn은 설치되어 있지 않을 가능성이 크다.

tslearn은 아나콘다 프롬프트에서 pip install tslearn이라는 명령어를 사용하여 설치할 수 있고, 우리가 사용할 LearningShapelets 함수를 사용하려면 tensorflow가 설치되어 있어야 한다.

import pandas as pd
import numpy as np
from tslearn.shapelets import LearningShapelets
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier as DTC
from sklearn.metrics import f1_score
import warnings
warnings.filterwarnings("ignore")

그리고 데이터를 정규화하기 위한 함수를 작성한다. 다른 방법도 많지만, 직접 작성하자.

시계열을 정규화하는 이유는 종목별로 주가 차이가 커서 쉐이플릿이 오로지 스케일에만 의존할 수 있기 때문이다. 즉, 우리는 특정한 모양을 찾는 것이 목적이므로 정규화한다라고 생각하자. 

def normalize(data):
    m = data.mean()
    s = data.std()
    return (data-m) / s

이제 데이터를 두 개의 그룹으로 쪼개는 함수를 만들자. 첫 번째 그룹은 긍정 그룹으로 주가가 기준치 이상 오른 그룹이고, 두 번째 그룹은 부정 그룹으로 긍정 그룹에 속하지 못한 그룹이다. 여기서 각 그룹에는 부분 시계열이 들어간다. 

주석을 친절히 달았지만, 이 함수는 한 종목의 주가 데이터와 정규화된 주가 데이터를 받아서, n 영업일 이내에 기준치 이상으로 수익을 본 길이 L짜리 부분 시계열을 positive에 그렇지 않은 부분 시계열을 negative에 추가한다. 예를 들어서, L = 10, n = 30인데, 오늘을 기준으로 30 영업일 이내에 주가가 기준치 이상으로 올랐다면, 10 영업일 이전부터 오늘까지의 정규화된 주가 데이터가 positive에 추가되는 것이다.

def make_dataset(n_data, data, L, n, profit_threshold): 
    # n_data: 표준화된 시계열 데이터 (dtype: ndarray)
    # data: 원본 시계열 데이터
    # L: 부분 시계열 길이
    # n: 기다리는 시점
    # profit_ratio: 수익률 기준
    
    # positive: 마지막 시점을 기준으로 영업일 n일 이내에 profit_ratio 이상의 수익을 본 부분 시계열
    # negative: 마지막 시점을 기준으로 영업일 n일 이내에 profit_ratio 이상의 수익을 보지 않은 부분 시계열
    
    positive = []
    negative = []
    
    for d in range(L, len(data)-L-n, L):
        p = False
        for ni in range(n):
            profit_ratio = ((data[d+ni] - data[d]) / data[d]) * 100
            if profit_ratio >= profit_threshold:
                p = True
                break
        if p:
            positive.append(n_data[d-L:d].tolist())
        else:
            negative.append(n_data[d-L:d].tolist())
    
    return positive, negative

 

사실 여기까지만 따라왔으면 뒤에는 어렵지 않다. 

위에 있는 함수는 하나의 종목에 대해서만 시계열을 분리하기 때문에, 모든 종목에 대해서 위 함수를 적용해야 할 것이다. 따라서 데이터가 있는 경로를 설정하고, 부분 시계열이 들어올 리스트를 초기화해주자.

P_list = []
N_list = []

path = r"C:\Users\Gilseung\Desktop\Jupyter\GILLAB\QUANT_DATA\201609~202108\주가\KOSPI"

이제 저 path에 있는 데이터를 열어서 종가만 가져온 뒤, 데이터를 분할하여 P_list 혹은 N_list에 추가한다.

여기서 data가 없거나 데이터에 오류가 있었는지 모든 값이 같아서 정규화를 하면 nan으로 바뀌는 문제가 있어서, 두번의 continue 문을 통해 문제가 있으면 건드리지 않고 넘어갔다. 

그리고 L을 50, n을 20, 기준치를 5로 잡아서 데이터를 분할했다. 즉, 마지막 시점을 기준으로 20 영업일 이내에 5% 이상 상승한 50 영업일 길이의 주가 시계열을 P_list에, 그렇지 않은 시계열을 N_list에 추가했다.  

for file_name in os.listdir(path):
    data = pd.read_csv(path + "/" + file_name, encoding = "cp949")["종가"].values
    if len(data) == 0:
        continue

    n_data = normalize(data)
    if np.nan in n_data:
        continue

    P, N = make_dataset(n_data = n_data, data = data, L=50, n=20, profit_threshold = 5)
    if len(P) > 0:
        P_list += P
    if len(N) > 0:
        N_list += N

이제 P_list에서만 주로 등장하는 쉐이플릿을 찾을 차례이다. 쉐이플릿은 시계열 분류에 사용되는 개념이기에, 분류에 적절한 구조로 데이터를 바꿔주자. 

P_list와 N_list를 합쳐서 X를 만들어주고, P_list에 있는 부분 시계열에 1을 N_list에 있는 부분 시계열에 0이라는 라벨을 달아주기 위해 Y를 정의한다. 그리고 X를 데이터프레임으로, Y를 시리즈로 바꾸고, X에 결측치가 하나라도 있으면 X와 Y에서 모두 삭제해준다.

X = P_list + N_list
Y = [1] * len(P_list) + [0] * len(N_list)

X = pd.DataFrame(X)
Y = pd.Series(Y)

Y = Y[X.isnull().sum(axis = 1) == 0].values
X = X.loc[X.isnull().sum(axis = 1) == 0].values

자. 이제 쉐이플릿의 길이를 2부터 20까지 바꿔가면서 쉐이플릿을 찾아보자.

먼저 결과 정리에 사용할 리스트를 모두 초기화해주고, X와 Y를 학습 데이터와 평가 데이터로 분할한다.

그리고 LearningShapelets 함수를 사용하여 LS 인스턴스를 생성하고, 이 인스턴스를 Train_X와 Train_Y에 대해 피팅한다. 

여기서 {l:1}은 길이가 l인 쉐이플릿을 1개만 찾겠다는 뜻이고, fit을 함으로써 Train_X에서 길이가 l인 쉐이플릿을 찾기 시작한다. 피팅이 되면, transform을 이용하여 Train_X와 Test_X에 있는 타임시리즈와 쉐이플릿까지의 거리를 계산하여 각각 s_Train_X와 s_Test_X에 저장한다. 그리고 쉐이플릿과 관련된 정보를 저장하고, s_Train_X와 s_Test_X를 이용하여 트리 모델을 학습하여 평가하면 끝!

l_list = []
shapelet_list = []
score_list = []
positive_mean_dist_list = []
negative_mean_dist_list = []

Train_X, Test_X, Train_Y, Test_Y = train_test_split(X, Y, test_size = 0.3, random_state = 2021)
for l in range(2, 21):
    LS = LearningShapelets(n_shapelets_per_size = {l:1}, verbose = 0, max_iter = 10000).fit(Train_X, Train_Y)
    
    s_Train_X = np.array(LS.transform(Train_X))
    s_Test_X = np.array(LS.transform(Test_X))
    
    positive_mean_dist_list.append(s_Train_X[np.array(Train_Y) == 1].mean())
    negative_mean_dist_list.append(s_Train_X[np.array(Train_Y) == 0].mean())
    
    shapelet = LS.shapelets_[0].flatten()
    
    model = DTC().fit(s_Train_X, Train_Y)
    pred_Y = model.predict(s_Test_X)
    score = f1_score(Test_Y, pred_Y)
    
    l_list.append(l)
    shapelet_list.append(shapelet)
    score_list.append(score)
    print(l, score)

result = pd.DataFrame({"length":l_list, "shapelet":shapelet_list, "f1_score":score_list})

결과 해석

이제 결과를 살펴보자. 사실 결과가 어마어마했으면 블로그에 쓰지 않고, 특허나 논문을 준비했을 것이다ㅎㅎ.

raw 데이터는 아래에서 확인할 수 있다.

실험5_쉐이플릿_결과.csv
0.00MB

어쨌든 f1_score를 기준으론 0.4 정도의 수치가 나왔는데, 물론 아쉽긴하지만 예상보다는 나쁘지 않다. 

클래스 비율이 4:6정도인데, 하나의 특징만 사용했다는 점을 고려하면 더더욱 그렇다.

더 확인해보니, 정밀도, 재현율 모두 0.4에 근접했는데, 클래스 불균형에 영향을 받았기보다는 분류 자체가 좀 어려웠던 것으로 보인다.

L값, n값, 쉐이플릿 개수, 심지어는 쉐이플릿을 탐색하는 방법에 따라 결과가 크게 다를 것이라 예상되니, 더 찾아봐야 하나라는 고민이 들긴한다.

찾은 쉐이플릿은 아래와 같다.

길이 = 2
길이 6
길이 12

나만 저 결과가 재밌는지 모르겠다. y축의 스케일을 무시하고 보면, 모두 감소하는 패턴이다. 그러니까 계속 감소했다라면, 언젠가 버티면 다시는 복구한다라는 결론을 내릴 수 있다. 이 주장도 얼마나 근거가 있는지는 검증해봐야겠지만 말이다. 

Comments