From 8ec107cf29f56373221733cfef40529311e28e6f Mon Sep 17 00:00:00 2001 From: Timo Schneider Date: Sun, 11 May 2025 14:36:59 +0200 Subject: [PATCH] wip: update pipeline --- src/main.rs | 140 +++++---------------------- src/pipeline.rs | 1 + src/pipeline/highlight_extractor.rs | 69 +++++++++++++ src/pipeline/m_estimator.rs | 0 src/pipeline/pointcloud_clusterer.rs | 0 src/pipeline/ros_interface.rs | 0 6 files changed, 93 insertions(+), 117 deletions(-) create mode 100644 src/pipeline.rs create mode 100644 src/pipeline/highlight_extractor.rs create mode 100644 src/pipeline/m_estimator.rs create mode 100644 src/pipeline/pointcloud_clusterer.rs create mode 100644 src/pipeline/ros_interface.rs diff --git a/src/main.rs b/src/main.rs index 1d5487c..904c9d4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ +mod pipeline; + use hdbscan::{Hdbscan, HdbscanHyperParams}; -use image::{DynamicImage, ImageReader, Rgb, RgbImage}; use rand::Rng; use rayon::prelude::*; use show_image::{create_window, ImageInfo, ImageView, PixelFormat}; @@ -8,47 +9,13 @@ use std::iter::zip; use std::sync::mpsc::Receiver; use std::sync::{mpsc, Arc, Mutex}; use std::thread; -use std::thread::sleep; use std::time::{Duration, Instant}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use nalgebra::{DMatrix}; use ndarray::{Array1, Array2}; use r2r::QosProfile; use futures::{executor::LocalPool, future, stream::StreamExt, task::LocalSpawnExt}; - -fn create_extract_thread(image_rx: Receiver>>, width: u32, threshold: u8, downsample: f64) -> (Receiver>>>, Arc>) { - let (tx, rx) = mpsc::sync_channel::>>>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - - thread::spawn(move || { - let mut t_loop = Instant::now(); - for image in image_rx { - let start = Instant::now(); - - let max_rng = (u32::MAX as f64 * downsample) as u32; - let poi = image - .par_iter() - .enumerate() - .filter(|(_, pixel)| pixel > &&threshold) - .filter(|_| { - let mut rng = rand::rng(); - rng.random::() < max_rng - }) - .map(|(i, _)| { - vec![ i as f64 % width as f64, i as f64 / width as f64 ] - }); - - let mut vec = Vec::new(); - vec.par_extend(poi); - let end = Instant::now(); - tx.send(Arc::new(vec)).expect("TODO: panic message"); - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - (rx, usage) -} +use crate::pipeline::highlight_extractor::{create_extract_thread, Pointcloud}; fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) -> (Receiver<(Arc>>, Arc>)>, Arc>) { let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc>>, Arc>)>(1); @@ -58,12 +25,13 @@ fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) thread::spawn(move || { let mut t_loop = Instant::now(); for pointcloud in pointcloud_rx { + if pointcloud.len() < 2 {continue;} let start = Instant::now(); let params = HdbscanHyperParams::builder() .epsilon(1000.0) .min_samples(3) .allow_single_cluster(false) - .min_cluster_size(pointcloud.len() / 10) + .min_cluster_size(2) .build(); let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap()); let end = Instant::now(); @@ -132,51 +100,6 @@ fn create_isolate_lane_thread(clusters_rx: Receiver>>>) -> (Receiver>>>, Arc>) { - let (projection_tx, projection_rx) = mpsc::sync_channel::>>>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - - thread::spawn(move || { - let h_i = [ - [-7.74914499e-03, 3.95733793e-18, 3.10353257e00], - [8.56519716e-18, 9.42313768e-05, -1.86052093e00], - [2.57498016e-18, -2.73825295e-03, 1.00000000e00], - ]; - - let mut t_loop = Instant::now(); - - for pointcloud in pointcloud_rx { - let start = Instant::now(); - let projection = pointcloud - .iter() - .map(|pt| { - let x = pt[0]; - let y = pt[1]; - - // Convert to homogeneous coordinates - let u = h_i[0][0] * x + h_i[0][1] * y + h_i[0][2]; - let v = h_i[1][0] * x + h_i[1][1] * y + h_i[1][2]; - let w = h_i[2][0] * x + h_i[2][1] * y + h_i[2][2]; - - let u_norm = u / w; - let v_norm = v / w; - - vec![u_norm, v_norm] - }) - .filter(|point| point[0].abs() <= 5.0) - .collect(); - - let end = Instant::now(); - projection_tx.send(Arc::new(projection)).unwrap(); - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - - (projection_rx, usage) -} - fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> (Receiver<(Arc>, Arc<[Vec>; 2]>)>, Arc>) { let (z_tx, z_rx) = mpsc::sync_channel::<(Arc>, Arc<[Vec>; 2]>)>(1); let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); @@ -253,22 +176,6 @@ fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> (Rec } -fn dummy_image_spawner_thread() -> Receiver>> { - let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_80.png").unwrap().decode().unwrap(); - let img_data: Vec = img.clone().into_bytes(); - - let (img_tx, img_rx) = mpsc::sync_channel::>>(1); - - thread::spawn(move || { - loop { - img_tx.send(Arc::new(img_data.clone())).unwrap(); - //sleep(Duration::from_millis(16)); - } - }); - - img_rx -} - #[show_image::main] fn main() { let ctx = r2r::Context::create().unwrap(); @@ -276,25 +183,14 @@ fn main() { let subscriber = node.subscribe::("/camera/front", QosProfile::default()).unwrap(); let mut timer = node.create_wall_timer(Duration::from_millis(1000)).unwrap(); - let (img_tx, img_rx) = mpsc::sync_channel::>>(1); + let (img_tx, img_rx) = mpsc::sync_channel::>(1); // Set up a simple task executor. let mut pool = LocalPool::new(); let spawner = pool.spawner(); - // Run the subscriber in one task, printing the messages - spawner.spawn_local(async move { - subscriber - .for_each(|msg| { - img_tx.send(Arc::new(msg.data)).unwrap(); - future::ready(()) - }) - .await - }).unwrap(); - - let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 800, 196, 0.025); - let (projection_rx, usage_transform) = create_transform_thread(pointcloud_rx); - let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(projection_rx); + let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 160, 0.025); + let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(pointcloud_rx); let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx); let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx); let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx); @@ -302,7 +198,6 @@ fn main() { let multi_stats = MultiProgress::new(); let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {pos:>6} us | {len:>6} us").unwrap(); let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone())); - let p_transform = multi_stats.add(ProgressBar::new(100).with_prefix("Transform").with_style(p_style.clone())); let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone())); let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone())); let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone())); @@ -316,23 +211,34 @@ fn main() { let mut t: VecDeque = VecDeque::with_capacity(25); let mut now: Instant = Instant::now(); + // Run the subscriber in one task, printing the messages + spawner.spawn_local(async move { + subscriber + .for_each(|msg| { + img_tx.send(Arc::new(msg)).unwrap(); + future::ready(()) + }) + .await + }).unwrap(); + + thread::spawn(move || { + for _ in z_rx {} + }); + loop { node.spin_once(Duration::from_millis(10)); pool.run_until_stalled(); let u_extract = *usage_extract.lock().unwrap(); - let u_transform = *usage_transform.lock().unwrap(); let u_cluster = *usage_cluster.lock().unwrap(); let u_split = *usage_split.lock().unwrap(); let u_isolate = *usage_isolate.lock().unwrap(); let u_detect = *usage_detect.lock().unwrap(); - let u_vec = vec![u_extract, u_transform, u_cluster, u_split, u_isolate, u_detect]; + let u_vec = vec![u_extract, u_cluster, u_split, u_isolate, u_detect]; p_extract.set_length(u_extract.1.as_micros() as u64); p_extract.set_position(u_extract.0.as_micros() as u64); - p_transform.set_length(u_transform.1.as_micros() as u64); - p_transform.set_position(u_transform.0.as_micros() as u64); p_cluster.set_length(u_cluster.1.as_micros() as u64); p_cluster.set_position(u_cluster.0.as_micros() as u64); p_split.set_length(u_split.1.as_micros() as u64); diff --git a/src/pipeline.rs b/src/pipeline.rs new file mode 100644 index 0000000..1f42681 --- /dev/null +++ b/src/pipeline.rs @@ -0,0 +1 @@ +pub mod highlight_extractor; \ No newline at end of file diff --git a/src/pipeline/highlight_extractor.rs b/src/pipeline/highlight_extractor.rs new file mode 100644 index 0000000..3a5f9fa --- /dev/null +++ b/src/pipeline/highlight_extractor.rs @@ -0,0 +1,69 @@ +use std::sync::{mpsc, Arc, Mutex}; +use std::sync::mpsc::Receiver; +use std::thread; +use std::time::{Duration, Instant}; +use rand::Rng; +use r2r::sensor_msgs::msg::Image; +use rayon::prelude::IntoParallelRefIterator; + +pub type Pointcloud = Arc>>; + +trait ExtractHighlights { + fn get_highlights(&self, threshold: u8, downsample: f64) -> Pointcloud; +} +impl ExtractHighlights for Image { + fn get_highlights(&self, threshold: u8, downsample: f64) -> Pointcloud { + let h_i = [ + [-7.74914499e-03, 3.95733793e-18, 3.10353257e00], + [8.56519716e-18, 9.42313768e-05, -1.86052093e00], + [2.57498016e-18, -2.73825295e-03, 1.00000000e00], + ]; + + let max_rng = (u32::MAX as f64 * downsample) as u32; + Arc::new(self.data + .iter() + .enumerate() + .filter(|(i, _)| *i as u32 / self.width >= self.height / 2) + .filter(|(_, pixel)| **pixel > threshold) + .filter(|_| { + let mut rng = rand::rng(); + rng.random::() < max_rng + }) + .map(|(i, _)| { + let x = i as f64 % self.width as f64; + let y = i as f64 / self.width as f64; + + // Convert to homogeneous coordinates + let u = h_i[0][0] * x + h_i[0][1] * y + h_i[0][2]; + let v = h_i[1][0] * x + h_i[1][1] * y + h_i[1][2]; + let w = h_i[2][0] * x + h_i[2][1] * y + h_i[2][2]; + + let u_norm = u / w; + let v_norm = v / w; + + vec![u_norm, v_norm] + }) + .filter(|point| point[0].abs() <= 5.0) + .collect()) + } +} +pub fn create_extract_thread(image_rx: Receiver>, threshold: u8, downsample: f64) -> (Receiver, Arc>) { + let (tx, rx) = mpsc::sync_channel::(1); + let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); + let local_usage = usage.clone(); + + thread::spawn(move || { + let mut t_loop = Instant::now(); + for image in image_rx { + let start = Instant::now(); + let poi = image.get_highlights(threshold, downsample); + let end = Instant::now(); + + tx.send(poi).unwrap(); + + *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); + t_loop = Instant::now(); + } + }); + (rx, usage) +} \ No newline at end of file diff --git a/src/pipeline/m_estimator.rs b/src/pipeline/m_estimator.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline/pointcloud_clusterer.rs b/src/pipeline/pointcloud_clusterer.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/pipeline/ros_interface.rs b/src/pipeline/ros_interface.rs new file mode 100644 index 0000000..e69de29