added local conf

This commit is contained in:
SHERPA
2026-01-06 16:38:26 +01:00
parent 225fbfe75a
commit ed031c9b8d
13 changed files with 362 additions and 9 deletions

View File

@@ -83,7 +83,7 @@ class LidarTransformPublisher(Node):
self.lidar1_buffer = lidar1_buffer
self.lidar2_buffer = lidar2_buffer
self.lidar1_frame = lidar1_frame
self.lidar2_frame = lidar2_frame
self.lidar2_frame = "livox_base" # lidar2_frame
def publish(self):
self.pcd_1 = self.pcd_buffer_to_o3d(self.lidar1_buffer)

Submodule src/FAST_LIO deleted from f20315319d

Submodule src/Livox-SDK2 deleted from 9791bc3d87

View File

@@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.8)
project(sherpa_lidar_utils)
find_package(ament_cmake REQUIRED)
find_package(rclpy REQUIRED)
# Install the Python script into the lib directory of the package
install(PROGRAMS
src/publish_calibration.py
src/servo_angle_tf.py
src/track_cluster.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()

View File

@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>sherpa_lidar_utils</name>
<version>0.0.1</version>
<description>Studienprojekt</description>
<maintainer email="timoschneider04@gmail.com">timo</maintainer>
<license>Apache 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<!-- Dependencies for the robot control system -->
<depend>rclpy</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
average_tf_publisher.py
Compute the mean of all 4x4 transform .npy files in a folder
and broadcast it as a static TF at 50 Hz.
Usage:
ros2 run <your_package> average_tf_publisher.py [directory] [parent_frame] [child_frame]
Default parameters:
directory = "calibration"
parent_frame = "world"
child_frame = "average_lidar"
"""
import os
import sys
import glob
import numpy as np
from scipy.spatial.transform import Rotation as R
import rclpy
from rclpy.node import Node
from tf2_ros.static_transform_broadcaster import StaticTransformBroadcaster
from geometry_msgs.msg import TransformStamped
class AverageTFPublisher(Node):
def __init__(self):
super().__init__("average_tf_publisher")
self.declare_parameter('calibration', 'calibration')
self.declare_parameter('parent_frame', 'velodyne')
self.declare_parameter('child_frame', 'livox_base')
self.declare_parameter('hz', 50.0)
# Read the values
directory = self.get_parameter('calibration').value
self.parent_frame = self.get_parameter('parent_frame').value
self.child_frame = self.get_parameter('child_frame').value
self.get_logger().info(f"Loading transforms from: {directory}")
transforms = self._load_transforms(directory)
if len(transforms) == 0:
raise RuntimeError(f"No valid .npy transforms found in {directory}")
self.get_logger().info(f"Loaded {len(transforms)} transforms.")
self.mean_t, self.mean_q = self._compute_mean_transform(transforms)
self.get_logger().info(
f"Average translation: {self.mean_t}\n"
f"Average quaternion: {self.mean_q}"
)
self.br = StaticTransformBroadcaster(self)
# Publish at 50 Hz (static TF can be published once, but continuous
# publishing helps late-joining nodes)
period = 1.0 / self.get_parameter('hz').value
self.timer = self.create_timer(period, self._timer_cb)
def _load_transforms(self, directory):
"""Load all 4x4 numpy transforms from directory."""
files = sorted(glob.glob(os.path.join(directory, "*.npy")))
mats = []
for f in files:
try:
T = np.load(f)
if T.shape == (4, 4):
mats.append(T)
else:
self.get_logger().warn(f"Skipping {f}: wrong shape {T.shape}")
except Exception as e:
self.get_logger().warn(f"Failed loading {f}: {e}")
return mats
def _compute_mean_transform(self, mats):
"""Return (mean_translation, mean_quaternion)"""
translations = []
quats = []
for T in mats:
translations.append(T[:3, 3])
q = R.from_matrix(T[:3, :3]).as_quat() # [x,y,z,w]
quats.append(q)
# mean translation
t_mean = np.mean(np.vstack(translations), axis=0)
# quaternion average using eigenvector method
Q = np.vstack(quats)
A = Q.T @ Q
eigvals, eigvecs = np.linalg.eigh(A)
q_mean = eigvecs[:, np.argmax(eigvals)]
if q_mean[3] < 0: # ensure w positive for consistency
q_mean = -q_mean
return t_mean, q_mean # q_mean = [x,y,z,w]
def _timer_cb(self):
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = self.parent_frame
t.child_frame_id = self.child_frame
t.transform.translation.x = float(self.mean_t[0])
t.transform.translation.y = float(self.mean_t[1])
t.transform.translation.z = float(self.mean_t[2])
t.transform.rotation.x = float(self.mean_q[0])
t.transform.rotation.y = float(self.mean_q[1])
t.transform.rotation.z = float(self.mean_q[2])
t.transform.rotation.w = float(self.mean_q[3])
self.br.sendTransform(t)
def main(argv=None):
rclpy.init(args=argv)
try:
node = AverageTFPublisher()
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from geometry_msgs.msg import TransformStamped
import tf2_ros
import math
def euler_to_quaternion(roll, pitch, yaw):
"""Convert Euler angles to quaternion (x, y, z, w)."""
cr = math.cos(roll / 2)
sr = math.sin(roll / 2)
cp = math.cos(pitch / 2)
sp = math.sin(pitch / 2)
cy = math.cos(yaw / 2)
sy = math.sin(yaw / 2)
w = cr * cp * cy + sr * sp * sy
x = sr * cp * cy - cr * sp * sy
y = cr * sp * cy + sr * cp * sy
z = cr * cp * sy - sr * sp * cy
return x, y, z, w
class ServoTfNode(Node):
"""Publishes TF for a servo based on its current position."""
def __init__(self):
super().__init__("stepper_tf_node")
# Subscribe to current servo position
self.pos_sub = self.create_subscription(
Float32,
"/nucleo/stepper0/position",
self.pos_callback,
10
)
# TF broadcaster
self.br = tf2_ros.TransformBroadcaster(self)
self.get_logger().info("Servo TF node started, subscribing to /nucleo/stepper0/position")
def pos_callback(self, msg: Float32):
"""Publish a TF corresponding to the servo position."""
angle_deg = msg.data # get float from subscriber
t = TransformStamped()
#t.header.stamp = (self.get_clock().now() - rclpy.duration.Duration(seconds=0.1)).to_msg()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = "livox_base"
t.child_frame_id = "frame_default"
t.transform.translation.x = 0.0
t.transform.translation.y = 0.0
t.transform.translation.z = 0.0
# Convert angle to yaw (radians) and then to quaternion
yaw = angle_deg * math.pi / 180.0
qx, qy, qz, qw = euler_to_quaternion(0, 0, yaw)
t.transform.rotation.x = qx
t.transform.rotation.y = qy
t.transform.rotation.z = qz
t.transform.rotation.w = qw
# Publish the transform
self.br.sendTransform(t)
def main(args=None):
rclpy.init(args=args)
node = ServoTfNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import PointStamped
from std_msgs.msg import Float32
from tf2_ros import Buffer, TransformListener
import tf2_geometry_msgs
import numpy as np
import math
import sensor_msgs_py.point_cloud2 as pc2
class Cluster5Tracker(Node):
def __init__(self):
super().__init__('cluster_tracker')
self.source_frame = 'lidar_base'
self.target_frame = 'livox_base'
self.buf = []
# Subscribe to point cloud of cluster 5
self.sub = self.create_subscription(
PointCloud2,
'/tracking/target',
self.cb,
10
)
# Publishers
self.centroid_pub = self.create_publisher(PointStamped, '/tracking/target_pos', 10)
self.angle_pub = self.create_publisher(Float32, '/nucleo/stepper0/target', 10)
# TF
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
def cb(self, msg: PointCloud2):
# Extract raw x, y, z as a regular list
points = [[p[0], p[1], p[2]] for p in pc2.read_points(msg, field_names=("x","y","z"), skip_nans=True)]
if len(points) == 0:
self.get_logger().warn("No points in cluster5 PointCloud2")
return
arr = np.array(points, dtype=np.float32)
centroid = np.mean(arr, axis=0)
pt = PointStamped()
pt.header.frame_id = msg.header.frame_id
pt.header.stamp = msg.header.stamp
pt.point.x, pt.point.y, pt.point.z = centroid
try:
# Transform to livox_base
t = self.tf_buffer.lookup_transform(
self.target_frame,
pt.header.frame_id,
pt.header.stamp,
timeout=rclpy.duration.Duration(seconds=1.0)
)
pt_transformed = tf2_geometry_msgs.do_transform_point(pt, t)
except Exception as e:
self.get_logger().warn(f"TF lookup failed: {e}")
return
# Publish centroid
self.centroid_pub.publish(pt_transformed)
# Compute yaw angle in degrees
x, y = pt_transformed.point.x, pt_transformed.point.y
yaw_deg = -math.degrees(math.atan2(y, x))
self.buf.append(yaw_deg)
if len(self.buf) > 15:
self.buf.pop(0)
angle_msg = Float32()
angle_msg.data = sum(self.buf) / len(self.buf)
self.angle_pub.publish(angle_msg)
#self.get_logger().info(f"Centroid: ({x:.2f}, {y:.2f}, {pt_transformed.point.z:.2f}), Yaw: {yaw_deg:.2f}°")
def main():
rclpy.init()
node = Cluster5Tracker()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@@ -5,8 +5,13 @@ import math
################### user configure parameters for ros2 start ###################
# Topics/Frames
frame_id = 'velodyne'
<<<<<<< Updated upstream
topic_preprocessing_in = 'filtered_points'
topic_preprocessing_out = 'new_filtered'
=======
topic_preprocessing_in = 'merged_cloud'
topic_preprocessing_out = 'filtered_points'
>>>>>>> Stashed changes
# Preprocessing
x_min = 0.0
@@ -17,6 +22,7 @@ tan_h_fov = math.pi / 4 # ±45°
tan_v_fov = math.pi / 6 # ±30°
# Clustering
<<<<<<< Updated upstream
z_dim_scale = 0.1
cluster_tolerance = 0.3
min_cluster_size = 10
@@ -27,6 +33,12 @@ min_length = 0.0
max_width = 1.5
max_height = 2.5
max_length = 1.5
=======
z_dim_scale = 1.0
cluster_tolerance = 0.1
min_cluster_size = 350
max_cluster_size = 1500
>>>>>>> Stashed changes
################### user configure parameters for ros2 end #####################
@@ -68,7 +80,7 @@ def generate_launch_description():
'stderr': 'screen',
}
),
Node(
Node(
package='target_tracking',
executable='cloud_clustering_node',
name='cloud_clustering',
@@ -78,4 +90,4 @@ def generate_launch_description():
'stderr': 'screen',
}
)
])
])

View File

@@ -100,7 +100,7 @@ namespace cloud_clustering
// Create a ROS subscriber for the input point cloud
sub = this->create_subscription<sensor_msgs::msg::PointCloud2>(
topic_in,
10, // queue size
rclcpp::SensorDataQoS(), // queue size
std::bind(&CloudClustering::cloud_cb, this, std::placeholders::_1));
std::cout << "Started clustering node with parameters:\n"

View File

@@ -153,11 +153,12 @@ private:
// Convert and publish
sensor_msgs::msg::PointCloud2 out_msg;
pcl::toROSMsg(*voxel_filtered, out_msg);
out_msg.header.stamp = this->get_clock()->now();
out_msg.header.stamp = msg->header.stamp;
out_msg.header.frame_id = msg->header.frame_id;
filtered_publisher_->publish(out_msg);
//RCLCPP_INFO(this->get_logger(), "Filtered %zu -> %zu points", input->points.size(), voxel_filtered->points.size());
<<<<<<< Updated upstream
//auto stop = std::chrono::high_resolution_clock::now();
//auto duration = std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
@@ -165,6 +166,8 @@ private:
// member function on the duration object
//std::cout << duration.count() << std::endl;
=======
>>>>>>> Stashed changes
}
tf2_ros::Buffer tf_buffer_;
@@ -183,4 +186,4 @@ int main(int argc, char * argv[])
rclcpp::spin(std::make_shared<CloudFilterNode>());
rclcpp::shutdown();
return 0;
}
}