Project 2: Parallel Image Processor with Thread Pool
Problem Statement
Build a parallel image processing application using a thread pool to process multiple images concurrently. The system should resize, filter, and save images using worker threads, with task distribution and result collection.
Use Cases
- Image/video processing pipelines
- Web server request handling
- Batch data processing
- Parallel compilation systems
- Database query execution
- Scientific simulations
Why It Matters
Thread pools amortize thread creation overhead and limit resource usage. Creating threads per task is expensive (1-2ms per spawn) and unbounded. Thread pool reuses threads and queues excess work.
For 10,000 small tasks:
- Spawn per task: 10-20 seconds (thread creation overhead)
- Thread pool (8 workers): 1-2 seconds (reuse threads)
Your image processor should:
- Load images from directory
- Distribute processing across worker threads
- Apply transformations (resize, blur, brightness adjustment)
- Save processed images to output directory
- Report progress and completion status
- Handle errors gracefully (corrupted images, disk full)
Milestone 1: Basic Thread Pool Implementation
Implement a simple thread pool with fixed number of worker threads. Workers pull tasks from shared queue and execute them.
Architecture
Structs:
-
ThreadPool- Manages worker threads- Field
workers: Vec<JoinHandle<()>>- Worker thread handles - Field
sender: Sender<Job>- Task submission channel - Field
shutdown: Arc<AtomicBool>- Shutdown signal
- Field
-
Job- Unit of work- Type alias:
Box<dyn FnOnce() + Send + 'static>
- Type alias:
Key Functions:
new(size: usize) -> ThreadPool- Create pool with N workersexecute<F>(&self, f: F)whereF: FnOnce() + Send + 'static- Submit taskshutdown(self)- Stop all workers gracefully
Role Each Plays:
- Worker threads: Loop receiving and executing jobs
- Shared channel: Distributes work across workers
- Shutdown flag: Coordinates graceful termination
Checkpoint Tests
#![allow(unused)]
fn main() {
#[test]
fn test_thread_pool_execution() {
use std::sync::{Arc, Mutex};
let pool = ThreadPool::new(4);
let counter = Arc::new(Mutex::new(0));
for _ in 0..100 {
let c = counter.clone();
pool.execute(move || {
let mut num = c.lock().unwrap();
*num += 1;
});
}
pool.shutdown();
assert_eq!(*counter.lock().unwrap(), 100);
}
#[test]
fn test_parallel_speedup() {
use std::time::Instant;
let pool = ThreadPool::new(4);
let start = Instant::now();
for _ in 0..8 {
pool.execute(|| {
thread::sleep(Duration::from_millis(100));
});
}
pool.shutdown();
let elapsed = start.elapsed();
// 8 tasks @ 100ms each on 4 workers ≈ 200ms total
assert!(elapsed < Duration::from_millis(300));
}
}
Starter Code
#![allow(unused)]
fn main() {
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::thread::{self, JoinHandle};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<JoinHandle<()>>,
sender: Sender<Job>,
shutdown: Arc<AtomicBool>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
// TODO: Create channel for jobs
// Spawn 'size' worker threads
// Each worker loops: recv job -> execute -> repeat
// Return ThreadPool with workers and sender
unimplemented!()
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
// TODO: Box closure and send through channel
// Hint: self.sender.send(Box::new(f)).unwrap()
unimplemented!()
}
pub fn shutdown(self) {
// TODO: Set shutdown flag
// Drop sender to close channel
// Join all worker threads
unimplemented!()
}
}
fn worker_loop(receiver: Arc<Mutex<Receiver<Job>>>, shutdown: Arc<AtomicBool>) {
// TODO: Loop while !shutdown:
// - Lock receiver
// - Try to recv job (with timeout to check shutdown)
// - If job received, execute it
// - Drop lock
unimplemented!()
}
}
Why previous Milestone is not enough: N/A - Foundation Milestone.
What’s the improvement: Thread pool vs spawn-per-task:
- Spawn-per-task: 1000 tasks × 1ms spawn = 1 second overhead
- Thread pool: 0 overhead (threads pre-spawned)
For high-frequency tasks (web requests, image tiles), thread pool is mandatory.
Milestone 2: Image Processing Tasks
Introduction
Add image processing functionality: load, resize, apply filters, save. Distribute tasks across thread pool workers.
Architecture
Structs:
-
ImageTask- Processing job- Field
input_path: PathBuf- Source image - Field
output_path: PathBuf- Destination - Field
operations: Vec<Operation>- Transformations to apply
- Field
-
Operation- Transformation enum- Variant
Resize(u32, u32)- New dimensions - Variant
Blur(f32)- Blur radius - Variant
Brighten(i32)- Brightness delta
- Variant
Key Functions:
process_image(task: ImageTask) -> Result<(), ImageError>load_image(path: &Path) -> Result<ImageBuffer, ImageError>save_image(image: &ImageBuffer, path: &Path) -> Result<(), ImageError>
Checkpoint Tests
#![allow(unused)]
fn main() {
#[test]
fn test_image_resize() {
let task = ImageTask {
input_path: PathBuf::from("test.png"),
output_path: PathBuf::from("out.png"),
operations: vec![Operation::Resize(100, 100)],
};
let result = process_image(task);
assert!(result.is_ok());
}
#[test]
fn test_parallel_processing() {
let pool = ThreadPool::new(4);
let counter = Arc::new(AtomicUsize::new(0));
for i in 0..10 {
let c = counter.clone();
pool.execute(move || {
// Simulate image processing
thread::sleep(Duration::from_millis(50));
c.fetch_add(1, Ordering::SeqCst);
});
}
pool.shutdown();
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
}
Starter Code
#![allow(unused)]
fn main() {
use std::path::{Path, PathBuf};
#[derive(Debug)]
pub enum Operation {
Resize(u32, u32),
Blur(f32),
Brighten(i32),
}
pub struct ImageTask {
pub input_path: PathBuf,
pub output_path: PathBuf,
pub operations: Vec<Operation>,
}
pub fn process_image(task: ImageTask) -> Result<(), String> {
// TODO: Load image from input_path
// Apply each operation in sequence
// Save to output_path
// Hint: Use image crate for actual processing
// let mut img = image::open(&task.input_path)?;
// for op in task.operations {
// img = apply_operation(img, op);
// }
// img.save(&task.output_path)?;
unimplemented!()
}
fn apply_operation(img: ImageBuffer, op: Operation) -> ImageBuffer {
// TODO: Match on operation and apply transformation
// Resize: image::resize()
// Blur: image::blur()
// Brighten: image::brighten()
unimplemented!()
}
}
Why previous Milestone is not enough: Thread pool without real work is just overhead. Need actual tasks to process.
What’s the improvement: Parallel image processing scales linearly:
- Sequential: 10 images × 500ms = 5 seconds
- Parallel (8 cores): 10 images / 8 = ~625ms
For batch processing (thousands of images), parallelism is essential.
Milestone 3: Progress Tracking and Results
Introduction
Track processing progress and collect results. Report completion percentage, failed tasks, and aggregate statistics.
Architecture
Enhanced Structs:
-
ProcessingResult- Task outcome- Field
task_id: usize - Field
status: TaskStatus- Success/Failed - Field
duration: Duration- Processing time - Field
error: Option<String>- Error message if failed
- Field
-
ProgressTracker- Monitor progress- Field
total: usize- Total tasks - Field
completed: AtomicUsize- Finished count - Field
results: Mutex<Vec<ProcessingResult>>
- Field
Key Functions:
track_progress(tracker: Arc<ProgressTracker>)- Progress reporter threadwait_for_completion(tracker: Arc<ProgressTracker>) -> Vec<ProcessingResult>
Checkpoint Tests
#![allow(unused)]
fn main() {
#[test]
fn test_progress_tracking() {
let tracker = Arc::new(ProgressTracker::new(10));
for i in 0..10 {
let t = tracker.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
t.record_completion(i, TaskStatus::Success, Duration::from_millis(10), None);
});
}
let results = tracker.wait_for_completion();
assert_eq!(results.len(), 10);
assert_eq!(tracker.completed.load(Ordering::SeqCst), 10);
}
#[test]
fn test_error_collection() {
let tracker = Arc::new(ProgressTracker::new(5));
for i in 0..5 {
let t = tracker.clone();
thread::spawn(move || {
if i % 2 == 0 {
t.record_completion(i, TaskStatus::Success, Duration::from_millis(10), None);
} else {
t.record_completion(
i,
TaskStatus::Failed,
Duration::from_millis(5),
Some("Processing error".to_string())
);
}
});
}
let results = tracker.wait_for_completion();
let failed = results.iter().filter(|r| matches!(r.status, TaskStatus::Failed)).count();
assert_eq!(failed, 2);
}
}
Starter Code
#![allow(unused)]
fn main() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
pub enum TaskStatus {
Success,
Failed,
}
pub struct ProcessingResult {
pub task_id: usize,
pub status: TaskStatus,
pub duration: Duration,
pub error: Option<String>,
}
pub struct ProgressTracker {
total: usize,
completed: AtomicUsize,
results: Mutex<Vec<ProcessingResult>>,
}
impl ProgressTracker {
pub fn new(total: usize) -> Self {
// TODO: Initialize with total count and empty results
unimplemented!()
}
pub fn record_completion(
&self,
task_id: usize,
status: TaskStatus,
duration: Duration,
error: Option<String>,
) {
// TODO: Increment completed counter
// Add result to results vec (with lock)
unimplemented!()
}
pub fn progress_percentage(&self) -> f64 {
// TODO: Calculate completion percentage
// Hint: (completed / total) * 100.0
unimplemented!()
}
pub fn wait_for_completion(&self) -> Vec<ProcessingResult> {
// TODO: Spin until completed == total
// Return cloned results vec
unimplemented!()
}
}
pub fn progress_reporter(tracker: Arc<ProgressTracker>) {
// TODO: Loop printing progress every 100ms
// Example: "Progress: 45/100 (45%)"
// Exit when completed == total
unimplemented!()
}
}
Why previous Milestone is not enough: No visibility into processing status. Users want progress bars and error reports.
What’s the improvement: Progress tracking enables UX and debugging:
- No tracking: Black box, no idea if hung or processing
- With tracking: Real-time progress, failed task identification
For long-running batch jobs, progress reporting is mandatory.
Milestone 4: Dynamic Task Submission
Introduction
Support submitting tasks dynamically while processing continues. Add tasks from multiple threads without blocking.
Architecture
Enhanced Pool:
- Allow task submission from any thread
- Handle varying load (elastic work queue)
- Report queue depth for monitoring
Key Functions:
execute_with_timeout(&self, f: Job, timeout: Duration) -> Result<(), TimeoutError>queue_depth(&self) -> usize- Number of pending tasks
Checkpoint Tests
#![allow(unused)]
fn main() {
#[test]
fn test_dynamic_submission() {
let pool = ThreadPool::new(4);
// Submit initial batch
for i in 0..10 {
pool.execute(move || println!("Task {}", i));
}
// Submit more tasks while processing
thread::sleep(Duration::from_millis(50));
for i in 10..20 {
pool.execute(move || println!("Task {}", i));
}
pool.shutdown();
}
#[test]
fn test_multi_threaded_submission() {
let pool = Arc::new(ThreadPool::new(4));
let mut submitters = vec![];
for _ in 0..4 {
let p = pool.clone();
let handle = thread::spawn(move || {
for i in 0..100 {
p.execute(move || {
thread::sleep(Duration::from_micros(10));
});
}
});
submitters.push(handle);
}
for h in submitters {
h.join().unwrap();
}
}
}
Starter Code
#![allow(unused)]
fn main() {
impl ThreadPool {
pub fn execute_with_timeout<F>(
&self,
f: F,
timeout: Duration,
) -> Result<(), String>
where
F: FnOnce() + Send + 'static,
{
// TODO: Try to send job with timeout
// Use sync_channel with timeout instead of regular channel
// Return Err if send times out
unimplemented!()
}
pub fn queue_depth(&self) -> usize {
// TODO: Track pending tasks
// Could use Arc<AtomicUsize> incremented on send, decremented on execute
unimplemented!()
}
pub fn active_workers(&self) -> usize {
// TODO: Track number of workers currently executing
// Use Arc<AtomicUsize> incremented before execute, decremented after
unimplemented!()
}
}
}
Why previous Milestone is not enough: Static workload doesn’t reflect reality. Real systems have dynamic, unpredictable task arrival.
What’s the improvement: Dynamic submission enables real-world patterns:
- Web server: New requests arrive while processing existing
- Stream processing: Events arrive continuously
- Adaptive systems: Task generation based on results
Milestone 5: Adaptive Pool Sizing
Introduction
Automatically adjust worker count based on load. Scale up when queue grows, scale down when idle.
Architecture
Adaptive Logic:
- Monitor queue depth and worker utilization
- Spawn workers if queue > threshold × current_workers
- Terminate idle workers after timeout
Key Functions:
scale_up(&mut self, count: usize)- Add workersscale_down(&mut self, count: usize)- Remove workersauto_scale(&self)- Background thread monitoring and adjusting
Checkpoint Tests
#![allow(unused)]
fn main() {
#[test]
fn test_scale_up() {
let mut pool = ThreadPool::new(2);
// Submit many tasks to trigger scaling
for _ in 0..100 {
pool.execute(|| thread::sleep(Duration::from_millis(10)));
}
thread::sleep(Duration::from_millis(50));
// Pool should have scaled up
assert!(pool.worker_count() > 2);
}
#[test]
fn test_scale_down() {
let mut pool = ThreadPool::new(8);
// Submit few tasks
for _ in 0..4 {
pool.execute(|| thread::sleep(Duration::from_millis(10)));
}
thread::sleep(Duration::from_secs(2)); // Wait for idle timeout
// Pool should have scaled down
assert!(pool.worker_count() < 8);
}
}
Starter Code
#![allow(unused)]
fn main() {
pub struct AdaptiveThreadPool {
workers: Arc<Mutex<Vec<JoinHandle<()>>>>,
sender: Sender<Job>,
min_workers: usize,
max_workers: usize,
queue_threshold: usize,
}
impl AdaptiveThreadPool {
pub fn new(min: usize, max: usize) -> Self {
// TODO: Initialize with min workers
// Spawn monitoring thread for auto-scaling
unimplemented!()
}
pub fn scale_up(&mut self, count: usize) {
// TODO: Spawn 'count' new workers
// Don't exceed max_workers
unimplemented!()
}
pub fn scale_down(&mut self, count: usize) {
// TODO: Signal 'count' workers to exit
// Don't go below min_workers
// Use special "exit" message in channel
unimplemented!()
}
pub fn worker_count(&self) -> usize {
self.workers.lock().unwrap().len()
}
}
fn auto_scale_monitor(pool: Arc<AdaptiveThreadPool>) {
// TODO: Loop checking queue depth
// If queue_depth > threshold * workers: scale_up()
// If workers idle for > 30s: scale_down()
unimplemented!()
}
}
Why previous Milestone is not enough: Fixed pool size is inefficient. Overprovisioned when idle (waste resources), underprovisioned during peaks (high latency).
What’s the improvement: Adaptive sizing optimizes resource usage:
- Fixed 100 workers: Wastes 95% resources during low load
- Adaptive 5-100 workers: Scales to load, saves resources
For cloud deployments, adaptive sizing reduces costs by 50-90%.
Milestone 6: Benchmark vs Sequential
Introduction
Benchmark thread pool against sequential processing. Measure speedup with varying worker counts and task sizes.
Architecture
Benchmarks:
- Fixed workload (1000 tasks)
- Vary task duration: 1ms, 10ms, 100ms
- Vary worker count: 1, 2, 4, 8, 16
- Measure total time and tasks/sec
Starter Code
#![allow(unused)]
fn main() {
pub struct Benchmark;
impl Benchmark {
pub fn benchmark_sequential(num_tasks: usize, task_duration: Duration) -> Duration {
let start = Instant::now();
for _ in 0..num_tasks {
thread::sleep(task_duration);
}
start.elapsed()
}
pub fn benchmark_thread_pool(
num_tasks: usize,
num_workers: usize,
task_duration: Duration,
) -> Duration {
let pool = ThreadPool::new(num_workers);
let start = Instant::now();
for _ in 0..num_tasks {
pool.execute(move || {
thread::sleep(task_duration);
});
}
pool.shutdown();
start.elapsed()
}
pub fn run_comparison() {
println!("=== Thread Pool vs Sequential Performance ===\n");
let num_tasks = 100;
let task_duration = Duration::from_millis(10);
let thread_counts = [1, 2, 4, 8];
let seq_time = Self::benchmark_sequential(num_tasks, task_duration);
println!("Sequential: {:?}\n", seq_time);
for &num_threads in &thread_counts {
let pool_time = Self::benchmark_thread_pool(num_tasks, num_threads, task_duration);
let speedup = seq_time.as_secs_f64() / pool_time.as_secs_f64();
println!("Thread Pool ({} workers):", num_threads);
println!(" Time: {:?}", pool_time);
println!(" Speedup: {:.2}x\n", speedup);
}
}
}
}
Why previous Milestone is not enough: Performance claims need validation.
What’s the improvement: Empirical speedup data:
- 1 worker: 1× (same as sequential)
- 4 workers: 3.8-4× speedup
- 8 workers: 7-8× speedup
Validates parallel efficiency and guides worker count selection.
Complete Working Example
use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize, Ordering}};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<JoinHandle<()>>,
sender: Sender<Job>,
shutdown: Arc<AtomicBool>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
let (sender, receiver) = channel();
let receiver = Arc::new(Mutex::new(receiver));
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let receiver = receiver.clone();
let shutdown = shutdown.clone();
let handle = thread::spawn(move || {
worker_loop(receiver, shutdown);
});
workers.push(handle);
}
ThreadPool {
workers,
sender,
shutdown,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
self.sender.send(Box::new(f)).unwrap();
}
pub fn shutdown(self) {
self.shutdown.store(true, Ordering::SeqCst);
drop(self.sender);
for worker in self.workers {
worker.join().unwrap();
}
}
}
fn worker_loop(receiver: Arc<Mutex<Receiver<Job>>>, shutdown: Arc<AtomicBool>) {
loop {
let job = {
let receiver = receiver.lock().unwrap();
receiver.recv()
};
match job {
Ok(job) => job(),
Err(_) => {
if shutdown.load(Ordering::SeqCst) {
break;
}
}
}
}
}
fn main() {
println!("=== Thread Pool Demo ===\n");
let pool = ThreadPool::new(4);
let counter = Arc::new(AtomicUsize::new(0));
println!("Submitting 100 tasks to thread pool with 4 workers...");
let start = Instant::now();
for i in 0..100 {
let c = counter.clone();
pool.execute(move || {
// Simulate work
thread::sleep(Duration::from_millis(10));
c.fetch_add(1, Ordering::SeqCst);
if i % 20 == 0 {
println!("Task {} completed", i);
}
});
}
pool.shutdown();
let elapsed = start.elapsed();
println!("\nAll tasks completed!");
println!("Total tasks: {}", counter.load(Ordering::SeqCst));
println!("Time elapsed: {:?}", elapsed);
println!("Throughput: {:.0} tasks/sec", 100.0 / elapsed.as_secs_f64());
}