Node.js中的四种流
上一篇《流(Stream)的优点》说明了为什么需要流,本篇则介绍Node.js中的四种流。
流的分类
Node.js中的每个流的实例都是stream模块提供的四个基本抽象类之一的实现:
- stream.Readable
- stream.Writable
- stream.Duplex
- stream.Transform
同时,每一个流同样也是EventEmitter
类的实例。所以,流和事件息息相关,比如,可读取流在读取完成后会触发end事件,或者在发生错误时出触发error事件。
可读流
可读流表示用来消费数据
的数据源
,比如,常见的从文件中读取数据:
fs.createReadStream('demo.txt')
表示以demo.txt文件作为数据源,并返回了可读流(ReadStream),既然得到了流,那么可以使用事件来获得相关的数据:
readStream
.on('data', chunk => {
console.log(`Chunk read: ${chunk.length}, ${chunk.toString()}`)
})
.on('end', () => console.log('--end--'))
每当流将数据块(chunk)的所有权让给消费者时,都会发出data事件
,从而获得此时的数据。
在实际应用中,常见的可读流包括:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdout and stderr
- process.stdin
除了模块中提供的这些可读流之外,我们也可以自定义可读流,为此,需要创建一个类,并继承stream.Readable
,还必须提供_read()
方法,下面是一个简单的实现:
const stream = require('stream')
class TimeStream extends stream.Readable {
constructor(options) {
super(options)
}
_read(size) {
const chunk = new Date().getTime().toString()
console.log(`Pushing chunk of size: ${chunk.length}`)
this.push(chunk, 'utf8')
}
}
上面代码中,定义了一个简单的可读流,请注意,_read()
方法是在TimeStream类
中定义的,是内部方法,无法被直接调用,外部消费者
调用需要使用read()
方法:
const timeStream = new TimeStream()
timeStream.on('readable', () => {
let chunk
while ((chunk = timeStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`)
}
})
readable事件
在流有新的数据时会被触发,如果数据可用,使用timeStream.read()
可以返回该数据,如果已经到达了流的尾部,timeStream.read()
将返回null
,并触发end
事件。
可写流
可写流表示用来接收数据
的目的地
,向写入流中写入数据非常简单,只需要调用write()
方法,其语法如下:
writable.write(chunk[, encoding][, callback])
当数据块是String类型时,可以指定encoding(默认是utf8,当数据块为Buffer时encoding会被忽略),当数据块完全被写入时,会调用callback。
当没有更多数据写入到流中时,使用end()
方法,其语法和write方法类似,如下:
writable.end([chunk[, encoding]][, callback])
最后,我们来看一个小例子:
const writeStream = fs.createWriteStream('example.txt')
writeStream.write('hello, ')
writeStream.end('world!')
上面代码,将hello, world这几个字符串写入到了文件example.txt中。
在实际应用中,常见的可写流有:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout, process.stderr
双向流
双向流(Duplex Stream)是既可以读取又可以写入的流,当我们想要描述一个既是数据源又是目的地的事物时,双向流就非常有用,比如网络套接字。
双向流同时继承
了stream.Readable和stream.Writable的方法,这意味我们既可以使用read()和write()方法,也可以同时监听readble
和drain
事件。
drain事件出现在这样一种情况下:当数据缓冲区已超出highWaterMark
或写入队列当前正忙,此时,调用.write()
会返回false,背压系统
启动,它将暂停
Readable流发送任何数据,并等待消费者(写入流)再次准备好。一旦数据缓冲区被清空,将触发drain事件并恢复传入的数据流。
变换流
变换流(Transform Stream)是一种特殊
的双向流,用来处理数据的转换。和双向流一样,变换流也是同时实现了Readable和Writable接口。
如果自己定义变换流,则必须要实现transform()
方法。
参考
- Stream | Node.js
- Mario Casciaro, Luciano Mammino. Node.js 设计模式(第2版)