15.3 Web Workers 与多线程

Web Workers 基础

基本概念与使用

Web Workers 允许在后台线程中运行 JavaScript 代码,不会阻塞主线程。

主线程代码 (main.js):

// 创建Worker
const worker = new Worker('worker.js');

// 向Worker发送消息
worker.postMessage({ command: 'start', data: 42 });

// 接收Worker消息
worker.onmessage = function(event) {
  console.log('Received from worker:', event.data);
  
  // 终止Worker
  if (event.data === 'done') {
    worker.terminate();
  }
};

// 错误处理
worker.onerror = function(error) {
  console.error('Worker error:', error.message);
  error.preventDefault(); // 阻止默认错误输出
};

Worker线程代码 (worker.js):

// 监听主线程消息
self.onmessage = function(event) {
  const { command, data } = event.data;
  
  if (command === 'start') {
    // 执行耗时计算
    const result = heavyCalculation(data);
    
    // 发送结果回主线程
    self.postMessage({ result });
    
    // 通知完成
    self.postMessage('done');
  }
};

function heavyCalculation(input) {
  // 模拟耗时操作
  let output = 0;
  for (let i = 0; i < input * 1000000; i++) {
    output += Math.sqrt(i);
  }
  return output;
}

Worker 高级用法

不同类型的 Workers

  1. 专用 Worker (Dedicated Worker)

    // 主线程
    const dedicatedWorker = new Worker('dedicated-worker.js');
    
  2. 共享 Worker (Shared Worker)

    // 主线程
    const sharedWorker = new SharedWorker('shared-worker.js');
    sharedWorker.port.start();
    sharedWorker.port.postMessage('Hello shared worker');
    
  3. Service Worker

    // 注册Service Worker
    if ('serviceWorker' in navigator) {
      navigator.serviceWorker.register('/sw.js')
        .then(registration => {
          console.log('ServiceWorker registration successful');
        });
    }
    

数据传输优化

  1. 结构化克隆算法

    // 自动处理大多数数据类型
    worker.postMessage({
      date: new Date(),
      set: new Set([1, 2, 3]),
      map: new Map([['key', 'value']])
    });
    
  2. Transferable Objects (零拷贝)

    // 主线程
    const buffer = new ArrayBuffer(1024);
    worker.postMessage(buffer, [buffer]); // 转移所有权
    
  3. SharedArrayBuffer

    // 主线程
    const sharedBuffer = new SharedArrayBuffer(1024);
    worker.postMessage({ buffer: sharedBuffer });
    

多线程编程模式

线程池实现

class WorkerPool {
  constructor(poolSize, workerScript) {
    this.poolSize = poolSize;
    this.workerScript = workerScript;
    this.taskQueue = [];
    this.workers = [];
    
    // 初始化worker池
    for (let i = 0; i < poolSize; i++) {
      this.createWorker();
    }
  }
  
  createWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.onmessage = (event) => {
      const { taskId, result } = event.data;
      const task = this.taskQueue.find(t => t.id === taskId);
      
      if (task) {
        task.resolve(result);
        this.taskQueue = this.taskQueue.filter(t => t.id !== taskId);
      }
      
      // 检查是否有等待的任务
      if (this.taskQueue.length > 0) {
        const nextTask = this.taskQueue.shift();
        worker.postMessage({
          taskId: nextTask.id,
          data: nextTask.data
        });
      } else {
        this.workers.push(worker);
      }
    };
    
    worker.onerror = (error) => {
      const task = this.taskQueue.shift();
      if (task) {
        task.reject(error);
      }
      this.createWorker(); // 替换崩溃的worker
    };
    
    this.workers.push(worker);
  }
  
  runTask(data) {
    return new Promise((resolve, reject) => {
      const taskId = performance.now() + Math.random().toString(36).substr(2);
      
      if (this.workers.length > 0) {
        const worker = this.workers.pop();
        worker.postMessage({
          taskId,
          data
        });
      } else {
        this.taskQueue.push({ id: taskId, data, resolve, reject });
      }
    });
  }
}

// 使用示例
const pool = new WorkerPool(4, 'worker.js');

