more robist calibration

This commit is contained in:
2025-08-16 19:45:27 +02:00
parent 46404e2b53
commit 77fc1bb186

View File

@@ -73,20 +73,15 @@ DATA binary
""" """
self.buffer.write(header.encode('ascii')) self.buffer.write(header.encode('ascii'))
for i in range(points.shape[0]): for i in range(points.shape[0]):
# x, y, z, intensity as float32, rgb as uint32
self.buffer.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i])) self.buffer.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i]))
class LidarTransformPublisher(Node): class LidarTransformPublisher(Node):
def __init__(self, lidar1_buffer, lidar1_frame, lidar2_buffer, lidar2_frame): def __init__(self, lidar1_buffer, lidar1_frame, lidar2_buffer, lidar2_frame):
super().__init__('static_transform_lidar_offsets') super().__init__('static_transform_lidar_offsets')
# Static TF broadcaster
self.br = StaticTransformBroadcaster(self) self.br = StaticTransformBroadcaster(self)
self.lidar1_buffer = lidar1_buffer self.lidar1_buffer = lidar1_buffer
self.lidar2_buffer = lidar2_buffer self.lidar2_buffer = lidar2_buffer
self.lidar1_frame = lidar1_frame self.lidar1_frame = lidar1_frame
self.lidar2_frame = lidar2_frame self.lidar2_frame = lidar2_frame
@@ -94,16 +89,13 @@ class LidarTransformPublisher(Node):
self.pcd_1 = self.pcd_buffer_to_o3d(self.lidar1_buffer) self.pcd_1 = self.pcd_buffer_to_o3d(self.lidar1_buffer)
self.pcd_2 = self.pcd_buffer_to_o3d(self.lidar2_buffer) self.pcd_2 = self.pcd_buffer_to_o3d(self.lidar2_buffer)
# Compute transform once
self.T = self.compute_transform() self.T = self.compute_transform()
self.get_logger().info(f"Computed initial transform:\n{self.T}") self.get_logger().info(f"Computed initial transform:\n{self.T}")
# Prepare translation and rotation
T_copy = np.array(self.T, copy=True) T_copy = np.array(self.T, copy=True)
trans = T_copy[:3, 3] trans = T_copy[:3, 3]
rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat() # [x, y, z, w] rot_quat = R.from_matrix(T_copy[:3, :3]).as_quat()
# Create static TransformStamped
t = TransformStamped() t = TransformStamped()
t.header.stamp.sec = 0 t.header.stamp.sec = 0
t.header.stamp.nanosec = 0 t.header.stamp.nanosec = 0
@@ -117,15 +109,12 @@ class LidarTransformPublisher(Node):
t.transform.rotation.z = rot_quat[2] t.transform.rotation.z = rot_quat[2]
t.transform.rotation.w = rot_quat[3] t.transform.rotation.w = rot_quat[3]
# Publish once
self.br.sendTransform(t) self.br.sendTransform(t)
self.get_logger().info("Published static transform.") self.get_logger().info("Published static transform.")
def pcd_buffer_to_o3d(self, buffer: BytesIO): def pcd_buffer_to_o3d(self, buffer: BytesIO):
buffer.seek(0) buffer.seek(0)
header_lines = [] header_lines = []
# Read header lines until 'DATA' line
while True: while True:
line = buffer.readline().decode('ascii').strip() line = buffer.readline().decode('ascii').strip()
if not line: if not line:
@@ -135,69 +124,84 @@ class LidarTransformPublisher(Node):
data_line = line data_line = line
break break
# Ensure binary format
if not data_line.lower().startswith("data binary"): if not data_line.lower().startswith("data binary"):
raise NotImplementedError("Only binary PCD supported for buffer parsing") raise NotImplementedError("Only binary PCD supported")
# Parse number of points
num_points = 0 num_points = 0
for line in header_lines: for line in header_lines:
if line.startswith("POINTS"): if line.startswith("POINTS"):
num_points = int(line.split()[1]) num_points = int(line.split()[1])
if line.startswith("FIELDS"):
fields = line.split()[1:] # Expect: x y z intensity rgb
if num_points == 0: if num_points == 0:
raise ValueError("PCD header does not specify number of points") raise ValueError("PCD header missing point count")
# Define numpy dtype based on expected fields
dtype = np.dtype([ dtype = np.dtype([
('x', 'f4'), ('x', 'f4'), ('y', 'f4'), ('z', 'f4'),
('y', 'f4'), ('intensity', 'f4'), ('rgb', 'u4')
('z', 'f4'),
('intensity', 'f4'),
('rgb', 'u4')
]) ])
# Read binary data
data = buffer.read(num_points * dtype.itemsize) data = buffer.read(num_points * dtype.itemsize)
points_array = np.frombuffer(data, dtype=dtype, count=num_points) points_array = np.frombuffer(data, dtype=dtype, count=num_points)
# Convert to Open3D point cloud
pcd = o3d.geometry.PointCloud() pcd = o3d.geometry.PointCloud()
xyz = np.stack([points_array['x'], points_array['y'], points_array['z']], axis=-1) xyz = np.stack([points_array['x'], points_array['y'], points_array['z']], axis=-1)
pcd.points = o3d.utility.Vector3dVector(xyz) pcd.points = o3d.utility.Vector3dVector(xyz)
return pcd return pcd
def compute_transform(self): def compute_transform(self):
# Downsample voxel_size = 0.2 # coarse-to-fine pyramid base
voxel_size = 0.05
pcd_1_ds = self.pcd_1.voxel_down_sample(voxel_size)
pcd_2_ds = self.pcd_2.voxel_down_sample(voxel_size)
# Estimate normals # --- Feature extraction ---
pcd_1_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20)) def preprocess(pcd, voxel):
pcd_2_ds.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=20)) pcd_down = pcd.voxel_down_sample(voxel)
pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel*2, max_nn=30))
fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel*5, max_nn=100))
return pcd_down, fpfh
# ICP registration src_down, src_fpfh = preprocess(self.pcd_2, voxel_size)
threshold = 0.5 tgt_down, tgt_fpfh = preprocess(self.pcd_1, voxel_size)
reg_result = o3d.pipelines.registration.registration_icp(
pcd_2_ds, pcd_1_ds, threshold, # --- Global alignment with RANSAC ---
np.eye(4), result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
o3d.pipelines.registration.TransformationEstimationPointToPoint() src_down, tgt_down, src_fpfh, tgt_fpfh, True,
1.5, # distance threshold
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
4,
[
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(1.5)
],
o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
) )
return reg_result.transformation
trans_init = result_ransac.transformation
# --- Multi-scale ICP refinement ---
voxel_radii = [0.2, 0.1, 0.05]
max_iters = [50, 30, 14]
transformation = trans_init
for radius, iters in zip(voxel_radii, max_iters):
src_down = self.pcd_2.voxel_down_sample(radius)
tgt_down = self.pcd_1.voxel_down_sample(radius)
src_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius*2, max_nn=30))
tgt_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius*2, max_nn=30))
result_icp = o3d.pipelines.registration.registration_icp(
src_down, tgt_down, radius*2, transformation,
o3d.pipelines.registration.TransformationEstimationPointToPlane()
)
transformation = result_icp.transformation
return transformation
def monitor_nodes(nodes, publisher): def monitor_nodes(nodes, publisher):
"""Separate thread that monitors node status and shuts down ROS when done."""
while rclpy.ok(): while rclpy.ok():
if all(node.finished for node in nodes): if all(node.finished for node in nodes):
print("All pointclouds captured") print("All pointclouds captured")
publisher.publish() publisher.publish()
return return
time.sleep(0.1) # check periodically time.sleep(0.1)
def main(): def main():
rclpy.init() rclpy.init()
@@ -205,10 +209,9 @@ def main():
buffer_livox = BytesIO() buffer_livox = BytesIO()
executor = MultiThreadedExecutor() executor = MultiThreadedExecutor()
nodes = [ nodes = [
PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', buffer_velodyne, 200), PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', buffer_velodyne, 350),
PointCloudSaver('livox_pcd_saver', '/livox/lidar', buffer_livox, 500), PointCloudSaver('livox_pcd_saver', '/livox/lidar', buffer_livox, 1000),
] ]
publisher = LidarTransformPublisher(buffer_velodyne, 'velodyne', buffer_livox, 'frame_default') publisher = LidarTransformPublisher(buffer_velodyne, 'velodyne', buffer_livox, 'frame_default')
@@ -226,4 +229,3 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()