一個基於 AWS Elasticsearch 的用戶行為 log 系統建立 ( 加強版 )

在之前筆者的這篇文章中:

一個基於 AWS Elasticsearch 的用戶行為 log 系統建立

在們學習了如何使用 AWS 的相關工具來建立一個用戶行為的 LOG 分析系統。

但是這篇文章中所提到的架構有個問題。

這個版本有什麼問題呢 ?

那就是在某些情況下它會一直噴以下錯誤 :

ServiceUnavailableException: Slow down.

那為什麼會一直噴 Slow down 呢 ?

會發生這個的原因在於,我們有採到 aws firehose 的限制,如下: Amazon Kinesis Data Firehose 有以下限制。

如果将 Direct PUT 配置为数据源,每个 Kinesis Data Firehose 传输流 都会受以下限制的约束:

* 对于 美国东部(弗吉尼亚北部)、美国西部(俄勒冈) 和 欧洲(爱尔兰):5,000 条记录/秒;2,000 个事务/秒;5 MB/秒。
* 对于 欧洲 (巴黎)、亚太地区(孟买)、美国东部(俄亥俄州)、欧洲(法兰克福)、南美洲(圣保罗)、亚太区域(首尔)、欧洲 (伦敦)、亚太区域(东京)、美国西部(加利福尼亚北部)、亚太区域(新加坡)、亚太区域(悉尼) 和 加拿大 (中部):1000 条记录/秒;1000 个事务/秒;1 MB/秒。


! 注意
当 Kinesis Data Streams 配置为数据源时,此限制不适用,Kinesis Data Firehose 可无限扩展和缩小。

來源 : 官網

加強版

原本的版本如下圖。

然後我們會將它修改成如下圖,就是在資料源與 firehose 之間多增加了 data stream。

使用 AWS data stream 有以下幾個好處 :

  • 可以自由的調整傳輸限制。(這樣就可以解決上述的問題)
  • 未來如果有其它單位想要接受這個資料源,那只要請對方接上這個 data stream,它就可以受到資料了。

AWS Kinesis Data Stream 申請

事實上就只有兩個東西要填寫Stream NameShard Number

其中這裡簡單的說一下 Shard 概念。

Stream Shard (碎片)

在 AWS kinesis data stream 中有個 shard 的概念,它就是指 stream 的子集合。

每條 stream 都是由 1 至 n 個 shard 所組合成,這樣有幾個好處 :

  • 在傳輸資料給 stream 時,可以將傳輸量平均的分散給不同 shard,這樣可以避免觸碰到每個 shard 的傳輸限制。
  • 你可以指定那一些類型的資料傳輸到 A Shard,那些類型的資料傳輸到 B Shard,這樣有助於你放便管理資料流。

Shard 的限制

上面有提到每個 stream 都有傳輸限制,這裡我們就來看一下它的限制有那些。

以下從 Aws 官網擷取 :

  • 單一碎片每秒可擷取多達 1 MiB 的資料 (包括分割區索引鍵) 或每秒寫入 1,000 筆記錄。同樣地,如果您將串流擴展到 5,000 個碎片,串流每秒即可擷取多達 5 GiB 或每秒 500 萬筆記錄。若您需要更多的擷取容量,可以使用 AWS Management Console 或 UpdateShardCount API 輕鬆擴展串流中的碎片數目。
  • GetRecords 每次呼叫可從單一碎片擷取最多 10 MiB 的資料,每次呼叫最多 10,000 筆記錄。每呼叫一次 GetRecords 即計為一筆讀取交易。
  • 每個碎片每秒可支援最多 5 筆讀取交易。每筆讀取交易可提供多達 10,000 筆記錄,每筆交易的上限為 10 MiB。
  • 每個碎片透過 GetRecords 每秒可支援最多 2 MiB 的總資料讀取速率。如果呼叫 GetRecords 傳回 10 MiB,在接下來的 5 秒內發出的後續呼叫將擲回例外狀況。

如何將資料傳輸到指定的 Shard

下面為一段 nodejs 寫入資料到 stream 的範例碼,其中注意到PartitionKey這個東東,它就是可以幫助你指定到想要的 Shard。

'use strict';

const AWS = require('aws-sdk');
const streamName = process.env['AWS_KINESIS_STREAM'];
const uuidv1 = require('uuid/v1');

const kinesis = new AWS.Kinesis({region: process.env['AWS_KINESIS_REGION']});

module.exports = {
  putRecord: (packet) => {
    return new Promise((resolve, reject) => {
    // 多加換行符號是因為這樣才能在 aws athena 進行解析
      const recordParams = {
        Data: JSON.stringify(packet) + '\n',
        StreamName: streamName,
        PartitionKey: uuidv1()
      };

      kinesis.putRecord(recordParams, (err) => {
        if (err) {
          reject(err);
        }
        resolve();
      });
    });
  }
};

PartitionKey基本上就是用來讓 AWS kinesis 來決定你要去那一個 Shard。

假設你的文件 A 傳輸時 PartitionKey 設為 GroupA 這個文字,那它就會跑到某個 Shard A 去,如果這時再傳輸個文件 B 並且 PartitionKey 也設定為 GroupA,那這一份文件也會傳輸到 Shard A。

所以當你想將同一類型的文件,都傳輸到同一個 Shard 時,記得將 PartitionKey 設為相同。

但如果是想將它平均分散到每一個 Shard 呢 ?

事實上有兩個方法,首先第一種方法就是每一丟資料時,先去抓這個 stream 看它有幾個 shards,然後再根據它的數量,來隨機產生個數字,例如有 4 個 shards 那你每次丟資料時,就從 1 ~ 4 隨機產生一個數字,然後再將它設到 PartitionKey 中,那這樣基本上就會平均分配。

而另一種方法就是每一次的 PartitionKey 都使用 uid 來設定,這樣也可以將他平均的進行分配。

不過我是比較建議用第二種,因為第一種每一次都要去 AWS 那抓取 stream 裡的 shards 大小,這樣太耗時間了。

參考資料

comments powered by Disqus