16.3 流(Stream)与管道(pipe)

Node.js 流基础概念

流的四种基本类型

  1. 可读流(Readable) - 数据来源(如文件读取、HTTP请求)
  2. 可写流(Writable) - 数据目标(如文件写入、HTTP响应)
  3. 双工流(Duplex) - 既可读又可写(如TCP套接字)
  4. 转换流(Transform) - 在读写过程中修改数据(如压缩/解压)

流的基本优势

  • 内存效率:无需一次性加载全部数据
  • 时间效率:可以边读取边处理
  • 组合性:通过管道连接多个流操作

流的创建与使用

可读流示例

import { createReadStream } from 'fs';
import { createServer } from 'http';

// 创建文件可读流
const fileStream = createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 每次读取64KB
});

// 事件监听方式
fileStream.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 字节数据`);
});

fileStream.on('end', () => {
  console.log('文件读取完成');
});

fileStream.on('error', (err) => {
  console.error('读取错误:', err);
});

// HTTP服务器中使用流
createServer((req, res) => {
  createReadStream('large-file.txt').pipe(res);
}).listen(3000);

可写流示例

import { createWriteStream } from 'fs';

const writeStream = createWriteStream('output.txt', {
  flags: 'a', // 追加模式
  encoding: 'utf8'
});

// 写入数据
writeStream.write('第一行数据\n');
writeStream.write('第二行数据\n');

// 结束写入
writeStream.end('最后一行数据\n');

// 事件监听
writeStream.on('finish', () => {
  console.log('所有数据已写入');
});

writeStream.on('error', (err) => {
  console.error('写入错误:', err);
});

管道(pipe)机制

基本管道操作

import { createReadStream, createWriteStream } from 'fs';

// 基本文件复制
createReadStream('source.txt')
  .pipe(createWriteStream('destination.txt'))
  .on('finish', () => console.log('复制完成'));

// 链式管道
import zlib from 'zlib';

createReadStream('source.txt')
  .pipe(zlib.createGzip()) // 压缩
  .pipe(createWriteStream('source.txt.gz'))
  .on('finish', () => console.log('压缩完成'));

错误处理管道

import { pipeline } from 'stream/promises';

async function safePipe() {
  try {
    await pipeline(
      createReadStream('input.txt'),
      processData(), // 自定义转换流
      createWriteStream('output.txt')
    );
    console.log('管道处理完成');
  } catch (err) {
    console.error('管道错误:', err);
  }
}

// 自定义转换流
function processData() {
  let count = 0;
  return new Transform({
    transform(chunk, encoding, callback) {
      count++;
      this.push(`块${count}: ${chunk}\n`);
      callback();
    }
  });
}

高级流模式

对象模式流

import { Readable } from 'stream';

// 创建对象模式的可读流
const objectStream = new Readable({
  objectMode: true,
  read() {} // 必须实现_read方法
});

// 推送对象
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // 结束流

// 消费对象流
objectStream.on('data', (obj) => {
  console.log('收到对象:', obj);
});

背压(Backpressure)管理

import { createWriteStream } from 'fs';

// 高水位线设置
const writer = createWriteStream('output.txt', {
  highWaterMark: 1024 * 1024 // 1MB缓冲区
});

// 手动处理背压
function writeData(reader, writer) {
  reader.on('data', (chunk) => {
    const canContinue = writer.write(chunk);
    if (!canContinue) {
      reader.pause(); // 暂停读取
      writer.once('drain', () => reader.resume()); // 缓冲区空时恢复
    }
  });
}

自定义流实现

自定义转换流

import { Transform } from 'stream';

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// 使用自定义转换流
createReadStream('input.txt')
  .pipe(new UpperCaseTransform())
  .pipe(createWriteStream('output.txt'));

自定义双工流

import { Duplex } from 'stream';

class EchoDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = [];
  }
  
  _write(chunk, encoding, callback) {
    this.buffer.push(chunk);
    callback();
  }
  
  _read(size) {
    while (this.buffer.length) {
      const chunk = this.buffer.shift();
      if (!this.push(chunk)) {
        break; // 消费者处理不过来时停止推送
      }
    }
    if (this.buffer.length === 0) {
      this.push(null); // 结束流
    }
  }
}

// 使用示例
const echo = new EchoDuplex();
process.stdin.pipe(echo).pipe(process.stdout);

实战应用场景

1. 大文件处理

import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import csv from 'csv-parser';
import { Transform } from 'stream';

async function processLargeCSV() {
  await pipeline(
    createReadStream('huge-dataset.csv'),
    csv(), // CSV解析
    new Transform({
      objectMode: true,
      transform(row, encoding, callback) {
        // 数据转换
        this.push(JSON.stringify(row) + '\n');
        callback();
      }
    }),
    createWriteStream('processed-data.ndjson')
  );
  
  console.log('CSV处理完成');
}

2. 实时日志处理

import { createServer } from 'http';
import { PassThrough } from 'stream';

// 创建日志流
const logStream = new PassThrough();

// 多个日志消费者
logStream.pipe(createWriteStream('application.log'));
logStream.pipe(process.stdout);

createServer((req, res) => {
  // 记录访问日志
  logStream.write(`[${new Date().toISOString()}] ${req.method} ${req.url}\n`);
  
  // 处理请求...
  res.end('Hello World');
}).listen(3000);

3. 数据加密管道

import { createReadStream, createWriteStream } from 'fs';
import { createCipheriv, createDecipheriv } from 'crypto';

const algorithm = 'aes-256-cbc';
const key = Buffer.from(process.env.ENCRYPTION_KEY, 'hex');
const iv = Buffer.alloc(16, 0); // 初始化向量

// 加密文件
function encryptFile(input, output) {
  return pipeline(
    createReadStream(input),
    createCipheriv(algorithm, key, iv),
    createWriteStream(output)
  );
}

// 解密文件
function decryptFile(input, output) {
  return pipeline(
    createReadStream(input),
    createDecipheriv(algorithm, key, iv),
    createWriteStream(output)
  );
}

性能优化技巧

1. 并行流处理

import { Transform } from 'stream';
import { Worker } from 'worker_threads';

class ParallelTransform extends Transform {
  constructor(workerPath, options) {
    super({ ...options, objectMode: true });
    this.workers = [];
    this.pending = 0;
    
    // 创建工作线程池
    for (let i = 0; i < options.concurrency || 4; i++) {
      const worker = new Worker(workerPath);
      worker.on('message', (result) => {
        this.push(result);
        this.processNext();
      });
      this.workers.push(worker);
    }
  }
  
  _transform(chunk, encoding, callback) {
    this.queue.push({ chunk, callback });
    this.processNext();
  }
  
  processNext() {
    if (this.queue.length && this.pending < this.workers.length) {
      const { chunk, callback } = this.queue.shift();
      const worker = this.workers[this.pending++];
      worker.postMessage(chunk);
      callback();
    }
  }
}

2. 流速率控制

import { Throttle } from 'stream-throttle';

// 限制读取速度为1MB/s
createReadStream('large-file.iso')
  .pipe(new Throttle({ rate: 1024 * 1024 })) // 1MB/秒
  .pipe(createWriteStream('copy.iso'));

3. 内存使用监控

function monitorStream(stream, name) {
  let bytes = 0;
  
  stream.on('data', (chunk) => {
    bytes += chunk.length;
    const mb = (bytes / (1024 * 1024)).toFixed(2);
    console.log(`[${name}] 已处理: ${mb} MB`);
  });
  
  stream.on('end', () => {
    console.log(`[${name}] 总计: ${bytes} 字节`);
  });
}

// 使用示例
const reader = createReadStream('bigfile.dat');
monitorStream(reader, '读取流');

错误处理最佳实践

1. 全面的管道错误处理

import { pipeline } from 'stream/promises';

async function safeProcessing() {
  try {
    await pipeline(
      createReadStream('input.data'),
      new Transform({
        transform(chunk, encoding, callback) {
          try {
            // 可能抛出错误的处理逻辑
            const result = processChunk(chunk);
            callback(null, result);
          } catch (err) {
            callback(err); // 将错误转发给管道
          }
        }
      }),
      createWriteStream('output.data')
    );
  } catch (err) {
    console.error('处理失败:', err);
    // 清理资源
    await fs.unlink('output.data').catch(() => {});
    throw err;
  }
}

2. 流超时控制

function withTimeout(stream, timeout) {
  let timer = setTimeout(() => {
    stream.destroy(new Error(`操作超时 (${timeout}ms)`));
  }, timeout);
  
  const clear = () => clearTimeout(timer);
  
  stream.on('close', clear);
  stream.on('end', clear);
  stream.on('error', clear);
  
  return stream;
}

// 使用示例
createReadStream('slow-resource.txt')
  .pipe(withTimeout(transformStream, 5000))
  .pipe(createWriteStream('output.txt'));

现代 JavaScript 集成

1. 异步迭代器与流

async function processStream() {
  const readable = createReadStream('data.txt', { encoding: 'utf8' });
  
  for await (const chunk of readable) {
    console.log('处理块:', chunk.length);
    // 处理每个数据块
  }
  
  console.log('流处理完成');
}

2. Promise 组合器与流

async function concurrentStreams() {
  const [file1, file2] = await Promise.all([
    fs.promises.readFile('file1.txt', 'utf8'),
    new Promise((resolve, reject) => {
      let content = '';
      createReadStream('file2.txt', 'utf8')
        .on('data', chunk => content += chunk)
        .on('end', () => resolve(content))
        .on('error', reject);
    })
  ]);
  
  console.log('合并内容:', file1 + file2);
}

总结对比表

特性 回调风格 Promise风格 流式处理
内存使用 中等 中等
大文件处理能力 优秀
代码复杂度 高(回调地狱) 中等 中等
实时处理能力 有限 有限 优秀
错误处理 回调参数 try/catch 事件/Promise
数据转换能力 需要手动处理 需要手动处理 内置管道支持
适用场景 简单I/O操作 简单异步操作 大数据/实时处理

Node.js 的流和管道机制为处理 I/O 密集型操作提供了高效、灵活的解决方案。通过合理使用这些技术,开发者可以构建出能够处理大规模数据而不会耗尽内存的高性能应用程序。

#前端开发 分享于 2025-03-25

【 内容由 AI 共享,不代表本站观点,请谨慎参考 】