wip: update pipeline
This commit is contained in:
140
src/main.rs
140
src/main.rs
@@ -1,5 +1,6 @@
|
|||||||
|
mod pipeline;
|
||||||
|
|
||||||
use hdbscan::{Hdbscan, HdbscanHyperParams};
|
use hdbscan::{Hdbscan, HdbscanHyperParams};
|
||||||
use image::{DynamicImage, ImageReader, Rgb, RgbImage};
|
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use show_image::{create_window, ImageInfo, ImageView, PixelFormat};
|
use show_image::{create_window, ImageInfo, ImageView, PixelFormat};
|
||||||
@@ -8,47 +9,13 @@ use std::iter::zip;
|
|||||||
use std::sync::mpsc::Receiver;
|
use std::sync::mpsc::Receiver;
|
||||||
use std::sync::{mpsc, Arc, Mutex};
|
use std::sync::{mpsc, Arc, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::thread::sleep;
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
||||||
use nalgebra::{DMatrix};
|
use nalgebra::{DMatrix};
|
||||||
use ndarray::{Array1, Array2};
|
use ndarray::{Array1, Array2};
|
||||||
use r2r::QosProfile;
|
use r2r::QosProfile;
|
||||||
use futures::{executor::LocalPool, future, stream::StreamExt, task::LocalSpawnExt};
|
use futures::{executor::LocalPool, future, stream::StreamExt, task::LocalSpawnExt};
|
||||||
|
use crate::pipeline::highlight_extractor::{create_extract_thread, Pointcloud};
|
||||||
fn create_extract_thread(image_rx: Receiver<Arc<Vec<u8>>>, width: u32, threshold: u8, downsample: f64) -> (Receiver<Arc<Vec<Vec<f64>>>>, Arc<Mutex<(Duration, Duration)>>) {
|
|
||||||
let (tx, rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1);
|
|
||||||
let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO)));
|
|
||||||
let local_usage = usage.clone();
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut t_loop = Instant::now();
|
|
||||||
for image in image_rx {
|
|
||||||
let start = Instant::now();
|
|
||||||
|
|
||||||
let max_rng = (u32::MAX as f64 * downsample) as u32;
|
|
||||||
let poi = image
|
|
||||||
.par_iter()
|
|
||||||
.enumerate()
|
|
||||||
.filter(|(_, pixel)| pixel > &&threshold)
|
|
||||||
.filter(|_| {
|
|
||||||
let mut rng = rand::rng();
|
|
||||||
rng.random::<u32>() < max_rng
|
|
||||||
})
|
|
||||||
.map(|(i, _)| {
|
|
||||||
vec![ i as f64 % width as f64, i as f64 / width as f64 ]
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut vec = Vec::new();
|
|
||||||
vec.par_extend(poi);
|
|
||||||
let end = Instant::now();
|
|
||||||
tx.send(Arc::new(vec)).expect("TODO: panic message");
|
|
||||||
*local_usage.lock().unwrap() = (end - start, t_loop.elapsed());
|
|
||||||
t_loop = Instant::now();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
(rx, usage)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>) -> (Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>, Arc<Mutex<(Duration, Duration)>>) {
|
fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>) -> (Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>, Arc<Mutex<(Duration, Duration)>>) {
|
||||||
let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>(1);
|
let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>(1);
|
||||||
@@ -58,12 +25,13 @@ fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>)
|
|||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let mut t_loop = Instant::now();
|
let mut t_loop = Instant::now();
|
||||||
for pointcloud in pointcloud_rx {
|
for pointcloud in pointcloud_rx {
|
||||||
|
if pointcloud.len() < 2 {continue;}
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let params = HdbscanHyperParams::builder()
|
let params = HdbscanHyperParams::builder()
|
||||||
.epsilon(1000.0)
|
.epsilon(1000.0)
|
||||||
.min_samples(3)
|
.min_samples(3)
|
||||||
.allow_single_cluster(false)
|
.allow_single_cluster(false)
|
||||||
.min_cluster_size(pointcloud.len() / 10)
|
.min_cluster_size(2)
|
||||||
.build();
|
.build();
|
||||||
let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap());
|
let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap());
|
||||||
let end = Instant::now();
|
let end = Instant::now();
|
||||||
@@ -132,51 +100,6 @@ fn create_isolate_lane_thread(clusters_rx: Receiver<Arc<HashMap<i32, Vec<Vec<f64
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn create_transform_thread(pointcloud_rx : Receiver<Arc<Vec<Vec<f64>>>>) -> (Receiver<Arc<Vec<Vec<f64>>>>, Arc<Mutex<(Duration, Duration)>>) {
|
|
||||||
let (projection_tx, projection_rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1);
|
|
||||||
let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO)));
|
|
||||||
let local_usage = usage.clone();
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let h_i = [
|
|
||||||
[-7.74914499e-03, 3.95733793e-18, 3.10353257e00],
|
|
||||||
[8.56519716e-18, 9.42313768e-05, -1.86052093e00],
|
|
||||||
[2.57498016e-18, -2.73825295e-03, 1.00000000e00],
|
|
||||||
];
|
|
||||||
|
|
||||||
let mut t_loop = Instant::now();
|
|
||||||
|
|
||||||
for pointcloud in pointcloud_rx {
|
|
||||||
let start = Instant::now();
|
|
||||||
let projection = pointcloud
|
|
||||||
.iter()
|
|
||||||
.map(|pt| {
|
|
||||||
let x = pt[0];
|
|
||||||
let y = pt[1];
|
|
||||||
|
|
||||||
// Convert to homogeneous coordinates
|
|
||||||
let u = h_i[0][0] * x + h_i[0][1] * y + h_i[0][2];
|
|
||||||
let v = h_i[1][0] * x + h_i[1][1] * y + h_i[1][2];
|
|
||||||
let w = h_i[2][0] * x + h_i[2][1] * y + h_i[2][2];
|
|
||||||
|
|
||||||
let u_norm = u / w;
|
|
||||||
let v_norm = v / w;
|
|
||||||
|
|
||||||
vec![u_norm, v_norm]
|
|
||||||
})
|
|
||||||
.filter(|point| point[0].abs() <= 5.0)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let end = Instant::now();
|
|
||||||
projection_tx.send(Arc::new(projection)).unwrap();
|
|
||||||
*local_usage.lock().unwrap() = (end - start, t_loop.elapsed());
|
|
||||||
t_loop = Instant::now();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
(projection_rx, usage)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> (Receiver<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>, Arc<Mutex<(Duration, Duration)>>) {
|
fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> (Receiver<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>, Arc<Mutex<(Duration, Duration)>>) {
|
||||||
let (z_tx, z_rx) = mpsc::sync_channel::<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>(1);
|
let (z_tx, z_rx) = mpsc::sync_channel::<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>(1);
|
||||||
let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO)));
|
let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO)));
|
||||||
@@ -253,22 +176,6 @@ fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> (Rec
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn dummy_image_spawner_thread() -> Receiver<Arc<Vec<u8>>> {
|
|
||||||
let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_80.png").unwrap().decode().unwrap();
|
|
||||||
let img_data: Vec<u8> = img.clone().into_bytes();
|
|
||||||
|
|
||||||
let (img_tx, img_rx) = mpsc::sync_channel::<Arc<Vec<u8>>>(1);
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
loop {
|
|
||||||
img_tx.send(Arc::new(img_data.clone())).unwrap();
|
|
||||||
//sleep(Duration::from_millis(16));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
img_rx
|
|
||||||
}
|
|
||||||
|
|
||||||
#[show_image::main]
|
#[show_image::main]
|
||||||
fn main() {
|
fn main() {
|
||||||
let ctx = r2r::Context::create().unwrap();
|
let ctx = r2r::Context::create().unwrap();
|
||||||
@@ -276,25 +183,14 @@ fn main() {
|
|||||||
let subscriber = node.subscribe::<r2r::sensor_msgs::msg::Image>("/camera/front", QosProfile::default()).unwrap();
|
let subscriber = node.subscribe::<r2r::sensor_msgs::msg::Image>("/camera/front", QosProfile::default()).unwrap();
|
||||||
let mut timer = node.create_wall_timer(Duration::from_millis(1000)).unwrap();
|
let mut timer = node.create_wall_timer(Duration::from_millis(1000)).unwrap();
|
||||||
|
|
||||||
let (img_tx, img_rx) = mpsc::sync_channel::<Arc<Vec<u8>>>(1);
|
let (img_tx, img_rx) = mpsc::sync_channel::<Arc<r2r::sensor_msgs::msg::Image>>(1);
|
||||||
|
|
||||||
// Set up a simple task executor.
|
// Set up a simple task executor.
|
||||||
let mut pool = LocalPool::new();
|
let mut pool = LocalPool::new();
|
||||||
let spawner = pool.spawner();
|
let spawner = pool.spawner();
|
||||||
|
|
||||||
// Run the subscriber in one task, printing the messages
|
let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 160, 0.025);
|
||||||
spawner.spawn_local(async move {
|
let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(pointcloud_rx);
|
||||||
subscriber
|
|
||||||
.for_each(|msg| {
|
|
||||||
img_tx.send(Arc::new(msg.data)).unwrap();
|
|
||||||
future::ready(())
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}).unwrap();
|
|
||||||
|
|
||||||
let (pointcloud_rx, usage_extract) = create_extract_thread(img_rx, 800, 196, 0.025);
|
|
||||||
let (projection_rx, usage_transform) = create_transform_thread(pointcloud_rx);
|
|
||||||
let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(projection_rx);
|
|
||||||
let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx);
|
let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx);
|
||||||
let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx);
|
let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx);
|
||||||
let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx);
|
let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx);
|
||||||
@@ -302,7 +198,6 @@ fn main() {
|
|||||||
let multi_stats = MultiProgress::new();
|
let multi_stats = MultiProgress::new();
|
||||||
let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {pos:>6} us | {len:>6} us").unwrap();
|
let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {pos:>6} us | {len:>6} us").unwrap();
|
||||||
let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone()));
|
let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone()));
|
||||||
let p_transform = multi_stats.add(ProgressBar::new(100).with_prefix("Transform").with_style(p_style.clone()));
|
|
||||||
let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone()));
|
let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone()));
|
||||||
let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone()));
|
let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone()));
|
||||||
let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone()));
|
let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone()));
|
||||||
@@ -316,23 +211,34 @@ fn main() {
|
|||||||
let mut t: VecDeque<Duration> = VecDeque::with_capacity(25);
|
let mut t: VecDeque<Duration> = VecDeque::with_capacity(25);
|
||||||
let mut now: Instant = Instant::now();
|
let mut now: Instant = Instant::now();
|
||||||
|
|
||||||
|
// Run the subscriber in one task, printing the messages
|
||||||
|
spawner.spawn_local(async move {
|
||||||
|
subscriber
|
||||||
|
.for_each(|msg| {
|
||||||
|
img_tx.send(Arc::new(msg)).unwrap();
|
||||||
|
future::ready(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}).unwrap();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
for _ in z_rx {}
|
||||||
|
});
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
node.spin_once(Duration::from_millis(10));
|
node.spin_once(Duration::from_millis(10));
|
||||||
pool.run_until_stalled();
|
pool.run_until_stalled();
|
||||||
|
|
||||||
let u_extract = *usage_extract.lock().unwrap();
|
let u_extract = *usage_extract.lock().unwrap();
|
||||||
let u_transform = *usage_transform.lock().unwrap();
|
|
||||||
let u_cluster = *usage_cluster.lock().unwrap();
|
let u_cluster = *usage_cluster.lock().unwrap();
|
||||||
let u_split = *usage_split.lock().unwrap();
|
let u_split = *usage_split.lock().unwrap();
|
||||||
let u_isolate = *usage_isolate.lock().unwrap();
|
let u_isolate = *usage_isolate.lock().unwrap();
|
||||||
let u_detect = *usage_detect.lock().unwrap();
|
let u_detect = *usage_detect.lock().unwrap();
|
||||||
|
|
||||||
let u_vec = vec![u_extract, u_transform, u_cluster, u_split, u_isolate, u_detect];
|
let u_vec = vec![u_extract, u_cluster, u_split, u_isolate, u_detect];
|
||||||
|
|
||||||
p_extract.set_length(u_extract.1.as_micros() as u64);
|
p_extract.set_length(u_extract.1.as_micros() as u64);
|
||||||
p_extract.set_position(u_extract.0.as_micros() as u64);
|
p_extract.set_position(u_extract.0.as_micros() as u64);
|
||||||
p_transform.set_length(u_transform.1.as_micros() as u64);
|
|
||||||
p_transform.set_position(u_transform.0.as_micros() as u64);
|
|
||||||
p_cluster.set_length(u_cluster.1.as_micros() as u64);
|
p_cluster.set_length(u_cluster.1.as_micros() as u64);
|
||||||
p_cluster.set_position(u_cluster.0.as_micros() as u64);
|
p_cluster.set_position(u_cluster.0.as_micros() as u64);
|
||||||
p_split.set_length(u_split.1.as_micros() as u64);
|
p_split.set_length(u_split.1.as_micros() as u64);
|
||||||
|
|||||||
1
src/pipeline.rs
Normal file
1
src/pipeline.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod highlight_extractor;
|
||||||
69
src/pipeline/highlight_extractor.rs
Normal file
69
src/pipeline/highlight_extractor.rs
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
use std::sync::{mpsc, Arc, Mutex};
|
||||||
|
use std::sync::mpsc::Receiver;
|
||||||
|
use std::thread;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use rand::Rng;
|
||||||
|
use r2r::sensor_msgs::msg::Image;
|
||||||
|
use rayon::prelude::IntoParallelRefIterator;
|
||||||
|
|
||||||
|
pub type Pointcloud = Arc<Vec<Vec<f64>>>;
|
||||||
|
|
||||||
|
trait ExtractHighlights {
|
||||||
|
fn get_highlights(&self, threshold: u8, downsample: f64) -> Pointcloud;
|
||||||
|
}
|
||||||
|
impl ExtractHighlights for Image {
|
||||||
|
fn get_highlights(&self, threshold: u8, downsample: f64) -> Pointcloud {
|
||||||
|
let h_i = [
|
||||||
|
[-7.74914499e-03, 3.95733793e-18, 3.10353257e00],
|
||||||
|
[8.56519716e-18, 9.42313768e-05, -1.86052093e00],
|
||||||
|
[2.57498016e-18, -2.73825295e-03, 1.00000000e00],
|
||||||
|
];
|
||||||
|
|
||||||
|
let max_rng = (u32::MAX as f64 * downsample) as u32;
|
||||||
|
Arc::new(self.data
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(i, _)| *i as u32 / self.width >= self.height / 2)
|
||||||
|
.filter(|(_, pixel)| **pixel > threshold)
|
||||||
|
.filter(|_| {
|
||||||
|
let mut rng = rand::rng();
|
||||||
|
rng.random::<u32>() < max_rng
|
||||||
|
})
|
||||||
|
.map(|(i, _)| {
|
||||||
|
let x = i as f64 % self.width as f64;
|
||||||
|
let y = i as f64 / self.width as f64;
|
||||||
|
|
||||||
|
// Convert to homogeneous coordinates
|
||||||
|
let u = h_i[0][0] * x + h_i[0][1] * y + h_i[0][2];
|
||||||
|
let v = h_i[1][0] * x + h_i[1][1] * y + h_i[1][2];
|
||||||
|
let w = h_i[2][0] * x + h_i[2][1] * y + h_i[2][2];
|
||||||
|
|
||||||
|
let u_norm = u / w;
|
||||||
|
let v_norm = v / w;
|
||||||
|
|
||||||
|
vec![u_norm, v_norm]
|
||||||
|
})
|
||||||
|
.filter(|point| point[0].abs() <= 5.0)
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn create_extract_thread(image_rx: Receiver<Arc<Image>>, threshold: u8, downsample: f64) -> (Receiver<Pointcloud>, Arc<Mutex<(Duration, Duration)>>) {
|
||||||
|
let (tx, rx) = mpsc::sync_channel::<Pointcloud>(1);
|
||||||
|
let usage = Arc::new(Mutex::new((Duration::ZERO, Duration::ZERO)));
|
||||||
|
let local_usage = usage.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut t_loop = Instant::now();
|
||||||
|
for image in image_rx {
|
||||||
|
let start = Instant::now();
|
||||||
|
let poi = image.get_highlights(threshold, downsample);
|
||||||
|
let end = Instant::now();
|
||||||
|
|
||||||
|
tx.send(poi).unwrap();
|
||||||
|
|
||||||
|
*local_usage.lock().unwrap() = (end - start, t_loop.elapsed());
|
||||||
|
t_loop = Instant::now();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
(rx, usage)
|
||||||
|
}
|
||||||
0
src/pipeline/m_estimator.rs
Normal file
0
src/pipeline/m_estimator.rs
Normal file
0
src/pipeline/pointcloud_clusterer.rs
Normal file
0
src/pipeline/pointcloud_clusterer.rs
Normal file
0
src/pipeline/ros_interface.rs
Normal file
0
src/pipeline/ros_interface.rs
Normal file
Reference in New Issue
Block a user