dual lidar calibration

This commit is contained in:
2025-08-16 12:03:09 +02:00
parent c7c6164ddf
commit 1d262e5e19
2 changed files with 130 additions and 7 deletions

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from tf2_ros import StaticTransformBroadcaster
from geometry_msgs.msg import TransformStamped
import numpy as np
import open3d as o3d
from scipy.spatial.transform import Rotation as R
class StaticTransformPublisher(Node):
def __init__(self):
super().__init__('static_transform_from_pointclouds')
# Static TF broadcaster
self.br = StaticTransformBroadcaster(self)
# Pointcloud files
self.velodyne_file = "/root/velodyne.pcd"
self.livox_file = "/root/livox.pcd"
# Frames
self.livox_frame = "frame_default"
self.velodyne_frame = "velodyne"
# Compute transform once
self.T = self.compute_transform()
self.get_logger().info(f"Computed initial transform:\n{self.T}")
# Prepare translation and rotation
T_copy = np.array(self.T, copy=True)
trans = T_copy[:3, 3]
rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w]
# Create static TransformStamped
t = TransformStamped()
t.header.stamp.sec = 0
t.header.stamp.nanosec = 0
t.header.frame_id = self.velodyne_frame
t.child_frame_id = self.livox_frame
t.transform.translation.x = trans[0]
t.transform.translation.y = trans[1]
t.transform.translation.z = trans[2]
t.transform.rotation.x = rot_quat[0]
t.transform.rotation.y = rot_quat[1]
t.transform.rotation.z = rot_quat[2]
t.transform.rotation.w = rot_quat[3]
# Publish once
self.br.sendTransform(t)
self.get_logger().info("Published static transform.")
def compute_transform(self):
# Load point clouds
pcd_vel = o3d.io.read_point_cloud(self.velodyne_file)
pcd_liv = o3d.io.read_point_cloud(self.livox_file)
# Downsample
voxel_size = 0.05
pcd_vel_ds = pcd_vel.voxel_down_sample(voxel_size)
pcd_liv_ds = pcd_liv.voxel_down_sample(voxel_size)
# Estimate normals
pcd_vel_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
pcd_liv_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
# ICP registration
threshold = 0.5
reg_result = o3d.pipelines.registration.registration_icp(
pcd_liv_ds, pcd_vel_ds, threshold,
np.eye(4),
o3d.pipelines.registration.TransformationEstimationPointToPoint()
)
return reg_result.transformation
def main(args=None):
rclpy.init(args=args)
node = StaticTransformPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -1,11 +1,14 @@
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from sensor_msgs.msg import PointCloud2 from sensor_msgs.msg import PointCloud2
import sensor_msgs_py.point_cloud2 as pc2 import sensor_msgs_py.point_cloud2 as pc2
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import struct import struct
from io import BytesIO from io import BytesIO
import threading
import time
class PointCloudSaver(Node): class PointCloudSaver(Node):
def __init__(self, node_name: str, pointcloud_topic: str, buffer, timeout_ms: int): def __init__(self, node_name: str, pointcloud_topic: str, buffer, timeout_ms: int):
@@ -17,6 +20,7 @@ class PointCloudSaver(Node):
10 10
) )
self.buffer = buffer self.buffer = buffer
self.finished = False
self.points = [] self.points = []
self.end_time = self.get_clock().now().nanoseconds + (timeout_ms * 1_000_000) self.end_time = self.get_clock().now().nanoseconds + (timeout_ms * 1_000_000)
self.cmap = plt.get_cmap('jet') self.cmap = plt.get_cmap('jet')
@@ -29,7 +33,8 @@ class PointCloudSaver(Node):
if now > self.end_time: if now > self.end_time:
if not self.points: if not self.points:
self.get_logger().warn("No points received!") self.get_logger().warn("No points received!")
rclpy.shutdown() self.destroy_node()
self.finished = True
return return
np_points = np.array(self.points, dtype=np.float32) np_points = np.array(self.points, dtype=np.float32)
@@ -45,8 +50,9 @@ class PointCloudSaver(Node):
filename = "pointcloud.pcd" filename = "pointcloud.pcd"
self.write_pcd_with_intensity_rgb(filename, np_points, rgb_int) self.write_pcd_with_intensity_rgb(filename, np_points, rgb_int)
self.get_logger().info(f"Saved {filename} with intensity (grayscale) and colored RGB") self.get_logger().info(f"Saved {filename}")
rclpy.shutdown() self.destroy_node()
self.finished = True
def write_pcd_with_intensity_rgb(self, filename, points, rgb_int): def write_pcd_with_intensity_rgb(self, filename, points, rgb_int):
header = f"""# .PCD v0.7 - Point Cloud Data file format header = f"""# .PCD v0.7 - Point Cloud Data file format
@@ -64,13 +70,43 @@ DATA binary
self.buffer.write(header.encode('ascii')) self.buffer.write(header.encode('ascii'))
for i in range(points.shape[0]): for i in range(points.shape[0]):
# x, y, z, intensity as float32, rgb as uint32 # x, y, z, intensity as float32, rgb as uint32
f.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i])) self.buffer.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i]))
def monitor_nodes(nodes):
"""Separate thread that monitors node status and shuts down ROS when done."""
while rclpy.ok():
if all(node.finished for node in nodes):
print("All nodes finished. Shutting down ROS.")
rclpy.shutdown()
break
time.sleep(0.1) # check periodically
def main(): def main():
rclpy.init() rclpy.init()
with open('/root/velodyne.pcd', "w+") as f: file_velodyne = open('/root/velodyne.pcd', "wb+")
node = PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', f, 5000) file_livox = open('/root/livox.pcd', "wb+")
rclpy.spin(node)
executor = MultiThreadedExecutor()
nodes = [
PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', file_velodyne, 5000),
PointCloudSaver('livox_pcd_saver', '/livox/lidar', file_livox, 5000),
]
monitor_thread = threading.Thread(target=monitor_nodes, args=(nodes,), daemon=True)
monitor_thread.start()
for node in nodes:
executor.add_node(node)
try:
executor.spin()
finally:
monitor_thread.join()
print("Executor and monitor thread exited cleanly.")
file_velodyne.close()
file_livox.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()