上一章節中我們有提到rabbitmq
,它是用來建立中介式架構
的broker
,但這種架構有什麼問題呢 ? 那就是分散式架構的頭號公敵單點失效(single point of failure)。
所以後來就有人提出使用對等式架構
來解決這個問題,這個架構就是會將broker
給移除掉,每一個用戶端同時也是伺服器端,像比特幣
這種應用就是用該結構來處理。
但相對的,它也有缺點,那就是要建置起來較為複雜,用在大規模的網路上,管理較難、安全性較低。
使用 ZEROMQ 進行對等式架構 (peer-to-peer)實作
zeromq
它是一套網路通訊函式庫,記得他不是一個伺服器,而是一個lib,它在socket api
之上做了一層封裝,將網路、通訊、進程等抽象化為統一的 API 接口,它和 socket 的區別是 :
- socket : 點對對的關係(一對一)
- zeromq : 多點對多點的關係(多對多)
那 zeromq 有什麼特點呢 ? 它有以下四個特點 :
- 去中心化 (無 broker)
- 強調訊息收發模式
- 以統一的接口支持多種底層通信方式
- 異步
- 速度飛快 (請參考這篇比較)
不過有一點要注意一下,zeromq 它不是一個獨立的伺服器進程 (rabbitmq 是),所以嚴格來說它不是 mq ,它沒有 mq 這種在中間解耦合的能力,事實上他的名命也說了 zero mq 。
zeromq 主要提供三種類型的通訊模式分別如下 :
REQ (request) / REP (reply) 模式
這模式就是傳統的一個 reuest 配一個 response 的模式,非常的簡單。
下面這段程式碼是發送請求(request)的程式碼。
var zmq = require('zeromq');
var requester = zmq.socket('req');
requester.on('message', function (reply) {
console.log(`Received reply: ${reply.toString()}`)
})
console.log('Send msg');
requester.send('Hello Mark');
requester.connect('tcp://localhost:5555');
process.on('SIGINT', function () {
requester.close();
})
然後下面這段程式碼為收到請求後,進行回傳的程式碼,這也可以理解成一個 server,它會一直等待 request 的 loop,然後針對每次的請求都進行回覆(Reply)。
var zmq = require('zeromq');
var responder = zmq.socket('rep');
responder.on('message', function (request) {
console.log(`Received request : ${request.toString()}`);
setTimeout(function () {
responder.send("Ok ~ I Received your msg");
},1000);
})
responder.bind('tcp://127.0.0.1:5555', function (err) {
if(err){
console.log(err);
}else{
console.log('Listening on 5555');
}
})
process.on('SIGINT', function () {
responder.close();
})
不過這邊有二點要注意,當你將 server 進行重啟時,client 不會自動的重新連上 server ,如果想要建立一個高可靠性的 server 請參考官網該篇文章,它說明的很詳細囉 ~
而另外一點就是,不論先開啟 client 或 server 都沒關係,在傳統觀念上 server 就是要先開,然後 client 才連的上,但在這裡,它們的關係是節點對節點,也就是說沒有主或從的關係,只有誰發誰送的問題。
Pub / Sub 模式
它基本上是一種很常見的設計模式,像我們在使用jquery
時的事件機制就很常看到它,如下 :
$(".test").on('click', function(){
/do something...
})
上面的程式碼中,當頁面獨發了click
事件後,就會發佈(pub)
一個訊息,給有訂閱(sub)
的使用者說,我獨發了 click 了喔,然後使用者在來處理獨發後的事情。
這種模式的優點就在於解耦合
,發佈者無須預先知道訊息的接受者,則也使得這種模式很適合用在變化多端的分佈式架構中。
我們簡單的用一句話來說明 zeromq 的 pub/sub 模式,就是下面這句 :
當訊息透過 pub socket 傳送後,便會擴播至所有已連線的 sub socket
這種類型的模式,很適合用來處理股價報價,每個 subscriber 都會去和 publisher 訂閱事件,當有新個報價時,就會通知所有有訂閱報價的 subscriber。
接下來我們來開使實作程式碼。
首先我們下面這段程式碼是用來建立 zeromq 的publisher
,也就是會將訊息從這邊發送出去給已連線的subscriber
。
// pub.js
var zmq = require('zeromq');
var pubSocket = zmq.socket('pub');
pubSocket.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');
setInterval(function(){
pubSocket.send(['mar',new Date()]);
},1000);
而下面這段程式碼就是subscriber
,它用來訂閱訊息來源,然後會使用on
這監聽器,來收得 pub 過來的訊息。
// sub.js
var zmq = require('zeromq');
var subSocket = zmq.socket('sub');
var port = "3000";
subSocket.connect(`tcp://127.0.0.1:${port}`);
subSocket.subscribe('mark');
console.log(`Subscriber connected to port ${port}`);
subSocket.on('message', function(topic, message){
console.log(topic.toString());
console.log(message.toString());
})
我們可以看你心情來決定要先開啟publisher
或subscriber
,zeromq 它有提供一個機制,他會自動重新連線,也就是說,當然二個都開啟後,如果將publisher
關掉在重啟,你的subscriber
還是可以繼續收到資料。
然後我們來執行程式碼看看。我們會開啟一個publisher
和二個subscriber
。
node pub.js
node sub.js
node sub.js
然後我們應該是會看到如下的結果,兩個subscriber
每隔十秒鐘會收到一次從publisher
來的資料。
mark
Thu Jul 20 2017 17:18:58 GMT+0800 (CST)
mark
Thu Jul 20 2017 17:18:59 GMT+0800 (CST)
Push / Pull
這種模式又被稱為管道(pipe)模式
,它是單向的,從 push 單向推送到 pull 端,這種模式和上面的pub/sub
最模式最大的差別在於 :
push 傳送的一堆資料,會被
平均
分散至多個 pull 端,就像是 load balance的機制一樣。
以下的程式碼為 pull 端的建立。
// pull.js
var zmq = require('zeromq');
var pullSocket = zmq.socket('pull');
pullSocket.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');
pullSocket.on('message',function(msg){
console.log(msg.toString());
})
而下面的程式碼為 push 端的建立。
// push.js
var zmq = require('zeromq');
var sockPush = zmq.socket('push');
sockPush.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');
var i =0;
setInterval(function(){
sockPush.send(`mark wake up ~ : ${i}`);
i++;
},1000);
然我們開始執行。
node push.js
node pull.js
node pull.js
這時你會看到下面的結果顯示出,每一個 push 出去的資料都會平分給另外兩個 pull 端。
mark wake up ~ : 10
mark wake up ~ : 11
mark wake up ~ : 12
mark wake up ~ : 14
mark wake up ~ : 16
mark wake up ~ : 18
mark wake up ~ : 20
mark wake up ~ : 22
mark wake up ~ : 24
mark wake up ~ : 13
mark wake up ~ : 15
mark wake up ~ : 17
mark wake up ~ : 19
mark wake up ~ : 21
mark wake up ~ : 23
這種模式事實上很像我們之前所談到的負載平衡,他們的概念的確是一樣的沒錯,這種模式也代表我們可以將一個複雜的任務平均分配下去,當各 pull 端完成時,在全部一起收集起來使用。
接下來我們再來建置一個分散式的雜湊碼破解器 ~
建立一個分散式的雜湊碼破解器
這個應用主要是可以根據一組字母表做出各種排列組合,藉此對輸入的雜湊碼(MD5、SHA1等)來進行破解,這個架構就是一個典型的平行管線。
這個爆力破解的過程如下
首先我們會先建立一個push
端,他們將我們指定的字串,進行各種排列組合,例如abc
,會產生abc
、acb
、bac
等……,然後使用串流來讀取出來,並且 push 到每一個 pull端。
我們下面的程式碼中alphabet
代表這我們要進行的排序組合,然後不可能英文 26 個字母全部排列,會出人命的,所以我們會用maxLength
來進行限制,我們該值為 4 的意思代表只從 26 個字母內選取出 4 個字來進行排列組合。
也因為上面 4 個字的限制,我們測試時輸入的單字要只有 4 個字母。
//ventilator.js
var zmq = require('zeromq');
var variationsStream = require('variations-stream');
var alphabet = 'abcdefghijklmnopqrstuvwxyz';
var batchSize = 10000;
var maxLength = 4;
var searchHash = process.argv[3];
var ventilator = zmq.socket('push');
ventilator.bindSync('tcp://127.0.0.1:5000');
var batch = [];
variationsStream(alphabet, maxLength)
.on('data', function (combination) {
console.log(combination);
batch.push(combination);
if (batch.length === batchSize) {
var msg = {
searchHash: searchHash,
variations: batch,
}
ventilator.send(JSON.stringify(msg));
batch = [];
}
}).on('end', function () {
var msg = {
searchHash: searchHash,
variations: batch,
}
ventilator.send(JSON.stringify(msg));
});
接下來在 pull 端收到從 push 端來的字串後,我們會將該字串轉換成sha1 hash
碼,然後我們在將該碼與輸入碼(我們要破解的碼)進行比對,最後當比對到時相同的東西時,我們就會將結果 push 到另一個收集結果的 pull 端 (就是toSink所連結的地方)
// worker.js
var zmq = require('zeromq');
var crypto = require('crypto');
var fromVentilator = zmq.socket('pull');
var toSink = zmq.socket('push');
fromVentilator.connect('tcp://127.0.0.1:5000');
toSink.connect('tcp://127.0.0.1:5001');
console.log('Worker connect to 5001');
fromVentilator.on('message',function (buffer) {
var msg = JSON.parse(buffer);
var variations = msg.variations;
variations.forEach(function(word) {
console.log(`Processing: ${word}`);
var shasum = crypto.createHash('sha1');
shasum.update(word);
var digest = shasum.digest('hex');
if(digest === msg.searchHash){
console.log(`Found! => ${word}`);
toSink.send(`Found! ${digest} => ${word}`);
}
});
})
其中下面這段,是指將我們從 26 個字母中產生任選出 4 個所產生出的排列組合的單字,進行sha1 hash
加密,產生出 hash 碼。
var shasum = crypto.createHash('sha1');
shasum.update(word);
var digest = shasum.digest('hex');
最後,當我們從 worker 那收到破解後的結果,就進行輸入。
var zmq = require('zeromq');
var sink = zmq.socket('pull');
sink.bindSync('tcp://127.0.0.1:5001');
sink.on('message',function (buffer) {
console.log(`Message from worker: ${buffer.toString()}`);
})
我們來執行看看,我們要先開啟兩個 worker 和一個 sink。
node worker.js
node worker.js
node sink.js
然後我們在開啟ventilator.js
,用來開始啟產生單字的排列組合,其中f1b5a91d4d6ad523f2610114591c007e75d15084
是指mark
的sha1 hash
碼。
node ventilator.js f1b5a91d4d6ad523f2610114591c007e75d15084
然後當破解完,你可以看到 sink 那的輸出結果。
Message from worker: Found! f1b5a91d4d6ad523f2610114591c007e75d15084 => mark
結論
本篇文章中,我們說明了如何使用zeromq
進行對等式架構
建置,並且還了它的三種模式 :
- REQ / REP
- PUB / SUB
- PUSH / PULL
這三種模式是 zeromq 中的基本,它們還有更多的變化類型,但都只是這三個的組合型,如果想了解更多,官網的資料已經夠多囉,請慢慢自已研究吧 ~