First Commit
This commit is contained in:
252
wasm_node_pthread.js
Normal file
252
wasm_node_pthread.js
Normal file
@@ -0,0 +1,252 @@
|
||||
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
|
||||
import { WASI } from 'wasi';
|
||||
import { fileURLToPath } from 'url';
|
||||
import path from 'path';
|
||||
import fs from 'fs/promises'; // For async file reading
|
||||
import os from 'os'; // For optimal NUM_WORKERS
|
||||
import { performance } from 'perf_hooks'; // For benchmarking
|
||||
|
||||
// Helper to get __dirname equivalent in ES Modules for this file
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
// --- Configuration Constants (shared by main and workers where relevant) ---
|
||||
const ARRAY_SIZE = 15000576; // Must match C code
|
||||
const NUM_WORKERS = os.cpus().length; // Optimal number of workers based on CPU cores
|
||||
|
||||
// WASM Memory configuration (consistent with Emscripten compile flags)
|
||||
const INITIAL_MEMORY_BYTES = 180224000;
|
||||
const MAXIMUM_MEMORY_BYTES = 268435456;
|
||||
const PAGE_SIZE = 65536; // 64KB
|
||||
|
||||
// --- WASI Import Version (based on previous successful tests) ---
|
||||
const WASI_VERSION = 'preview1'; // Or 'wasi_snapshot_preview1', 'wasi_unstable' based on your Node.js's specific requirement.
|
||||
|
||||
// --- Main Thread Logic ---
|
||||
if (isMainThread) {
|
||||
|
||||
async function main() {
|
||||
const wasmPath = './binaries/wasm_add.wasm'; // Path to your compiled WASM binary
|
||||
|
||||
try {
|
||||
// Read the WebAssembly binary file once for all threads
|
||||
const wasmBuffer = await fs.readFile(wasmPath);
|
||||
|
||||
// Call the setupWebAssembly function to initialize and run the benchmarks
|
||||
await setupWebAssembly(wasmBuffer);
|
||||
|
||||
} catch (error) {
|
||||
console.error("Error during WebAssembly execution:", error);
|
||||
process.exit(1); // Exit with an error code on failure
|
||||
}
|
||||
}
|
||||
|
||||
// Call the main function to start the application
|
||||
main();
|
||||
|
||||
} else { // --- Worker Thread Logic ---
|
||||
|
||||
// Data passed from the main thread when the worker was created
|
||||
const { wasmBinary, wasmMemory, functionName, args } = workerData;
|
||||
|
||||
// Setup WASI imports for this worker thread's WASM instance
|
||||
const wasi = new WASI({
|
||||
version: WASI_VERSION,
|
||||
args: process.argv,
|
||||
env: process.env,
|
||||
preopens: { '.': '.' }
|
||||
});
|
||||
|
||||
// Minimal import object for standalone WASM in a worker
|
||||
const importObject = {
|
||||
wasi_snapshot_preview1: wasi.wasiImports, // Provide WASI imports
|
||||
env: {
|
||||
memory: wasmMemory, // The shared WebAssembly.Memory object
|
||||
// Common stubs for functions WASM might try to call (if not handled by WASI)
|
||||
_abort: () => { console.error("WASM called abort from worker!"); process.exit(1); },
|
||||
_sbrk: (increment) => {
|
||||
console.warn(`Worker: _sbrk called with increment ${increment}. Not fully implemented for custom runtime.`);
|
||||
return 0; // Return dummy value
|
||||
},
|
||||
__setThrew: (ptr, val) => {},
|
||||
__resumeException: (ptr) => {},
|
||||
},
|
||||
};
|
||||
|
||||
async function initAndRunWorker() {
|
||||
let instance;
|
||||
try {
|
||||
// Instantiate the WASM module within this worker thread
|
||||
instance = (await WebAssembly.instantiate(wasmBinary, importObject)).instance;
|
||||
} catch (e) {
|
||||
console.error("Worker: Failed to instantiate WASM module:", e);
|
||||
parentPort.postMessage({ type: 'error', message: `WASM instantiation failed: ${e.message}` });
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const exports = instance.exports;
|
||||
|
||||
// Call the specified WASM function with its arguments
|
||||
if (typeof exports[functionName] === 'function') {
|
||||
try {
|
||||
exports[functionName](...args);
|
||||
parentPort.postMessage({ type: 'done' }); // Signal successful completion
|
||||
} catch (e) {
|
||||
console.error(`Worker: Error executing ${functionName}:`, e);
|
||||
parentPort.postMessage({ type: 'error', message: `Execution failed: ${e.message}` });
|
||||
}
|
||||
} else {
|
||||
console.error(`Worker: Function '${functionName}' not found in WASM exports.`);
|
||||
parentPort.postMessage({ type: 'error', message: `Function '${functionName}' not found.` });
|
||||
}
|
||||
|
||||
process.exit(0); // Exit the worker process cleanly
|
||||
}
|
||||
|
||||
// Start execution in the worker thread
|
||||
initAndRunWorker();
|
||||
}
|
||||
|
||||
// --- The setupWebAssembly Function (available to both, but called only by main) ---
|
||||
// This function orchestrates the WASM module loading and parallel execution.
|
||||
export async function setupWebAssembly(wasmBuffer) {
|
||||
|
||||
// Create the shared WebAssembly.Memory object
|
||||
const memory = new WebAssembly.Memory({
|
||||
initial: INITIAL_MEMORY_BYTES / PAGE_SIZE,
|
||||
maximum: MAXIMUM_MEMORY_BYTES / PAGE_SIZE,
|
||||
shared: true
|
||||
});
|
||||
|
||||
// Setup WASI for the main thread's WASM instance
|
||||
const wasi = new WASI({
|
||||
version: WASI_VERSION,
|
||||
args: process.argv,
|
||||
env: process.env,
|
||||
preopens: { '.': '.' }
|
||||
});
|
||||
|
||||
// Instantiate the WASM module on the main thread
|
||||
const { instance } = await WebAssembly.instantiate(wasmBuffer, {
|
||||
wasi_snapshot_preview1: wasi.wasiImports, // Provide WASI imports
|
||||
env: {
|
||||
memory: memory, // Pass the shared memory
|
||||
emscripten_notify_memory_growth: (index) => {
|
||||
console.log(`Memory grew to ${index / 65536} pages`);
|
||||
},
|
||||
_abort: () => { console.error("WASM called abort from main!"); process.exit(1); },
|
||||
_sbrk: (increment) => {
|
||||
console.warn(`Main: _sbrk called with increment ${increment}. Not fully implemented.`);
|
||||
return 0;
|
||||
},
|
||||
__setThrew: (ptr, val) => {},
|
||||
__resumeException: (ptr) => {},
|
||||
}
|
||||
});
|
||||
|
||||
const exports = instance.exports; // Original WASM exports from the main instance
|
||||
|
||||
const ARRAY_SIZE = 15000576; // Keep consistent with C code
|
||||
|
||||
// Allocate arrays using the main thread's WASM instance (malloc/free are C exports)
|
||||
const a = exports._alloc_float_array(ARRAY_SIZE);
|
||||
const b = exports._alloc_float_array(ARRAY_SIZE);
|
||||
const c = exports._alloc_float_array(ARRAY_SIZE);
|
||||
if (!a || !b || !c) throw new Error("Failed to allocate memory");
|
||||
|
||||
console.log(`Allocated arrays in shared memory at: a=${a}, b=${b}, c=${c}`);
|
||||
|
||||
// --- Worker Orchestration Helper (Internal to setupWebAssembly) ---
|
||||
// This helper function handles spawning workers and waiting for their completion.
|
||||
async function runRangeTaskInWorkers(wasmFunctionName, arrayPointers) {
|
||||
const workers = [];
|
||||
const promises = [];
|
||||
const chunkSize = Math.ceil(ARRAY_SIZE / NUM_WORKERS);
|
||||
|
||||
for (let i = 0; i < NUM_WORKERS; i++) {
|
||||
const startIdx = i * chunkSize;
|
||||
const endIdx = Math.min(startIdx + chunkSize, ARRAY_SIZE);
|
||||
|
||||
if (startIdx >= ARRAY_SIZE) break; // Stop if no more work chunks
|
||||
|
||||
const worker = new Worker(__filename, { // Crucially, the worker executes *this same file*
|
||||
workerData: {
|
||||
wasmBinary: wasmBuffer, // Pass the WASM binary data to workers
|
||||
wasmMemory: memory, // Pass the shared memory object
|
||||
functionName: wasmFunctionName, // The C function to call in the worker
|
||||
args: [...arrayPointers, startIdx, endIdx] // Arguments for the C function
|
||||
}
|
||||
});
|
||||
|
||||
const workerPromise = new Promise((resolve, reject) => {
|
||||
worker.on('message', (msg) => {
|
||||
if (msg.type === 'done') {
|
||||
resolve();
|
||||
} else if (msg.type === 'error') {
|
||||
reject(new Error(`Worker error for ${wasmFunctionName}: ${msg.message}`));
|
||||
}
|
||||
});
|
||||
worker.on('error', reject); // Catch errors emitted by the worker process
|
||||
worker.on('exit', (code) => {
|
||||
if (code !== 0 && code !== undefined) {
|
||||
reject(new Error(`Worker exited with code ${code} for ${wasmFunctionName}`));
|
||||
} else {
|
||||
resolve(); // Resolve if exited cleanly (code 0 or undefined)
|
||||
}
|
||||
});
|
||||
});
|
||||
workers.push(worker);
|
||||
promises.push(workerPromise);
|
||||
}
|
||||
|
||||
// Wait for all workers to complete their assigned tasks
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
// --- Override/Wrap original C function calls with worker orchestration ---
|
||||
// This makes the subsequent benchmark calls look exactly like your original script.
|
||||
const wrappedExports = {
|
||||
init_arrays: async (aPtr, bPtr) => {
|
||||
await runRangeTaskInWorkers('_init_arrays_range', [aPtr, bPtr]);
|
||||
},
|
||||
calculate: async (aPtr, bPtr, cPtr) => {
|
||||
await runRangeTaskInWorkers('_calculate_range', [aPtr, bPtr, cPtr]);
|
||||
},
|
||||
// Keep original alloc/dealloc directly from WASM as they are not parallelized
|
||||
alloc_float_array: exports._alloc_float_array,
|
||||
dealloc_float_array: exports._dealloc_float_array,
|
||||
alloc: exports._alloc,
|
||||
dealloc: exports._dealloc,
|
||||
// Expose memory for direct access like in your original code
|
||||
memory: exports.memory, // This points to the same shared memory object
|
||||
};
|
||||
|
||||
|
||||
// --- Benchmark Execution (This part now perfectly matches your desired input!) ---
|
||||
|
||||
// Benchmark initialization
|
||||
const initStart = performance.now();
|
||||
await wrappedExports.init_arrays(a, b); // Calls the JS wrapper
|
||||
const initTime = performance.now() - initStart;
|
||||
|
||||
// Benchmark calculation
|
||||
const calcStart = performance.now();
|
||||
await wrappedExports.calculate(a, b, c); // Calls the JS wrapper
|
||||
const calcTime = performance.now() - calcStart;
|
||||
|
||||
console.log(`Array initialization time: ${initTime.toFixed(3)} ms`);
|
||||
console.log(`Calculation time: ${calcTime.toFixed(3)} ms`);
|
||||
console.log(`Total time: ${(initTime + calcTime).toFixed(3)} ms`);
|
||||
|
||||
// Sample output
|
||||
// Access memory via the original instance's exports.memory (or the shared 'memory' var)
|
||||
const mem = new Float32Array(wrappedExports.memory.buffer);
|
||||
for (let i = 0; i < 10; i++) {
|
||||
console.log(`c[${i}] = ${mem[c/4 + i]}`);
|
||||
}
|
||||
|
||||
// Clean up
|
||||
wrappedExports.dealloc_float_array(a);
|
||||
wrappedExports.dealloc_float_array(b);
|
||||
wrappedExports.dealloc_float_array(c);
|
||||
}
|
||||
Reference in New Issue
Block a user