Node之可擴展性 --- 訊息佇列 Message queue (ZeroMQ)
nodejs
Lastmod: 2019-12-15

上一章節中我們有提到rabbitmq,它是用來建立中介式架構broker,但這種架構有什麼問題呢 ? 那就是分散式架構的頭號公敵單點失效(single point of failure)。

所以後來就有人提出使用對等式架構來解決這個問題,這個架構就是會將broker給移除掉,每一個用戶端同時也是伺服器端,像比特幣這種應用就是用該結構來處理。

但相對的,它也有缺點,那就是要建置起來較為複雜,用在大規模的網路上,管理較難、安全性較低。

使用 ZEROMQ 進行對等式架構 (peer-to-peer)實作

zeromq它是一套網路通訊函式庫,記得他不是一個伺服器,而是一個lib,它在socket api之上做了一層封裝,將網路、通訊、進程等抽象化為統一的 API 接口,它和 socket 的區別是 :

  • socket : 點對對的關係(一對一)
  • zeromq : 多點對多點的關係(多對多)

那 zeromq 有什麼特點呢 ? 它有以下四個特點 :

  • 去中心化 (無 broker)
  • 強調訊息收發模式
  • 以統一的接口支持多種底層通信方式
  • 異步
  • 速度飛快 (請參考這篇比較)

不過有一點要注意一下,zeromq 它不是一個獨立的伺服器進程 (rabbitmq 是),所以嚴格來說它不是 mq ,它沒有 mq 這種在中間解耦合的能力,事實上他的名命也說了 zero mq 。

zeromq 主要提供三種類型的通訊模式分別如下 :

REQ (request) / REP (reply) 模式

這模式就是傳統的一個 reuest 配一個 response 的模式,非常的簡單。

下面這段程式碼是發送請求(request)的程式碼。

var zmq =  require('zeromq');
var requester = zmq.socket('req');

requester.on('message', function (reply) {
    console.log(`Received reply: ${reply.toString()}`)
})

console.log('Send msg');
requester.send('Hello Mark');


requester.connect('tcp://localhost:5555');

process.on('SIGINT', function () {
    requester.close();
})

然後下面這段程式碼為收到請求後,進行回傳的程式碼,這也可以理解成一個 server,它會一直等待 request 的 loop,然後針對每次的請求都進行回覆(Reply)。

var zmq = require('zeromq');

var responder = zmq.socket('rep');

responder.on('message', function (request) {
    console.log(`Received request : ${request.toString()}`);

    setTimeout(function () {
       responder.send("Ok ~ I Received your msg"); 
    },1000);
})

responder.bind('tcp://127.0.0.1:5555', function (err) {
    if(err){
        console.log(err);
    }else{
        console.log('Listening on 5555');
    }
})

process.on('SIGINT', function () {
    responder.close();
    
})

不過這邊有二點要注意,當你將 server 進行重啟時,client 不會自動的重新連上 server ,如果想要建立一個高可靠性的 server 請參考官網該篇文章,它說明的很詳細囉 ~

reliable-request-reply

而另外一點就是,不論先開啟 client 或 server 都沒關係,在傳統觀念上 server 就是要先開,然後 client 才連的上,但在這裡,它們的關係是節點對節點,也就是說沒有主或從的關係,只有誰發誰送的問題。

Pub / Sub 模式

它基本上是一種很常見的設計模式,像我們在使用jquery時的事件機制就很常看到它,如下 :

$(".test").on('click', function(){
    /do something...
})

上面的程式碼中,當頁面獨發了click事件後,就會發佈(pub)一個訊息,給有訂閱(sub)的使用者說,我獨發了 click 了喔,然後使用者在來處理獨發後的事情。

這種模式的優點就在於解耦合,發佈者無須預先知道訊息的接受者,則也使得這種模式很適合用在變化多端的分佈式架構中。

我們簡單的用一句話來說明 zeromq 的 pub/sub 模式,就是下面這句 :

