NodeJs-Stream

class Stream

Node.jsStream 模块是处理数据流的核心工具,它允许你在处理文件、网络操作、子进程之间的数据传输时进行异步处理。Stream 模块下的所有类都继承自 EventEmitter,因此流支持多种事件模式来处理数据传输。Stream 分为四种基本类型:

  1. 可读流 (Readable): 允许从数据源读取数据。
  2. 可写流 (Writable): 允许写入数据到目标位置。
  3. 双工流 (Duplex): 既可读又可写,允许双向的数据传输。
  4. 转换流 (Transform): 是一种双工流,但数据的读写是经过某种变换或处理的。

以下是详细的 Node.js Stream 模块的相关属性、方法以及它的全面使用介绍。


1. 模块引用

要使用 Stream 模块,首先需要引入它:

const { Readable, Writable, Duplex, Transform } = require('stream');

或使用 require('node:stream')

2. 可读流 (Readable Stream)

Readable 流表示可以从中读取数据的流。

常用事件

  • 'data': 当数据块可用时触发。
  • 'end': 当没有更多数据可读取时触发。
  • 'error': 当读取过程中出错时触发。
  • 'close': 当流完全关闭时触发。

重要方法

  • readable.read([size]):
    从流中读取并返回指定大小的数据块。
  • readable.pipe(destination[, options]):
    将当前可读流的数据传输到目标可写流。
  • readable.pause():
    暂停流的传输。
  • readable.resume():
    恢复流的传输。
  • readable.isPaused():
    检查流是否处于暂停状态。
  • readable.setEncoding(encoding):
    设置流的字符编码。
  • readable.unshift(chunk):
    向缓冲区插入数据。

示例

const { Readable } = require('stream');

const readable = new Readable({
  read() {
    this.push('Hello');
    this.push('World');
    this.push(null);  // 表示数据结束
  }
});

readable.on('data', (chunk) => {
  console.log(`Received: ${chunk}`);
});

3. 可写流 (Writable Stream)

Writable 流表示你可以将数据写入目标位置。

常用事件

  • 'drain': 当写入缓冲区清空时触发。
  • 'finish': 当所有数据已写入时触发。
  • 'error': 当写入出错时触发。
  • 'close': 当流关闭时触发。

重要方法

  • writable.write(chunk[, encoding][, callback]):
    将数据块写入流中。如果返回 false,表示缓冲区已满,应该暂停写入并等待 drain 事件。
  • writable.end([chunk][, encoding][, callback]):
    表示没有更多数据要写入,并且关闭流。
  • writable.cork():
    强制将数据保存在缓冲区中,直到调用 writable.uncork()
  • writable.uncork():
    刷新缓冲区中的所有数据。
  • writable.setDefaultEncoding(encoding):
    设置默认的编码格式。

示例

const { Writable } = require('stream');

const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk}`);
    callback();
  }
});

writable.write('Hello');
writable.write('World');
writable.end('!');

4. 双工流 (Duplex Stream)

Duplex 流是同时具有可读和可写功能的流。

示例

const { Duplex } = require('stream');

const duplex = new Duplex({
  read(size) {
    this.push('Hello');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk}`);
    callback();
  }
});

duplex.on('data', (chunk) => {
  console.log(`Received: ${chunk}`);
});

duplex.write('World');
duplex.end();

5. 转换流 (Transform Stream)

Transform 流是一种特殊的双工流,允许对流经的数据进行修改。

示例

const { Transform } = require('stream');

const transform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

transform.on('data', (chunk) => {
  console.log(`Transformed: ${chunk}`);
});

transform.write('Hello');
transform.write('World');
transform.end();

6. 流的管道化 (Piping)

pipe() 方法用于将数据从一个流直接传递到另一个流,常用于将可读流的数据传递到可写流。

示例

const fs = require('fs');
const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

// 通过管道将数据从 input.txt 传输到 output.txt
readable.pipe(writable);

7. Stream 模块的类与对象

Stream 类

  • stream.pipeline():
    安全地将多个流连接在一起,并自动处理错误。
  • stream.finished():
    当流结束或错误时执行回调。

Stream 实例

  • stream.readable:
    布尔值,指示流是否可读。
  • stream.writable:
    布尔值,指示流是否可写。

8. 高水位线 (HighWaterMark)

highWaterMark 是流的一个属性,用于控制缓冲区的大小。不同类型的流默认 highWaterMark 大小不同:

  • 对于 ReadableWritable 流,默认值是 16 KB
  • 对于对象模式的流,默认值是 16 个对象。

示例

const readable = new Readable({
  highWaterMark: 1024, // 1 KB 缓冲区
  read() {}
});

9. 错误处理

流可能会发生错误,因此通常需要监听 'error' 事件。

示例

readable.on('error', (err) => {
  console.error('Stream error:', err);
});

10. 对象模式 (Object Mode)

流可以以对象而不是字节进行操作。为了启用对象模式,需要在创建流时传递 objectMode: true

示例

const readable = new Readable({
  objectMode: true,
  read() {
    this.push({ key: 'value' });
    this.push(null);
  }
});

readable.on('data', (data) => {
  console.log(data);
});

Node.js 中的 Stream 模块非常强大,支持数据的实时处理。无论是文件、网络数据还是其他数据源,流都提供了一个高效的异步解决方案,特别适合处理大量数据或需要对数据进行实时处理的场景。

评论区
评论列表
menu