2017年8月31日 星期四

SH專區 - Dynamic Time Warping(動態時間規劃) 1


在接下來的一系列文章裡,我們希望能討論一個演算法:Dynamic Time Warping (動態時間規劃)以及其在金融交易中可能的應用

這一系列的文章,都是我們的自學筆記,所以如果有理解錯誤或是疏漏的地方,請大家不吝告訴我們。



好了,進入主題吧。第一篇文章中,我們先簡單介紹一下DTW算法。

  • 什麼是Dynamic Time Warping?
Dynamic Time Warping (動態時間規劃)本質上是一個 dynamic optimization 的問題,所尋找的是兩個序列之間的「最近距離」。

距離,不就只是 這樣?
是的,這就是我們一般常用的Euclidean distance。那麼,我們為什麼還需要DTW?




關鍵在於warping這個字,在很多實務上的使用中,並不是每一個序列中的每一個元素,都排列得整整齊齊的,很多時候我們拿到的資料都參雜著雜訊或是不完整,Euclidean distance,在很多時候並不是一個好的測量值。這時候我們就需要DTW,去處理這些在時間上有所扭曲的序列。


也正因為這種適合比較兩個扭曲序列的特性,DTW在早前常被用於語音識別之上,來比對語音是否相似。在參考資料 [1] 中,作者舉了一個很生動簡單的例子,大家可以參考。
另外一點,需要提醒大家的,DTW是一個相似性的「測量值」(measure),而不是「度量值」(metric)。


  • 怎麼計算Dynamic Time Warping?
最基礎的DTW計算其實相當直覺,就是建構一個「點對點」的distance matrix,然後由起點一步一步的向終點邁進。在前進的時候,只有一個條件:挑選最短路徑。
  1. Distance Matrix
先讓我們了解如何建立一個點對點的distance matrix。假設我們有兩個序列,第一個是abc、第二個是ABC,那麼這個點對點的序列就是:
python code 1:

def distance_matrix(arr1, arr2):
    """
    In this function, we compute the Euclidean distance matrix between two sequences.

    We use two for loops to fill distance values into the cells for illustration reason.

    For a more compact form, simply use
    distance = [ [ np.sqrt( (arr1[i] - arr2[j])**2 ) for i in np.arange(len(arr1))] for j in np.arange(len(arr2))]

    """

    # construct an empty array
    distance = np.ndarray(shape = (len(arr1), len(arr2)))

    for row in np.arange(len(arr1)):
        for col  in np.arange(len(arr2)):
            distance[row,col] = np.sqrt( (arr1[row] - arr2[col])**2 )

    return distance

如果我們輸入兩個序列分別為 arr1 = [13,  2,  2,  6, 17] 和 arr2 =[19, 10,  1,  0, 17],他們所對應的Distance_Matrix為:



  
  1. DTW

前面提到過DTW是一個找尋最短路徑的演算法,假設我們有兩個序列 Q (query) 和T (template),我們希望從上圖的左上角當作起點,一步一步的向右下角的終點前進。在這前進的過程中,基本的規則只有一個:只能前進(也就是只能 ↓、→和↘)。為了保證了我們能夠以最短的距離從起點走到終點,我們的路徑必須要滿足每一次前進之前,我們都要處在離原點最近的點之上,如下圖 [註1,註2]。
圖片來源 Ref: [2]
這個要求,也就是這個recursive equation:

在上面這個方程式中,gamma(i,j)代表了在第 i 行、第 j 列的累計距離(請參考上圖),dist(qi, tj) 則代表了Q 序列第 i 個點和T序列的第 j 個點之間的距離。比如說 Q = arr1 = [13,2,2,6,17],T = arr2 = [19,10,1,0,17],那麼 dist(q3, t2) = dist(6,1) = 5 (也就是上面Distance_Matrix中第3行第2列的值 [註3])。