當訊息透過 pub socket 傳送後,便會擴播至所有已連線的 sub socket

這種類型的模式,很適合用來處理股價報價,每個 subscriber 都會去和 publisher 訂閱事件,當有新個報價時,就會通知所有有訂閱報價的 subscriber。

接下來我們來開使實作程式碼。

首先我們下面這段程式碼是用來建立 zeromq 的publisher,也就是會將訊息從這邊發送出去給已連線的subscriber

// pub.js

var zmq = require('zeromq');
var pubSocket = zmq.socket('pub');

pubSocket.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
    pubSocket.send(['mar',new Date()]);
},1000);

而下面這段程式碼就是subscriber,它用來訂閱訊息來源,然後會使用on這監聽器,來收得 pub 過來的訊息。

// sub.js
var zmq = require('zeromq');

var subSocket = zmq.socket('sub');
var port = "3000";

subSocket.connect(`tcp://127.0.0.1:${port}`);
subSocket.subscribe('mark');
console.log(`Subscriber connected to port ${port}`);

subSocket.on('message', function(topic, message){
    console.log(topic.toString());
    console.log(message.toString());
})

我們可以看你心情來決定要先開啟publishersubscriber,zeromq 它有提供一個機制,他會自動重新連線,也就是說,當然二個都開啟後,如果將publisher關掉在重啟,你的subscriber還是可以繼續收到資料。

然後我們來執行程式碼看看。我們會開啟一個publisher和二個subscriber

node pub.js
node sub.js
node sub.js

然後我們應該是會看到如下的結果,兩個subscriber每隔十秒鐘會收到一次從publisher來的資料。

mark
Thu Jul 20 2017 17:18:58 GMT+0800 (CST)

mark
Thu Jul 20 2017 17:18:59 GMT+0800 (CST)

Push / Pull

這種模式又被稱為管道(pipe)模式,它是單向的,從 push 單向推送到 pull 端,這種模式和上面的pub/sub最模式最大的差別在於 :

push 傳送的一堆資料,會被平均分散至多個 pull 端,就像是 load balance的機制一樣。

以下的程式碼為 pull 端的建立。

// pull.js 

var zmq = require('zeromq');
var pullSocket = zmq.socket('pull');

pullSocket.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

pullSocket.on('message',function(msg){
    console.log(msg.toString());
})

而下面的程式碼為 push 端的建立。

// push.js
var zmq = require('zeromq');
var sockPush = zmq.socket('push');

sockPush.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var i =0;
setInterval(function(){
    sockPush.send(`mark wake up ~ : ${i}`);
    i++;
},1000);

然我們開始執行。

node push.js
node pull.js
node pull.js

這時你會看到下面的結果顯示出,每一個 push 出去的資料都會平分給另外兩個 pull 端。

mark wake up ~ : 10
mark wake up ~ : 11
mark wake up ~ : 12
mark wake up ~ : 14
mark wake up ~ : 16
mark wake up ~ : 18
mark wake up ~ : 20
mark wake up ~ : 22
mark wake up ~ : 24
mark wake up ~ : 13
mark wake up ~ : 15
mark wake up ~ : 17
mark wake up ~ : 19
mark wake up ~ : 21
mark wake up ~ : 23

這種模式事實上很像我們之前所談到的負載平衡,他們的概念的確是一樣的沒錯,這種模式也代表我們可以將一個複雜的任務平均分配下去,當各 pull 端完成時,在全部一起收集起來使用。

接下來我們再來建置一個分散式的雜湊碼破解器 ~

建立一個分散式的雜湊碼破解器

這個應用主要是可以根據一組字母表做出各種排列組合,藉此對輸入的雜湊碼(MD5、SHA1等)來進行破解,這個架構就是一個典型的平行管線。

這個爆力破解的過程如下

首先我們會先建立一個push端,他們將我們指定的字串,進行各種排列組合,例如abc,會產生abcacbbac等……,然後使用串流來讀取出來,並且 push 到每一個 pull端。

