Node.js中的stream(基础篇)

Posted by WWJ Blog on January 28, 2020

Node.Js中的Stream(基础篇)

什么是Stream

首先Stream是一种抽象的数据接口,Stream翻译过来就是流的意思,它的作用就是让数据具有流动性、可轻易分离。

需要注意的是,Stream并不是Node独有的概念,而是操作系统中操作数据的一种方式,Linux中的 命令就是Stream。

为什么要使用Stream

设想这样的一个场景,现在有一大桶数据,我们需要将这桶内的数据转移至另外一个桶内

  • 场景一:如果这桶数据是固态形式的,数据就不能被分割,那么就必须一次性转移, 而我们系统处理数据的能力只达到了一个碗的级别,即每次只能转移一碗数据,那么这个时候无论如何都无法将这桶数据转移出去,导致的结果就是系统崩溃(内存溢出或者未处理的数据丢失)。
  • 场景二:如果这桶数据是液态形式的(流),那么我们就可以一次转移一碗水,当一碗水全部转移之后,再继续转移下一碗水。这样就不会因为能力不够而导致系统崩溃了。

在NodeJs中,几乎所有的异步API使用的都是Buffer模式。对于输入操作,Buffer模式会将资源集中收集到Buffer区中,一旦读取完整个资源,就会将Buffer区中的数据返回给回调函数,如果数据过大,那么回调函数就会一直接受不到数据,这就是场景一所描述的。来看个例子:

我们首先使用非阻塞I/O读取一个比较小的文件,这个文件的大小为100KB

const fs = require('fs');

const startTime = +new Date();
fs.readFile('./WX20191228-221034@2x.png', (err, res) =>{
  if (err) {
    console.log(err);
    return;
  };
  const endTime = +new Date();
  console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});

控制台输出为

文件读取完成读取时间为 1 毫秒

用同样的方式,我们来读取一个非常大的文件,这个文件大概有2.45GB

const fs = require('fs');

const startTime = +new Date();
fs.readFile('./video.rmvb', (err, res) =>{
  if (err) {
    console.log(err);
    return;
  };
  const endTime = +new Date();
  console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});

此时控制台的输出则是文件过大,已经超出Buffer区的大小了 Alt text

上文说到,如果数据是液态的(流状)的,就可以先读取“一碗”数据,等读取完成之后,再读取“下一碗”数据,就不必等到缓冲区中收集所有数据再处理一整个数据块。 那么我们就来使用Stream的方式来处理这个大小为2.45GB的视频文件。

  • fs.createReadStream()的方式创建Stream
  • stream.on('data', () => {})监听每次数据的读取的动作
  • stream.on('end', () => {})监听数据读取完成的动作
const fs = require('fs');
let count = 1;
const startTime = +new Date();
const stream = fs.createReadStream('./video.rmvb');

stream.on('data', () => {
  console.log(`正在第${count}次读取文件`);
  count++;
});

stream.on('end', () => {
  const endTime = +new Date();
  console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});

再来看下控制台的输出 Alt text

从控制台的输出我们可以看出来两点:

  • 文件不是一次性读取的,而是读取了37413次
  • 文件在6528毫秒之后被成功读取了

所以,使用Stream的优势显而易见,它允许我们做一些看起来不可能的事情,即通过分布式处理缓冲数据

Stream的分类

