Node.Js中的Stream(基础篇)
什么是Stream
首先Stream是一种抽象的数据接口,Stream翻译过来就是流的意思,它的作用就是让数据具有流动性、可轻易分离。
需要注意的是,Stream并不是Node独有的概念,而是操作系统中操作数据的一种方式,Linux中的 | 命令就是Stream。 |
为什么要使用Stream
设想这样的一个场景,现在有一大桶数据,我们需要将这桶内的数据转移至另外一个桶内
- 场景一:如果这桶数据是固态形式的,数据就不能被分割,那么就必须一次性转移, 而我们系统处理数据的能力只达到了一个碗的级别,即每次只能转移一碗数据,那么这个时候无论如何都无法将这桶数据转移出去,导致的结果就是系统崩溃(内存溢出或者未处理的数据丢失)。
- 场景二:如果这桶数据是液态形式的(流),那么我们就可以一次转移一碗水,当一碗水全部转移之后,再继续转移下一碗水。这样就不会因为能力不够而导致系统崩溃了。
在NodeJs中,几乎所有的异步API使用的都是Buffer模式。对于输入操作,Buffer模式会将资源集中收集到Buffer区中,一旦读取完整个资源,就会将Buffer区中的数据返回给回调函数,如果数据过大,那么回调函数就会一直接受不到数据,这就是场景一所描述的。来看个例子:
我们首先使用非阻塞I/O读取一个比较小的文件,这个文件的大小为100KB
const fs = require('fs');
const startTime = +new Date();
fs.readFile('./WX20191228-221034@2x.png', (err, res) =>{
if (err) {
console.log(err);
return;
};
const endTime = +new Date();
console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});
控制台输出为
文件读取完成,读取时间为: 1 毫秒
用同样的方式,我们来读取一个非常大的文件,这个文件大概有2.45GB
const fs = require('fs');
const startTime = +new Date();
fs.readFile('./video.rmvb', (err, res) =>{
if (err) {
console.log(err);
return;
};
const endTime = +new Date();
console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});
此时控制台的输出则是文件过大,已经超出Buffer区的大小了
上文说到,如果数据是液态的(流状)的,就可以先读取“一碗”数据,等读取完成之后,再读取“下一碗”数据,就不必等到缓冲区中收集所有数据再处理一整个数据块。 那么我们就来使用Stream的方式来处理这个大小为2.45GB的视频文件。
fs.createReadStream()
的方式创建Streamstream.on('data', () => {})
监听每次数据的读取的动作stream.on('end', () => {})
监听数据读取完成的动作
const fs = require('fs');
let count = 1;
const startTime = +new Date();
const stream = fs.createReadStream('./video.rmvb');
stream.on('data', () => {
console.log(`正在第${count}次读取文件`);
count++;
});
stream.on('end', () => {
const endTime = +new Date();
console.log('文件读取完成,读取时间为:', endTime - startTime, '毫秒');
});
再来看下控制台的输出
从控制台的输出我们可以看出来两点:
- 文件不是一次性读取的,而是读取了37413次
- 文件在6528毫秒之后被成功读取了
所以,使用Stream的优势显而易见,它允许我们做一些看起来不可能的事情,即通过分布式处理缓冲数据
Stream的分类
在NodeJs中,每一个Stream都是Streams核心模块中可用的四个基本抽象类之一的实现:
- stream.Readable:可读的流 (例如
fs.createReadStream()
) - stream.Writable:可写的流 (
例如 fs.createWriteStream()
) - stream.Duplex:可读写的流 (例如
net.Socket
) - stream.Transform:在读写过程中可以修改和变换数据的 Duplex 流 (例如
zlib.createDeflate()
)
一、Readable Readable流也称可读流,使用Readable类可以自定义实现可读流。
- 自定义_read方法来实现流的读取过程
- 调用push方法来触发一次流数据读取,触发data事件
- 使用push(null)表示流读取完成,触发finish事件
我们来实现一个字符串的可读流,在NodeJs中,stream不仅可以读取二进制数据,还可以读取任何Javascript数据
const { Readable } = require('stream');
class StringReadable extends Readable {
constructor(props) {
super();
this._props = props;
}
_read() {
const { str } = this._props;
for(let i = 0; i < str.length; i++) {
this.push(str[i]);
}
this.push(null);
}
}
const strStream = new StringReadable({
str: 'hello'
});
strStream.on('data', (chunk) => {
console.log('read: ', chunk.toString());
});
strStream.on('end', () => {
console.log('读取完成:end');
});
控制台输出为:
二、Writable Writable流为可写流,使用Writable类可实现自定义可写流
- 自定义_write方式来实现可写流的写入数据流程,在_write函数内调用callback()来通知可写流来写入下一个数据
- 调用end方法来表示数据写入结束,此时会触发finish事件
_write有以下格式
_write(chunk, [encoding], [callback]);
encoding参数为可选,其在chunk是String类型时指定(默认为utf8,如果chunk是Buffer,则忽略)。 callback也是可选的,当数据已经完全刷新时执行的一个回调函数。
const { Writable } = require('stream');
class StringWritable extends Writable {
constructor() {
super();
this._strContent = '';
}
_write(chunk, enc, cb) {
if (chunk) {
this._strContent += chunk.toString();
console.log('write: ', chunk.toString());
}
cb();
}
}
const strStream = new StringWritable();
strStream.write('h');
strStream.write('e');
strStream.write('l');
strStream.write('l');
strStream.write('o');
strStream.end(null);
strStream.on('finish', () => {
console.log('写入完成:', strStream._strContent);
});
控制台输出为:
三、Duplex Duplex流即是可读流又是可写流,当我们想实现一个即是数据源又是数据终点的类时,Duplex就可以派上用场了。
Duplex流继承自stream.Readable和stream.Writable,所以我们既可以用read方法也可以用write来读取/写入数据,或者可以监听data事件。
要创建一个自定义的Duplex类,就必须为_read()和_write()提供一个实现。传递给Duplex的props在Duplex内部被转发给Readable和Writable的构造函数,不过需要注意的是,props的allowHalfOpen属性默认为true,如果设置成false的话,那么只要Readable或者Writable其中之一被冻结了,stream就结束了。
const { Duplex } = require('stream');
class StringDuplex extends Duplex {
constructor(props) {
super();
this._props = props;
this._strContent = '';
}
_read() {
const { str } = this._props;
const len = str.length;
for(let i = 0; i < len; i++) {
this.push(str[i]);
}
this.push(null);
}
_write(chunk, enc, cb) {
if (chunk) {
this._strContent += chunk.toString();
console.log('write: ', chunk.toString());
}
cb();
}
}
const strDuplexStream = new StringDuplex({
str: 'hello',
});
strDuplexStream.on('data', (chunk) => {
console.log('read: ', chunk.toString());
});
strDuplexStream.on('end', () => {
console.log('end');
});
strDuplexStream.write('h');
strDuplexStream.write('e');
strDuplexStream.write('l');
strDuplexStream.write('l');
strDuplexStream.write('o');
strDuplexStream.end(null);
strDuplexStream.on('finish', () => {
console.log('write: ', strDuplexStream._strContent);
});
控制台输出为:
四、Transform Transform流是专门设计用于处理数据转换的一种特殊类型的双重Duplex,所以Transform类又是继承自Duplex类的。
从上个例子可以看出,虽然Deplex继承了Readable和Writable,但是其内部数据的读取和数据的写入并没有直接的关系,strDuplexStream的可读端的数据是没有流转至可写端的。
要实现一个自定义的Transform类,则必须
- 自定义 _transform(chunk, encoding, callback) 方法来定义可读端输入结果的处理过程。
- 在 _transform 方法中调用 push 方法来将处理结果输出到可读端。
- 在 _transform 方法中调用 callback 进行下一个处理。
const { Transform } = require('stream');
class StringTransform extends Transform {
constructor() {
super();
this._strContent = '';
}
_transform(chunk, enc, cb) {
if (chunk) {
console.log('write: ', chunk.toString());
const data = chunk.toString().toUpperCase();
this.push(data);
this._strContent += data;
}
cb();
}
}
const strTransform = new StringTransform();
strTransform.on('data', (chunk) => {
console.log('read: ', chunk.toString());
});
strTransform.on('end', () => {
console.log('end');
});
strTransform.write('h');
strTransform.write('e');
strTransform.write('l');
strTransform.write('l');
strTransform.write('o');
strTransform.end(null);
strTransform.on('finish', () => {
console.log(strTransform._strContent);
});
控制台输出为:
从输出可以看出,从可写流的可写端流向可读流的可读端是同一份数据。
objectMode
默认情况下,流处理的数据是Buffer/String类型的值。objectMode属性可以使流处理任何复杂类型的值。
const { Readable } = require('stream');
const sr = new Readable();
const sro = new Readable({ objectMode: true });
sr.push('hello');
// 这个会报错:TypeError: Invalid non-string/buffer chunk
// sr.push({});
sr.push(null);
sro.push('hello');
sro.push({});
sro.push(null);
sr.on('data', (chunk) => {
console.log('无objectMode:', chunk);
});
sro.on('data', (chunk) => {
console.log('有objectMode:', chunk);
});
控制台输出