From 74414934c4fbdc5b02d893a83a5f59e3f3aeca0b Mon Sep 17 00:00:00 2001 From: Timo Date: Sun, 11 May 2025 16:59:38 +0200 Subject: [PATCH] ros2 working with image inject --- src/main.rs | 226 +++++---------------------- src/pipeline.rs | 4 +- src/pipeline/hdb_clusterer.rs | 89 +++++++++++ src/pipeline/highlight_extractor.rs | 5 +- src/pipeline/m_estimator.rs | 66 ++++++++ src/pipeline/pointcloud_clusterer.rs | 0 6 files changed, 199 insertions(+), 191 deletions(-) create mode 100644 src/pipeline/hdb_clusterer.rs delete mode 100644 src/pipeline/pointcloud_clusterer.rs diff --git a/src/main.rs b/src/main.rs index 904c9d4..d4e7487 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,180 +1,19 @@ mod pipeline; -use hdbscan::{Hdbscan, HdbscanHyperParams}; use rand::Rng; use rayon::prelude::*; -use show_image::{create_window, ImageInfo, ImageView, PixelFormat}; -use std::collections::{HashMap, VecDeque}; -use std::iter::zip; -use std::sync::mpsc::Receiver; +use std::collections::VecDeque; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use nalgebra::{DMatrix}; -use ndarray::{Array1, Array2}; use r2r::QosProfile; use futures::{executor::LocalPool, future, stream::StreamExt, task::LocalSpawnExt}; +use image::{ImageReader, Rgb, RgbImage}; +use show_image::{create_window, ImageInfo, ImageView, PixelFormat}; +use crate::pipeline::hdb_clusterer::create_pointcloud_labels_thread; use crate::pipeline::highlight_extractor::{create_extract_thread, Pointcloud}; - -fn create_pointcloud_labels_thread(pointcloud_rx: Receiver>>>) -> (Receiver<(Arc>>, Arc>)>, Arc>) { - let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc>>, Arc>)>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - - thread::spawn(move || { - let mut t_loop = Instant::now(); - for pointcloud in pointcloud_rx { - if pointcloud.len() < 2 {continue;} - let start = Instant::now(); - let params = HdbscanHyperParams::builder() - .epsilon(1000.0) - .min_samples(3) - .allow_single_cluster(false) - .min_cluster_size(2) - .build(); - let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap()); - let end = Instant::now(); - pointcloud_labels_tx.send((pointcloud, labels)).unwrap(); - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - - (pointcloud_labels_rx, usage) -} - -fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc>>, Arc>)>) -> (Receiver>>>>, Arc>) { - let (clusters_tx, clusters_rx) = mpsc::sync_channel::>>>>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - - thread::spawn(move || { - let mut t_loop = Instant::now(); - for (pointcloud, labels) in cluster_labels { - let start = Instant::now(); - - let mut clusters = HashMap::>>::with_capacity(5); - - let _ = zip(pointcloud.iter(), labels.iter()).for_each(|(point, label)| { - if !clusters.contains_key(label) { clusters.insert(*label, Vec::new()); } - - clusters.get_mut(label).unwrap().push(point.clone()); - }); - - let end = Instant::now(); - clusters_tx.send(Arc::new(clusters)).unwrap(); - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - - (clusters_rx, usage) -} - -fn create_isolate_lane_thread(clusters_rx: Receiver>>>>) -> (Receiver>; 2]>>, Arc>) { - let (lanes_tx, lanes_rx) = mpsc::sync_channel::>; 2]>>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - - thread::spawn(move || { - let mut t_loop = Instant::now(); - for clusters in clusters_rx { - let start = Instant::now(); - - let mut averages = clusters.iter().filter(|(label, _)| **label != -1).map(|(label, cluster)| { - (*label, cluster.iter().map(|point| point[0]).sum::() / (cluster.len() as f64)) - }).collect::>(); - - averages.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap()); - - if averages.len() < 2 {continue;} - let end = Instant::now(); - lanes_tx.send(Arc::new([clusters.get(&averages[0].0).unwrap().clone(), clusters.get(&averages[1].0 ).unwrap().clone()])).unwrap() ; - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - - (lanes_rx, usage) -} - - -fn create_mestimator_thread(lanes_rx: Receiver>; 2]>>) -> (Receiver<(Arc>, Arc<[Vec>; 2]>)>, Arc>) { - let (z_tx, z_rx) = mpsc::sync_channel::<(Arc>, Arc<[Vec>; 2]>)>(1); - let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); - let local_usage = usage.clone(); - let c = 0.1 * 2.3849; - - thread::spawn(move || { - let mut z = vec![4.0, -2.0, 0.0, 0.0]; - let mut t_loop = Instant::now(); - - for lanes in lanes_rx { - let start = Instant::now(); - - let mut p = Vec::>::new(); - p.append(&mut lanes[0].clone()); - p.append(&mut lanes[1].clone()); - let y = Array1::from_vec(p.iter().map(|point| point[0]).collect()); - - let mut H: Array2 = Array2::from_shape_vec((0, 4), Vec::::new()).unwrap(); - - lanes[0].clone().into_iter().map(|point| point[1]).for_each(|x| { - let a = vec![0.5, -1.0, -x, 0.5*x.powi(2)]; - let b: Array1 = Array1::from_vec(a); - H.push_row(b.view()).unwrap() - }); - lanes[1].clone().into_iter().map(|point| point[1]).for_each(|x| { - let a = vec![-0.5, -1.0, -x, 0.5*x.powi(2)]; - let b: Array1 = Array1::from_vec(a); - H.push_row(b.view()).unwrap() - }); - - let H_t = H.t(); - - for _ in 0..3 { - let res_l = lanes[0] - .iter() - .map(|point| point[0] - (0.5 * z[0] - z[1] - point[1] * z[2] + 0.5 * z[3] * point[1].powi(2))) - .map(|r| 1.0/(1.0 + (r/c).powi(2))) - .collect::>(); - - let res_r = lanes[1] - .iter() - .map(|point| point[0] - (-0.5 * z[0] - z[1] - point[0] * z[2] + 0.5 * z[3] * point[1].powi(2))) - .map(|r| 1.0/(1.0 + (r/c).powi(2))) - .collect::>(); - - let mut v = Vec::::new(); - v.append(&mut res_l.clone()); - v.append(&mut res_r.clone()); - - let w = Array2::from_diag(&Array1::from_vec(v)); - - let first_part: Array2 = H_t.dot(&w).dot(&H); - - let a_nalgebra = DMatrix::from_row_slice(4, 4, first_part.as_slice().unwrap()); - - // Perform the matrix inversion using nalgebra - let inv = a_nalgebra.pseudo_inverse(0.0).unwrap(); - - // Convert the nalgebra matrix back to Array2 - let second_part: Array2 = Array2::from_shape_vec((4, 4), inv.as_slice().to_vec()).unwrap(); - - z = second_part.dot(&H_t).dot(&w).dot(&y).to_vec(); - } - - let end = Instant::now(); - z_tx.send((Arc::new(z.clone()), lanes)).unwrap(); - *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); - t_loop = Instant::now(); - } - }); - - (z_rx, usage) -} - +use crate::pipeline::m_estimator::create_mestimator_thread; #[show_image::main] fn main() { @@ -189,32 +28,29 @@ fn main() { let mut pool = LocalPool::new(); let spawner = pool.spawner(); - let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 160, 0.025); + let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 196, 0.025); let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(pointcloud_rx); - let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx); - let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx); - let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx); - + let (z_rx, usage_detect) = create_mestimator_thread(cluster_labels_rx); +/* let multi_stats = MultiProgress::new(); let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {pos:>6} us | {len:>6} us").unwrap(); let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone())); let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone())); - let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone())); - let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone())); let p_detect = multi_stats.add(ProgressBar::new(100).with_prefix("Detect").with_style(p_style.clone())); let p_max = multi_stats.add(ProgressBar::new(100).with_prefix("Pipeline load").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.red/white} {percent:>3}%").unwrap())); let p_avg_ms = multi_stats.add(ProgressBar::new(100_000).with_prefix("Pipeline delay").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.cyan/white} {pos:>6} us").unwrap())); let p_framerate = multi_stats.add(ProgressBar::new(150).with_prefix("Framerate").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.blue/white} {pos:>3} FPS").unwrap())); - - let window = create_window("image", Default::default()).unwrap(); +*/ let mut t: VecDeque = VecDeque::with_capacity(25); let mut now: Instant = Instant::now(); + let img = ImageReader::open("./images/lane_detection_loop_80.png").unwrap().decode().unwrap(); // Run the subscriber in one task, printing the messages spawner.spawn_local(async move { subscriber - .for_each(|msg| { + .for_each(|mut msg| { + msg.data = img.clone().into_bytes(); img_tx.send(Arc::new(msg)).unwrap(); future::ready(()) }) @@ -222,7 +58,30 @@ fn main() { }).unwrap(); thread::spawn(move || { - for _ in z_rx {} + let window = create_window("image", Default::default()).unwrap(); + for z in z_rx { + let mut rgb_image = RgbImage::new(800, 800); + + let colors = vec![Rgb([0, 255, 0]), Rgb([255, 0, 0])]; + + (0..rgb_image.height()).for_each(|x| { + let x_m = x as f64 * (45.0 / 800.0); + let const_part = - z[1] - x_m * z[2] + 0.5 * z[3] * x_m.powi(2); + let l = (0.5 * z[0] + const_part) * (800.0/45.0);; + let r = (-0.5 * z[0] + const_part) * (800.0/45.0);; + + if l.abs() < rgb_image.width() as f64 / 2.0 - 1.0 { + rgb_image.put_pixel((rgb_image.width() as f64 /2.0 + l) as u32, rgb_image.height() - 1 - x, colors[0]); + } + if r.abs() < rgb_image.width() as f64 / 2.0 - 1.0 { + rgb_image.put_pixel((rgb_image.width() as f64/2.0 + r) as u32, rgb_image.height() - 1 - x, colors[1]); + } + }); + + let image = ImageView::new(ImageInfo::new(PixelFormat::Rgb8, 800, 800), rgb_image.iter().as_slice()); + window.set_image("camera", image).unwrap(); + + } }); loop { @@ -231,28 +90,21 @@ fn main() { let u_extract = *usage_extract.lock().unwrap(); let u_cluster = *usage_cluster.lock().unwrap(); - let u_split = *usage_split.lock().unwrap(); - let u_isolate = *usage_isolate.lock().unwrap(); let u_detect = *usage_detect.lock().unwrap(); - let u_vec = vec![u_extract, u_cluster, u_split, u_isolate, u_detect]; - + let u_vec = vec![u_extract, u_cluster, u_detect]; + /* p_extract.set_length(u_extract.1.as_micros() as u64); p_extract.set_position(u_extract.0.as_micros() as u64); p_cluster.set_length(u_cluster.1.as_micros() as u64); p_cluster.set_position(u_cluster.0.as_micros() as u64); - p_split.set_length(u_split.1.as_micros() as u64); - p_split.set_position(u_split.0.as_micros() as u64); - p_isolate.set_length(u_isolate.1.as_micros() as u64); - p_isolate.set_position(u_isolate.0.as_micros() as u64); p_detect.set_length(u_detect.1.as_micros() as u64); p_detect.set_position(u_detect.0.as_micros() as u64); p_max.set_position(u_vec.iter().map(|(t, d)| ((t.as_secs_f64() / d.as_secs_f64()) * 100.0) as u64).max().unwrap()); p_avg_ms.set_position(u_vec.iter().map(|(t, _)| t).sum::().as_micros() as u64); p_framerate.set_position((1.0 / (t.iter().map(|d| d.as_secs_f64()).sum::() / t.len() as f64)) as u64); - - +*/ if t.len() == 25 { t.pop_front(); } diff --git a/src/pipeline.rs b/src/pipeline.rs index 1f42681..9563b96 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1 +1,3 @@ -pub mod highlight_extractor; \ No newline at end of file +pub mod highlight_extractor; +pub mod hdb_clusterer; +pub mod m_estimator; \ No newline at end of file diff --git a/src/pipeline/hdb_clusterer.rs b/src/pipeline/hdb_clusterer.rs new file mode 100644 index 0000000..f961196 --- /dev/null +++ b/src/pipeline/hdb_clusterer.rs @@ -0,0 +1,89 @@ +use std::collections::HashMap; +use std::iter::zip; +use std::sync::{mpsc, Arc, Mutex}; +use std::sync::mpsc::Receiver; +use std::thread; +use std::time::{Duration, Instant}; +use hdbscan::{Hdbscan, HdbscanHyperParams}; +use r2r::indexmap::IndexMap; +use crate::pipeline::highlight_extractor::Pointcloud; + +type PointcloudLabels = Arc>; +pub type PointcloudLabeled = Arc, i32)>>; + +trait Label { + fn label(&self) -> Option; +} + +trait SortLabeled { + fn sort_horizontally(&self) -> PointcloudLabeled; +} + +impl SortLabeled for PointcloudLabeled { + fn sort_horizontally(&self) -> PointcloudLabeled { + let mut averages: HashMap> = HashMap::new(); + self.iter().filter(|(_, label)| *label >= 0).for_each(|(point, label)| { + if !averages.contains_key(label) { + averages.insert(*label, Vec::new()); + } + averages.get_mut(label).unwrap().push(point[1]); + }); + + let mut average: Vec<(i32, f64)> = averages.into_iter().map(|(label, y_values)| (label, y_values.iter().sum::() / y_values.len() as f64)).collect(); + average.sort_by(|(_, a), (_, b)| a.partial_cmp(&b).unwrap()); + println!("{:?}", average); + let sorted_map: IndexMap<_, _> = average.iter().enumerate().map(|(new_label, (old_label, _))| (old_label, new_label)).collect(); + + Arc::new(self.iter() + .filter(|(_, label)| *label >= 0) + .map(|(point, label)| (point.clone(), *sorted_map.get(label).unwrap() as i32)) + .filter(|(_, label)| *label == 0 || *label == average.len() as i32 - 1) + .collect()) + } +} + +impl Label for Pointcloud { + fn label(&self) -> Option { + if self.len() < 75 { return None } + + let params = HdbscanHyperParams::builder() + .epsilon(10000.0) + .min_samples(3) + .allow_single_cluster(false) + .min_cluster_size(20) + .build(); + + let labels = Hdbscan::new(self, params).cluster().unwrap(); + + let pointcloud_labeled: PointcloudLabeled = Arc::new(zip(self.iter(), labels) + .map(|(point, label)| (point.clone(), label)) + .collect()); + + Some(pointcloud_labeled) + } +} + +pub fn create_pointcloud_labels_thread(pointcloud_rx: Receiver) -> (Receiver, Arc>) { + let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::(1); + let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); + let local_usage = usage.clone(); + + thread::spawn(move || { + let mut t_loop = Instant::now(); + for pointcloud in pointcloud_rx { + let start = Instant::now(); + + if let Some(labeled_pointcloud) = pointcloud.label() { + let clusters = labeled_pointcloud.sort_horizontally(); + + let end = Instant::now(); + pointcloud_labels_tx.send(clusters).unwrap(); + + *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); + t_loop = Instant::now(); + } + } + }); + + (pointcloud_labels_rx, usage) +} \ No newline at end of file diff --git a/src/pipeline/highlight_extractor.rs b/src/pipeline/highlight_extractor.rs index 3a5f9fa..fc5387e 100644 --- a/src/pipeline/highlight_extractor.rs +++ b/src/pipeline/highlight_extractor.rs @@ -4,7 +4,6 @@ use std::thread; use std::time::{Duration, Instant}; use rand::Rng; use r2r::sensor_msgs::msg::Image; -use rayon::prelude::IntoParallelRefIterator; pub type Pointcloud = Arc>>; @@ -42,8 +41,8 @@ impl ExtractHighlights for Image { let v_norm = v / w; vec![u_norm, v_norm] - }) - .filter(|point| point[0].abs() <= 5.0) + }) + //.filter(|point| point[0].abs() <= 5.0) .collect()) } } diff --git a/src/pipeline/m_estimator.rs b/src/pipeline/m_estimator.rs index e69de29..25ef19f 100644 --- a/src/pipeline/m_estimator.rs +++ b/src/pipeline/m_estimator.rs @@ -0,0 +1,66 @@ +use std::sync::{mpsc, Arc, Mutex}; +use std::sync::mpsc::Receiver; +use std::thread; +use std::time::{Duration, Instant}; +use nalgebra::DMatrix; +use ndarray::{Array1, Array2}; +use crate::pipeline::hdb_clusterer::PointcloudLabeled; + +pub type LaneEstimation = Arc>; +pub fn create_mestimator_thread(lanes_rx: Receiver) -> (Receiver, Arc>) { + let (z_tx, z_rx) = mpsc::sync_channel::(1); + let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO))); + let local_usage = usage.clone(); + let c = 0.1 * 2.3849; + + thread::spawn(move || { + let mut z = vec![4.0, -2.0, 0.0, 0.0]; + let mut t_loop = Instant::now(); + + for lanes in lanes_rx { + let start = Instant::now(); + + let y = Array1::from_vec(lanes.iter().map(|(point, label)| point[0]).collect()); + + let mut H: Array2 = Array2::from_shape_vec((0, 4), Vec::::new()).unwrap(); + + lanes.iter().for_each(|(point, label)| { + let a= vec![if *label == 0 {0.5} else {-0.5}, -1.0, -point[1], 0.5*point[1].powi(2)]; + + let b: Array1 = Array1::from_vec(a); + H.push_row(b.view()).unwrap() + }); + + let H_t = H.t(); + + for _ in 0..3 { + let res = lanes + .iter() + .map(|(point, label)| point[0] - (if *label == 0 {0.5} else {-0.5} * z[0] - z[1] - point[1] * z[2] + 0.5 * z[3] * point[1].powi(2))) + .map(|r| 1.0/(1.0 + (r/c).powi(2))) + .collect::>(); + + let w = Array2::from_diag(&Array1::from_vec(res)); + + let first_part: Array2 = H_t.dot(&w).dot(&H); + + let a_nalgebra = DMatrix::from_row_slice(4, 4, first_part.as_slice().unwrap()); + + // Perform the matrix inversion using nalgebra + let inv = a_nalgebra.pseudo_inverse(0.0).unwrap(); + + // Convert the nalgebra matrix back to Array2 + let second_part: Array2 = Array2::from_shape_vec((4, 4), inv.as_slice().to_vec()).unwrap(); + + z = second_part.dot(&H_t).dot(&w).dot(&y).to_vec(); + } + + let end = Instant::now(); + z_tx.send(Arc::new(z.clone())).unwrap(); + *local_usage.lock().unwrap() = (end - start, t_loop.elapsed()); + t_loop = Instant::now(); + } + }); + + (z_rx, usage) +} \ No newline at end of file diff --git a/src/pipeline/pointcloud_clusterer.rs b/src/pipeline/pointcloud_clusterer.rs deleted file mode 100644 index e69de29..0000000