Parallel Algorithms
This chapter explores parallel patterns using Rust’s ecosystem, focusing on data parallelism with Rayon, work partitioning strategies, parallel reduction patterns, pipeline parallelism, and SIMD vectorization. We’ll cover practical, production-ready examples for maximizing CPU utilization.
Pattern 1: Rayon Patterns
Problem: Sequential code wastes modern multi-core CPUs—a 4-core CPU runs sequential code at 25% utilization. Manual threading with std::thread is complex: need to partition data, spawn threads, collect results, handle errors, avoid data races.
Solution: Use Rayon’s parallel iterators with .par_iter(). Rayon provides work-stealing scheduler that automatically balances load across threads.
Why It Matters: Trivial code change yields massive speedups. Image processing 1M pixels: sequential 500ms, parallel 80ms (6x faster on 8-core).
Use Cases: Image processing (grayscale, filters, resizing), data validation (emails, phone numbers, formats), log parsing and analysis, batch data transformations, scientific computing, map-reduce operations, any embarrassingly parallel workload.
Example: Parallel Iterator Basics
Replace .iter() with .par_iter() to distribute work across CPU cores using Rayon’s work-stealing thread pool. Rayon balances load dynamically—threads that finish early steal work from slower threads. Avoid for small datasets (<1K items), I/O-bound work, or sequential dependencies.
#![allow(unused)]
fn main() {
fn parallel_map_example() {
let numbers: Vec<i64> = (0..1_000_000).collect();
// Sequential
let start = Instant::now();
let sequential: Vec<i64> = numbers
.iter()
.map(|&x| x * x)
.collect();
let seq_time = start.elapsed();
// Parallel
let start = Instant::now();
let parallel: Vec<i64> = numbers
.par_iter()
.map(|&x| x * x)
.collect();
let par_time = start.elapsed();
println!("Sequential: {:?}", seq_time);
println!("Parallel: {:?}", par_time);
let speedup = seq_time.as_secs_f64() / par_time.as_secs_f64();
println!("Speedup: {:.2}x", speedup);
assert_eq!(sequential, parallel);
}
}
Example: par_iter vs par_iter_mut vs into_par_iter
Three variants: par_iter() borrows immutably, par_iter_mut() borrows mutably for in-place modification, into_par_iter() consumes. Rayon ensures each thread accesses disjoint elements—no data races. Use into_par_iter() for expensive-to-clone types to avoid clone overhead.
#![allow(unused)]
fn main() {
fn iterator_variants() {
let mut data = vec![1, 2, 3, 4, 5];
// Immutable parallel iteration
let sum: i32 = data.par_iter().sum();
println!("Sum: {}", sum);
// Mutable parallel iteration
data.par_iter_mut().for_each(|x| *x *= 2);
println!("Doubled: {:?}", data);
// Consuming parallel iteration
let owned_data = vec![1, 2, 3, 4, 5];
let squares: Vec<i32> = owned_data
.into_par_iter()
.map(|x| x * x)
.collect();
println!("Squares: {:?}", squares);
}
iterator_variants();
// Output: Sum: 15, Doubled: [2, 4, 6, 8, 10]
// Squares: [1, 4, 9, 16, 25]
}
Example: Parallel filter and map
Chains filter() and map() with lazy evaluation—no intermediate collections, one fused pass. Filter early: filter().map() processes fewer elements than map().filter() when map is expensive. Rayon’s work-stealing handles irregular workloads from filtering automatically.
#![allow(unused)]
fn main() {
fn parallel_filter_map() {
let numbers: Vec<i32> = (0..10_000).collect();
let result: Vec<i32> = numbers
.par_iter()
.filter(|&&x| x % 2 == 0)
.map(|&x| x * x)
.collect();
println!("Filtered and squared {} numbers", result.len());
}
}
Example: Parallel flat_map
Flattens nested structures in parallel—each input produces multiple outputs combined into one collection. Essential for one-to-many transformations like directory traversal or tokenization. Nested flat_map(|r| r.into_par_iter()) creates two-level parallelism.
#![allow(unused)]
fn main() {
fn parallel_flat_map() {
use std::ops::Range;
let ranges: Vec<Range<i32>> = vec![0..10, 10..20, 20..30];
let flattened: Vec<i32> = ranges
.into_par_iter()
.flat_map(|range| range.into_par_iter())
.collect();
println!("Flattened {} items", flattened.len());
}
}
Image Processing
Parallel image ops on pixel data using par_iter_mut() and par_chunks_mut(). Grayscale processes RGB triplets; brightness adjusts each pixel. Perfect for data-parallel workloads where each pixel is independent.
#![allow(unused)]
fn main() {
struct Image {
pixels: Vec<u8>,
width: usize,
height: usize,
}
impl Image {
fn new(width: usize, height: usize) -> Self {
Self {
pixels: vec![128; width * height * 3], // RGB
width,
height,
}
}
fn apply_filter_parallel(&mut self, filter: fn(u8) -> u8) {
self.pixels.par_iter_mut().for_each(|pixel| {
*pixel = filter(*pixel);
});
}
fn grayscale_parallel(&mut self) {
self.pixels
.par_chunks_mut(3)
.for_each(|rgb| {
let r = rgb[0] as u32;
let g = rgb[1] as u32;
let b = rgb[2] as u32;
let sum = r + g + b;
let gray = (sum / 3) as u8;
rgb[0] = gray;
rgb[1] = gray;
rgb[2] = gray;
});
}
fn brightness_parallel(&mut self, delta: i16) {
self.pixels.par_iter_mut().for_each(|pixel| {
*pixel = (*pixel as i16 + delta).clamp(0, 255) as u8;
});
}
}
//Usage
let mut img = Image::new(1920, 1080);
img.grayscale_parallel();
img.brightness_parallel(10);
}
Log Parsing
Parses log lines in parallel using filter_map() to skip invalid entries. Each line is parsed independently—perfect parallelism. Combine with filter() to select specific log levels.
#![allow(unused)]
fn main() {
#[derive(Debug)]
struct LogEntry {
timestamp: u64,
level: String,
message: String,
}
fn parse_logs_parallel(lines: Vec<String>) -> Vec<LogEntry> {
lines
.into_par_iter()
.filter_map(|line| {
let parts: Vec<&str> = line.split('|').collect();
if parts.len() >= 3 {
Some(LogEntry {
timestamp: parts[0].parse().ok()?,
level: parts[1].to_string(),
message: parts[2].to_string(),
})
} else {
None
}
})
.collect()
}
}
Rayon Benefits:
- Minimal code changes: Add
.par_prefix - Work stealing: Automatic load balancing
- Type safe: Same API as sequential iterators
- No data races: Enforced by type system
Example: Bridge from sequential iterator
Bridges a standard iterator into Rayon’s parallel world—downstream ops run in parallel. If 90% of time is in the source, par_bridge() provides only 10% speedup. Order may not be preserved—use enumerate() and sort if needed.
#![allow(unused)]
fn main() {
fn par_bridge_basic() {
let iter = (0..1000).filter(|x| x % 2 == 0);
// Bridge to parallel
let sum: i32 = iter.par_bridge().map(|x| x * x).sum();
println!("Sum: {}", sum);
}
}
Example: Bridge from channel receiver
Parallelizes items from a channel—producer sends, workers process in parallel as items arrive. Decouples production from processing rate, reducing latency for streaming. Use sync_channel(bound) for backpressure.
#![allow(unused)]
fn main() {
fn par_bridge_from_channel() {
let (tx, rx) = mpsc::channel();
// Producer thread
thread::spawn(move || {
for i in 0..1000 {
tx.send(i).unwrap();
thread::sleep(Duration::from_micros(10));
}
});
// Parallel processing of channel items
let sum: i32 = rx
.into_iter()
.par_bridge()
.map(|x| {
// Expensive computation
thread::sleep(Duration::from_micros(100));
x * x
})
.sum();
println!("Channel sum: {}", sum);
}
}
Example: File system traversal
Recursively traverses directories using a sequential iterator, then processes files in parallel via par_bridge(). I/O-bound traversal runs single-threaded while CPU-bound filtering runs parallel.
#![allow(unused)]
fn main() {
use std::fs;
use std::path::PathBuf;
fn find_large_files(
root: &str,
min_size: u64
) -> Vec<(PathBuf, u64)> {
type BoxIter = Box<dyn Iterator<Item = PathBuf> + Send>;
fn visit(p: PathBuf) -> BoxIter {
let entries = match fs::read_dir(&p) {
Ok(e) => e,
Err(_) => return Box::new(std::iter::empty()),
};
let iter = entries
.filter_map(|e| e.ok())
.flat_map(|entry| {
let path = entry.path();
if path.is_dir() {
visit(path)
} else {
Box::new(std::iter::once(path))
}
});
Box::new(iter)
}
visit(PathBuf::from(root))
.par_bridge()
.filter_map(|path| {
let metadata = fs::metadata(&path).ok()?;
let size = metadata.len();
if size >= min_size {
Some((path, size))
} else {
None
}
})
.collect()
}
}
Example: Database query results
Custom iterator simulates database fetches, par_bridge() processes results in parallel as they arrive. Useful when query returns rows one-at-a-time but processing is expensive.
#![allow(unused)]
fn main() {
struct DatabaseIterator {
current: usize,
total: usize,
}
impl Iterator for DatabaseIterator {
type Item = i32;
fn next(&mut self) -> Option<Self::Item> {
if self.current < self.total {
let value = self.current as i32;
self.current += 1;
// Simulate database fetch delay
thread::sleep(Duration::from_micros(10));
Some(value)
} else {
None
}
}
}
fn process_database_results() {
let db_iter = DatabaseIterator {
current: 0,
total: 1000,
};
// Process results in parallel as they arrive
let sum: i32 = db_iter
.par_bridge()
.map(|x| x * 2)
.sum();
println!("Database result sum: {}", sum);
}
}
Example: Network stream processing
Processes network packets in parallel as they arrive via channel. Producer sends packets at network rate, consumers process them in parallel—decouples arrival from processing.
#![allow(unused)]
fn main() {
fn process_network_stream() {
let (tx, rx) = mpsc::channel();
// Simulate network packets arriving
thread::spawn(move || {
for i in 0..100 {
let packet = format!("packet_{}", i);
tx.send(packet).unwrap();
thread::sleep(Duration::from_millis(5));
}
});
// Process packets in parallel
let processed: Vec<String> = rx
.into_iter()
.par_bridge()
.map(|packet| {
// Expensive processing (e.g., parsing, validation)
thread::sleep(Duration::from_millis(10));
format!("processed_{}", packet)
})
.collect();
println!("Processed {} packets", processed.len());
}
}
par_bridge Use Cases:
- Channel receivers: Process items as they arrive
- Custom iterators: Database cursors, file system traversal
- Lazy evaluation: Only compute when needed
- Adaptive parallelism: Work stealing adapts to varying workload
Pattern 2: Work Partitioning Strategies
Problem: Bad work partitioning kills parallel performance. Too-small chunks (100 items) cause overhead—thread spawn/join costs dominate.
Solution: Choose grain size based on work: 1K-10K items for simple operations, larger for cache-heavy work. Use Rayon’s adaptive chunking (default) for uniform work, explicit chunking for cache optimization.
Why It Matters: Grain size tuning: 2-3x performance difference. Matrix multiply with blocking: 5x faster due to cache hits.
Use Cases: Matrix operations (multiply, transpose, factorization), sorting algorithms (quicksort, mergesort), divide-and-conquer (tree traversal, expression evaluation), irregular workloads (graph algorithms), cache-sensitive operations (blocked algorithms).
Example: Chunk sizes
Compares chunk sizes: default (Rayon adaptive), small (100), balanced (10K). Too-small chunks cause overhead; too-large causes imbalance. Start with defaults, tune if profiling shows issues.
#![allow(unused)]
fn main() {
fn chunk_size_comparison() {
let data: Vec<i32> = (0..1_000_000).collect();
// Default chunking (Rayon decides)
let start = Instant::now();
let sum1: i32 = data.par_iter().sum();
let default_time = start.elapsed();
// Custom chunk size (too small - more overhead)
let start = Instant::now();
let sum2: i32 = data
.par_chunks(100)
.map(|c| c.iter().sum::<i32>())
.sum();
let small_chunk_time = start.elapsed();
// Custom chunk size (balanced)
let start = Instant::now();
let sum3: i32 = data
.par_chunks(10_000)
.map(|c| c.iter().sum::<i32>())
.sum();
let balanced_chunk_time = start.elapsed();
println!("Default: {:?}", default_time);
println!("Small chunks (100): {:?}", small_chunk_time);
println!("Balanced chunks (10k): {:?}", balanced_chunk_time);
assert_eq!(sum1, sum2);
assert_eq!(sum2, sum3);
}
}
Example: Work splitting strategies
Compares equal splits (N/threads items each) vs adaptive (Rayon work-stealing). Static is cache-friendly; adaptive handles irregular workloads. Use static for uniform work, adaptive for variable work.
#![allow(unused)]
fn main() {
fn work_splitting_strategies() {
let data: Vec<i32> = (0..100_000).collect();
// Strategy 1: Equal splits (good for uniform work)
let chunk_size = data.len() / rayon::current_num_threads();
let result1: Vec<i32> = data
.par_chunks(chunk_size.max(1))
.flat_map(|chunk| chunk.par_iter().map(|&x| x * x))
.collect();
// Strategy 2: Adaptive (good for non-uniform work)
let result2: Vec<i32> = data
.par_iter()
.map(|&x| x * x)
.collect();
assert_eq!(result1.len(), result2.len());
}
}
Example: Matrix multiplication with blocking
Processes matrices in blocks that fit L1/L2 cache for better memory access patterns. Partitions output by blocks; each thread processes assigned blocks. Achieves 5x+ speedup from cache efficiency.
#![allow(unused)]
fn main() {
struct Matrix {
data: Vec<f64>,
rows: usize,
cols: usize,
}
impl Matrix {
fn new(rows: usize, cols: usize) -> Self {
Self {
data: vec![0.0; rows * cols],
rows,
cols,
}
}
fn get(&self, row: usize, col: usize) -> f64 {
self.data[row * self.cols + col]
}
fn set(&mut self, row: usize, col: usize, value: f64) {
self.data[row * self.cols + col] = value;
}
// Parallel blocked matrix multiply for cache efficiency
fn multiply_blocked(
&self,
other: &Matrix,
block_size: usize
) -> Matrix {
assert_eq!(self.cols, other.rows);
let mut result = Matrix::new(self.rows, other.cols);
// Partition work by output blocks
let row_blocks: Vec<usize> = (0..self.rows)
.step_by(block_size).collect();
let col_blocks: Vec<usize> = (0..other.cols)
.step_by(block_size).collect();
row_blocks.par_iter().for_each(|&row_start| {
for &col_start in &col_blocks {
// Process block
let row_end = (row_start + block_size)
.min(self.rows);
let col_end = (col_start + block_size)
.min(other.cols);
for i in row_start..row_end {
for j in col_start..col_end {
let mut sum = 0.0;
for k in 0..self.cols {
sum += self.get(i, k) * other.get(k, j);
}
let ptr = result.data.as_ptr();
unsafe {
let p = ptr as *mut f64;
*p.add(i * result.cols + j) = sum;
}
}
}
}
});
result
}
}
}
Example: Parallel merge sort with optimal grain size
Parallelizes merge sort with rayon::join() for divide step. Switches to sequential arr.sort() below grain_size threshold to avoid overhead. Grain size of 1000-10000 typically optimal.
#![allow(unused)]
fn main() {
fn parallel_merge_sort<T: Ord + Send>(
arr: &mut [T],
grain_size: usize
) {
if arr.len() <= grain_size {
arr.sort();
return;
}
let mid = arr.len() / 2;
let (left, right) = arr.split_at_mut(mid);
rayon::join(
|| parallel_merge_sort(left, grain_size),
|| parallel_merge_sort(right, grain_size),
);
// Merge (in-place merge omitted for brevity)
let mut temp = Vec::with_capacity(arr.len());
let mut i = 0;
let mut j = mid;
while i < mid && j < arr.len() {
let placeholder = unsafe { std::ptr::read(&arr[0]) };
if arr[i] <= arr[j] {
temp.push(std::mem::replace(&mut arr[i], placeholder));
i += 1;
} else {
let ph = unsafe { std::ptr::read(&arr[0]) };
temp.push(std::mem::replace(&mut arr[j], ph));
j += 1;
}
}
while i < mid {
let ph = unsafe { std::ptr::read(&arr[0]) };
temp.push(std::mem::replace(&mut arr[i], ph));
i += 1;
}
while j < arr.len() {
let ph = unsafe { std::ptr::read(&arr[0]) };
temp.push(std::mem::replace(&mut arr[j], ph));
j += 1;
}
for (i, item) in temp.into_iter().enumerate() {
arr[i] = item;
}
}
// Usage
let mut data: Vec<i32> = (0..100_000).rev().collect();
parallel_merge_sort(&mut data, 1000);
println!("Sorted: {}", data.windows(2).all(|w| w[0] <= w[1]));
}
Example: Dynamic load balancing
Demonstrates work-stealing on irregular workloads where each item has different cost. Rayon automatically rebalances—threads that finish early steal from others.
#![allow(unused)]
fn main() {
fn dynamic_load_balancing() {
// Simulate irregular workload
let items: Vec<usize> = (0..1000).map(|i| i % 100).collect();
let start = Instant::now();
// Rayon balances work through work stealing
let results: Vec<usize> = items
.par_iter()
.map(|&work| {
// Simulate variable work
let mut sum = 0;
for _ in 0..work {
sum += 1;
}
sum
})
.collect();
println!("Dynamic balancing: {:?}", start.elapsed());
println!("Total work: {}", results.iter().sum::<usize>());
}
}
Example: Grain size tuning
Tests grain sizes (100, 1K, 10K, 100K) to find optimal balance. Larger grains reduce overhead; smaller grains improve load balance. Optimal depends on CPU cache and work complexity.
#![allow(unused)]
fn main() {
fn grain_size_tuning() {
let data: Vec<i32> = (0..1_000_000).collect();
for grain_size in [100, 1_000, 10_000, 100_000] {
let start = Instant::now();
let sum: i32 = data
.par_chunks(grain_size)
.map(|chunk| chunk.iter().sum::<i32>())
.sum();
println!("Grain {}: {:?}", grain_size, start.elapsed());
}
}
}
Work Partitioning Guidelines:
- Grain size: Larger grains reduce overhead, but may cause load imbalance
- Chunk size: Balance overhead vs parallelism (typically 1000-10000 items)
- Cache blocking: Improve cache locality with block-based partitioning
- Work stealing: Rayon automatically balances irregular workloads
Example: Parallel quicksort
Uses rayon::join() to sort partitions in parallel—divide step spawns two tasks. Switches to sequential sort below threshold to avoid overhead. Achieves near-linear speedup on multi-core CPUs.
#![allow(unused)]
fn main() {
fn parallel_quicksort<T: Ord + Send>(arr: &mut [T]) {
if arr.len() <= 1 {
return;
}
let pivot_idx = partition(arr);
let (left, right) = arr.split_at_mut(pivot_idx);
// Parallelize recursion
rayon::join(
|| parallel_quicksort(left),
|| parallel_quicksort(&mut right[1..]),
);
}
fn partition<T: Ord>(arr: &mut [T]) -> usize {
let len = arr.len();
let pivot_idx = len / 2;
arr.swap(pivot_idx, len - 1);
let mut i = 0;
for j in 0..len - 1 {
if arr[j] <= arr[len - 1] {
arr.swap(i, j);
i += 1;
}
}
arr.swap(i, len - 1);
i
}
// Usage
let mut data: Vec<i32> = (0..100_000).rev().collect();
parallel_quicksort(&mut data);
println!("Sorted: {}", data.windows(2).all(|w| w[0] <= w[1]));
}
Example: Parallel tree traversal
Traverses tree branches in parallel using rayon::join() for recursive descent. Returns vector of values in pre-order. Well-suited for balanced trees; unbalanced trees may cause work imbalance.
#![allow(unused)]
fn main() {
#[derive(Debug)]
struct TreeNode<T> {
value: T,
left: Option<Box<TreeNode<T>>>,
right: Option<Box<TreeNode<T>>>,
}
impl<T: Send + Sync> TreeNode<T> {
fn parallel_map<F, U>(&self, f: &F) -> TreeNode<U>
where
F: Fn(&T) -> U + Sync,
U: Send + Sync,
{
let value = f(&self.value);
let (left, right) = rayon::join(
|| self.left.as_ref()
.map(|n| Box::new(n.parallel_map(f))),
|| self.right.as_ref()
.map(|n| Box::new(n.parallel_map(f))),
);
TreeNode { value, left, right }
}
fn parallel_sum(&self) -> T
where
T: std::ops::Add<Output = T> + Default + Copy + Send + Sync,
{
let mut sum = self.value;
let (left_sum, right_sum) = rayon::join(
|| self.left.as_ref()
.map_or(T::default(), |n| n.parallel_sum()),
|| self.right.as_ref()
.map_or(T::default(), |n| n.parallel_sum()),
);
sum = sum + left_sum + right_sum;
sum
}
}
}
Example: Parallel Fibonacci (demonstrative, not efficient)
Demonstrates recursive parallelism with sequential cutoff at n<20 to avoid overhead. Teaching example only—real Fibonacci uses iterative or matrix methods.
#![allow(unused)]
fn main() {
fn parallel_fib(n: u32) -> u64 {
if n <= 1 {
return n as u64;
}
if n < 20 {
// Sequential threshold to avoid overhead
return fib_sequential(n);
}
let (a, b) = rayon::join(
|| parallel_fib(n - 1),
|| parallel_fib(n - 2),
);
a + b
}
fn fib_sequential(n: u32) -> u64 {
if n <= 1 {
return n as u64;
}
let mut a = 0;
let mut b = 1;
for _ in 2..=n {
let c = a + b;
a = b;
b = c;
}
b
}
// Usage
let result = parallel_fib(35);
}
Example: Parallel directory size calculation
Recursively calculates directory sizes using rayon::join() for parallel subdirectory traversal. Sums file sizes at each level. I/O bound but parallelism helps on SSDs and networked storage.
#![allow(unused)]
fn main() {
use std::fs;
use std::path::Path;
fn parallel_dir_size<P: AsRef<Path>>(path: P) -> u64 {
let path = path.as_ref();
if path.is_file() {
return fs::metadata(path).map(|m| m.len()).unwrap_or(0);
}
if !path.is_dir() {
return 0;
}
let entries: Vec<_> = fs::read_dir(path)
.ok()
.map(|entries| entries.filter_map(|e| e.ok()).collect())
.unwrap_or_default();
entries
.par_iter()
.map(|entry| parallel_dir_size(entry.path()))
.sum()
}
// Usage:
let size = parallel_dir_size("/home/user/projects"); // bytes
}
Example Parallel expression evaluation
Evaluates expression trees by computing sub-expressions in parallel via rayon::join(). Binary ops split left/right subtrees across threads. Speedup depends on tree structure and operation cost.
#[derive(Debug, Clone)]
enum Expr {
Num(i32),
Add(Box<Expr>, Box<Expr>),
Mul(Box<Expr>, Box<Expr>),
Sub(Box<Expr>, Box<Expr>),
}
impl Expr {
fn parallel_eval(&self) -> i32 {
match self {
Expr::Num(n) => *n,
Expr::Add(left, right) => {
let (l, r) = rayon::join(
|| left.parallel_eval(),
|| right.parallel_eval(),
);
l + r
}
Expr::Mul(left, right) => {
let (l, r) = rayon::join(
|| left.parallel_eval(),
|| right.parallel_eval(),
);
l * r
}
Expr::Sub(left, right) => {
let (l, r) = rayon::join(
|| left.parallel_eval(),
|| right.parallel_eval(),
);
l - r
}
}
}
}
fn main() {
let expr = Expr::Add(
Box::new(Expr::Mul(
Box::new(Expr::Num(5)),
Box::new(Expr::Num(10)),
)),
Box::new(Expr::Sub(
Box::new(Expr::Num(20)),
Box::new(Expr::Num(8)),
)),
);
let result = expr.parallel_eval();
println!("Expression result: {}", result);
}
Recursive Parallelism Tips:
- Sequential cutoff: Switch to sequential below threshold
- rayon::join: Parallel fork-join primitive
- Balance: Ensure subtasks have similar work
- Overhead: Avoid creating too many small tasks
Pattern 3: Parallel Reduce and Fold
Problem: Aggregating results from parallel operations is non-trivial. Simple sum/min/max work, but custom aggregations (histograms, statistics, merging maps) require careful design.
Solution: Use reduce for simple aggregations (sum, min, max, product). Use fold + reduce pattern for custom accumulators: fold builds per-thread state, reduce combines them.
Why It Matters: Statistics in one parallel pass instead of multiple sequential passes. Histogram generation: parallel fold+reduce 10x faster than sequential.
Use Cases: Statistics computation (mean, variance, stddev), histograms and frequency counting, word counting in text processing, aggregating results from parallel operations, merging sorted chunks, custom accumulators (sets, maps).
Example: Simple reduce (sum, min, max)
Built-in reductions sum(), min(), max(), product() work directly on parallel iterators. Each thread computes local result, then results merge. Watch for overflow with product().
#![allow(unused)]
fn main() {
fn simple_reductions() {
let numbers: Vec<i64> = (1..=1_000_000).collect();
// Sum
let sum: i64 = numbers.par_iter().sum();
println!("Sum: {}", sum);
// Min/Max
let min = numbers.par_iter().min().unwrap();
let max = numbers.par_iter().max().unwrap();
println!("Min: {}, Max: {}", min, max);
// Product (be careful of overflow!)
let small_numbers: Vec<i64> = (1..=10).collect();
let product: i64 = small_numbers.par_iter().product();
println!("Product: {}", product);
}
// Usage: simple_reductions();
// Output: Sum: 500000500000, Min: 1, Max: 1000000
}
Example: Reduce with custom operation
Uses reduce(|| identity, |a, b| combine) for custom aggregations. Operation must be associative. Examples: string concatenation, finding closest element to target.
#![allow(unused)]
fn main() {
fn custom_reduce() {
let numbers: Vec<i32> = (1..=100).collect();
// Custom reduction: concatenate all numbers
let concatenated = numbers
.par_iter()
.map(|n| n.to_string())
.reduce(|| String::new(), |a, b| format!("{},{}", a, b));
let len = concatenated.len().min(50);
println!("First 50 chars: {}", &concatenated[..len]);
// Find element closest to target
let target = 42;
let closest = numbers
.par_iter()
.reduce(
|| &numbers[0],
|a, b| {
if (a - target).abs() < (b - target).abs() {
a
} else {
b
}
},
);
println!("Closest to {}: {}", target, closest);
}
}
Example: fold vs reduce
fold() creates per-thread accumulators, then requires reduce() to merge. Use fold when accumulator type differs from element type—essential for histograms, word counts, multi-field stats.
#![allow(unused)]
fn main() {
fn fold_vs_reduce() {
let numbers: Vec<i32> = (1..=1000).collect();
// fold: provide identity and combine function
let sum_fold = numbers.par_iter().fold(
|| 0, // Identity function
|acc, &x| acc + x, // Fold function
).sum::<i32>(); // Reduce the folded results
// reduce: simpler but less flexible
let sum_reduce = numbers.par_iter().sum::<i32>();
assert_eq!(sum_fold, sum_reduce);
println!("Sum: {}", sum_fold);
}
}
Example: fold_with for custom accumulators
Collects multiple statistics (count, sum, min, max) in one parallel pass. Each thread maintains its own accumulator; reduce() merges at end. Much faster than multiple passes.
#![allow(unused)]
fn main() {
fn fold_with_accumulator() {
let numbers: Vec<i32> = (1..=100).collect();
// Collect statistics in one pass
#[derive(Default)]
struct Stats {
count: usize,
sum: i64,
min: i32,
max: i32,
}
let stats = numbers
.par_iter()
.fold(
|| Stats {
count: 0,
sum: 0,
min: i32::MAX,
max: i32::MIN,
},
|mut acc, &x| {
acc.count += 1;
acc.sum += x as i64;
acc.min = acc.min.min(x);
acc.max = acc.max.max(x);
acc
},
)
.reduce(
|| Stats::default(),
|a, b| Stats {
count: a.count + b.count,
sum: a.sum + b.sum,
min: a.min.min(b.min),
max: a.max.max(b.max),
},
);
println!("Count: {}", stats.count);
let avg = stats.sum as f64 / stats.count as f64;
println!("Average: {:.2}", avg);
println!("Min: {}, Max: {}", stats.min, stats.max);
}
}
Example: Parallel histogram
Each thread builds a local histogram using fold, then reduce merges all local histograms into the final result. This avoids contention on a shared data structure.
fn parallel_histogram(data: Vec<i32>) -> HashMap<i32, usize> {
data.par_iter()
.fold(
|| HashMap::new(),
|mut map, &value| {
*map.entry(value).or_insert(0) += 1;
map
},
)
.reduce(
|| HashMap::new(),
|mut a, b| {
for (key, count) in b {
*a.entry(key).or_insert(0) += count;
}
a
},
)
}
fn main() {
let data: Vec<i32> = (0..10000).map(|i| i % 100).collect();
let histogram = parallel_histogram(data);
println!("Histogram buckets: {}", histogram.len());
println!("Bucket 50: {}", histogram.get(&50).unwrap_or(&0));
}
Example: Word frequency count
Splits text in parallel, builds per-thread HashMaps, merges with reduce. par_split_whitespace() handles tokenization. Pattern applies to any frequency counting task.
fn word_frequency_parallel(text: String) -> HashMap<String, usize> {
text.par_split_whitespace()
.fold(
|| HashMap::new(),
|mut map, word| {
*map.entry(word.to_lowercase()).or_insert(0) += 1;
map
},
)
.reduce(
|| HashMap::new(),
|mut a, b| {
for (word, count) in b {
*a.entry(word).or_insert(0) += count;
}
a
},
)
}
fn main() {
let text = "the quick brown fox jumps over the lazy dog"
.into();
let freq = word_frequency_parallel(text);
for (word, count) in freq.iter().take(5) {
println!("{}: {}", word, count);
}
}
Example: Parallel variance calculation
Two-pass algorithm: first pass computes mean in parallel, second computes variance. More numerically stable than one-pass. Both passes use par_iter().
fn parallel_variance(numbers: &[f64]) -> (f64, f64) {
// Two-pass algorithm (more numerically stable)
let sum: f64 = numbers.par_iter().sum();
let mean = sum / numbers.len() as f64;
let variance = numbers
.par_iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>()
/ numbers.len() as f64;
(mean, variance)
}
fn main() {
let numbers: Vec<f64> = (1..=100).map(|x| x as f64).collect();
let (mean, variance) = parallel_variance(&numbers);
println!("Mean: {:.2}, Var: {:.2}", mean, variance);
}
Example Merge sorted chunks
Merges sorted chunks in parallel using tree reduction—pairs merge concurrently, then merge results. Each iteration halves chunk count. O(log n) merge rounds with parallel work in each.
#![allow(unused)]
fn main() {
fn parallel_merge_reduce(mut chunks: Vec<Vec<i32>>) -> Vec<i32> {
while chunks.len() > 1 {
chunks = chunks
.par_chunks(2)
.map(|pair| {
if pair.len() == 2 {
merge(&pair[0], &pair[1])
} else {
pair[0].clone()
}
})
.collect();
}
chunks.into_iter().next().unwrap_or_default()
}
fn merge(a: &[i32], b: &[i32]) -> Vec<i32> {
let mut result = Vec::with_capacity(a.len() + b.len());
let mut i = 0;
let mut j = 0;
while i < a.len() && j < b.len() {
if a[i] <= b[j] {
result.push(a[i]);
i += 1;
} else {
result.push(b[j]);
j += 1;
}
}
result.extend_from_slice(&a[i..]);
result.extend_from_slice(&b[j..]);
result
}
}
Reduction Patterns:
- reduce: Simple aggregation (sum, min, max)
- fold + reduce: Custom accumulator, then combine
- Associative operations: Required for correctness
- Commutative: Not required but helps performance
Pattern 4: Pipeline Parallelism
Problem: Multi-stage data processing often bottlenecks on slowest stage. Sequential pipeline wastes CPU—decode thread idle while enhance runs.
Solution: Use channel-based pipelines with separate threads per stage. Rayon’s par_iter at each stage for intra-stage parallelism.
Why It Matters: Image processing pipeline: sequential 300ms, staged parallel 100ms (3x faster). ETL pipeline processing 1M records: sequential 10min, parallel pipeline 2min (5x speedup).
Use Cases: ETL (Extract-Transform-Load) data pipelines, image/video processing (decode→enhance→compress), log analysis (parse→enrich→filter→aggregate), data transformation chains, streaming data processing, multi-stage batch jobs.
Example: Simple pipeline with channels
Three-stage pipeline: generate→transform→filter→consume with channels between stages. par_bridge() parallelizes each stage. sync_channel(bound) provides backpressure.
#![allow(unused)]
fn main() {
fn simple_pipeline() {
let (stage1_tx, stage1_rx) = mpsc::sync_channel(100);
let (stage2_tx, stage2_rx) = mpsc::sync_channel(100);
let (stage3_tx, stage3_rx) = mpsc::sync_channel(100);
// Stage 1: Data generation
let producer = thread::spawn(move || {
for i in 0..1000 {
stage1_tx.send(i).unwrap();
}
});
// Stage 2: Transform (parallel)
let stage2 = thread::spawn(move || {
stage1_rx
.into_iter()
.par_bridge()
.for_each_with(stage2_tx, |tx, item| {
let transformed = item * 2;
tx.send(transformed).unwrap();
});
});
// Stage 3: Filter (parallel)
let stage3 = thread::spawn(move || {
stage2_rx
.into_iter()
.par_bridge()
.filter(|&x| x % 4 == 0)
.for_each_with(stage3_tx, |tx, item| {
tx.send(item).unwrap();
});
});
// Consumer
let consumer = thread::spawn(move || {
let sum: i32 = stage3_rx.into_iter().sum();
sum
});
producer.join().unwrap();
stage2.join().unwrap();
stage3.join().unwrap();
let result = consumer.join().unwrap();
println!("Pipeline result: {}", result);
}
}
Example: Image processing pipeline
Chains decode→enhance→compress stages using into_par_iter().map().map().map(). All images process through stages in parallel. Stages overlap—while one batch decodes, another enhances.
struct ImagePipeline;
impl ImagePipeline {
fn process_batch(images: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
images
.into_par_iter()
.map(|img| Self::stage1_decode(img))
.map(|img| Self::stage2_enhance(img))
.map(|img| Self::stage3_compress(img))
.collect()
}
// Usage: ImagePipeline::process_batch(raw_images);
fn stage1_decode(data: Vec<u8>) -> Vec<u8> {
// Simulate decoding
thread::sleep(Duration::from_micros(100));
data
}
fn stage2_enhance(mut data: Vec<u8>) -> Vec<u8> {
// Simulate enhancement
for pixel in &mut data {
*pixel = pixel.saturating_add(10);
}
data
}
fn stage3_compress(data: Vec<u8>) -> Vec<u8> {
// Simulate compression
thread::sleep(Duration::from_micros(50));
data
}
}
fn main() {
let images: Vec<Vec<u8>> = (0..100)
.map(|_| vec![128; 1000])
.collect();
let start = std::time::Instant::now();
let processed = ImagePipeline::process_batch(images);
}
Example: Log processing pipeline
Chains parse→filter→aggregate stages using parallel iterators. Each stage processes all items in parallel. Uses filter_map() to combine parse+filter, reducing intermediate allocations.
#[derive(Debug, Clone)]
struct RawLog(String);
#[derive(Debug, Clone)]
struct ParsedLog {
timestamp: u64,
level: String,
message: String,
}
#[derive(Debug, Clone)]
struct EnrichedLog {
log: ParsedLog,
metadata: String,
}
struct LogPipeline;
impl LogPipeline {
fn process(logs: Vec<RawLog>) -> Vec<EnrichedLog> {
logs.into_par_iter()
.filter_map(|raw| Self::parse(raw))
.map(|parsed| Self::enrich(parsed))
.filter(|enriched| enriched.log.level == "ERROR")
.collect()
}
// Usage: let errors = LogPipeline::process(raw_logs);
fn parse(raw: RawLog) -> Option<ParsedLog> {
let parts: Vec<&str> = raw.0.split('|').collect();
if parts.len() >= 3 {
Some(ParsedLog {
timestamp: parts[0].parse().ok()?,
level: parts[1].to_string(),
message: parts[2].to_string(),
})
} else {
None
}
}
fn enrich(log: ParsedLog) -> EnrichedLog {
EnrichedLog {
log: log.clone(),
metadata: format!("enriched_{}", log.timestamp),
}
}
}
fn main() {
let logs: Vec<RawLog> = (0..1000)
.map(|i| {
let lvl = if i % 10 == 0 { "ERROR" } else { "INFO" };
RawLog(format!("{}|{}|msg_{}", i, lvl, i))
})
.collect();
let errors = LogPipeline::process(logs);
println!("Found {} errors", errors.len());
}
Example: Parallel stages with different parallelism
Three stages with different characteristics: light (high parallelism), heavy (larger chunks for cache), aggregation (parallel reduction). Tune chunk size per stage based on work.
#![allow(unused)]
fn main() {
fn multi_stage_parallel() {
let data: Vec<i32> = (0..10000).collect();
// Stage 1: Light processing (high parallelism)
let stage1: Vec<i32> = data
.par_iter()
.map(|&x| x + 1)
.collect();
// Stage 2: Heavy processing (moderate parallelism)
let stage2: Vec<i32> = stage1
.par_chunks(100) // Larger chunks for heavy work
.flat_map(|chunk| {
chunk.par_iter().map(|&x| {
// Simulate heavy computation
let mut result = x;
for _ in 0..100 {
result = (result * 2) % 1000;
}
result
})
})
.collect();
// Stage 3: Aggregation
let sum: i32 = stage2.par_iter().sum();
println!("Multi-stage result: {}", sum);
}
}
Example: ETL pipeline (Extract, Transform, Load)
Classic data pipeline: read files in parallel (extract), transform content (transform), aggregate results (load). Each phase uses par_iter(). Order-independent stages maximize parallelism.
struct EtlPipeline;
impl EtlPipeline {
fn run(input_files: Vec<String>) -> Vec<(String, usize)> {
input_files
.into_par_iter()
// Extract: Read files in parallel
.filter_map(|file| Self::extract(&file))
// Transform: Process data in parallel
.map(|data| Self::transform(data))
// Load: Aggregate results
.collect()
}
fn extract(file: &str) -> Option<Vec<String>> {
// Simulate file reading
Some(vec![format!("data_from_{}", file)])
}
fn transform(data: Vec<String>) -> (String, usize) {
// Simulate transformation
let processed = data
.par_iter()
.map(|s| s.to_uppercase())
.collect::<Vec<_>>();
("transformed".to_string(), processed.len())
}
}
fn main() {
let files: Vec<String> = (0..100)
.map(|i| format!("file_{}.csv", i))
.collect();
let results = EtlPipeline::run(files);
println!("Processed {} files", results.len());
}
Pipeline Patterns:
- Channel-based: Explicit stages with bounded buffers
- Iterator-based: Chain transformations with par_iter
- Staged parallelism: Different parallelism per stage
- Backpressure: Bounded channels prevent memory issues
Pattern 5: SIMD Parallelism
Problem: CPU vector units (AVX2: 8 floats, AVX-512: 16 floats) sit idle with scalar code. Data-level parallelism untapped—process 1 element when hardware can do 8.
Solution: Write SIMD-friendly code: contiguous arrays, simple operations, no branches in hot loops. Use Struct-of-Arrays (SoA) instead of Array-of-Structs (AoS) for better vectorization.
Why It Matters: Matrix multiply: 10x speedup with SIMD+threading vs scalar sequential. Dot product: 4-8x faster with vectorization.
Use Cases: Matrix operations (multiply, transpose, dot product), image processing (convolution, filters), signal processing (FFT, filters), scientific computing (numerical methods), vector arithmetic, statistical computations.
Example: Manual SIMD-friendly code
Processes 4 elements at a time in separate accumulators, enabling auto-vectorization. Handle remainder separately. Pattern works for sum, dot product, and other reductions.
#![allow(unused)]
fn main() {
fn simd_friendly_sum(data: &[f32]) -> f32 {
// Process 4 elements at a time (compiler can auto-vectorize)
let chunks = data.chunks_exact(4);
let remainder = chunks.remainder();
let mut sums = [0.0f32; 4];
for chunk in chunks {
sums[0] += chunk[0];
sums[1] += chunk[1];
sums[2] += chunk[2];
sums[3] += chunk[3];
}
let chunk_sum: f32 = sums.iter().sum();
let remainder_sum: f32 = remainder.iter().sum();
chunk_sum + remainder_sum
}
let data: Vec<f32> = (0..1_000_000).map(|x| x as f32).collect();
let sum = simd_friendly_sum(&data);
}
Example: Array operations (SIMD-friendly)
Element-wise a[i] + b[i] auto-vectorizes when arrays are contiguous. Combine with par_iter() for thread parallelism. Use zip() for multi-array operations.
#![allow(unused)]
fn main() {
fn vector_add(a: &[f32], b: &[f32], result: &mut [f32]) {
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), result.len());
// Compiler can auto-vectorize this
for i in 0..a.len() {
result[i] = a[i] + b[i];
}
}
fn vector_add_parallel(a: &[f32], b: &[f32]) -> Vec<f32> {
use rayon::prelude::*;
// Combine SIMD and thread parallelism
a.par_iter()
.zip(b.par_iter())
.map(|(&x, &y)| x + y)
.collect()
}
let a: Vec<f32> = (0..1000).map(|x| x as f32).collect();
let b: Vec<f32> = (0..1000).map(|x| (x * 2) as f32).collect();
let result = vector_add_parallel(&a, &b);
}
Example: Dot product (SIMD-friendly)
Dot product (Σ a[i]*b[i]) is perfectly suited for SIMD—independent multiply-accumulate ops. Sequential version auto-vectorizes; parallel version adds thread-level parallelism.
#![allow(unused)]
fn main() {
fn dot_product(a: &[f32], b: &[f32]) -> f32 {
assert_eq!(a.len(), b.len());
a.iter()
.zip(b.iter())
.map(|(&x, &y)| x * y)
.sum()
}
// Usage: dot_product(&[1.0, 2.0], &[3.0, 4.0]); // 11.0
fn dot_product_parallel(a: &[f32], b: &[f32]) -> f32 {
use rayon::prelude::*;
assert_eq!(a.len(), b.len());
a.par_iter()
.zip(b.par_iter())
.map(|(&x, &y)| x * y)
.sum()
}
let result = dot_product_parallel(&big_vec_a, &big_vec_b);
}
Matrix multiplication with SIMD hints
Inner k-loop performs contiguous multiply-accumulate operations that compilers can auto-vectorize. Outer loops parallelize over rows with Rayon. Combines threading (row distribution) with SIMD (inner loop) for maximum throughput.
fn matrix_multiply_simd(
a: &[f32], b: &[f32], result: &mut [f32], n: usize
) {
// Matrix dimensions: n x n
assert_eq!(a.len(), n * n);
assert_eq!(b.len(), n * n);
assert_eq!(result.len(), n * n);
for i in 0..n {
for j in 0..n {
let mut sum = 0.0;
// Inner loop is SIMD-friendly
for k in 0..n {
sum += a[i * n + k] * b[k * n + j];
}
result[i * n + j] = sum;
}
}
}
// Parallel + SIMD matrix multiplication
fn matrix_multiply_par_simd(
a: &[f32], b: &[f32], n: usize
) -> Vec<f32> {
use rayon::prelude::*;
let mut result = vec![0.0; n * n];
(0..n).into_par_iter().for_each(|i| {
for j in 0..n {
let mut sum = 0.0;
// This loop can be auto-vectorized
for k in 0..n {
sum += a[i * n + k] * b[k * n + j];
}
result[i * n + j] = sum;
}
});
result
}
fn main() {
let n = 512;
let a: Vec<f32> = (0..n * n).map(|x| x as f32).collect();
let b: Vec<f32> = (0..n * n).map(|x| (x * 2) as f32).collect();
let result = matrix_multiply_par_simd(&a, &b, n);
println!("Result checksum: {}", result.iter().sum::<f32>());
}
Example: Blocked matrix operations (cache + SIMD friendly)
Processes matrices in blocks that fit L1/L2 cache. Inner loops stay hot in cache. Achieves 5-10x speedup from cache efficiency alone before SIMD/threading.
#![allow(unused)]
fn main() {
fn blocked_matrix_mul(
a: &[f32], b: &[f32], result: &mut [f32],
n: usize, block_size: usize
) {
use rayon::prelude::*;
for i_block in (0..n).step_by(block_size) {
for j_block in (0..n).step_by(block_size) {
for k_block in (0..n).step_by(block_size) {
// Process block
let i_end = (i_block + block_size).min(n);
let j_end = (j_block + block_size).min(n);
let k_end = (k_block + block_size).min(n);
for i in i_block..i_end {
for j in j_block..j_end {
let mut sum = result[i * n + j];
// Inner loop is SIMD-friendly
for k in k_block..k_end {
sum += a[i * n + k] * b[k * n + j];
}
result[i * n + j] = sum;
}
}
}
}
}
}
}
Example: Image convolution
Applies kernel filter to image—each output pixel computed from neighborhood sum. Outer loop over rows parallelizes with par_iter(). Inner kernel loop is SIMD-friendly multiply-accumulate.
#![allow(unused)]
fn main() {
fn convolve_2d(
image: &[f32], kernel: &[f32],
width: usize, height: usize, ksz: usize
) -> Vec<f32> {
use rayon::prelude::*;
let offset = ksz / 2;
let mut result = vec![0.0; width * height];
(offset..height - offset).into_par_iter().for_each(|y| {
for x in offset..width - offset {
let mut sum = 0.0;
// Convolution kernel (SIMD-friendly inner loops)
for ky in 0..ksz {
for kx in 0..ksz {
let img_y = y + ky - offset;
let img_x = x + kx - offset;
let img_idx = img_y * width + img_x;
let kernel_idx = ky * ksz + kx;
sum += image[img_idx] * kernel[kernel_idx];
}
}
result[y * width + x] = sum;
}
});
result
}
}
Example: Reduction with SIMD
Combines threading (par_chunks) with SIMD-friendly chunk processing. Each chunk’s iter().sum() auto-vectorizes. Achieves both 8x thread speedup and 4-8x SIMD speedup.
#![allow(unused)]
fn main() {
fn parallel_sum_simd(data: &[f32]) -> f32 {
use rayon::prelude::*;
// Split into chunks for parallel processing
data.par_chunks(1024)
.map(|chunk| {
// Each chunk can be SIMD-vectorized
chunk.iter().sum::<f32>()
})
.sum()
}
}
SIMD Optimization Tips:
- Alignment: Align data to 16/32-byte boundaries
- Contiguous memory: Use arrays/slices, not scattered data
- Inner loops: Make innermost loops SIMD-friendly
- Combine with threading: Rayon + SIMD for maximum performance
- Profile: Use compiler output to verify vectorization
Example: Iterator patterns that auto-vectorize
Simple map(|x| x * 2.0) and zip().map() auto-vectorize well. Avoid closures with captured state or complex control flow. Test in release builds only.
#![allow(unused)]
fn main() {
fn auto_vectorize_examples() {
let data: Vec<f32> = (0..10000).map(|x| x as f32).collect();
// Good: Simple map (auto-vectorizes)
let doubled: Vec<f32> = data.iter().map(|&x| x * 2.0).collect();
// Good: Zip and map (auto-vectorizes)
let summed: Vec<f32> = data
.iter()
.zip(data.iter())
.map(|(&a, &b)| a + b)
.collect();
// Good: Chunks with fold (auto-vectorizes)
let chunk_sums: Vec<f32> = data
.chunks(4)
.map(|chunk| chunk.iter().sum())
.collect();
}
}
Example: Explicit chunking for better vectorization
Process in chunks matching SIMD width (8 for AVX). Inner loop over chunk elements becomes single SIMD instruction. Match chunk size to CPU’s vector width.
#![allow(unused)]
fn main() {
fn chunked_operations(data: &[f32]) -> Vec<f32> {
let mut result = Vec::with_capacity(data.len());
// Process in SIMD-width chunks
const SIMD_WIDTH: usize = 8; // Typical for AVX
for chunk in data.chunks(SIMD_WIDTH) {
for &value in chunk {
result.push(value * 2.0 + 1.0);
}
}
result
}
}
Example: Struct of Arrays (SoA) vs Array of Structs (AoS)
AoS (Vec<Point>) scatters x,y,z across memory—bad for SIMD. SoA stores each field contiguously, enabling vectorization. SoA can be 4-8x faster for SIMD workloads.
// Bad for SIMD: Array of Structs
#[derive(Copy, Clone)]
struct PointAoS {
x: f32,
y: f32,
z: f32,
}
fn process_aos(points: &[PointAoS]) -> Vec<f32> {
// Poor vectorization: scattered access
points.iter().map(|p| p.x + p.y + p.z).collect()
}
// Good for SIMD: Struct of Arrays
struct PointsSoA {
x: Vec<f32>,
y: Vec<f32>,
z: Vec<f32>,
}
impl PointsSoA {
fn process(&self) -> Vec<f32> {
// Good vectorization: contiguous access
self.x
.iter()
.zip(self.y.iter())
.zip(self.z.iter())
.map(|((&x, &y), &z)| x + y + z)
.collect()
}
}
fn main() {
let points_soa = PointsSoA {
x: (0..10000).map(|i| i as f32).collect(),
y: (0..10000).map(|i| (i * 2) as f32).collect(),
z: (0..10000).map(|i| (i * 3) as f32).collect(),
};
let sums = points_soa.process();
}
Example Parallel + SIMD Monte Carlo
Estimates π using random points—threads process chunks while inner loops can auto-vectorize. Combines thread parallelism (chunk distribution) with SIMD-friendly point-in-circle calculation.
#![allow(unused)]
fn main() {
fn monte_carlo_pi_parallel_simd(samples: usize) -> f64 {
use rayon::prelude::*;
use rand::Rng;
let inside: usize = (0..samples)
.into_par_iter()
.chunks(1024) // Process in batches
.map(|chunk| {
let mut rng = rand::thread_rng();
let mut count = 0;
// Inner loop can be vectorized
for _ in chunk {
let x: f32 = rng.gen();
let y: f32 = rng.gen();
if x * x + y * y <= 1.0 {
count += 1;
}
}
count
})
.sum();
4.0 * inside as f64 / samples as f64
}
let pi = monte_carlo_pi_parallel_simd(10_000_000);
}
Example: Benchmarking SIMD effectiveness
Compares loop, iterator (likely vectorized), and parallel versions. Speedup loop→iterator reveals SIMD benefit; iterator→parallel reveals threading benefit.
#![allow(unused)]
fn main() {
fn benchmark_vectorization() {
let data: Vec<f32> = (0..10_000_000)
.map(|x| x as f32)
.collect();
// Version 1: Simple loop
let start = std::time::Instant::now();
let mut result1 = Vec::with_capacity(data.len());
for &x in &data {
result1.push(x * 2.0 + 1.0);
}
let time1 = start.elapsed();
// Version 2: Iterator (likely vectorized)
let start = std::time::Instant::now();
let result2: Vec<f32> = data
.iter()
.map(|&x| x * 2.0 + 1.0)
.collect();
let time2 = start.elapsed();
// Version 3: Parallel + potential vectorization
use rayon::prelude::*;
let start = std::time::Instant::now();
let result3: Vec<f32> = data
.par_iter()
.map(|&x| x * 2.0 + 1.0)
.collect();
let time3 = start.elapsed();
println!("Simple loop: {:?}", time1);
println!("Iterator: {:?}", time2);
println!("Parallel: {:?}", time3);
let s1 = time1.as_secs_f64() / time2.as_secs_f64();
let s2 = time2.as_secs_f64() / time3.as_secs_f64();
println!("Speedup (iter/loop): {:.2}x", s1);
println!("Speedup (par/iter): {:.2}x", s2);
}
}
Vectorization Guidelines:
- Contiguous data: Use slices/arrays
- Simple operations: +, -, *, / vectorize well
- No branching: Avoid if/else in hot loops
- Struct of Arrays: Better than Array of Structs
- Verify: Use
cargo rustc -- --emit asmto check
Summary
This chapter covered parallel algorithm patterns in Rust:
- Rayon Patterns: par_iter, par_bridge for easy parallelization
- Work Partitioning: Chunking, load balancing, recursive parallelism
- Parallel Reduce/Fold: Aggregation patterns, custom accumulators
- Pipeline Parallelism: Multi-stage processing with channels
- SIMD Parallelism: Auto-vectorization, SoA layout, parallel + SIMD
Key Takeaways:
- Rayon makes data parallelism trivial (add
.par_) - Work stealing automatically balances irregular workloads
- Grain size matters: too small = overhead, too large = imbalance
- fold + reduce for custom aggregations
- Pipeline stages can have different parallelism levels
- SIMD + threading for maximum performance
- SoA layout enables better vectorization
Performance Guidelines:
- Use Rayon for CPU-bound data parallelism
- Combine thread parallelism and SIMD for best results
- Profile to find bottlenecks (CPU, memory, cache)
- Tune chunk size based on workload
- Use blocked algorithms for cache efficiency
Common Patterns:
- Map: Transform each element independently
- Filter: Select elements based on predicate
- Reduce: Aggregate to single value
- Fold: Accumulate with custom state
- Pipeline: Chain transformations
When to Parallelize:
- Large datasets: >10,000 items typically
- CPU-bound: Compute-intensive operations
- Independent work: No dependencies between items
- Speedup > overhead: Measure, don’t assume
Pitfalls to Avoid:
- Over-parallelization (too many small tasks)
- False sharing (cache line contention)
- Sequential bottlenecks (Amdahl’s law)
- Ignoring memory bandwidth limits
- Not profiling actual performance