88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from tf2_ros import StaticTransformBroadcaster
|
|
from geometry_msgs.msg import TransformStamped
|
|
import numpy as np
|
|
import open3d as o3d
|
|
from scipy.spatial.transform import Rotation as R
|
|
|
|
class StaticTransformPublisher(Node):
|
|
def __init__(self):
|
|
super().__init__('static_transform_from_pointclouds')
|
|
|
|
# Static TF broadcaster
|
|
self.br = StaticTransformBroadcaster(self)
|
|
|
|
# Pointcloud files
|
|
self.velodyne_file = "/root/velodyne.pcd"
|
|
self.livox_file = "/root/livox.pcd"
|
|
|
|
# Frames
|
|
self.livox_frame = "frame_default"
|
|
self.velodyne_frame = "velodyne"
|
|
|
|
# Compute transform once
|
|
self.T = self.compute_transform()
|
|
self.get_logger().info(f"Computed initial transform:\n{self.T}")
|
|
|
|
# Prepare translation and rotation
|
|
T_copy = np.array(self.T, copy=True)
|
|
trans = T_copy[:3, 3]
|
|
rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w]
|
|
|
|
# Create static TransformStamped
|
|
t = TransformStamped()
|
|
t.header.stamp.sec = 0
|
|
t.header.stamp.nanosec = 0
|
|
t.header.frame_id = self.velodyne_frame
|
|
t.child_frame_id = self.livox_frame
|
|
t.transform.translation.x = trans[0]
|
|
t.transform.translation.y = trans[1]
|
|
t.transform.translation.z = trans[2]
|
|
t.transform.rotation.x = rot_quat[0]
|
|
t.transform.rotation.y = rot_quat[1]
|
|
t.transform.rotation.z = rot_quat[2]
|
|
t.transform.rotation.w = rot_quat[3]
|
|
|
|
# Publish once
|
|
self.br.sendTransform(t)
|
|
self.get_logger().info("Published static transform.")
|
|
|
|
def compute_transform(self):
|
|
# Load point clouds
|
|
pcd_vel = o3d.io.read_point_cloud(self.velodyne_file)
|
|
pcd_liv = o3d.io.read_point_cloud(self.livox_file)
|
|
|
|
# Downsample
|
|
voxel_size = 0.05
|
|
pcd_vel_ds = pcd_vel.voxel_down_sample(voxel_size)
|
|
pcd_liv_ds = pcd_liv.voxel_down_sample(voxel_size)
|
|
|
|
# Estimate normals
|
|
pcd_vel_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
|
|
pcd_liv_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20))
|
|
|
|
# ICP registration
|
|
threshold = 0.5
|
|
reg_result = o3d.pipelines.registration.registration_icp(
|
|
pcd_liv_ds, pcd_vel_ds, threshold,
|
|
np.eye(4),
|
|
o3d.pipelines.registration.TransformationEstimationPointToPoint()
|
|
)
|
|
return reg_result.transformation
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
node = StaticTransformPublisher()
|
|
try:
|
|
rclpy.spin(node)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|