Node.js 的串流之旅之雙工串流與管道
nodejs
Lastmod: 2019-12-15

在node js 中雙工串流主要有以下兩種,這兩種直接用白話文來講就是同時有readwrite的功能。

  • Tranform Stream
  • Duplex Stream

那這兩者有什麼差別呢,差別在於duplex寫出讀入可以完全的沒關係,你可以把他想像成兩個獨立的readablewritable的組成。

tranform就是寫出讀入彼此間的資料存在有轉換關係,我們接下來會直接實作程式碼,更能夠看出它們兩個的差異。

最後我們還會說到pipe,它的功用就是用來接串流組合起來。

Tranform Stream

我們這邊會簡單做個替換指定字串的Transform串流,其中這個類別,我們需要實作兩個方法_transform_flust

_transform基本上與readable_read很像,是將資料寫入水缸中,而不是直接將它寫到資源內,而我們在這個方法中實作了將指定字串連行替換。

_flush是在串流要結束時,如果還有事實沒有處理,這時就可以在_flush進行處理,像我們這個範例中,想要在最後加個!!!!!時,就是寫在這裡。

基本上只要前一篇文章中的readablewritable基本概念都了解,那這些雙工串流你會學的很輕鬆的。

const stream = require('stream');

class TransofrmStream extends stream.Transform{
    constructor(search_string, replace_string){
    super({decodeStrings:false});    
        this.search_string=search_string;
        this.replace_string = replace_string;
    }
    
    _transform(chunk, encoding, cb){
        const result = chunk.toString().replace(this.search_string,this.replace_string);
    this.push(result);
    cb();
    }
    
    _flush(cb){
        this.push('!!!!')
        cb();
    }
}

const transofm_stream = new TransofrmStream('World', 'Mark');
transofm_stream.on('data', (chunk) => {
    console.log(chunk.toString());
})

transofm_stream.write('Hello World');
transofm_stream.write('Mark ! you are my all World');
transofm_stream.end();

執行結果。

Hello Mark
Mark ! you are my all Mark
!!!!

Duplex Stream

上面的tranform Stream是建立了一個寫入讀入有關係的stream,而接下來我們要說明的duplex stream是這兩個是沒有關係的,也就是你做你的事,他做他的事,這種類形的串流在實物中,像socket就是一個duplex的實作,他傳送資料時,與接受資料時這兩個是完全沒有關係的。

duplex stream我們需要個別實作_read_write,其中_read就如同前一篇所介紹過的readable中我們要實作的_read,都是要讀取資料到串流中用的。

_write也如同上一篇文章中,我們在writable裡實作的_write一樣,都是入了寫入資料到某個接受者中(ex. 檔案)。

這兩個份別是獨立的,執行起來結果事實上和前一篇實作的差不多。

debugger;
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class DuplexStream extends stream.Duplex{
    constructor(){
        super();    
        this.count =0;
    }
    _read(){
        if(this.count <= 10){
            this.push("hello mark");
            this.count++;
        }else{
            this.push(null);
        }
    }
    _write(chunk,encoding,cb){
        const data_path = "test.txt";
        mkdirp(path.dirname(data_path), function(err) {
            if (err) {
                return cb(err);
            }
            fs.writeFile(data_path, chunk, cb);
        });
    }
}

const duplexStream = new DuplexStream();
duplexStream.on('data', (chunk) => {
    console.log(chunk.toString());
});

duplexStream.write("Hello mark , how are you today?");
duplexStream.end(()=>{
    console.log('finish');
});

Pipe

stream 實作上是個通用介面,而 pipe 則是能組合大多數介面的方法。

pipe這東西基本上的概念就是上面這句話,像我們在unix上用的|本身就是pipe的意思,像我們如果輸入下面的指令,它代表的意思就是將ls所讀入的資料串流readablegrep node所產生的串流組合起來。

所以下面這個指令會顯示資料夾內有node這關鍵字的文件。

ls | grep node

在用更白話文的說法,pipe就是將streamstream連接在一起的工具。

pipe基本上會從readable串流中取得它讀入的資料,並將之資料寫入到指定的writable串流中,因此只要該串流為readable串流,便可以使用pipe來與其它串流進行組合。

像上們面的的Duplex 範別程式碼中,他本身有包含獨立的readablewritable,這也代表這我們可以使用pipe將這兩個連結起來,這樣我們就可以直接將_read讀入的資料,寫入到_write實作所產生的檔案中。

const duplexStream = new DuplexStream();
duplexStream.pipe(duplexStream);

在 node.js 中有很多都有應用到這個功能,像如果我們要讀取某個檔案內容,並且複制到新的檔案中也是這樣處理,如下程式碼,這樣就可以將test_file.txt 的內容,複製到destination.txt

其中fs.createReadStream就是建立一個readable的串流,用來讀取資料用的,然後fs.createWriteStream就是建立一個writable串流,用來寫入到檔案用的,最後在後pipe來進行連結。

const fs = require('fs');

const readStream = fs.createReadStream("test_file.txt");
const writeStream = fs.createWriteStream("destination.txt"); 

readStream.pipe(writeStream);

我們還可以在將程式碼改像像在linux時所使用的cp也就是複制檔案的方法。

// cp.js
const fs = require('fs');

const readStream = fs.createReadStream(process.argv[2]);
const writeStream = fs.createWriteStream(process.argv[3]); 

readStream.pipe(writeStream);

執行方法。

node cp.js test.txt destination.txt

參考資料

comments powered by Disqus