串流是啥,事實上這個東東,我們每天都有使用,簡單的說,它是一種傳送內容
的技術,在沒有使用串流技術時,我們想要在網路上看影片,需要將它下載下來才能播放,但如果使用串流技術那傳送影片,它會將一小短小短的資料,一直傳送給網頁,所以我們可以直接進行觀看,並且在觀看時它還會繼續傳送後面的片段過來。
串流的優點
在node js中,傳送內容基本上有分兩種形式,一種是緩衝
而另一種就是串流
,我們先來看看緩衝在處理資料傳送時,它是如何處理。
緩衝它的基本概念就是將所有的資料先收集到緩衝區裡,當資料已經完整的讀取完,在傳送給接受者,如下圖所示,它會將HELLO MARK
這所有的資料先讀取完,然後才能傳送給接受者。
然後我們來看看串流,它每個時間點一接受到資料就會直接發送給接受者,所以如下圖所示,在時間點t1
時,它會接受到HELLO
這串資料,然後在t2
時會接受到剩下的MARK
資料。
那這樣有什麼好處呢 ? 簡單的說有兩個優點,一個是空間效率
,因為如果使用緩衝的方法來進行個10gb
以上的資料傳輸,就代表這你需要10gb
的緩衝空間,那這樣記憶體一定爆掉。而第二個優點就是時間效率
,就如同最上面的影片例子來說明,使用串流,你可以直接看影片,然後再看影片時,它還會繼續傳送剩下的影片片段,節省了等待時間。
緩衝與串流的程式碼實做比較
我們簡單寫一個檔案下載的伺服器,然後分別以緩衝與串流的程式碼,來進行實作,在nodejs
中,有個 module 名為fs
,它是專門用來處理檔案傳輸的工具,它也同時繼承了node 中的串流模組stream
。
緩衝讀取檔案實作
這是一個下載aaa.avi
檔案的伺服器,但假設該檔案大小如果大於1024mb(舊版本node)
的話,它會直接死掉,因為它緩衝爆掉了。
基本上這種類形的api
被稱為Bulk I/O
。
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
fs.readFile('aaa.avi', (err, data) => {
if(err) {
console.log(err);
res.writeHead(500);
res.end(err.message);
} else{
res.writeHead(200, { 'Content-Type': 'video/avi' });
res.end(data);
}
});
});
server.listen(3000, () => {
console.log('server up !');
});
串流讀取檔案實作
下列程式碼就是使用串流來實作的讀取檔案伺服器,這樣如果檔案大小在大,它都可以進行處理,因為它是將大檔案分割成小塊小塊,然後一直傳輸。
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'video/avi' });
fs.createReadStream('aaa.avi').pipe(res)
.on('finish', () => {
console.log('done');
});
});
server.listen(3000, () => {
console.log('server up !');
});
Node.js 的串流工具說明
在串流的世界中,有分為可讀取串流
另一種為可寫入串流
。
可讀取串流
可讀取串流就是指資料的來源
,我們可以利用在stream
核心模組中的Readable
,來抽像化類別進行實作,基本上像fs
、process.stdin
都有繼承Readable
這模組,所以我們在fs
中差可以使用串流的技術來讀取檔案。
接下來我們來實作個Readable
串流。
首先建立一個名為Read
的類別,並且繼承Readable
,在這個類別中,我們需要實作_read
這個方法,這個方法最主要的功能就是從你的資源(ex.檔案或文字)中取得資料,並執行this.push
來丟到一個internal queue
中,你可以想成水缸,每當this.push
被呼叫時,Readable
就會觸發readable
事件,白話文就是和使用者說,這裡有資料喔 ~ 快來拿啊,這邊可以想成打開水缸開關給人拿。
下列程式碼中,我們在_read
模擬每次會丟入this.push
為一個字元,並且至到字元結束時,就停止丟入this.push(null)
,然後我們會使用markStream.read()
這段用來實際取得現在internal queue
內有的資料,我們也可以使用markStream.read(size)
來指定每次取得時,要取得多大的資料。
const Readable = require('stream').Readable;
class Read extends Readable {
constructor(data) {
super();
this.data = data;
this.dataLength= data.length;
this.count=0;
}
_read(){
const chunk = this.data.slice(this.count,this.count+1);
if(this.count === this.dataLength){
this.push(null);
}else{
console.log('Pushing chunk of size:' + chunk.length);
this.push(chunk, 'utf8');
this.count++;
}
}
}
const testStr = new Array(10).join('喝');
const markStream = new Read(testStr);
markStream.on('readable', function(){
let chunk ;
while ((chunk = markStream.read()) !== null){
console.log("chunk received:" + chunk.toString());
}
});
可寫入串流
可寫入串流就是指資料的接受者
,例入我們要將資料寫入到一個檔案中,那個檔案就是一個接受者。
下面的程式碼,我們一樣實作一個Writable
,首先建立一個類別Write
並且繼承stream.Writable
,在這裡我們要實作_write
方法,這支方法是要告訴stream
要如何將資料寫入到接受者
內,這範例中,我們的接受者就是一個檔案,存放位置為test.txt
。
在Writable
中,我們使用者可以使用write(chunk)
來將資料寫入到stream
中,最後當使用者呼叫end()
時就是告訴stream
已經沒有資料了,這時它會觸發finish
事件來通知使用者。
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
class Write extends stream.Writable {
constructor() {
super();
}
_write(chunk, encoding, cb) {
const data_path = "test.txt";
mkdirp(path.dirname(data_path), function(err) {
if (err) {
return cb(err);
}
fs.writeFile(data_path, chunk, cb);
});
}
}
const markStream = new Write();
markStream.write("Hello mark , how are you today?");
markStream.end(()=>{
console.log('finish');
});
回壓 (Backpressure)
stream
就如同水管中的水流動,但是水管大小一定要限制,如果水量過大,而出口過小一定會發生問題,這在資料傳送時也一定會發生,如果寫入資料快於串流能消化的速度時就一定會遇到。
而回壓這個機製就是為了解決這個問題。
我們先來寫段程式碼,來模擬水管爆掉的情況,基本上我們使用和上面的範例相同,都是寫入資料到檔案中,_watch
沒有變動,比較不同的是執行寫入時的下面段,在執行write()
時除了寫入到stream
中,它事實上還會回傳一個布林值,用來告訴我們這個水管是否滿了
,如果滿了,它會回傳true
。
對了還有我們本次測試時使用chance
模組,它可以隨機產生字串,而likelihood: 95
這段代表chance.bool
有95%
的機率會回傳true
,而我們為了盡可能的模擬出水管爆滿的情況,所以我們將丟入的資料大小增加到16KB左右。
有個預設的
highWaterMark
限制為16KB
,這就是水管的大小。
const testStream = new Write();
function generateMoreData() {
while (chance.bool({likelihood: 95})) {
let is_continue_write_data = testStream.write(
chance.string({length: (16* 1024) -1})
);
if(!is_continue_write_data){
console.log('Backpressure');
// 這行先別看
//return testStream.once('drain', generateMoreData);
}
}
}
generateMoreData();
這是全部的程式碼,Write
的地方完全相同。
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
const chance = require('chance').Chance();
class Write extends stream.Writable {
constructor() {
super();
}
_write(chunk, encoding, cb) {
const data_path = "test.txt";
mkdirp(path.dirname(data_path), function(err) {
if (err) {
return cb(err);
}
console.log('push')
fs.writeFile(data_path, chunk, cb);
});
}
}
const testStream = new Write();
function generateMoreData() {
while (chance.bool({likelihood: 95})) {
let is_continue_write_data = testStream.write(
chance.string({length: (16* 1024) -1})
);
if(!is_continue_write_data){
console.log('Backpressure');
//return testStream.once('drain', generateMoreData);
}
}
}
generateMoreData();
然後我們來看看執行後的結果,下面的意思就是說,我們丟第一次資料時水管就滿了,所以接下來丟的資料都進行不去,但這時我們還是硬給他一直丟(一堆Backpressure
),這時就只是浪費社會資源。
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Backpressure
Push
Push
Push
Push
那這時我們要如何解決呢,有一個事件叫drain
,它就是用來通知我們水管可以丟了,所以我們現在就是監聽drain
事件,當它通知我們時,就代表我們可以在丟資料了generateMoreData
。
const testStream = new Write();
function generateMoreData() {
while (chance.bool({likelihood: 95})) {
let is_continue_write_data = testStream.write(
chance.string({length: (16* 1024) -1})
);
if(!is_continue_write_data){
console.log('Backpressure');
// 這邊 !
return testStream.once('drain', generateMoreData);
}
}
}
generateMoreData();
我們來看看增加這行的執行結果,這樣好多了,不會一直往水管中丟資料。
Backpressure
push
push
Backpressure
push
push
Backpressure
push
push
##參考資料