added pipeline stats

This commit is contained in:
2025-05-06 23:28:56 +02:00
parent 5ecb8e7153
commit 42ef932f5e
2 changed files with 90 additions and 30 deletions

View File

@@ -14,3 +14,4 @@ rand = "0.9.1"
rayon = "1.10.0" rayon = "1.10.0"
hdbscan = "0.10.0" hdbscan = "0.10.0"
nalgebra = "0.33.2" nalgebra = "0.33.2"
indicatif = "0.17.11"

View File

@@ -6,16 +6,21 @@ use show_image::{create_window, ImageInfo, ImageView, PixelFormat};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::iter::zip; use std::iter::zip;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use nalgebra::DMatrix; use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use nalgebra::{partial_sort2, DMatrix};
use ndarray::{Array1, Array2}; use ndarray::{Array1, Array2};
fn create_extract_thread(image_rx: Receiver<Arc<Vec<u8>>>, width: u32, threshold: u8, downsample: f64) -> Receiver<Arc<Vec<Vec<f64>>>> { fn create_extract_thread(image_rx: Receiver<Arc<Vec<u8>>>, width: u32, threshold: u8, downsample: f64) -> (Receiver<Arc<Vec<Vec<f64>>>>, Arc<Mutex<f64>>) {
let (tx, rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1); let (tx, rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
thread::spawn(move || { thread::spawn(move || {
for image in image_rx { for image in image_rx {
let start = Instant::now();
let max_rng = (u32::MAX as f64 * downsample) as u32; let max_rng = (u32::MAX as f64 * downsample) as u32;
let poi = image let poi = image
.par_iter() .par_iter()
@@ -31,17 +36,22 @@ fn create_extract_thread(image_rx: Receiver<Arc<Vec<u8>>>, width: u32, threshold
let mut vec = Vec::new(); let mut vec = Vec::new();
vec.par_extend(poi); vec.par_extend(poi);
let end = Instant::now();
tx.send(Arc::new(vec)).expect("TODO: panic message"); tx.send(Arc::new(vec)).expect("TODO: panic message");
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
rx (rx, usage)
} }
fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>) -> Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)> { fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>) -> (Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>, Arc<Mutex<f64>>) {
let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>(1); let (pointcloud_labels_tx, pointcloud_labels_rx) = mpsc::sync_channel::<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
thread::spawn(move || { thread::spawn(move || {
for pointcloud in pointcloud_rx { for pointcloud in pointcloud_rx {
let start = Instant::now();
let params = HdbscanHyperParams::builder() let params = HdbscanHyperParams::builder()
.epsilon(1000.0) .epsilon(1000.0)
.min_samples(3) .min_samples(3)
@@ -49,18 +59,23 @@ fn create_pointcloud_labels_thread(pointcloud_rx: Receiver<Arc<Vec<Vec<f64>>>>)
.min_cluster_size(pointcloud.len() / 10) .min_cluster_size(pointcloud.len() / 10)
.build(); .build();
let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap()); let labels = Arc::new(Hdbscan::new(&pointcloud, params).cluster().unwrap());
let end = Instant::now();
pointcloud_labels_tx.send((pointcloud, labels)).unwrap(); pointcloud_labels_tx.send((pointcloud, labels)).unwrap();
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
pointcloud_labels_rx (pointcloud_labels_rx, usage)
} }
fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>) -> Receiver<Arc<HashMap<i32, Vec<Vec<f64>>>>> { fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc<Vec<Vec<f64>>>, Arc<Vec<i32>>)>) -> (Receiver<Arc<HashMap<i32, Vec<Vec<f64>>>>>, Arc<Mutex<f64>>) {
let (clusters_tx, clusters_rx) = mpsc::sync_channel::<Arc<HashMap<i32, Vec<Vec<f64>>>>>(1); let (clusters_tx, clusters_rx) = mpsc::sync_channel::<Arc<HashMap<i32, Vec<Vec<f64>>>>>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
thread::spawn(move || { thread::spawn(move || {
for (pointcloud, labels) in cluster_labels { for (pointcloud, labels) in cluster_labels {
let start = Instant::now();
let mut clusters = HashMap::<i32, Vec<Vec<f64>>>::with_capacity(5); let mut clusters = HashMap::<i32, Vec<Vec<f64>>>::with_capacity(5);
let _ = zip(pointcloud.iter(), labels.iter()).for_each(|(point, label)| { let _ = zip(pointcloud.iter(), labels.iter()).for_each(|(point, label)| {
@@ -69,19 +84,23 @@ fn create_cluster_separator_thread(cluster_labels: Receiver<(Arc<Vec<Vec<f64>>>,
clusters.get_mut(label).unwrap().push(point.clone()); clusters.get_mut(label).unwrap().push(point.clone());
}); });
let end = Instant::now();
clusters_tx.send(Arc::new(clusters)).unwrap(); clusters_tx.send(Arc::new(clusters)).unwrap();
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
clusters_rx (clusters_rx, usage)
} }
fn create_isolate_lane_thread(clusters_rx: Receiver<Arc<HashMap<i32, Vec<Vec<f64>>>>>) -> Receiver<Arc<[Vec<Vec<f64>>; 2]>> { fn create_isolate_lane_thread(clusters_rx: Receiver<Arc<HashMap<i32, Vec<Vec<f64>>>>>) -> (Receiver<Arc<[Vec<Vec<f64>>; 2]>>, Arc<Mutex<f64>>) {
let (lanes_tx, lanes_rx) = mpsc::sync_channel::<Arc<[Vec<Vec<f64>>; 2]>>(1); let (lanes_tx, lanes_rx) = mpsc::sync_channel::<Arc<[Vec<Vec<f64>>; 2]>>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
thread::spawn(move || { thread::spawn(move || {
for clusters in clusters_rx { for clusters in clusters_rx {
let start = Instant::now();
let mut averages = clusters.iter().filter(|(label, _)| **label != -1).map(|(label, cluster)| { let mut averages = clusters.iter().filter(|(label, _)| **label != -1).map(|(label, cluster)| {
(*label, cluster.iter().map(|point| point[0]).sum::<f64>() / (cluster.len() as f64)) (*label, cluster.iter().map(|point| point[0]).sum::<f64>() / (cluster.len() as f64))
}).collect::<Vec<(i32, f64)>>(); }).collect::<Vec<(i32, f64)>>();
@@ -89,17 +108,20 @@ fn create_isolate_lane_thread(clusters_rx: Receiver<Arc<HashMap<i32, Vec<Vec<f64
averages.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap()); averages.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap());
if averages.len() < 2 {continue;} if averages.len() < 2 {continue;}
let end = Instant::now();
lanes_tx.send(Arc::new([clusters.get(&averages[0].0).unwrap().clone(), clusters.get(&averages[1].0 ).unwrap().clone()])).unwrap() ; lanes_tx.send(Arc::new([clusters.get(&averages[0].0).unwrap().clone(), clusters.get(&averages[1].0 ).unwrap().clone()])).unwrap() ;
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
lanes_rx (lanes_rx, usage)
} }
fn create_transform_thread(pointcloud_rx : Receiver<Arc<Vec<Vec<f64>>>>) -> Receiver<Arc<Vec<Vec<f64>>>>{ fn create_transform_thread(pointcloud_rx : Receiver<Arc<Vec<Vec<f64>>>>) -> (Receiver<Arc<Vec<Vec<f64>>>>, Arc<Mutex<f64>>) {
let (projection_tx, projection_rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1); let (projection_tx, projection_rx) = mpsc::sync_channel::<Arc<Vec<Vec<f64>>>>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
thread::spawn(move || { thread::spawn(move || {
let h_i = [ let h_i = [
@@ -109,6 +131,7 @@ fn create_transform_thread(pointcloud_rx : Receiver<Arc<Vec<Vec<f64>>>>) -> Rece
]; ];
for pointcloud in pointcloud_rx { for pointcloud in pointcloud_rx {
let start = Instant::now();
let projection = pointcloud let projection = pointcloud
.iter() .iter()
.map(|pt| { .map(|pt| {
@@ -128,20 +151,26 @@ fn create_transform_thread(pointcloud_rx : Receiver<Arc<Vec<Vec<f64>>>>) -> Rece
.filter(|point| point[0].abs() <= 5.0) .filter(|point| point[0].abs() <= 5.0)
.collect(); .collect();
let end = Instant::now();
projection_tx.send(Arc::new(projection)).unwrap(); projection_tx.send(Arc::new(projection)).unwrap();
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
projection_rx
(projection_rx, usage)
} }
fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> Receiver<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)> { fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> (Receiver<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>, Arc<Mutex<f64>>) {
let (z_tx, z_rx) = mpsc::sync_channel::<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>(1); let (z_tx, z_rx) = mpsc::sync_channel::<(Arc<Vec<f64>>, Arc<[Vec<Vec<f64>>; 2]>)>(1);
let usage = Arc::new(Mutex::new(0.0));
let local_usage = usage.clone();
let c = 0.1 * 2.3849; let c = 0.1 * 2.3849;
thread::spawn(move || { thread::spawn(move || {
let mut z = vec![4.0, -2.0, 0.0, 0.0]; let mut z = vec![4.0, -2.0, 0.0, 0.0];
for lanes in lanes_rx { for lanes in lanes_rx {
let start = Instant::now();
let mut p = Vec::<Vec<f64>>::new(); let mut p = Vec::<Vec<f64>>::new();
p.append(&mut lanes[0].clone()); p.append(&mut lanes[0].clone());
p.append(&mut lanes[1].clone()); p.append(&mut lanes[1].clone());
@@ -162,7 +191,7 @@ fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> Rece
let H_t = H.t(); let H_t = H.t();
for _ in 0..5 { for _ in 0..3 {
let res_l = lanes[0] let res_l = lanes[0]
.iter() .iter()
.map(|point| point[0] - (0.5 * z[0] - z[1] - point[1] * z[2] + 0.5 * z[3] * point[1].powi(2))) .map(|point| point[0] - (0.5 * z[0] - z[1] - point[1] * z[2] + 0.5 * z[3] * point[1].powi(2)))
@@ -194,16 +223,18 @@ fn create_mestimator_thread(lanes_rx: Receiver<Arc<[Vec<Vec<f64>>; 2]>>) -> Rece
z = second_part.dot(&H_t).dot(&w).dot(&y).to_vec(); z = second_part.dot(&H_t).dot(&w).dot(&y).to_vec();
} }
let end = Instant::now();
z_tx.send((Arc::new(z.clone()), lanes)).unwrap(); z_tx.send((Arc::new(z.clone()), lanes)).unwrap();
*local_usage.lock().unwrap() = (end - start).as_secs_f64() / start.elapsed().as_secs_f64();
} }
}); });
z_rx (z_rx, usage)
} }
fn dummy_image_spawner_thread() -> Receiver<Arc<Vec<u8>>> { fn dummy_image_spawner_thread() -> Receiver<Arc<Vec<u8>>> {
let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_60.png").unwrap().decode().unwrap(); let img: DynamicImage = ImageReader::open("./images/lane_detection_loop_80.png").unwrap().decode().unwrap();
let img_data: Vec<u8> = img.clone().into_bytes(); let img_data: Vec<u8> = img.clone().into_bytes();
let (img_tx, img_rx) = mpsc::sync_channel::<Arc<Vec<u8>>>(1); let (img_tx, img_rx) = mpsc::sync_channel::<Arc<Vec<u8>>>(1);
@@ -220,13 +251,24 @@ fn dummy_image_spawner_thread() -> Receiver<Arc<Vec<u8>>> {
#[show_image::main] #[show_image::main]
fn main() { fn main() {
let image_rx = dummy_image_spawner_thread(); let image_rx = dummy_image_spawner_thread();
let pointcloud_rx = create_extract_thread(image_rx, 800, 196, 0.025); let (pointcloud_rx, usage_extract) = create_extract_thread(image_rx, 800, 196, 0.025);
let projection_rx = create_transform_thread(pointcloud_rx); let (projection_rx, usage_transform) = create_transform_thread(pointcloud_rx);
let cluster_labels_rx = create_pointcloud_labels_thread(projection_rx); let (cluster_labels_rx, usage_cluster) = create_pointcloud_labels_thread(projection_rx);
let clusters_rx = create_cluster_separator_thread(cluster_labels_rx); let (clusters_rx, usage_split) = create_cluster_separator_thread(cluster_labels_rx);
let lanes_rx = create_isolate_lane_thread(clusters_rx); let (lanes_rx, usage_isolate) = create_isolate_lane_thread(clusters_rx);
let z_rx = create_mestimator_thread(lanes_rx); let (z_rx, usage_detect) = create_mestimator_thread(lanes_rx);
let multi_stats = MultiProgress::new();
let p_style = ProgressStyle::default_bar().template("{prefix:15} {bar:50.gray/white} {percent:>3}%").unwrap();
let p_extract = multi_stats.add(ProgressBar::new(100).with_prefix("Extract").with_style(p_style.clone()));
let p_transform = multi_stats.add(ProgressBar::new(100).with_prefix("Transform").with_style(p_style.clone()));
let p_cluster = multi_stats.add(ProgressBar::new(100).with_prefix("Cluster").with_style(p_style.clone()));
let p_split = multi_stats.add(ProgressBar::new(100).with_prefix("Split").with_style(p_style.clone()));
let p_isolate = multi_stats.add(ProgressBar::new(100).with_prefix("Isolate").with_style(p_style.clone()));
let p_detect = multi_stats.add(ProgressBar::new(100).with_prefix("Detect").with_style(p_style.clone()));
let p_max = multi_stats.add(ProgressBar::new(100).with_prefix("Pipeline load").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.red/white} {percent:>3}%").unwrap()));
let p_framerate = multi_stats.add(ProgressBar::new(150).with_prefix("Framerate").with_style(ProgressStyle::default_bar().template("{prefix:15} {bar:50.blue/white} {pos:>3} FPS").unwrap()));
let window = create_window("image", Default::default()).unwrap(); let window = create_window("image", Default::default()).unwrap();
let mut t: VecDeque<Duration> = VecDeque::with_capacity(25); let mut t: VecDeque<Duration> = VecDeque::with_capacity(25);
@@ -237,12 +279,30 @@ fn main() {
for (z, lanes) in z_rx { for (z, lanes) in z_rx {
let u_extract = *usage_extract.lock().unwrap();
let u_transform = *usage_transform.lock().unwrap();
let u_cluster = *usage_cluster.lock().unwrap();
let u_split = *usage_split.lock().unwrap();
let u_isolate = *usage_isolate.lock().unwrap();
let u_detect = *usage_detect.lock().unwrap();
let u_vec = vec![u_extract, u_transform, u_cluster, u_split, u_isolate, u_detect];
let u_total: f64 = u_vec.iter().sum();
p_extract.set_position((u_extract / u_total * 100.0) as u64);
p_transform.set_position((u_transform / u_total * 100.0) as u64);
p_cluster.set_position((u_cluster / u_total * 100.0) as u64);
p_split.set_position((u_split / u_total * 100.0) as u64);
p_isolate.set_position((u_isolate / u_total * 100.0) as u64);
p_detect.set_position((u_detect / u_total * 100.0) as u64);
p_max.set_position((100.0 * *u_vec.iter().max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap()) as u64);
p_framerate.set_position((t.iter().map(|d| 1.0 / d.as_secs_f64()).sum::<f64>() / t.len() as f64) as u64);
let mut rgb_image = RgbImage::new(800, 800); let mut rgb_image = RgbImage::new(800, 800);
let colors = vec![Rgb([0, 255, 0]), Rgb([255, 0, 0])]; let colors = vec![Rgb([0, 255, 0]), Rgb([255, 0, 0])];
println!("{:?}", z);
(0..rgb_image.height()).for_each(|x| { (0..rgb_image.height()).for_each(|x| {
let x_m = x as f64 * (45.0 / 800.0); let x_m = x as f64 * (45.0 / 800.0);
let const_part = - z[1] - x_m * z[2] + 0.5 * z[3] * x_m.powi(2); let const_part = - z[1] - x_m * z[2] + 0.5 * z[3] * x_m.powi(2);
@@ -280,7 +340,6 @@ fn main() {
t.pop_front(); t.pop_front();
} }
t.push_back(now.elapsed()); t.push_back(now.elapsed());
println!("Avg: {:.6} s", t.iter().map(|d| d.as_secs_f64()).sum::<f64>() / t.len() as f64);
now = Instant::now(); now = Instant::now();
} }