import rclpy from rclpy.node import Node from sensor_msgs.msg import PointCloud2 import sensor_msgs_py.point_cloud2 as pc2 import numpy as np import matplotlib.pyplot as plt import struct from io import BytesIO class PointCloudSaver(Node): def __init__(self, node_name: str, pointcloud_topic: str, buffer, timeout_ms: int): super().__init__(node_name) self.subscription = self.create_subscription( PointCloud2, pointcloud_topic, self.callback, 10 ) self.buffer = buffer self.points = [] self.end_time = self.get_clock().now().nanoseconds + (timeout_ms * 1_000_000) self.cmap = plt.get_cmap('jet') def callback(self, msg): now = self.get_clock().now().nanoseconds for p in pc2.read_points(msg, field_names=("x", "y", "z", "intensity"), skip_nans=True): self.points.append([p[0], p[1], p[2], p[3]]) if now > self.end_time: if not self.points: self.get_logger().warn("No points received!") rclpy.shutdown() return np_points = np.array(self.points, dtype=np.float32) intensities = np_points[:, 3] norm_int = (intensities - intensities.min()) / (intensities.ptp() + 1e-8) # Map normalized intensity to RGB colormap colors = self.cmap(norm_int)[:, :3] # RGB 0-1 colors = (colors * 255).astype(np.uint8) rgb_int = np.left_shift(colors[:,0].astype(np.uint32), 16) | \ np.left_shift(colors[:,1].astype(np.uint32), 8) | \ colors[:,2].astype(np.uint32) filename = "pointcloud.pcd" self.write_pcd_with_intensity_rgb(filename, np_points, rgb_int) self.get_logger().info(f"Saved {filename} with intensity (grayscale) and colored RGB") rclpy.shutdown() def write_pcd_with_intensity_rgb(self, filename, points, rgb_int): header = f"""# .PCD v0.7 - Point Cloud Data file format VERSION 0.7 FIELDS x y z intensity rgb SIZE 4 4 4 4 4 TYPE F F F F U COUNT 1 1 1 1 1 WIDTH {points.shape[0]} HEIGHT 1 VIEWPOINT 0 0 0 1 0 0 0 POINTS {points.shape[0]} DATA binary """ self.buffer.write(header.encode('ascii')) for i in range(points.shape[0]): # x, y, z, intensity as float32, rgb as uint32 f.write(struct.pack('ffffI', points[i,0], points[i,1], points[i,2], points[i,3], rgb_int[i])) def main(): rclpy.init() with open('/root/velodyne.pcd', "w+") as f: node = PointCloudSaver('velodyne_pcd_saver', '/velodyne_points', f, 5000) rclpy.spin(node) if __name__ == "__main__": main()