该代码用于分析和处理点云数据,通过对点云数据进行裁剪、平面拟合和生成包围框来提取特定区域的特征并发布结果。主要使用了RANSAC算法来识别并拟合平面,从而提取平面的法向量,接着根据该平面计算出该区域的最小矩形包围框(Bounding Box),并得到包围框的中心点以及相对于Y轴的倾角。最终,程序将处理后的点云和包围框通过ROS节点发布出来,用于后续的处理或可视化。
1.导入必要的库
import rospy
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs import point_cloud2
from std_msgs.msg import Header
from sklearn.linear_model import RANSACRegressor
from visualization_msgs.msg import Marker
from geometry_msgs.msg import Point
import math
2. 设置XYZ范围限制和RANSAC误差阈值
X_MIN, X_MAX = 0.5, 3
Y_MIN, Y_MAX = 0, 2
Z_MIN, Z_MAX = -0.5, 0.2
DISTANCE_THRESHOLD = 0.05 # RANSAC 拟合的误差阈值
3. 裁剪点云,限制XYZ坐标范围
def filter_xyz_range(points):"""裁剪点云,将点限制在指定 XYZ 范围内"""return points[(points[:, 0] >= X_MIN) & (points[:, 0] <= X_MAX) &(points[:, 1] >= Y_MIN) & (points[:, 1] <= Y_MAX) &(points[:, 2] >= Z_MIN) & (points[:, 2] <= Z_MAX)]
def fit_plane_ransac(points):"""使用 RANSAC 算法拟合平面,返回平面的法向量和拟合的平面点"""if len(points) < 3:return None, NoneX = points[:, :2] # 使用 x, y 特征y = points[:, 2] # Z 坐标作为目标值ransac = RANSACRegressor(residual_threshold=DISTANCE_THRESHOLD)ransac.fit(X, y)inlier_mask = ransac.inlier_mask_plane_points = points[inlier_mask] # 选取平面内的点normal_vector = np.array([ransac.estimator_.coef_[0], ransac.estimator_.coef_[1], -1]) # 计算法向量return normal_vector, plane_points
5. 算包围框的8个顶点坐标及中心点坐标
def compute_bounding_box(points):"""根据点云计算最小矩形包围框的8个顶点坐标"""min_vals = np.min(points, axis=0)max_vals = np.max(points, axis=0)# 计算矩形框8个顶点坐标p1 = [min_vals[0], min_vals[1], min_vals[2]]p2 = [min_vals[0], min_vals[1], max_vals[2]]p3 = [min_vals[0], max_vals[1], min_vals[2]]p4 = [min_vals[0], max_vals[1], max_vals[2]]p5 = [max_vals[0], min_vals[1], min_vals[2]]p6 = [max_vals[0], min_vals[1], max_vals[2]]p7 = [max_vals[0], max_vals[1], min_vals[2]]p8 = [max_vals[0], max_vals[1], max_vals[2]]return np.array([p1, p2, p3, p4, p5, p6, p7, p8])def compute_bounding_box_center(bbox_points):"""计算包围框的中心坐标"""center = np.mean(bbox_points, axis=0)return center
6. 计算平面法向量与Y轴的倾斜角
def compute_pitch_angle(normal_vector):"""计算法向量和Y轴的倾斜角(以度为单位)"""y_axis = np.array([0, 1, 0])cos_angle = np.dot(normal_vector, y_axis) / (np.linalg.norm(normal_vector) * np.linalg.norm(y_axis))cos_angle = np.clip(cos_angle, -1.0, 1.0)angle = math.acos(cos_angle) # 计算夹角(弧度)return math.degrees(angle) # 转换为角度
7. 创建包围框的Marker消息(可视化)
def create_bounding_box_marker(bbox_points, frame_id):"""创建一个长方体包围框的 Marker 消息,用于可视化"""marker = Marker()marker.header.frame_id = frame_idmarker.header.stamp = rospy.Time.now()marker.type = Marker.LINE_LIST # 使用线框模型marker.action = Marker.ADDmarker.scale.x = 0.01 # 线的粗细marker.color.r = 1.0marker.color.g = 0.0marker.color.b = 0.0marker.color.a = 1.0# 定义包围框的边界线edges = [[0, 1], [1, 3], [3, 2], [2, 0], # 底面四条边[4, 5], [5, 7], [7, 6], [6, 4], # 顶面四条边[0, 4], [1, 5], [2, 6], [3, 7] # 连接底面和顶面的四条边]# 构建包围框的线条for edge in edges:p1 = Point(*bbox_points[edge[0]])p2 = Point(*bbox_points[edge[1]])marker.points.append(p1)marker.points.append(p2)return marker
8.发布点云话题:
def publish_point_cloud(points, frame_id, topic):"""发布点云消息"""if len(points) == 0:rospy.logwarn("点云为空,未发布消息。")returnheader = Header()header.stamp = rospy.Time.now()header.frame_id = frame_idcloud_msg = point_cloud2.create_cloud_xyz32(header, points)rospy.Publisher(topic, PointCloud2, queue_size=1).publish(cloud_msg)
9.回调函数及主函数
def point_cloud_callback(msg):points = np.array([p[:3] for p in point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)])points_in_range = filter_xyz_range(points)if points_in_range.size == 0:rospy.logwarn("裁剪后没有点云数据。")returnnormal1, plane_points1 = fit_plane_ransac(points_in_range)normal2, plane_points2 = fit_plane_ransac(points_in_range)if normal1 is None or normal2 is None:rospy.logwarn("平面拟合失败,无法生成包围框。")returnbounding_box_points = compute_bounding_box(points_in_range)bbox_center = compute_bounding_box_center(bounding_box_points)angle = compute_pitch_angle(normal1)# 输出三维矩形框的中心坐标rospy.loginfo(f"Bounding Box Center: {bbox_center}")rospy.loginfo(f"Angle with Y axis: {angle:.2f} degrees")bbox_marker = create_bounding_box_marker(bounding_box_points, msg.header.frame_id)publish_point_cloud(points_in_range, msg.header.frame_id, "/filtered_points")bbox_pub.publish(bbox_marker)def main():rospy.init_node("bounding_box_fitter")# 初始化发布器global bbox_pubbbox_pub = rospy.Publisher("/bounding_box", Marker, queue_size=1)# 订阅点云话题rospy.Subscriber("/merged_point_cloud", PointCloud2, point_cloud_callback)rospy.spin()
10.完整的代码如下:
import rospy
import numpy as np
from sensor_msgs.msg import PointCloud2
from sensor_msgs import point_cloud2
from std_msgs.msg import Header
from sklearn.linear_model import RANSACRegressor
from visualization_msgs.msg import Marker
from geometry_msgs.msg import Point
import math# 设置 XYZ 范围限制
X_MIN, X_MAX = 0.5, 3
Y_MIN, Y_MAX = 0, 2
Z_MIN, Z_MAX = -0.5, 0.2
DISTANCE_THRESHOLD = 0.05 # RANSAC 拟合的误差阈值def filter_xyz_range(points):"""裁剪点云,将点限制在指定 XYZ 范围内"""return points[(points[:, 0] >= X_MIN) & (points[:, 0] <= X_MAX) &(points[:, 1] >= Y_MIN) & (points[:, 1] <= Y_MAX) &(points[:, 2] >= Z_MIN) & (points[:, 2] <= Z_MAX)]def fit_plane_ransac(points):"""使用 RANSAC 算法拟合平面,返回平面的法向量和拟合的平面点"""if len(points) < 3:return None, NoneX = points[:, :2] # 使用 x, y 特征y = points[:, 2] # Z 坐标作为目标值ransac = RANSACRegressor(residual_threshold=DISTANCE_THRESHOLD)ransac.fit(X, y)inlier_mask = ransac.inlier_mask_plane_points = points[inlier_mask] # 选取平面内的点normal_vector = np.array([ransac.estimator_.coef_[0], ransac.estimator_.coef_[1], -1]) # 计算法向量return normal_vector, plane_pointsdef compute_bounding_box(points):"""根据点云计算最小矩形包围框的8个顶点坐标"""min_vals = np.min(points, axis=0)max_vals = np.max(points, axis=0)# 计算矩形框8个顶点坐标p1 = [min_vals[0], min_vals[1], min_vals[2]]p2 = [min_vals[0], min_vals[1], max_vals[2]]p3 = [min_vals[0], max_vals[1], min_vals[2]]p4 = [min_vals[0], max_vals[1], max_vals[2]]p5 = [max_vals[0], min_vals[1], min_vals[2]]p6 = [max_vals[0], min_vals[1], max_vals[2]]p7 = [max_vals[0], max_vals[1], min_vals[2]]p8 = [max_vals[0], max_vals[1], max_vals[2]]return np.array([p1, p2, p3, p4, p5, p6, p7, p8])def compute_bounding_box_center(bbox_points):"""计算包围框的中心坐标"""center = np.mean(bbox_points, axis=0)return centerdef compute_pitch_angle(normal_vector):"""计算法向量和Y轴的倾斜角(以度为单位)"""y_axis = np.array([0, 1, 0])cos_angle = np.dot(normal_vector, y_axis) / (np.linalg.norm(normal_vector) * np.linalg.norm(y_axis))cos_angle = np.clip(cos_angle, -1.0, 1.0)angle = math.acos(cos_angle) # 计算夹角(弧度)return math.degrees(angle) # 转换为角度def create_bounding_box_marker(bbox_points, frame_id):"""创建一个长方体包围框的 Marker 消息,用于可视化"""marker = Marker()marker.header.frame_id = frame_idmarker.header.stamp = rospy.Time.now()marker.type = Marker.LINE_LIST # 使用线框模型marker.action = Marker.ADDmarker.scale.x = 0.01 # 线的粗细marker.color.r = 1.0marker.color.g = 0.0marker.color.b = 0.0marker.color.a = 1.0# 定义包围框的边界线edges = [[0, 1], [1, 3], [3, 2], [2, 0], # 底面四条边[4, 5], [5, 7], [7, 6], [6, 4], # 顶面四条边[0, 4], [1, 5], [2, 6], [3, 7] # 连接底面和顶面的四条边]for edge in edges:p1 = Point(*bbox_points[edge[0]])p2 = Point(*bbox_points[edge[1]])marker.points.append(p1)marker.points.append(p2)return markerdef publish_point_cloud(points, frame_id, topic):"""发布裁剪后的点云消息"""if len(points) == 0:rospy.logwarn("点云为空,未发布消息。")returnheader = Header()header.stamp = rospy.Time.now()header.frame_id = frame_idcloud_msg = point_cloud2.create_cloud_xyz32(header, points)rospy.Publisher(topic, PointCloud2, queue_size=1).publish(cloud_msg)def point_cloud_callback(msg):"""接收并处理点云数据,进行裁剪、平面拟合、包围框计算并发布"""points = np.array([p[:3] for p in point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)])points_in_range = filter_xyz_range(points)if points_in_range.size == 0:rospy.logwarn("裁剪后没有点云数据。")returnnormal1, plane_points1 = fit_plane_ransac(points_in_range)normal2, plane_points2 = fit_plane_ransac(points_in_range)if normal1 is None or normal2 is None:rospy.logwarn("平面拟合失败,无法生成包围框。")returnbounding_box_points = compute_bounding_box(points_in_range)bbox_center = compute_bounding_box_center(bounding_box_points)angle = compute_pitch_angle(normal1)# 输出三维矩形框的中心坐标rospy.loginfo(f"Bounding Box Center: {bbox_center}")rospy.loginfo(f"Angle with Y axis: {angle:.2f} degrees")bbox_marker = create_bounding_box_marker(bounding_box_points, msg.header.frame_id)publish_point_cloud(points_in_range, msg.header.frame_id, "/filtered_points")bbox_pub.publish(bbox_marker)def main():"""初始化 ROS 节点、发布器和订阅器"""rospy.init_node("bounding_box_fitter")# 初始化发布器global bbox_pubbbox_pub = rospy.Publisher("/bounding_box", Marker, queue_size=1)# 订阅点云话题rospy.Subscriber("/merged_point_cloud", PointCloud2, point_cloud_callback)rospy.spin()if __name__ == "__main__": main()
11.运行程序以及自己的bag包,别忘了更改相应的话题 ,结果如下: