From 42ef932f5e1eab3a804b0001658c942c7382705d Mon Sep 17 00:00:00 2001 From: Timo Schneider Date: Tue, 6 May 2025 23:28:56 +0200 Subject: [PATCH] added pipeline stats --- Cargo.toml | 3 +- src/main.rs | 117 +++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 90 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cf54d9d..7f61223 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,5 @@ ndarray = "0.16.0" rand = "0.9.1" rayon = "1.10.0" hdbscan = "0.10.0" -nalgebra = "0.33.2" \ No newline at end of file +nalgebra = "0.33.2" +indicatif = "0.17.11" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 9fbf796..107da42 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,16 +6,21 @@ use show_image::{create_window, ImageInfo, ImageView, PixelFormat}; use std::collections::{HashMap, VecDeque}; use std::iter::zip; use std::sync::mpsc::Receiver; -use std::sync::{mpsc, Arc}; +use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use nalgebra::DMatrix; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use nalgebra::{partial_sort2, DMatrix}; use ndarray::{Array1, Array2}; -fn create_extract_thread(image_rx: Receiver>>, width: u32, threshold: u8, downsample: f64) -> Receiver>>> { +fn create_extract_thread(image_rx: Receiver>>, width: u32, threshold: u8, downsample: f64) -> (Receiver>>>, Arc>) { let (tx, rx) = mpsc::sync_channel::>>>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); + thread::spawn(move || { for image in image_rx { + let start = Instant::now(); let max_rng = (u32::MAX as f64 * downsample) as u32; let poi = image .par_iter() @@ -31,17 +36,22 @@ fn create_extract_thread(image_rx: Receiver>>, width: u32, threshold let mut vec = Vec::new(); vec.par_extend(poi); + let end = Instant::now(); tx.send(Arc::new(vec)).expect("TODO: panic message"); + *local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64(); } }); - rx + (rx, usage) } -fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) -> Receiver<(Arc>>, Arc>)> { +fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) -> (Receiver<(Arc>>, Arc>)>, Arc>) { let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc>>, Arc>)>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); thread::spawn(move || { for pointcloud in pointcloud_rx { + let start = Instant::now(); let params = HdbscanHyperParams::builder() .epsilon(1000.0) .min_samples(3) @@ -49,18 +59,23 @@ fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) .min_cluster_size(pointcloud.len() / 10) .build(); let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap()); - pointcloud_labels_tx.send((pointcloud, labels)).unwrap() ; + let end = Instant::now(); + pointcloud_labels_tx.send((pointcloud, labels)).unwrap(); + *local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64(); } }); - pointcloud_labels_rx + (pointcloud_labels_rx, usage) } -fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc>>, Arc>)>) -> Receiver>>>> { +fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc>>, Arc>)>) -> (Receiver>>>>, Arc>) { let (clusters_tx, clusters_rx) = mpsc::sync_channel::>>>>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); thread::spawn(move || { for (pointcloud, labels) in cluster_labels { + let start = Instant::now(); let mut clusters = HashMap::>>::with_capacity(5); let _ = zip(pointcloud.iter(), labels.iter()).for_each(|(point, label)| { @@ -69,19 +84,23 @@ fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc>>, clusters.get_mut(label).unwrap().push(point.clone()); }); - - clusters_tx.send(Arc::new(clusters)).unwrap() ; + let end = Instant::now(); + clusters_tx.send(Arc::new(clusters)).unwrap(); + *local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64(); } }); - clusters_rx + (clusters_rx, usage) } -fn create_isolate_lane_thread(clusters_rx: Receiver>>>>) -> Receiver>; 2]>> { +fn create_isolate_lane_thread(clusters_rx: Receiver>>>>) -> (Receiver>; 2]>>, Arc>) { let (lanes_tx, lanes_rx) = mpsc::sync_channel::>; 2]>>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); thread::spawn(move || { for clusters in clusters_rx { + let start = Instant::now(); let mut averages = clusters.iter().filter(|(label, _)| **label != -1).map(|(label, cluster)| { (*label, cluster.iter().map(|point| point[0]).sum::() / (cluster.len() as f64)) }).collect::>(); @@ -89,17 +108,20 @@ fn create_isolate_lane_thread(clusters_rx: Receiver>>>) -> Receiver>>>{ +fn create_transform_thread(pointcloud_rx : Receiver>>>) -> (Receiver>>>, Arc>) { let (projection_tx, projection_rx) = mpsc::sync_channel::>>>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); thread::spawn(move || { let h_i = [ @@ -109,6 +131,7 @@ fn create_transform_thread(pointcloud_rx : Receiver>>>) -> Rece ]; for pointcloud in pointcloud_rx { + let start = Instant::now(); let projection = pointcloud .iter() .map(|pt| { @@ -128,20 +151,26 @@ fn create_transform_thread(pointcloud_rx : Receiver>>>) -> Rece .filter(|point| point[0].abs() <= 5.0) .collect(); + let end = Instant::now(); projection_tx.send(Arc::new(projection)).unwrap(); + *local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64(); } }); - projection_rx + + (projection_rx, usage) } -fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> Receiver<(Arc>, Arc<[Vec>; 2]>)> { +fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> (Receiver<(Arc>, Arc<[Vec>; 2]>)>, Arc>) { let (z_tx, z_rx) = mpsc::sync_channel::<(Arc>, Arc<[Vec>; 2]>)>(1); + let usage = Arc::new(Mutex::new(0.0)); + let local_usage = usage.clone(); let c = 0.1 * 2.3849; thread::spawn(move || { let mut z = vec![4.0, -2.0, 0.0, 0.0]; for lanes in lanes_rx { + let start = Instant::now(); let mut p = Vec::>::new(); p.append(&mut lanes[0].clone()); p.append(&mut lanes[1].clone()); @@ -162,7 +191,7 @@ fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> Rece let H_t = H.t(); - for _ in 0..5 { + for _ in 0..3 { let res_l = lanes[0] .iter() .map(|point| point[0] - (0.5 * z[0] - z[1] - point[1] * z[2] + 0.5 * z[3] * point[1].powi(2))) @@ -194,16 +223,18 @@ fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> Rece z = second_part.dot(&H_t).dot(&w).dot(&y).to_vec(); } + let end = Instant::now(); z_tx.send((Arc::new(z.clone()), lanes)).unwrap(); + *local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64(); } }); - z_rx + (z_rx, usage) } fn dummy_image_spawner_thread() -> Receiver>> { - let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_60.png").unwrap().decode().unwrap(); + let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_80.png").unwrap().decode().unwrap(); let img_data: Vec = img.clone().into_bytes(); let (img_tx, img_rx) = mpsc::sync_channel::>>(1); @@ -220,13 +251,24 @@ fn dummy_image_spawner_thread() -> Receiver>> { #[show_image::main] fn main() { let image_rx = dummy_image_spawner_thread(); - let pointcloud_rx = create_extract_thread(image_rx, 800, 196, 0.025); - let projection_rx = create_transform_thread(pointcloud_rx); - let cluster_labels_rx = create_pointcloud_labels_thread(projection_rx); - let clusters_rx = create_cluster_separator_thread(cluster_labels_rx); - let lanes_rx = create_isolate_lane_thread(clusters_rx); - let z_rx = create_mestimator_thread(lanes_rx); + let (pointcloud_rx, usage_extract) = create_extract_thread(image_rx, 800, 196, 0.025); + let (projection_rx, usage_transform) = create_transform_thread(pointcloud_rx); + let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(projection_rx); + let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx); + let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx); + let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx); + let multi_stats = MultiProgress::new(); + let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {percent:>3}%").unwrap(); + let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone())); + let p_transform = multi_stats.add(ProgressBar::new(100).with_prefix("Transform").with_style(p_style.clone())); + let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone())); + let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone())); + let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone())); + let p_detect = multi_stats.add(ProgressBar::new(100).with_prefix("Detect").with_style(p_style.clone())); + + let p_max = multi_stats.add(ProgressBar::new(100).with_prefix("Pipeline load").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.red/white} {percent:>3}%").unwrap())); + let p_framerate = multi_stats.add(ProgressBar::new(150).with_prefix("Framerate").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.blue/white} {pos:>3} FPS").unwrap())); let window = create_window("image", Default::default()).unwrap(); let mut t: VecDeque = VecDeque::with_capacity(25); @@ -237,12 +279,30 @@ fn main() { for (z, lanes) in z_rx { + let u_extract = *usage_extract.lock().unwrap(); + let u_transform = *usage_transform.lock().unwrap(); + let u_cluster = *usage_cluster.lock().unwrap(); + let u_split = *usage_split.lock().unwrap(); + let u_isolate = *usage_isolate.lock().unwrap(); + let u_detect = *usage_detect.lock().unwrap(); + + let u_vec = vec![u_extract, u_transform, u_cluster, u_split, u_isolate, u_detect]; + let u_total: f64 = u_vec.iter().sum(); + + p_extract.set_position((u_extract / u_total * 100.0) as u64); + p_transform.set_position((u_transform / u_total * 100.0) as u64); + p_cluster.set_position((u_cluster / u_total * 100.0) as u64); + p_split.set_position((u_split / u_total * 100.0) as u64); + p_isolate.set_position((u_isolate / u_total * 100.0) as u64); + p_detect.set_position((u_detect / u_total * 100.0) as u64); + + p_max.set_position((100.0 * *u_vec.iter().max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap()) as u64); + p_framerate.set_position((t.iter().map(|d| 1.0 / d.as_secs_f64()).sum::() / t.len() as f64) as u64); + let mut rgb_image = RgbImage::new(800, 800); let colors = vec![Rgb([0, 255, 0]), Rgb([255, 0, 0])]; - println!("{:?}", z); - (0..rgb_image.height()).for_each(|x| { let x_m = x as f64 * (45.0 / 800.0); let const_part = - z[1] - x_m * z[2] + 0.5 * z[3] * x_m.powi(2); @@ -280,7 +340,6 @@ fn main() { t.pop_front(); } t.push_back(now.elapsed()); - println!("Avg: {:.6} s", t.iter().map(|d| d.as_secs_f64()).sum::() / t.len() as f64); now = Instant::now(); }