Dear Support Team/Community Members,
I’m working on generating geo-referenced PLY point clouds using the ZED SDK (Python) with VIO-GNSS fusion from an SVO file. My goal is a PLY file with global coordinates (ECEF or Lat/Lon/Alt).
I’ve reviewed these community threads:
- Using Zed SDK to generate GeoReferenced Point Cloud
- Advanced navigation Help with Generating Point Cloud Data with Real-World Coordinates Using ZED 2i and GNSS - #7 by TanguyHardelin
Current Progress:
- Successfully generating PLY files with local VIO
WORLD
coordinates frompymesh
. - VIO-GNSS fusion is set up, and GNSS data (from a
GNSS_json
stream in the SVO) is being ingested. - After processing, I’m able to obtain the transformation from the VIO
WORLD
origin to ECEF (T_world_to_ecef
) usingfusion.get_geo_pose(geo_pose_of_world_origin, sl.Transform())
.
Request: Could you please review the Python script given below? I’m specifically looking for guidance on:
- The correctness of my approach for transforming all point cloud vertices.
- Ensuring the method for saving the transformed vertices into a geo-referenced PLY is sound.
- Any suggestions for modifications to implement this crucial step correctly.
Your assistance in reviewing my script and guiding me on this final transformation and saving stage would be highly appreciated.
Thank you,
Rahgav
import sys
import time
import pyzed.sl as sl
import ogl_viewer.viewer as gl
import argparse
import numpy as np
import json
from gnss_replay import GNSSReplay
import datetime
def gnss_str_to_pyzed(data:str, ctstamp:sl.Timestamp) -> sl.GNSSData:
"""
Converts a GNSS string to a PyZED GNSSData object.
Args:
data (str): The GNSS string to be converted.
Returns:
sl.GNSSData: The converted GNSSData object.
"""
data = json.loads(data)
gps_data = sl.GNSSData()
sl_fix_mode = sl.GNSS_MODE.FIX_2D
sl_status = sl.GNSS_STATUS.SINGLE
gps_data.gnss_mode = sl_fix_mode.value
gps_data.gnss_status = sl_status.value
# date_time = datetime.fromisoformat(str(data["ts"])) # Added corresponding to data format
zed_ts = sl.Timestamp()
# zed_ts.set_nanoseconds(int(date_time.timestamp()*1e9))
zed_ts.set_nanoseconds(data["ts"])
# latlngC=sl.LatLng()
# latlngC.set_coordinates(data['coordinates']["latitude"],data['coordinates']["longitude"],data['coordinates']["altitude"],False)
# ecefCord= sl.GeoConverter.latlng2ecef(latlngC)
gps_data.ts = zed_ts
gps_data.set_coordinates(data['coordinates']["latitude"],data['coordinates']["longitude"],data['coordinates']["altitude"],False)
# covariances = json.loads(data["position_covariance"])
covariances = data["position_covariance"]
gps_data.position_covariances = [float(x) for x in covariances]
return gps_data
def main():
# Initialization
init = sl.InitParameters()
init.sdk_verbose=1
init.depth_mode = sl.DEPTH_MODE.NEURAL
init.coordinate_units = sl.UNIT.METER
init.camera_resolution = sl.RESOLUTION.HD2K
init.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Z_UP_X_FWD # OpenGL's coordinate system is right_handed
init.depth_minimum_distance = 2
init.depth_maximum_distance = 40
init.svo_real_time_mode = False
parse_args(init)
# Open Camera (in offlince mode open svo2 file)
zed = sl.Camera()
status = zed.open(init)
if status != sl.ERROR_CODE.SUCCESS:
print("Camera Open : "+repr(status)+". Exit program.")
exit()
camera_infos = zed.get_camera_information()
pose = sl.Pose()
# Stage 1 - Enable Positional Tracking
positional_tracking_parameters = sl.PositionalTrackingParameters()
positional_tracking_parameters.mode=sl.POSITIONAL_TRACKING_MODE.GEN_1
positional_tracking_parameters.set_floor_as_origin = False
positional_tracking_parameters.enable_area_memory = True
positional_tracking_parameters.enable_pose_smoothing =True
positional_tracking_parameters.enable_imu_fusion = True
positional_tracking_parameters.set_gravity_as_origin = True
returned_state = zed.enable_positional_tracking(positional_tracking_parameters)
if returned_state != sl.ERROR_CODE.SUCCESS:
print(f"Positional Tracking Error: {status}")
zed.close()
exit(1)
# Stage 2 - Create Fusion object and Initialization fusion module:
fusion = sl.Fusion()
init_fusion_param = sl.InitFusionParameters()
init_fusion_param.coordinate_units = sl.UNIT.METER
init_fusion_param.coordinate_system = sl.COORDINATE_SYSTEM.IMAGE
init_fusion_param.output_performance_metrics = True
init_fusion_param.verbose = True
init_fusion_param.timeout_period_number = 66
fusion_init_code = fusion.init(init_fusion_param)
if fusion_init_code != sl.FUSION_ERROR_CODE.SUCCESS:
print("[ZED][ERROR] Failed to initialize fusion :" + repr(fusion_init_code) + ". Exit program")
exit()
# Stage 3 - Enable odometry publishing:
configuration = sl.CommunicationParameters()
configuration.set_for_shared_memory()
zed.start_publishing(configuration)
uuid = sl.CameraIdentifier(zed.get_camera_information().serial_number)
# Stage 4 - Suscribe fusion module
fusion.subscribe(uuid, configuration, sl.Transform(0, 0, 0))
# Stage 5 - Enable positional tracking for Fusion object
positional_tracking_fusion_parameters = sl.PositionalTrackingFusionParameters()
positional_tracking_fusion_parameters.enable_GNSS_fusion = True
gnss_calibration_parameters = sl.GNSSCalibrationParameters()
gnss_calibration_parameters.target_yaw_uncertainty = 1e-2
gnss_calibration_parameters.enable_translation_uncertainty_target = True
gnss_calibration_parameters.target_translation_uncertainty = 15e-2
gnss_calibration_parameters.enable_reinitialization = True
gnss_calibration_parameters.gnss_vio_reinit_threshold = 5
gnss_calibration_parameters.enable_rolling_calibration = True
gnss_calibration_parameters.gnss_antenna_position = np.asarray([0,0,0])
positional_tracking_fusion_parameters.gnss_calibration_parameters = gnss_calibration_parameters
fusion.enable_positionnal_tracking(positional_tracking_fusion_parameters)
# Stage 6 - Initialize parameters for spatial Mapping
if opt.build_mesh:
spatial_mapping_parameters = sl.SpatialMappingParameters(resolution = sl.MAPPING_RESOLUTION.MEDIUM,mapping_range = sl.MAPPING_RANGE.MEDIUM,max_memory_usage = 2048,save_texture = False,use_chunk_only = True,reverse_vertex_order = False,map_type = sl.SPATIAL_MAP_TYPE.MESH)
pymesh = sl.Mesh()
else :
spatial_mapping_parameters = sl.SpatialMappingParameters(resolution = sl.MAPPING_RESOLUTION.MEDIUM,mapping_range = sl.MAPPING_RANGE.MEDIUM,max_memory_usage = 4096,save_texture = False,use_chunk_only = False,reverse_vertex_order = False,map_type = sl.SPATIAL_MAP_TYPE.FUSED_POINT_CLOUD)
spatial_mapping_parameters.stability_counter = 4
spatial_mapping_parameters.resolution_meter = 0.05
pymesh = sl.FusedPointCloud()
# Stage 7 - Enable spatail mapping module
status = zed.enable_spatial_mapping(spatial_mapping_parameters)
if status != sl.ERROR_CODE.SUCCESS:
print(f"Spatial Mapping Error: {status}")
zed.close()
exit(1)
# For grabing the each frame create parameters
runtime_parameters = sl.RuntimeParameters(measure3D_reference_frame = sl.REFERENCE_FRAME.CAMERA, enable_depth = True)
mapping_activated = True
max_frames= zed.get_svo_number_of_frames()
frame_count=0
last_call = time.time()
last_timestamp = sl.Timestamp()
# Stage 8: Itrating through each frame of svo2
while frame_count < max_frames:
data_map={}
# Grab an image, a RuntimeParameters object must be given to grab()
err = zed.grab(runtime_parameters)
if err == sl.ERROR_CODE.SUCCESS:
current_ts = zed.get_timestamp(sl.TIME_REFERENCE.IMAGE)
tracking_state = zed.get_position(pose)
# Stage 9 -- Extract gnss data from svo2 file between the interval
zed.retrieve_svo_data("GNSS_json",data_map,last_timestamp,current_ts)
if data_map:
data_map_list = list(data_map.items())
data_map = [(k,v) for k,v in data_map.items()]
is_data_ingested = False
for data_item in data_map_list:
data_string = data_item[1].get_content_as_string()
input_gnss = gnss_str_to_pyzed(data_string, current_ts)
# Ingest gnss data into fusion module
ingest_error = fusion.ingest_gnss_data(input_gnss)
if ingest_error != sl.FUSION_ERROR_CODE.SUCCESS and ingest_error != sl.FUSION_ERROR_CODE.NO_NEW_DATA_AVAILABLE:
print("Ingest error occurred when ingesting GNSSData: ", ingest_error)
else:
print(input_gnss.get_coordinates(False))
current_geopose=sl.GeoPose()
# convert pose to cooresponding geo pose
current_geopose_status=fusion.camera_to_geo(pose,current_geopose)
if current_geopose_status==sl.GNSS_FUSION_STATUS.OK:
is_data_ingested=True
break
# Stage 10 - Get spatial data based onn chunck
if time.time() - last_call > 0.5:
if zed.get_spatial_map_request_status_async() != sl.ERROR_CODE.SUCCESS:
zed.request_spatial_map_async()
last_call = time.time()
if zed.get_spatial_map_request_status_async() == sl.ERROR_CODE.SUCCESS:
zed.retrieve_spatial_map_async(pymesh)
# Stage 11 - Start fusion process
fusion_process_status = fusion.process()
if fusion_process_status != sl.FUSION_ERROR_CODE.SUCCESS and fusion_process_status != sl.FUSION_ERROR_CODE.NO_NEW_DATA_AVAILABLE:
# print(f"Fusion process error: {fusion_process_status}") # Can be verbose
pass
camera_pose = sl.Pose()
if fusion.process() == sl.FUSION_ERROR_CODE.SUCCESS:
fused_tracking_state = fusion.get_position(camera_pose, sl.REFERENCE_FRAME.WORLD)
if fused_tracking_state == sl.POSITIONAL_TRACKING_STATE.OK:
rotation = camera_pose.get_rotation_vector()
py_translation=sl.translation()
translation = camera_pose.get_translation(py_translation)
text_rotation = str((round(rotation[0], 2), round(rotation[1], 2), round(rotation[2], 2)))
text_translation = str((round(translation.get()[0], 2), round(translation.get()[1], 2), round(translation.get()[2], 2)))
pose_data = camera_pose.pose_data(sl.Transform())
print("get position translation = ",text_translation,", rotation_message = ",text_rotation)
elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
break
# Fusion process start here
# zed_pose = sl.Pose()
# fusion_pose = sl.Pose()
# zed_position_status = zed.get_position(zed_pose, sl.REFERENCE_FRAME.WORLD)
# if is_data_ingested:
# fusion_position_status = fusion.get_position(fusion_pose, sl.REFERENCE_FRAME.WORLD)
# if fusion.process() == sl.FUSION_ERROR_C0ODE.SUCCESS:
# current_state = fusion.get_position(fusion_pose, sl.REFERENCE_FRAME.WORLD)
# if zed.get_spatial_map_request_status_async() == sl.ERROR_CODE.SUCCESS:
# zed.retrieve_spatial_map_async(pymesh)
# else:
# print("Data map is blank for this image")
# print("Processing frame no "+ str(frame_count))
frame_count += 1
last_timestamp=current_ts
# print("Extracting and saving point cloud...")
# status = zed.extract_whole_spatial_map(pymesh)
# Write resulatant pymesh into ply file
if status == sl.ERROR_CODE.SUCCESS:
filepath = "/home/ceinfo/Documents/spatial_output/sample.ply"
status = pymesh.save(filepath, typeMesh=sl.MESH_FILE_FORMAT.PLY)
if status:
print(f"Point cloud saved successfully: {filepath}")
else:
print("Failed to save point cloud")
zed.disable_spatial_mapping()
zed.disable_positional_tracking()
zed.close()
pymesh.clear()
zed.close()
def parse_args(init):
if len(opt.input_svo_file)>0 and opt.input_svo_file.endswith(".svo2"):
init.set_from_svo_file(opt.input_svo_file)
print("[Sample] Using SVO File input: {0}".format(opt.input_svo_file))
elif len(opt.ip_address)>0 :
ip_str = opt.ip_address
if ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4 and len(ip_str.split(':'))==2:
init.set_from_stream(ip_str.split(':')[0],int(ip_str.split(':')[1]))
print("[Sample] Using Stream input, IP : ",ip_str)
elif ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4:
init.set_from_stream(ip_str)
print("[Sample] Using Stream input, IP : ",ip_str)
else :
print("Unvalid IP format. Using live stream")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--input_svo_file', type=str, help='Path to an .svo file, if you want to replay it',default = '')
parser.add_argument('--ip_address', type=str, help='IP Adress, in format a.b.c.d:port or a.b.c.d, if you have a streaming setup', default = '')
parser.add_argument('--resolution', type=str, help='Resolution, can be either HD2K, HD1200, HD1080, HD720, SVGA or VGA', default = '')
parser.add_argument('--build_mesh', help = 'Either the script should plot a mesh or point clouds of surroundings', action='store_true')
opt = parser.parse_args()
if len(opt.input_svo_file)>0 and len(opt.ip_address)>0:
print("Specify only input_svo_file or ip_address, or none to use wired camera, not both. Exit program")
exit()
main()