python code 2:
def dtw_basic(arr1, arr2, alignment_curve = False, gen_plot = True):
    """
    This function shows a basic DTW calculation with two multi-dimensional arrays.

    Notice that we modified the outputs a litte, it outputs (dtw_distance, dtw_matrix) now.
    """

    # initialize the dtw array
    dtw = np.zeros(shape = (len(arr1), len(arr2)))

    for row in range(dtw.shape[0]):
        for col in range(dtw.shape[1]):
            # calculate distance between arr1[row] and arr2[col]
            # here we use Euclidean distance
            dist = np.sqrt( np.sum( (arr1[row] - arr2[col])**2 ) )

            # the starting point
            if row == 0 and col == 0:
                dtw[row, col] = dist
            # we can only go right along the upmost row
            elif row == 0:
                dtw[row, col] = dist + dtw[row, col - 1]
            # we can only go down along the leftmost column
            elif col == 0:
                dtw[row, col] = dist + dtw[row-1, col]
            # the recursive relation
            else:
                dtw[row, col] = dist + min(dtw[row-1, col], dtw[row-1, col-1], dtw[row, col-1])

    # alignment curve
    if alignment_curve:

        row = 0
        col = 0
        alignment = [ [row, col] ]

        while row != dtw.shape[0] - 1 or col != dtw.shape[1] - 1:
            if row == dtw.shape[0] - 1:
                col += 1
            elif col == dtw.shape[1] - 1:
                row += 1
            else:
                idx = np.argsort( [ dtw[row+1, col], dtw[row+1, col+1], dtw[row, col+1] ]  )[0]
                if idx == 0:
                    row += 1
                elif  idx == 1:
                    row += 1
                    col += 1
                else:
                    col += 1
            alignment.append([row, col])

        alignment = np.array(map(np.array, alignment))

    if gen_plot:
        # plotting
        fig = plt.figure(figsize = (5,5))
        plt.imshow( dtw )
        plt.xlim(-0.1, dtw.shape[1] - 1)
        plt.ylim(dtw.shape[0] - 1, -0.1)
        plt.title("Basic Dynamic Time Warping Matrix Heat Map")
        if alignment_curve:
            plt.plot( alignment[:,1], alignment[:,0], linewidth = 3, color = 'white', label = 'alignment curve')
            plt.legend(loc = 'best')
        plt.show()

    if alignment_curve:
        return dtw, alignment
    else:
        return dtw

而所得到的DTW array 如下:


所以,最後我們所得到的,DTW measure就是17。在這邊大家可以比較一下在上一個小節中算出來的Distance_Matrix,其中的對角線,就是兩個序列一一對應的距離,而對角線的總和,就是這兩條序列的Euclidean distance.

而DTW的路徑則是:
從程式碼中,大家已經可以看到DTW的缺點了(原始版本的),計算DTW需要的空間和時間都是 O(n2) [註4]


目前的例子中大家可能還沒有特別的感覺,但是當每個序列個別都有1000個點,總共有上萬個序列要交叉比對的時候,這個複雜度的問題就會變得很重叫。好消息是,經過歷年來的研究,現在我們已經可以用很多種方式替DTW的計算加速,這就讓我們留到下次再說。



備註:
  1. 這邊可以必須要提醒一下,這只是最基本的DTW path pattern,後續的研究中,許多的學者還提出了許多各式各樣的路徑模式,或是讓起點和終點不是固定的變形。但是整體基本的原則是一樣的,在這邊就不探討了,有興趣的人可以參考 [2] 和其中的參考資料。
  2. 如上圖所示,DTW是「可以」處理兩個不同長度的序列的。但是為了簡單起見,我們只討論相同長度的序列。
  3. Python 是以 0 為起點的程式語言。
  4. 我們跑了兩個for loop,loop 從 0 ~ n-1。所以花的時間是 n*n。在這之中,我們填滿了 n*n 個空格,所以空間的複雜度也是nxn。
參考資料:
  1. 動態時間歸整 | DTW | Dynamic Time Warping, McKelvin's Blog
  2. Similarity Measures and Dimensionality Reduction Techniques for Time Series Data Mining, Carmelo Cassisi, Placido Montalto, Marco Aliotta, Andrea Cannata and Alfredo Pulvirenti 

Evernote helps you remember everything and get organized effortlessly. Download Evernote.

沒有留言:

張貼留言