在前幾篇文章中,我們說明了如何將系統進行擴展
,而接下來呢,我們將要說明如何使用訊息佇列
來進行整合,事實上之前的每篇文章中都要提到一個名稱IPC通信
,其中裡面就包含了訊息佇列 (message queue)
。
訊息佇列基本上是用來行程間溝通
或是同行程內不同執行序溝通
,他提供了異步
的溝通協定,這個意思就是指當你傳送一堆訊息給 A 時,A 可以不用即時的來處理這些訊息,這也代表這訊息可堆積再處理,白話文就是 :
訊息接受者如果爆了,我訊息發送者還是可以一直發送訊息,等你好了,你還是可以取得完整的訊息。
我們可以想想http協定
他是一個同步協定
,這也代表你傳送一個request
必須等待伺服器發送response
。
至於我們為什麼要用message queue
請參考下面這篇文章,他真的已經寫的很完整了。
然後我們先簡單的說明一下訊息系統
的基礎。
訊息系統架構
基本上分為以下兩種 :
對等式 (peer-to-peer)
在對等式的架構下,每一個節點都直接將訊息傳送給接受者,這種方法基本上會比較複雜,因為他還要決定各自結點的的通訊協定,但還是有一些優點 :
- 避免單點故障。
- 和中介者模式比較來少了中間一層,速度應該是比較快。
- 彈性較高。
以下為對等式架構的圖示
:
zeroMQ 他可以幫助我們建立
對等式架構
。
中介者模式 (message broker)
而中介者模式
就是所有的節點,都會連結到某個broker
,一切都由broke
來處理,每個節點不需要知道,我和誰溝通,只需要知道要傳送的訊息內容即可。但缺點就是上面對等式的優點。
以下為中介者架構
的圖示 :
RabbitMQ 就是專門用來建立這個架構的東東。
接下來的文章中,我們將要先來實作一些rabbitmq
。
Rabbit MQ
在上面的章節中,我們應該有說到,分佈式架構除了對等式架構
外,還有一個是中介者架構
,中介者的主要作用就是讓訊息接受者與傳送者之間完全的解偶,而rabbitmq
就是一個支援AMQP (Advanced Message Queuing Protocol)協議
的中間介者。
如下圖所示,它就是中間綠綠那個,我們稱他為中介者 broker
。
那AMQP
是什麼 ?
它是一種協議
,AMQP 是一個提供統一訊息服務的應用層標準協議(osi第七層),也就是設定於其它應用軟體之間的通訊,像 http、https、ftp 等都是應用層協議。
根據該協議,客戶端與訊息中間件(broker)可傳送訊息,不受客戶端/中間件不同產品,不同開發語言的條件限制。
它有三個總要概念
佇列 (queue) :
這東東它是儲存訊息的架構,然後裡面的訊息它會被客戶端拿走。一個佇列可能會推多個客戶端取走訊息,這時處理的方式和我們之前說的負載平衡差不多。
佇列它還有以下三種特性 :
- 可延續性 : 意即若 broker 重新啟動時,則佇列也自動重新建立。那裡面的訊息著麼辦呢 ? 只有被示為需保訊的訊息,才會存入磁碟,並於重啟時還原。
- 專用性 : 一個佇列可綁定特定的訂閱著,若彼此連線關閉時,則該佇列會說掰掰。
- 自動刪除 : 當有沒任何訂閱者連線時,便刪除佇列。
交換器 ( exchange ) :
他主要的功能是將訊息傳輸送到一個或多個佇列
,這個東西就是放在producer
與queue
之間,架構大概長的如下圖 :
這邊我們會有一個問題,那就是為什麼需要這個 exchange 而不是直接
producer
與quenu
間溝通就好呢 ? 這點我們後面在來說明。
綁定器 ( bind ) :
這個就是上面的交換器與佇列之間的連結
以上的東西都會被中介者管理,它會建立一個抽象的通道
,用於維護與中介者之間的通訊狀態。
使用 Rabiit MQ (mac)
首先我們要先安裝 rabbitmq :
brew update
brew install rabbitmq
然後安裝好後,我們需要去系統的.zsrch
或.bashrc
設置路徑,這樣我們才能在 terminal 上執行他的指令。
PATH=$PATH:/usr/local/sbin
然後我們就可以執行下列指令開啟rabbitmq server
:
rabbitmq-server
如果看到下列的出輸結果,則代表開啟成功。
RabbitMQ 3.6.9. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker...
completed with 10 plugins.
如果要關啟請執行下列指令,而不要使用alt + c
。
rabbitmqctl stop
接下來我們就安裝 node 的 rabbitmq 的 lib ampqplib
。
npm install --save amqplib
使用rabiitmq實作一個 Producer => Queue 的架構
接下來我們從最簡單的使用開始,我們會簡單的實作如下圖的架構,共有三個實體producer
、queue
、consume
,其中queue
現階段就是我們的rabbitmq server
。
首先我們會先建立一個producer
來負責產生訊息,從下面的程式碼我們可以知道,它需要先連線到我們的rabbitmq server
,並且建立一個通道,然後,我們可以指定要將資料送到特定的queue
,而我們 queue 取名為mark
。
其中,下面的程式碼中,比較需要注意的下面這行,這行是會判斷該 queue 是否存在,如果不存在則建立,並且durable : false
代表的意思為當 queue 重開裡面的資料會消失
。
ch.assertQueue(quenu_name, {durable: false});
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://127.0.0.1', function (err, conn) {
console.log("connect amqp server !");
conn.createChannel(function (err, ch) {
var quenu_name = "mark";
ch.assertQueue(quenu_name, {durable: false});
ch.sendToQueue(quenu_name, new Buffer('Hello Mark'));
console.log('Send a message');
})
})
再來我們來實作consumer
來當作我們訊息的接受者,一樣也是需要去連線到radditmq server
和通道,然後會監聽這個通道所傳下來的訊息。
其中我們的 ch.consume
有個參數是{noAck: true}
,代表這如果該channel
不會使用act
來進行確認,而如果是 false 它代表的意思為,如果中介者未收到 ack (確認),則訊息就會被保留在佇列裡以待再次處理。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://127.0.0.1', function (err, conn) {
conn.createChannel(function (err, ch) {
var queue_name = "mark";
ch.assertQueue(queue_name, { durable: false });
console.log("Waitting the meesages");
ch.consume(queue_name, function (msg) {
console.log(`Received the msg : ${msg.content.toString()}`);
}, {noAck: true});
})
})
最後我們來執行兩次producer
,然後再打開consumer
來接受資料,你會發現,他還是接受的得,而且二次傳的資料都有收到,這就是我們前面有提供的message quenue
的優點 :
consumer 如果爆了,我 producer 還是一直可以發送資料,等你好了,你還是可以取得完整的資料。
這邊我們提出一個問題。
那 quenue 應該有最大值吧,不然一直沒有訊息流出,遲早會爆的吧 ?
嗯沒錯,所以radditmq
它有提供兩個方法來設定每個queue
的最大值 :
- 使用 policy 來進行設定。
- 使用參數來設定。
我們簡單的使用參數來限定,下面指令設定 quenue 最大為10 byte
。
ch.assertQueue(queue_name, { durable: false, maxLength: 10 });
那如果 queue 超過10 byte
後會如何呢 ?
我們實際執行看看就知道,首先每 0.5 秒傳送一個數字給 queue 結果如下:
Send a message:1
Send a message:2
Send a message:3
Send a message:4
Send a message:5
Send a message:6
Send a message:7
Send a message:8
Send a message:9
Send a message:10
Send a message:11
Send a message:12
Send a message:13
Send a message:14
Send a message:15
Send a message:16
Send a message:17
Send a message:18
Send a message:19
Send a message:20
然後我們 consumer 接收的結果如下 :
Received the msg : 11
Received the msg : 12
Received the msg : 13
Received the msg : 13
Received the msg : 14
Received the msg : 15
Received the msg : 16
Received the msg : 17
Received the msg : 18
Received the msg : 19
Received the msg : 20
從上面的結果可知,因為每個單字為 1 byte,所以當 queue 儲放了前十個數字後,就超過 10 byte了,然後它將先進來的就先把他刪除,所以結果才能如上面所示,只收到 11 以後的數字。
使用rabiitmq實作一個 Producer => Exchange => Queue 的架構
實際上在 rabbitmq 中,我們的 producer 完全不會直接傳送訊息給 queue,producer 完全不知道會傳給那個 queue ,而是透過 exchange 來進行處理,而這邊就準備回答我們上面問的一個問題。
為什麼要有 exchange 呢 ?
我在 quora 發現也有人有相同的問題 (Why does RabbitMQ have both exchanges and queues?)。
然後我發現有一個老兄的說明很傳神,我簡單的說明一下。
想像一下你在 app store ,當你進入店內後,有一位服務員問你說 “你需要什麼呢 ? " ,然我回答說 “我要尋找耳機”,然後該名服務員就將你引導到處理耳機的 queue ,接下來下一客人進來服務員就問 “你需要什麼呢 ?” ,然後該名客人回答 “我要修理我的電腦”,然後服務員就將它引導到處理修改電腦的 queue 去處理。
所以根據那位老兄的說明,你可以發現服務員
就是我們的Exchange
,如果沒有了它,那就代表這每一位進來店裡的老兄,只能自已去尋找所需要的 queue ,而且還可能找錯,那是不是很浪費時間呢 ?
在理解了exchange
功用後,我們就開始來它的實作。
我們先來畫張圖,你應該會更容易理解,我們等一下做的東西,其中我們先實作 error 那條線,其它的線事實上大同小異。
首先我們來建立producer
,下面的程式碼中,可以看到我們會建立一個exchange
而不在是queue
,並且我們發送訊息的對像也改為exchange
。
還有我們在發送訊息時,有指定routing_key
,這也代表我們有指定發送到的綁定 routing key 為error
的 queue 。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://127.0.0.1', function (err, conn) {
console.log("connect amqp server !");
conn.createChannel(function (err, ch) {
var exchange_name = "logs";
var msg = "Hello mark";
var routing_key = "error";
ch.assertExchange(exchange_name, 'direct', { durable:false });
ch.publish(exchange_name, routing_key, new Buffer(msg));
console.log(`send msg: ${msg}`);
})
})
其中下面這段程式碼中的direct
代表這個 exchange 的種類,我們必須給定這個 exchange 的類型,用來決定要如何將訊息從 exchange 發送出。
ch.assertExchange(exchange_name, 'direct', { durable:false });
它主要有四種類型 :
- direct : 會需要設定一個叫
routing key
的參數綁定 queue,然後在發送訊息時指定routing key
,exchange 就會將該訊息傳送到指定的 queue (上面範例所使用的)。 - topic : 大至上和 direct 相同,但這裡的
routing key
可以用匹配的方式。 - fanout : 此類型的路由規則最簡單,就是收到訊息後,將它發送到所有綁定的佇列中。
- header : 它的路由規則是由
header
來判斷,如果要做一些複雜的路由規則,那就用他。
接下來我們來看看consumer
的程式碼,如下,我們在綁定 queue 時有指定 routing_key ,就代表這個 consume 只能收到指定的 queue 的訊息。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://127.0.0.1', function (err, conn) {
conn.createChannel(function (err, ch) {
var exchange_name = "logs";
ch.assertExchange(exchange_name, 'direct', { durable: false });
ch.assertQueue('', { exclusive: true }, function (err, q) {
console.log('Waiting for messages');
var routing_key = 'error';
ch.bindQueue(q.queue, exchange_name, routing_key);
ch.consume(q.queue, function (msg) {
console.log(`Received msg:${msg.content.toString()}`);
console.log(`routing key is:${msg.fields.routingKey.toString()}`);
}, { noAck: true });
})
})
})
然後我們就來執行看看,結果如下 :
// producer
connect amqp server !
send msg: Hello mark
// consumer
Waiting for messages
Received msg:Hello mark
routing key is:error
結論
本篇文章中,我們先簡單的說明了message queue
的概念,它是我們跨進程與執行緒的溝通工具之一,並且它是屬於異步
的架構。
接下來我們也說明了訊息系統架構,它基本上可分兩類,一種為對等式架構
,而另一種為中介者架構
,前者可以使用zeromq
來建立,而後者可使用rabbitmq
來建立。
最後呢,我們就開始使用rabbitmq
來進行實作,本文中只簡單的介紹一些基本的使用方式。