15.3 Web Workers 与多线程
Web Workers 基础
基本概念与使用
Web Workers 允许在后台线程中运行 JavaScript 代码,不会阻塞主线程。
主线程代码 (main.js):
// 创建Worker
const worker = new Worker('worker.js');
// 向Worker发送消息
worker.postMessage({ command: 'start', data: 42 });
// 接收Worker消息
worker.onmessage = function(event) {
console.log('Received from worker:', event.data);
// 终止Worker
if (event.data === 'done') {
worker.terminate();
}
};
// 错误处理
worker.onerror = function(error) {
console.error('Worker error:', error.message);
error.preventDefault(); // 阻止默认错误输出
};
Worker线程代码 (worker.js):
// 监听主线程消息
self.onmessage = function(event) {
const { command, data } = event.data;
if (command === 'start') {
// 执行耗时计算
const result = heavyCalculation(data);
// 发送结果回主线程
self.postMessage({ result });
// 通知完成
self.postMessage('done');
}
};
function heavyCalculation(input) {
// 模拟耗时操作
let output = 0;
for (let i = 0; i < input * 1000000; i++) {
output += Math.sqrt(i);
}
return output;
}
Worker 高级用法
不同类型的 Workers
-
专用 Worker (Dedicated Worker)
// 主线程 const dedicatedWorker = new Worker('dedicated-worker.js'); -
共享 Worker (Shared Worker)
// 主线程 const sharedWorker = new SharedWorker('shared-worker.js'); sharedWorker.port.start(); sharedWorker.port.postMessage('Hello shared worker'); -
Service Worker
// 注册Service Worker if ('serviceWorker' in navigator) { navigator.serviceWorker.register('/sw.js') .then(registration => { console.log('ServiceWorker registration successful'); }); }
数据传输优化
-
结构化克隆算法
// 自动处理大多数数据类型 worker.postMessage({ date: new Date(), set: new Set([1, 2, 3]), map: new Map([['key', 'value']]) }); -
Transferable Objects (零拷贝)
// 主线程 const buffer = new ArrayBuffer(1024); worker.postMessage(buffer, [buffer]); // 转移所有权 -
SharedArrayBuffer
// 主线程 const sharedBuffer = new SharedArrayBuffer(1024); worker.postMessage({ buffer: sharedBuffer });
多线程编程模式
线程池实现
class WorkerPool {
constructor(poolSize, workerScript) {
this.poolSize = poolSize;
this.workerScript = workerScript;
this.taskQueue = [];
this.workers = [];
// 初始化worker池
for (let i = 0; i < poolSize; i++) {
this.createWorker();
}
}
createWorker() {
const worker = new Worker(this.workerScript);
worker.onmessage = (event) => {
const { taskId, result } = event.data;
const task = this.taskQueue.find(t => t.id === taskId);
if (task) {
task.resolve(result);
this.taskQueue = this.taskQueue.filter(t => t.id !== taskId);
}
// 检查是否有等待的任务
if (this.taskQueue.length > 0) {
const nextTask = this.taskQueue.shift();
worker.postMessage({
taskId: nextTask.id,
data: nextTask.data
});
} else {
this.workers.push(worker);
}
};
worker.onerror = (error) => {
const task = this.taskQueue.shift();
if (task) {
task.reject(error);
}
this.createWorker(); // 替换崩溃的worker
};
this.workers.push(worker);
}
runTask(data) {
return new Promise((resolve, reject) => {
const taskId = performance.now() + Math.random().toString(36).substr(2);
if (this.workers.length > 0) {
const worker = this.workers.pop();
worker.postMessage({
taskId,
data
});
} else {
this.taskQueue.push({ id: taskId, data, resolve, reject });
}
});
}
}
// 使用示例
const pool = new WorkerPool(4, 'worker.js');
async function processTasks() {
const results = await Promise.all([
pool.runTask({ operation: 'encrypt', data: 'secret1' }),
pool.runTask({ operation: 'encrypt', data: 'secret2' }),
pool.runTask({ operation: 'analyze', data: [1, 2, 3] })
]);
console.log('All tasks completed:', results);
}
基于 Actor 模型的通信
// worker-manager.js
class WorkerActor {
constructor(workerScript) {
this.worker = new Worker(workerScript);
this.callbacks = new Map();
this.messageId = 0;
this.worker.onmessage = (event) => {
const { id, result, error } = event.data;
if (this.callbacks.has(id)) {
const { resolve, reject } = this.callbacks.get(id);
this.callbacks.delete(id);
if (error) {
reject(new Error(error));
} else {
resolve(result);
}
}
};
}
send(message) {
return new Promise((resolve, reject) => {
const id = ++this.messageId;
this.callbacks.set(id, { resolve, reject });
this.worker.postMessage({ id, message });
});
}
terminate() {
this.worker.terminate();
}
}
// 使用示例
const actor = new WorkerActor('actor-worker.js');
async function performTasks() {
try {
const result1 = await actor.send({ type: 'CALCULATE', data: 42 });
const result2 = await actor.send({ type: 'TRANSFORM', data: 'input' });
console.log('Results:', result1, result2);
} catch (error) {
console.error('Actor error:', error);
}
}
性能优化技巧
任务分片处理
// 主线程
function processLargeData(data, chunkSize, workerScript) {
return new Promise((resolve) => {
const worker = new Worker(workerScript);
const chunks = [];
for (let i = 0; i < data.length; i += chunkSize) {
chunks.push(data.slice(i, i + chunkSize));
}
let processed = 0;
const results = [];
worker.onmessage = (event) => {
results.push(event.data.result);
processed++;
if (processed < chunks.length) {
worker.postMessage({ chunk: chunks[processed], index: processed });
} else {
worker.terminate();
resolve(results.flat());
}
};
// 启动第一个任务
worker.postMessage({ chunk: chunks[0], index: 0 });
});
}
共享内存与原子操作
// 主线程
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
// 初始化共享数据
Atomics.store(sharedArray, 0, 0);
const workers = [];
for (let i = 0; i < 4; i++) {
const worker = new Worker('shared-worker.js');
worker.postMessage({ buffer: sharedBuffer, id: i });
workers.push(worker);
}
// 等待所有worker完成
setTimeout(() => {
console.log('Final value:', Atomics.load(sharedArray, 0));
workers.forEach(w => w.terminate());
}, 5000);
// shared-worker.js
self.onmessage = function(event) {
const { buffer, id } = event.data;
const sharedArray = new Int32Array(buffer);
for (let i = 0; i < 1000; i++) {
// 原子操作增加共享值
Atomics.add(sharedArray, 0, 1);
}
console.log(`Worker ${id} finished`);
};
实际应用场景
图像处理
// image-processor.js
class ImageProcessor {
constructor() {
this.worker = new Worker('image-worker.js');
this.callbacks = new Map();
this.taskId = 0;
this.worker.onmessage = (event) => {
const { id, imageData, error } = event.data;
if (this.callbacks.has(id)) {
const { resolve, reject } = this.callbacks.get(id);
this.callbacks.delete(id);
if (error) {
reject(new Error(error));
} else {
resolve(imageData);
}
}
};
}
processImage(imageData, operations) {
return new Promise((resolve, reject) => {
const id = ++this.taskId;
this.callbacks.set(id, { resolve, reject });
// 使用Transferable Objects传输图像数据
this.worker.postMessage({
id,
imageData,
operations
}, [imageData.data.buffer]);
});
}
}
// 使用示例
const canvas = document.getElementById('canvas');
const ctx = canvas.getContext('2d');
const processor = new ImageProcessor();
function applyFilter() {
const imageData = ctx.getImageData(0, 0, canvas.width, canvas.height);
processor.processImage(imageData, [
{ type: 'grayscale' },
{ type: 'blur', radius: 2 },
{ type: 'contrast', level: 1.5 }
]).then(processedData => {
ctx.putImageData(processedData, 0, 0);
}).catch(error => {
console.error('Image processing failed:', error);
});
}
大数据分析
// data-analytics.js
class DataAnalytics {
constructor(workerCount = 4) {
this.pool = new WorkerPool(workerCount, 'analytics-worker.js');
}
async analyzeLargeDataset(dataset, chunkSize = 10000) {
const chunks = [];
for (let i = 0; i < dataset.length; i += chunkSize) {
chunks.push(dataset.slice(i, i + chunkSize));
}
const tasks = chunks.map(chunk =>
this.pool.runTask({
action: 'analyze',
data: chunk
})
);
const results = await Promise.all(tasks);
return this.aggregateResults(results);
}
aggregateResults(partialResults) {
// 合并所有worker的结果
return partialResults.reduce((final, current) => {
return {
count: final.count + current.count,
sum: final.sum + current.sum,
min: Math.min(final.min, current.min),
max: Math.max(final.max, current.max)
};
}, { count: 0, sum: 0, min: Infinity, max: -Infinity });
}
}
// 使用示例
const analytics = new DataAnalytics();
const largeDataset = generateLargeDataset(); // 假设有100万条数据
analytics.analyzeLargeDataset(largeDataset)
.then(stats => {
console.log('Analysis results:', stats);
console.log('Average:', stats.sum / stats.count);
});
注意事项与最佳实践
-
Worker 生命周期管理
- 及时终止不再需要的 Worker (
worker.terminate()) - 避免频繁创建/销毁 Worker,考虑使用池化技术
- 及时终止不再需要的 Worker (
-
错误处理
- 为 Worker 添加
onerror处理程序 - 实现 Worker 内部错误捕获和报告机制
- 为 Worker 添加
-
性能考量
- Worker 启动有开销,适合长时间运行的任务
- 消息传递有序列化/反序列化成本,大数据考虑 Transferable Objects
-
安全限制
- Worker 不能直接访问 DOM
- 受同源策略限制
- 某些 API 不可用 (如 localStorage)
-
调试技巧
- Chrome DevTools 可以调试 Worker 代码
- 使用
console.log在 Worker 中输出信息 - 给 Worker 命名便于调试:
new Worker('worker.js', {name: 'ImageWorker'})
现代替代方案
WebAssembly + Worker
// 主线程
const worker = new Worker('wasm-worker.js');
fetch('module.wasm')
.then(response => response.arrayBuffer())
.then(wasmBinary => {
worker.postMessage({
type: 'INIT_WASM',
wasmBinary
}, [wasmBinary]);
});
// wasm-worker.js
let wasmModule;
self.onmessage = async function(event) {
const { type, wasmBinary } = event.data;
if (type === 'INIT_WASM') {
const { instance } = await WebAssembly.instantiate(wasmBinary);
wasmModule = instance.exports;
self.postMessage({ status: 'WASM_READY' });
}
if (type === 'COMPUTE' && wasmModule) {
const result = wasmModule.compute(event.data.input);
self.postMessage({ result });
}
};
Comlink 简化 Worker 通信
// main.js
import * as Comlink from 'comlink';
const worker = new Worker('worker.js');
const workerApi = Comlink.wrap(worker);
async function run() {
// 像调用本地函数一样调用Worker函数
const result = await workerApi.heavyCalculation(42);
console.log('Result:', result);
}
// worker.js
importScripts('https://unpkg.com/comlink/dist/umd/comlink.js');
const api = {
heavyCalculation(input) {
// 执行耗时计算
return input * 2;
}
};
Comlink.expose(api);
#前端开发
分享于 2025-03-25