async function processTasks() {
  const results = await Promise.all([
    pool.runTask({ operation: 'encrypt', data: 'secret1' }),
    pool.runTask({ operation: 'encrypt', data: 'secret2' }),
    pool.runTask({ operation: 'analyze', data: [1, 2, 3] })
  ]);
  
  console.log('All tasks completed:', results);
}

基于 Actor 模型的通信

// worker-manager.js
class WorkerActor {
  constructor(workerScript) {
    this.worker = new Worker(workerScript);
    this.callbacks = new Map();
    this.messageId = 0;
    
    this.worker.onmessage = (event) => {
      const { id, result, error } = event.data;
      
      if (this.callbacks.has(id)) {
        const { resolve, reject } = this.callbacks.get(id);
        this.callbacks.delete(id);
        
        if (error) {
          reject(new Error(error));
        } else {
          resolve(result);
        }
      }
    };
  }
  
  send(message) {
    return new Promise((resolve, reject) => {
      const id = ++this.messageId;
      this.callbacks.set(id, { resolve, reject });
      this.worker.postMessage({ id, message });
    });
  }
  
  terminate() {
    this.worker.terminate();
  }
}

// 使用示例
const actor = new WorkerActor('actor-worker.js');

async function performTasks() {
  try {
    const result1 = await actor.send({ type: 'CALCULATE', data: 42 });
    const result2 = await actor.send({ type: 'TRANSFORM', data: 'input' });
    console.log('Results:', result1, result2);
  } catch (error) {
    console.error('Actor error:', error);
  }
}

性能优化技巧

任务分片处理

// 主线程
function processLargeData(data, chunkSize, workerScript) {
  return new Promise((resolve) => {
    const worker = new Worker(workerScript);
    const chunks = [];
    
    for (let i = 0; i < data.length; i += chunkSize) {
      chunks.push(data.slice(i, i + chunkSize));
    }
    
    let processed = 0;
    const results = [];
    
    worker.onmessage = (event) => {
      results.push(event.data.result);
      processed++;
      
      if (processed < chunks.length) {
        worker.postMessage({ chunk: chunks[processed], index: processed });
      } else {
        worker.terminate();
        resolve(results.flat());
      }
    };
    
    // 启动第一个任务
    worker.postMessage({ chunk: chunks[0], index: 0 });
  });
}

共享内存与原子操作

// 主线程
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);

// 初始化共享数据
Atomics.store(sharedArray, 0, 0);

const workers = [];
for (let i = 0; i < 4; i++) {
  const worker = new Worker('shared-worker.js');
  worker.postMessage({ buffer: sharedBuffer, id: i });
  workers.push(worker);
}

// 等待所有worker完成
setTimeout(() => {
  console.log('Final value:', Atomics.load(sharedArray, 0));
  workers.forEach(w => w.terminate());
}, 5000);

// shared-worker.js
self.onmessage = function(event) {
  const { buffer, id } = event.data;
  const sharedArray = new Int32Array(buffer);
  
  for (let i = 0; i < 1000; i++) {
    // 原子操作增加共享值
    Atomics.add(sharedArray, 0, 1);
  }
  
  console.log(`Worker ${id} finished`);
};

实际应用场景

图像处理

// image-processor.js
class ImageProcessor {
  constructor() {
    this.worker = new Worker('image-worker.js');
    this.callbacks = new Map();
    this.taskId = 0;
    
    this.worker.onmessage = (event) => {
      const { id, imageData, error } = event.data;
      if (this.callbacks.has(id)) {
        const { resolve, reject } = this.callbacks.get(id);
        this.callbacks.delete(id);
        
        if (error) {
          reject(new Error(error));
        } else {
          resolve(imageData);
        }
      }
    };
  }
  
  processImage(imageData, operations) {
    return new Promise((resolve, reject) => {
      const id = ++this.taskId;
      this.callbacks.set(id, { resolve, reject });
      
      // 使用Transferable Objects传输图像数据
      this.worker.postMessage({
        id,
        imageData,
        operations
      }, [imageData.data.buffer]);
    });
  }
}

// 使用示例
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
const processor = new ImageProcessor();

