Node.js中的四种流

阅读 1.3k

上一篇《流(Stream)的优点》说明了为什么需要流,本篇则介绍Node.js中的四种流。

流的分类

Node.js中的每个流的实例都是stream模块提供的四个基本抽象类之一的实现:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

同时,每一个流同样也是EventEmitter类的实例。所以,流和事件息息相关,比如,可读取流在读取完成后会触发end事件,或者在发生错误时出触发error事件。

可读流

可读流表示用来消费数据数据源,比如,常见的从文件中读取数据:

fs.createReadStream('demo.txt')

表示以demo.txt文件作为数据源,并返回了可读流(ReadStream),既然得到了流,那么可以使用事件来获得相关的数据:

readStream
  .on('data', chunk => {
    console.log(`Chunk read: ${chunk.length}, ${chunk.toString()}`)
  })
  .on('end', () => console.log('--end--'))

每当流将数据块(chunk)的所有权让给消费者时,都会发出data事件,从而获得此时的数据。

在实际应用中,常见的可读流包括:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdout and stderr
  • process.stdin

除了模块中提供的这些可读流之外,我们也可以自定义可读流,为此,需要创建一个类,并继承stream.Readable,还必须提供_read()方法,下面是一个简单的实现:

const stream = require('stream')

class TimeStream extends stream.Readable {
  constructor(options) {
    super(options)
  }

  _read(size) {
    const chunk = new Date().getTime().toString()
    console.log(`Pushing chunk of size: ${chunk.length}`)
    this.push(chunk, 'utf8')
  }
}

上面代码中,定义了一个简单的可读流,请注意,_read()方法是在TimeStream类中定义的,是内部方法,无法被直接调用,外部消费者调用需要使用read()方法:

const timeStream = new TimeStream()
timeStream.on('readable', () => {
  let chunk
  while ((chunk = timeStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`)
  }
})

readable事件在流有新的数据时会被触发,如果数据可用,使用timeStream.read()可以返回该数据,如果已经到达了流的尾部,timeStream.read()将返回null,并触发end事件。

可写流

可写流表示用来接收数据目的地,向写入流中写入数据非常简单,只需要调用write()方法,其语法如下:

writable.write(chunk[, encoding][, callback])

当数据块是String类型时,可以指定encoding(默认是utf8,当数据块为Buffer时encoding会被忽略),当数据块完全被写入时,会调用callback。

当没有更多数据写入到流中时,使用end()方法,其语法和write方法类似,如下:

writable.end([chunk[, encoding]][, callback])

最后,我们来看一个小例子:

const writeStream = fs.createWriteStream('example.txt')
writeStream.write('hello, ')
writeStream.end('world!')

上面代码,将hello, world这几个字符串写入到了文件example.txt中。

在实际应用中,常见的可写流有:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

双向流

双向流(Duplex Stream)是既可以读取又可以写入的流,当我们想要描述一个既是数据源又是目的地的事物时,双向流就非常有用,比如网络套接字。

双向流同时继承了stream.Readable和stream.Writable的方法,这意味我们既可以使用read()和write()方法,也可以同时监听readbledrain事件。

drain事件出现在这样一种情况下:当数据缓冲区已超出highWaterMark或写入队列当前正忙,此时,调用.write()会返回false,背压系统启动,它将暂停Readable流发送任何数据,并等待消费者(写入流)再次准备好。一旦数据缓冲区被清空,将触发drain事件并恢复传入的数据流。

变换流

变换流(Transform Stream)是一种特殊的双向流,用来处理数据的转换。和双向流一样,变换流也是同时实现了Readable和Writable接口。

如果自己定义变换流,则必须要实现transform()方法。

参考

  • Stream | Node.js
  • Mario Casciaro, Luciano Mammino. Node.js 设计模式(第2版)
最后编辑于: 2022-06-27

评论(0条)

(必填)
复制成功