From 995d902334e3b883d06969769623df58423d9255 Mon Sep 17 00:00:00 2001 From: Timo Date: Sat, 16 Aug 2025 12:41:29 +0200 Subject: [PATCH] dual lidar calibration script --- scripts/calibrate_lidar.py | 229 +++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 scripts/calibrate_lidar.py diff --git a/scripts/calibrate_lidar.py b/scripts/calibrate_lidar.py new file mode 100644 index 0000000..7567ede --- /dev/null +++ b/scripts/calibrate_lidar.py @@ -0,0 +1,229 @@ +import rclpy +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor +from sensor_msgs.msg import PointCloud2 +from geometry_msgs.msg import TransformStamped +import sensor_msgs_py.point_cloud2 as pc2 +from tf2_ros import StaticTransformBroadcaster +import numpy as np +import matplotlib.pyplot as plt +import struct +from io import BytesIO +import threading +import time +import open3d as o3d +from scipy.spatial.transform import Rotation as R + +class PointCloudSaver(Node): + def __init__(self, node_name: str, pointcloud_topic: str, buffer, timeout_ms: int): + super().__init__(node_name) + self.subscription = self.create_subscription( + PointCloud2, + pointcloud_topic, + self.callback, + 10 + ) + self.buffer = buffer + self.finished = False + self.points = [] + self.end_time = self.get_clock().now().nanoseconds + (timeout_ms * 1_000_000) + self.cmap = plt.get_cmap('jet') + + def callback(self, msg): + now = self.get_clock().now().nanoseconds + for p in pc2.read_points(msg, field_names=("x", "y", "z", "intensity"), skip_nans=True): + self.points.append([p[0], p[1], p[2], p[3]]) + + if now > self.end_time: + if not self.points: + self.get_logger().warn("No points received!") + self.destroy_node() + self.finished = True + return + + np_points = np.array(self.points, dtype=np.float32) + intensities = np_points[:, 3] + norm_int = (intensities - intensities.min()) / (intensities.ptp() + 1e-8) + + # Map normalized intensity to RGB colormap + colors = self.cmap(norm_int)[:, :3] # RGB 0-1 + colors = (colors * 255).astype(np.uint8) + rgb_int = np.left_shift(colors[:,0].astype(np.uint32), 16) | \ + np.left_shift(colors[:,1].astype(np.uint32), 8) | \ + colors[:,2].astype(np.uint32) + + filename = "pointcloud.pcd" + self.write_pcd_with_intensity_rgb(filename, np_points, rgb_int) + self.get_logger().info(f"Saved {filename}") + self.destroy_node() + self.finished = True + + def write_pcd_with_intensity_rgb(self, filename, points, rgb_int): + header = f"""# .PCD v0.7 - Point Cloud Data file format +VERSION 0.7 +FIELDS x y z intensity rgb +SIZE 4 4 4 4 4 +TYPE F F F F U +COUNT 1 1 1 1 1 +WIDTH {points.shape[0]} +HEIGHT 1 +VIEWPOINT 0 0 0 1 0 0 0 +POINTS {points.shape[0]} +DATA binary +""" + self.buffer.write(header.encode('ascii')) + for i in range(points.shape[0]): + # x, y, z, intensity as float32, rgb as uint32 + self.buffer.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i])) + + +class LidarTransformPublisher(Node): + def __init__(self, lidar1_buffer, lidar1_frame, lidar2_buffer, lidar2_frame): + super().__init__('static_transform_lidar_offsets') + + # Static TF broadcaster + self.br = StaticTransformBroadcaster(self) + + self.lidar1_buffer = lidar1_buffer + self.lidar2_buffer = lidar2_buffer + + self.lidar1_frame = lidar1_frame + self.lidar2_frame = lidar2_frame + + def publish(self): + self.pcd_1 = self.pcd_buffer_to_o3d(self.lidar1_buffer) + self.pcd_2 = self.pcd_buffer_to_o3d(self.lidar2_buffer) + + # Compute transform once + self.T = self.compute_transform() + self.get_logger().info(f"Computed initial transform:\n{self.T}") + + # Prepare translation and rotation + T_copy = np.array(self.T, copy=True) + trans = T_copy[:3, 3] + rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w] + + # Create static TransformStamped + t = TransformStamped() + t.header.stamp.sec = 0 + t.header.stamp.nanosec = 0 + t.header.frame_id = self.lidar1_frame + t.child_frame_id = self.lidar2_frame + t.transform.translation.x = trans[0] + t.transform.translation.y = trans[1] + t.transform.translation.z = trans[2] + t.transform.rotation.x = rot_quat[0] + t.transform.rotation.y = rot_quat[1] + t.transform.rotation.z = rot_quat[2] + t.transform.rotation.w = rot_quat[3] + + # Publish once + self.br.sendTransform(t) + self.get_logger().info("Published static transform.") + + def pcd_buffer_to_o3d(self, buffer: BytesIO): + buffer.seek(0) + header_lines = [] + + # Read header lines until 'DATA' line + while True: + line = buffer.readline().decode('ascii').strip() + if not line: + continue + header_lines.append(line) + if line.startswith("DATA"): + data_line = line + break + + # Ensure binary format + if not data_line.lower().startswith("data binary"): + raise NotImplementedError("Only binary PCD supported for buffer parsing") + + # Parse number of points + num_points = 0 + for line in header_lines: + if line.startswith("POINTS"): + num_points = int(line.split()[1]) + if line.startswith("FIELDS"): + fields = line.split()[1:] # Expect: x y z intensity rgb + + if num_points == 0: + raise ValueError("PCD header does not specify number of points") + + # Define numpy dtype based on expected fields + dtype = np.dtype([ + ('x', 'f4'), + ('y', 'f4'), + ('z', 'f4'), + ('intensity', 'f4'), + ('rgb', 'u4') + ]) + + # Read binary data + data = buffer.read(num_points * dtype.itemsize) + points_array = np.frombuffer(data, dtype=dtype, count=num_points) + + # Convert to Open3D point cloud + pcd = o3d.geometry.PointCloud() + xyz = np.stack([points_array['x'], points_array['y'], points_array['z']], axis=-1) + pcd.points = o3d.utility.Vector3dVector(xyz) + + return pcd + + def compute_transform(self): + # Downsample + voxel_size = 0.05 + pcd_1_ds = self.pcd_1.voxel_down_sample(voxel_size) + pcd_2_ds = self.pcd_2.voxel_down_sample(voxel_size) + + # Estimate normals + pcd_1_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20)) + pcd_2_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20)) + + # ICP registration + threshold = 0.5 + reg_result = o3d.pipelines.registration.registration_icp( + pcd_2_ds, pcd_1_ds, threshold, + np.eye(4), + o3d.pipelines.registration.TransformationEstimationPointToPoint() + ) + return reg_result.transformation + + +def monitor_nodes(nodes, publisher): + """Separate thread that monitors node status and shuts down ROS when done.""" + while rclpy.ok(): + if all(node.finished for node in nodes): + print("All pointclouds captured") + publisher.publish() + return + time.sleep(0.1) # check periodically + +def main(): + rclpy.init() + buffer_velodyne = BytesIO() + buffer_livox = BytesIO() + + executor = MultiThreadedExecutor() + + nodes = [ + PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', buffer_velodyne, 200), + PointCloudSaver('livox_pcd_saver', '/livox/lidar', buffer_livox, 500), + ] + publisher = LidarTransformPublisher(buffer_velodyne, 'velodyne', buffer_livox, 'frame_default') + + monitor_thread = threading.Thread(target=monitor_nodes, args=(nodes,publisher), daemon=True) + monitor_thread.start() + + for node in nodes: + executor.add_node(node) + try: + executor.spin() + finally: + monitor_thread.join() + rclpy.shutdown() + + +if __name__ == "__main__": + main() +