function applyFilter() {
  const imageData = ctx.getImageData(0, 0, canvas.width, canvas.height);
  
  processor.processImage(imageData, [
    { type: 'grayscale' },
    { type: 'blur', radius: 2 },
    { type: 'contrast', level: 1.5 }
  ]).then(processedData => {
    ctx.putImageData(processedData, 0, 0);
  }).catch(error => {
    console.error('Image processing failed:', error);
  });
}

大数据分析

// data-analytics.js
class DataAnalytics {
  constructor(workerCount = 4) {
    this.pool = new WorkerPool(workerCount, 'analytics-worker.js');
  }
  
  async analyzeLargeDataset(dataset, chunkSize = 10000) {
    const chunks = [];
    for (let i = 0; i < dataset.length; i += chunkSize) {
      chunks.push(dataset.slice(i, i + chunkSize));
    }
    
    const tasks = chunks.map(chunk => 
      this.pool.runTask({
        action: 'analyze',
        data: chunk
      })
    );
    
    const results = await Promise.all(tasks);
    return this.aggregateResults(results);
  }
  
  aggregateResults(partialResults) {
    // 合并所有worker的结果
    return partialResults.reduce((final, current) => {
      return {
        count: final.count + current.count,
        sum: final.sum + current.sum,
        min: Math.min(final.min, current.min),
        max: Math.max(final.max, current.max)
      };
    }, { count: 0, sum: 0, min: Infinity, max: -Infinity });
  }
}

// 使用示例
const analytics = new DataAnalytics();
const largeDataset = generateLargeDataset(); // 假设有100万条数据

analytics.analyzeLargeDataset(largeDataset)
  .then(stats => {
    console.log('Analysis results:', stats);
    console.log('Average:', stats.sum / stats.count);
  });

注意事项与最佳实践

  1. Worker 生命周期管理

    • 及时终止不再需要的 Worker (worker.terminate())
    • 避免频繁创建/销毁 Worker,考虑使用池化技术
  2. 错误处理

    • 为 Worker 添加 onerror 处理程序
    • 实现 Worker 内部错误捕获和报告机制
  3. 性能考量

    • Worker 启动有开销,适合长时间运行的任务
    • 消息传递有序列化/反序列化成本,大数据考虑 Transferable Objects
  4. 安全限制

    • Worker 不能直接访问 DOM
    • 受同源策略限制
    • 某些 API 不可用 (如 localStorage)
  5. 调试技巧

    • Chrome DevTools 可以调试 Worker 代码
    • 使用 console.log 在 Worker 中输出信息
    • 给 Worker 命名便于调试:new Worker('worker.js', {name: 'ImageWorker'})

现代替代方案

WebAssembly + Worker

// 主线程
const worker = new Worker('wasm-worker.js');

fetch('module.wasm')
  .then(response => response.arrayBuffer())
  .then(wasmBinary => {
    worker.postMessage({
      type: 'INIT_WASM',
      wasmBinary
    }, [wasmBinary]);
  });

// wasm-worker.js
let wasmModule;

self.onmessage = async function(event) {
  const { type, wasmBinary } = event.data;
  
  if (type === 'INIT_WASM') {
    const { instance } = await WebAssembly.instantiate(wasmBinary);
    wasmModule = instance.exports;
    self.postMessage({ status: 'WASM_READY' });
  }
  
  if (type === 'COMPUTE' && wasmModule) {
    const result = wasmModule.compute(event.data.input);
    self.postMessage({ result });
  }
};

Comlink 简化 Worker 通信

// main.js
import * as Comlink from 'comlink';

const worker = new Worker('worker.js');
const workerApi = Comlink.wrap(worker);

async function run() {
  // 像调用本地函数一样调用Worker函数
  const result = await workerApi.heavyCalculation(42);
  console.log('Result:', result);
}

// worker.js
importScripts('https://unpkg.com/comlink/dist/umd/comlink.js');

const api = {
  heavyCalculation(input) {
    // 执行耗时计算
    return input * 2;
  }
};

Comlink.expose(api);
#前端开发 分享于 2025-03-25

【 内容由 AI 共享,不代表本站观点,请谨慎参考 】