我們下面的程式碼中alphabet代表這我們要進行的排序組合,然後不可能英文 26 個字母全部排列,會出人命的,所以我們會用maxLength來進行限制,我們該值為 4 的意思代表只從 26 個字母內選取出 4 個字來進行排列組合。

也因為上面 4 個字的限制,我們測試時輸入的單字要只有 4 個字母。

//ventilator.js
var zmq = require('zeromq');
var variationsStream = require('variations-stream');
var alphabet = 'abcdefghijklmnopqrstuvwxyz';
var batchSize = 10000;
var maxLength = 4;
var searchHash = process.argv[3];

var ventilator = zmq.socket('push');
ventilator.bindSync('tcp://127.0.0.1:5000');

var batch = [];
variationsStream(alphabet, maxLength)
    .on('data', function (combination) {
        console.log(combination);
        batch.push(combination);
        if (batch.length === batchSize) {
            var msg = {
                searchHash: searchHash,
                variations: batch,
            }
            
            ventilator.send(JSON.stringify(msg));
            batch = [];
        }
    }).on('end', function () {
        var msg = {
            searchHash: searchHash,
            variations: batch,
        }
        ventilator.send(JSON.stringify(msg));
    });

接下來在 pull 端收到從 push 端來的字串後,我們會將該字串轉換成sha1 hash碼,然後我們在將該碼與輸入碼(我們要破解的碼)進行比對,最後當比對到時相同的東西時,我們就會將結果 push 到另一個收集結果的 pull 端 (就是toSink所連結的地方)

// worker.js
var zmq = require('zeromq');
var crypto = require('crypto');
var fromVentilator = zmq.socket('pull');
var toSink = zmq.socket('push');

fromVentilator.connect('tcp://127.0.0.1:5000');
toSink.connect('tcp://127.0.0.1:5001');
console.log('Worker connect to 5001');

fromVentilator.on('message',function (buffer) {
    var msg = JSON.parse(buffer);
    var variations = msg.variations;
    variations.forEach(function(word) {
        console.log(`Processing: ${word}`);            
        var shasum = crypto.createHash('sha1');
        shasum.update(word);
        var digest = shasum.digest('hex');
        if(digest === msg.searchHash){
            console.log(`Found! => ${word}`);
            toSink.send(`Found! ${digest} => ${word}`);
        }
    });
})

其中下面這段,是指將我們從 26 個字母中產生任選出 4 個所產生出的排列組合的單字,進行sha1 hash加密,產生出 hash 碼。

        var shasum = crypto.createHash('sha1');
        shasum.update(word);
        var digest = shasum.digest('hex');

最後,當我們從 worker 那收到破解後的結果,就進行輸入。

var zmq = require('zeromq');
var sink = zmq.socket('pull');
sink.bindSync('tcp://127.0.0.1:5001');

sink.on('message',function (buffer) {
    console.log(`Message from worker: ${buffer.toString()}`);
})

我們來執行看看,我們要先開啟兩個 worker 和一個 sink。

node worker.js
node worker.js
node sink.js

然後我們在開啟ventilator.js,用來開始啟產生單字的排列組合,其中f1b5a91d4d6ad523f2610114591c007e75d15084是指marksha1 hash碼。

node ventilator.js f1b5a91d4d6ad523f2610114591c007e75d15084

然後當破解完,你可以看到 sink 那的輸出結果。

Message from worker: Found! f1b5a91d4d6ad523f2610114591c007e75d15084 => mark

結論

本篇文章中,我們說明了如何使用zeromq進行對等式架構建置,並且還了它的三種模式 :

  • REQ / REP
  • PUB / SUB
  • PUSH / PULL

這三種模式是 zeromq 中的基本,它們還有更多的變化類型,但都只是這三個的組合型,如果想了解更多,官網的資料已經夠多囉,請慢慢自已研究吧 ~

參考資料

comments powered by Disqus