dual lidar calibration script

This commit is contained in:
2025-08-16 12:41:29 +02:00
parent 1d262e5e19
commit 995d902334

229
scripts/calibrate_lidar.py Normal file
View File

@@ -0,0 +1,229 @@
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import TransformStamped
import sensor_msgs_py.point_cloud2 as pc2
from tf2_ros import StaticTransformBroadcaster
import numpy as np
import matplotlib.pyplot as plt
import struct
from io import BytesIO
import threading
import time
import open3d as o3d
from scipy.spatial.transform import Rotation as R
class PointCloudSaver(Node):
def __init__(self, node_name: str, pointcloud_topic: str, buffer, timeout_ms: int):
super().__init__(node_name)
self.subscription = self.create_subscription(
PointCloud2,
pointcloud_topic,
self.callback,
10
)
self.buffer = buffer
self.finished = False
self.points = []
self.end_time = self.get_clock().now().nanoseconds + (timeout_ms * 1_000_000)
self.cmap = plt.get_cmap('jet')
def callback(self, msg):
now = self.get_clock().now().nanoseconds
for p in pc2.read_points(msg, field_names=("x", "y", "z", "intensity"), skip_nans=True):
self.points.append([p[0], p[1], p[2], p[3]])
if now > self.end_time:
if not self.points:
self.get_logger().warn("No points received!")
self.destroy_node()
self.finished = True
return
np_points = np.array(self.points, dtype=np.float32)
intensities = np_points[:, 3]
norm_int = (intensities - intensities.min()) / (intensities.ptp() + 1e-8)
# Map normalized intensity to RGB colormap
colors = self.cmap(norm_int)[:, :3] # RGB 0-1
colors = (colors * 255).astype(np.uint8)
rgb_int = np.left_shift(colors[:,0].astype(np.uint32), 16) | \
np.left_shift(colors[:,1].astype(np.uint32), 8) | \
colors[:,2].astype(np.uint32)
filename = "pointcloud.pcd"
self.write_pcd_with_intensity_rgb(filename, np_points, rgb_int)
self.get_logger().info(f"Saved {filename}")
self.destroy_node()
self.finished = True
def write_pcd_with_intensity_rgb(self, filename, points, rgb_int):
header = f"""# .PCD v0.7 - Point Cloud Data file format
VERSION 0.7
FIELDS x y z intensity rgb
SIZE 4 4 4 4 4
TYPE F F F F U
COUNT 1 1 1 1 1
WIDTH {points.shape[0]}
HEIGHT 1
VIEWPOINT 0 0 0 1 0 0 0
POINTS {points.shape[0]}
DATA binary
"""
self.buffer.write(header.encode('ascii'))
for i in range(points.shape[0]):
# x, y, z, intensity as float32, rgb as uint32
self.buffer.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i]))
class LidarTransformPublisher(Node):
def __init__(self, lidar1_buffer, lidar1_frame, lidar2_buffer, lidar2_frame):
super().__init__('static_transform_lidar_offsets')
# Static TF broadcaster
self.br = StaticTransformBroadcaster(self)
self.lidar1_buffer = lidar1_buffer
self.lidar2_buffer = lidar2_buffer
self.lidar1_frame = lidar1_frame
self.lidar2_frame = lidar2_frame
def publish(self):
self.pcd_1 = self.pcd_buffer_to_o3d(self.lidar1_buffer)
self.pcd_2 = self.pcd_buffer_to_o3d(self.lidar2_buffer)
# Compute transform once
self.T = self.compute_transform()
self.get_logger().info(f"Computed initial transform:\n{self.T}")
# Prepare translation and rotation
T_copy = np.array(self.T, copy=True)
trans = T_copy[:3, 3]
rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w]
# Create static TransformStamped
t = TransformStamped()
t.header.stamp.sec = 0
t.header.stamp.nanosec = 0
t.header.frame_id = self.lidar1_frame
t.child_frame_id = self.lidar2_frame
t.transform.translation.x = trans[0]
t.transform.translation.y = trans[1]
t.transform.translation.z = trans[2]
t.transform.rotation.x = rot_quat[0]
t.transform.rotation.y = rot_quat[1]
t.transform.rotation.z = rot_quat[2]
t.transform.rotation.w = rot_quat[3]
# Publish once
self.br.sendTransform(t)
self.get_logger().info("Published static transform.")
def pcd_buffer_to_o3d(self, buffer: BytesIO):
buffer.seek(0)
header_lines = []
# Read header lines until 'DATA' line
while True:
line = buffer.readline().decode('ascii').strip()
if not line:
continue
header_lines.append(line)
if line.startswith("DATA"):
data_line = line
break
# Ensure binary format
if not data_line.lower().startswith("data binary"):
raise NotImplementedError("Only binary PCD supported for buffer parsing")
# Parse number of points
num_points = 0
for line in header_lines:
if line.startswith("POINTS"):
num_points = int(line.split()[1])
if line.startswith("FIELDS"):
fields = line.split()[1:] # Expect: x y z intensity rgb
if num_points == 0:
raise ValueError("PCD header does not specify number of points")
# Define numpy dtype based on expected fields
dtype = np.dtype([
('x', 'f4'),
('y', 'f4'),
('z', 'f4'),
('intensity', 'f4'),
('rgb', 'u4')
])
# Read binary data
data = buffer.read(num_points * dtype.itemsize)
points_array = np.frombuffer(data, dtype=dtype, count=num_points)
# Convert to Open3D point cloud
pcd = o3d.geometry.PointCloud()
xyz = np.stack([points_array['x'], points_array['y'], points_array['z']], axis=-1)
pcd.points = o3d.utility.Vector3dVector(xyz)
return pcd
def compute_transform(self):
# Downsample
voxel_size = 0.05
pcd_1_ds = self.pcd_1.voxel_down_sample(voxel_size)
pcd_2_ds = self.pcd_2.voxel_down_sample(voxel_size)
# Estimate normals
pcd_1_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
pcd_2_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
# ICP registration
threshold = 0.5
reg_result = o3d.pipelines.registration.registration_icp(
pcd_2_ds, pcd_1_ds, threshold,
np.eye(4),
o3d.pipelines.registration.TransformationEstimationPointToPoint()
)
return reg_result.transformation
def monitor_nodes(nodes, publisher):
"""Separate thread that monitors node status and shuts down ROS when done."""
while rclpy.ok():
if all(node.finished for node in nodes):
print("All pointclouds captured")
publisher.publish()
return
time.sleep(0.1) # check periodically
def main():
rclpy.init()
buffer_velodyne = BytesIO()
buffer_livox = BytesIO()
executor = MultiThreadedExecutor()
nodes = [
PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', buffer_velodyne, 200),
PointCloudSaver('livox_pcd_saver', '/livox/lidar', buffer_livox, 500),
]
publisher = LidarTransformPublisher(buffer_velodyne, 'velodyne', buffer_livox, 'frame_default')
monitor_thread = threading.Thread(target=monitor_nodes, args=(nodes,publisher), daemon=True)
monitor_thread.start()
for node in nodes:
executor.add_node(node)
try:
executor.spin()
finally:
monitor_thread.join()
rclpy.shutdown()
if __name__ == "__main__":
main()