在NodeJs中,每一个Stream都是Streams核心模块中可用的四个基本抽象类之一的实现:

  • stream.Readable:可读的流 (例如 fs.createReadStream()
  • stream.Writable:可写的流 (例如 fs.createWriteStream())
  • stream.Duplex:可读写的流 (例如 net.Socket)
  • stream.Transform:在读写过程中可以修改和变换数据的 Duplex 流 (例如zlib.createDeflate())

一、Readable Readable流也称可读流,使用Readable类可以自定义实现可读流。

  • 自定义_read方法来实现流的读取过程
  • 调用push方法来触发一次流数据读取,触发data事件
  • 使用push(null)表示流读取完成,触发finish事件

我们来实现一个字符串的可读流,在NodeJs中,stream不仅可以读取二进制数据,还可以读取任何Javascript数据

const { Readable } = require('stream');

class StringReadable extends Readable {
  constructor(props) {
    super();
    this._props = props;
  }

  _read() {
    const { str } = this._props;
    for(let i = 0; i < str.length; i++) {
      this.push(str[i]);
    }
    this.push(null);
  }
}

const strStream = new StringReadable({
  str: 'hello'
});

strStream.on('data', (chunk) => {
  console.log('read: ', chunk.toString());
});

strStream.on('end', () => {
  console.log('读取完成:end');
});

控制台输出为: Alt text

二、Writable Writable流为可写流,使用Writable类可实现自定义可写流

  • 自定义_write方式来实现可写流的写入数据流程,在_write函数内调用callback()来通知可写流来写入下一个数据
  • 调用end方法来表示数据写入结束,此时会触发finish事件

_write有以下格式

_write(chunk, [encoding], [callback]);

encoding参数为可选,其在chunk是String类型时指定(默认为utf8,如果chunk是Buffer,则忽略)。 callback也是可选的,当数据已经完全刷新时执行的一个回调函数。

const { Writable } = require('stream');

class StringWritable extends Writable {
  constructor() {
    super();
    this._strContent = '';
  }

  _write(chunk, enc, cb) {
    if (chunk) {
      this._strContent += chunk.toString();
      console.log('write: ', chunk.toString());
    }

    cb();
  }
}

const strStream = new StringWritable();
strStream.write('h');
strStream.write('e');
strStream.write('l');
strStream.write('l');
strStream.write('o');
strStream.end(null);

strStream.on('finish', () => {
  console.log('写入完成:', strStream._strContent);
});

控制台输出为: Alt text

三、Duplex Duplex流即是可读流又是可写流,当我们想实现一个即是数据源又是数据终点的类时,Duplex就可以派上用场了。

Duplex流继承自stream.Readable和stream.Writable,所以我们既可以用read方法也可以用write来读取/写入数据,或者可以监听data事件。

要创建一个自定义的Duplex类,就必须为_read()和_write()提供一个实现。传递给Duplex的props在Duplex内部被转发给Readable和Writable的构造函数,不过需要注意的是,props的allowHalfOpen属性默认为true,如果设置成false的话,那么只要Readable或者Writable其中之一被冻结了,stream就结束了。

const { Duplex } = require('stream');

class StringDuplex extends Duplex {
  constructor(props) {
    super();
    this._props = props;
    this._strContent = '';
  }

  _read() {
    const { str } = this._props;
    const len = str.length;
    for(let i = 0; i < len; i++) {
      this.push(str[i]);
    }
    this.push(null);
  }

  _write(chunk, enc, cb) {
    if (chunk) {
      this._strContent += chunk.toString();
      console.log('write: ', chunk.toString());
    }
    cb();
  }
}

const strDuplexStream = new StringDuplex({
  str: 'hello',
});

strDuplexStream.on('data', (chunk) => {
  console.log('read: ', chunk.toString());
});

strDuplexStream.on('end', () => {
  console.log('end');
});

strDuplexStream.write('h');
strDuplexStream.write('e');
strDuplexStream.write('l');
strDuplexStream.write('l');
strDuplexStream.write('o');
strDuplexStream.end(null);

strDuplexStream.on('finish', () => {
  console.log('write: ', strDuplexStream._strContent);
});

控制台输出为: Alt text

四、Transform Transform流是专门设计用于处理数据转换的一种特殊类型的双重Duplex,所以Transform类又是继承自Duplex类的。

从上个例子可以看出,虽然Deplex继承了Readable和Writable,但是其内部数据的读取和数据的写入并没有直接的关系,strDuplexStream的可读端的数据是没有流转至可写端的。

要实现一个自定义的Transform类,则必须

  • 自定义 _transform(chunk, encoding, callback) 方法来定义可读端输入结果的处理过程。
  • 在 _transform 方法中调用 push 方法来将处理结果输出到可读端。
  • 在 _transform 方法中调用 callback 进行下一个处理。
const { Transform } = require('stream');

class StringTransform extends Transform {
  constructor() {
    super();
    this._strContent = '';
  }

  _transform(chunk, enc, cb) {
    if (chunk) {
      console.log('write: ', chunk.toString());
      const data = chunk.toString().toUpperCase();
      this.push(data);
      this._strContent += data;
    }
    cb();
  }
}

const strTransform = new StringTransform();

strTransform.on('data', (chunk) => {
  console.log('read: ', chunk.toString());
});

strTransform.on('end', () => {
  console.log('end');
});

strTransform.write('h');
strTransform.write('e');
strTransform.write('l');
strTransform.write('l');
strTransform.write('o');
strTransform.end(null);

strTransform.on('finish', () => {
  console.log(strTransform._strContent);
});

控制台输出为: Alt text

从输出可以看出,从可写流的可写端流向可读流的可读端是同一份数据。

objectMode

默认情况下,流处理的数据是Buffer/String类型的值。objectMode属性可以使流处理任何复杂类型的值。

const { Readable } = require('stream');

const sr = new Readable();
const sro = new Readable({ objectMode: true });

sr.push('hello');
// 这个会报错:TypeError: Invalid non-string/buffer chunk
// sr.push({});
sr.push(null);

sro.push('hello');
sro.push({});
sro.push(null);

sr.on('data', (chunk) => {
  console.log('无objectMode:', chunk);
});

sro.on('data', (chunk) => {
  console.log('有objectMode:', chunk);
});

控制台输出 Alt text