Files
sherpa_ros2_ws/scripts/publish_lidar_offset.py
2025-08-16 12:03:09 +02:00

88 lines
2.8 KiB
Python

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from tf2_ros import StaticTransformBroadcaster
from geometry_msgs.msg import TransformStamped
import numpy as np
import open3d as o3d
from scipy.spatial.transform import Rotation as R
class StaticTransformPublisher(Node):
def __init__(self):
super().__init__('static_transform_from_pointclouds')
# Static TF broadcaster
self.br = StaticTransformBroadcaster(self)
# Pointcloud files
self.velodyne_file = "/root/velodyne.pcd"
self.livox_file = "/root/livox.pcd"
# Frames
self.livox_frame = "frame_default"
self.velodyne_frame = "velodyne"
# Compute transform once
self.T = self.compute_transform()
self.get_logger().info(f"Computed initial transform:\n{self.T}")
# Prepare translation and rotation
T_copy = np.array(self.T, copy=True)
trans = T_copy[:3, 3]
rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w]
# Create static TransformStamped
t = TransformStamped()
t.header.stamp.sec = 0
t.header.stamp.nanosec = 0
t.header.frame_id = self.velodyne_frame
t.child_frame_id = self.livox_frame
t.transform.translation.x = trans[0]
t.transform.translation.y = trans[1]
t.transform.translation.z = trans[2]
t.transform.rotation.x = rot_quat[0]
t.transform.rotation.y = rot_quat[1]
t.transform.rotation.z = rot_quat[2]
t.transform.rotation.w = rot_quat[3]
# Publish once
self.br.sendTransform(t)
self.get_logger().info("Published static transform.")
def compute_transform(self):
# Load point clouds
pcd_vel = o3d.io.read_point_cloud(self.velodyne_file)
pcd_liv = o3d.io.read_point_cloud(self.livox_file)
# Downsample
voxel_size = 0.05
pcd_vel_ds = pcd_vel.voxel_down_sample(voxel_size)
pcd_liv_ds = pcd_liv.voxel_down_sample(voxel_size)
# Estimate normals
pcd_vel_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
pcd_liv_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
# ICP registration
threshold = 0.5
reg_result = o3d.pipelines.registration.registration_icp(
pcd_liv_ds, pcd_vel_ds, threshold,
np.eye(4),
o3d.pipelines.registration.TransformationEstimationPointToPoint()
)
return reg_result.transformation
def main(args=None):
rclpy.init(args=args)
node = StaticTransformPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()