Geo-referencing Point Cloud Vertices (ZED SDK Python, VIO-GNSS)

Dear Support Team/Community Members,

I’m working on generating geo-referenced PLY point clouds using the ZED SDK (Python) with VIO-GNSS fusion from an SVO file. My goal is a PLY file with global coordinates (ECEF or Lat/Lon/Alt).

I’ve reviewed these community threads:

Current Progress:

  • Successfully generating PLY files with local VIO WORLD coordinates from pymesh.
  • VIO-GNSS fusion is set up, and GNSS data (from a GNSS_json stream in the SVO) is being ingested.
  • After processing, I’m able to obtain the transformation from the VIO WORLD origin to ECEF (T_world_to_ecef) using fusion.get_geo_pose(geo_pose_of_world_origin, sl.Transform()).

Request: Could you please review the Python script given below? I’m specifically looking for guidance on:

  • The correctness of my approach for transforming all point cloud vertices.
  • Ensuring the method for saving the transformed vertices into a geo-referenced PLY is sound.
  • Any suggestions for modifications to implement this crucial step correctly.

Your assistance in reviewing my script and guiding me on this final transformation and saving stage would be highly appreciated.

Thank you,
Rahgav

import sys
import time
import pyzed.sl as sl
import ogl_viewer.viewer as gl
import argparse
import numpy as np
import json
from gnss_replay import GNSSReplay
import datetime

def gnss_str_to_pyzed(data:str, ctstamp:sl.Timestamp) -> sl.GNSSData:
    """
    Converts a GNSS string to a PyZED GNSSData object.

    Args:
        data (str): The GNSS string to be converted.

    Returns:
        sl.GNSSData: The converted GNSSData object.
    """    
    data = json.loads(data)
    gps_data = sl.GNSSData()
    sl_fix_mode = sl.GNSS_MODE.FIX_2D
    sl_status = sl.GNSS_STATUS.SINGLE
    gps_data.gnss_mode = sl_fix_mode.value
    gps_data.gnss_status = sl_status.value

    # date_time = datetime.fromisoformat(str(data["ts"])) # Added corresponding to data format
    zed_ts = sl.Timestamp()
    # zed_ts.set_nanoseconds(int(date_time.timestamp()*1e9))
    zed_ts.set_nanoseconds(data["ts"])
    # latlngC=sl.LatLng()
    # latlngC.set_coordinates(data['coordinates']["latitude"],data['coordinates']["longitude"],data['coordinates']["altitude"],False)
    # ecefCord= sl.GeoConverter.latlng2ecef(latlngC)
    gps_data.ts = zed_ts
    gps_data.set_coordinates(data['coordinates']["latitude"],data['coordinates']["longitude"],data['coordinates']["altitude"],False)
    # covariances = json.loads(data["position_covariance"])
    covariances = data["position_covariance"]    
    gps_data.position_covariances = [float(x) for x in covariances]
    return gps_data

def main():
    # Initialization 
    init = sl.InitParameters()
    init.sdk_verbose=1
    init.depth_mode = sl.DEPTH_MODE.NEURAL
    init.coordinate_units = sl.UNIT.METER
    init.camera_resolution = sl.RESOLUTION.HD2K    
    init.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Z_UP_X_FWD # OpenGL's coordinate system is right_handed    
    init.depth_minimum_distance = 2
    init.depth_maximum_distance = 40
    init.svo_real_time_mode = False
    
    parse_args(init)
    # Open Camera (in offlince mode open svo2 file)
    zed = sl.Camera()
    status = zed.open(init)
    if status != sl.ERROR_CODE.SUCCESS:
        print("Camera Open : "+repr(status)+". Exit program.")
        exit()
    
    camera_infos = zed.get_camera_information()
    pose = sl.Pose()    
    
    # Stage 1 - Enable Positional Tracking
    positional_tracking_parameters = sl.PositionalTrackingParameters()
    positional_tracking_parameters.mode=sl.POSITIONAL_TRACKING_MODE.GEN_1
    positional_tracking_parameters.set_floor_as_origin = False
    positional_tracking_parameters.enable_area_memory = True
    positional_tracking_parameters.enable_pose_smoothing =True
    positional_tracking_parameters.enable_imu_fusion = True
    positional_tracking_parameters.set_gravity_as_origin = True
    returned_state = zed.enable_positional_tracking(positional_tracking_parameters)
    if returned_state != sl.ERROR_CODE.SUCCESS:
        print(f"Positional Tracking Error: {status}")
        zed.close()
        exit(1)    
    
    # Stage 2 - Create Fusion object and Initialization fusion module:
    fusion = sl.Fusion()
    init_fusion_param = sl.InitFusionParameters()
    init_fusion_param.coordinate_units = sl.UNIT.METER
    init_fusion_param.coordinate_system = sl.COORDINATE_SYSTEM.IMAGE
    init_fusion_param.output_performance_metrics = True
    init_fusion_param.verbose = True
    init_fusion_param.timeout_period_number = 66
    fusion_init_code = fusion.init(init_fusion_param)
    if fusion_init_code != sl.FUSION_ERROR_CODE.SUCCESS:
        print("[ZED][ERROR] Failed to initialize fusion :" + repr(fusion_init_code) + ". Exit program")
        exit()
    
    # Stage 3 - Enable odometry publishing:
    configuration = sl.CommunicationParameters()
    configuration.set_for_shared_memory()
    zed.start_publishing(configuration)
    uuid = sl.CameraIdentifier(zed.get_camera_information().serial_number)
    
    # Stage 4 - Suscribe fusion module
    fusion.subscribe(uuid, configuration, sl.Transform(0, 0, 0))
    
    # Stage 5 - Enable positional tracking for Fusion object
    positional_tracking_fusion_parameters = sl.PositionalTrackingFusionParameters()
    positional_tracking_fusion_parameters.enable_GNSS_fusion = True
    gnss_calibration_parameters = sl.GNSSCalibrationParameters()
    gnss_calibration_parameters.target_yaw_uncertainty = 1e-2
    gnss_calibration_parameters.enable_translation_uncertainty_target = True
    gnss_calibration_parameters.target_translation_uncertainty = 15e-2
    gnss_calibration_parameters.enable_reinitialization = True
    gnss_calibration_parameters.gnss_vio_reinit_threshold = 5
    gnss_calibration_parameters.enable_rolling_calibration = True
    gnss_calibration_parameters.gnss_antenna_position = np.asarray([0,0,0])
    positional_tracking_fusion_parameters.gnss_calibration_parameters = gnss_calibration_parameters    
    fusion.enable_positionnal_tracking(positional_tracking_fusion_parameters)
     
    # Stage 6 - Initialize parameters for spatial Mapping
    if opt.build_mesh:
        spatial_mapping_parameters = sl.SpatialMappingParameters(resolution = sl.MAPPING_RESOLUTION.MEDIUM,mapping_range =  sl.MAPPING_RANGE.MEDIUM,max_memory_usage = 2048,save_texture = False,use_chunk_only = True,reverse_vertex_order = False,map_type = sl.SPATIAL_MAP_TYPE.MESH)
        pymesh = sl.Mesh() 
    else :
        spatial_mapping_parameters = sl.SpatialMappingParameters(resolution = sl.MAPPING_RESOLUTION.MEDIUM,mapping_range =  sl.MAPPING_RANGE.MEDIUM,max_memory_usage = 4096,save_texture = False,use_chunk_only = False,reverse_vertex_order = False,map_type = sl.SPATIAL_MAP_TYPE.FUSED_POINT_CLOUD)
        spatial_mapping_parameters.stability_counter = 4
        spatial_mapping_parameters.resolution_meter = 0.05
        pymesh = sl.FusedPointCloud()    

    # Stage 7 - Enable spatail mapping module
    status = zed.enable_spatial_mapping(spatial_mapping_parameters)
    if status != sl.ERROR_CODE.SUCCESS:
        print(f"Spatial Mapping Error: {status}")
        zed.close()
        exit(1)
    
    # For grabing the each frame create parameters
    runtime_parameters = sl.RuntimeParameters(measure3D_reference_frame = sl.REFERENCE_FRAME.CAMERA, enable_depth = True)    
    
    mapping_activated = True    
    max_frames= zed.get_svo_number_of_frames()
    frame_count=0
    last_call = time.time()
    last_timestamp = sl.Timestamp()
    
    # Stage 8: Itrating through each frame of svo2
    while frame_count < max_frames:
        data_map={}        
        # Grab an image, a RuntimeParameters object must be given to grab()
        err = zed.grab(runtime_parameters)        
        if err == sl.ERROR_CODE.SUCCESS:            
            current_ts = zed.get_timestamp(sl.TIME_REFERENCE.IMAGE)
            tracking_state = zed.get_position(pose)
            
            # Stage 9 -- Extract gnss data from svo2 file between the interval
            zed.retrieve_svo_data("GNSS_json",data_map,last_timestamp,current_ts)
            if data_map:
                data_map_list = list(data_map.items())
                data_map = [(k,v) for k,v in data_map.items()]
                is_data_ingested = False
                for data_item in data_map_list:
                    data_string = data_item[1].get_content_as_string()
                    input_gnss = gnss_str_to_pyzed(data_string, current_ts)
                    # Ingest gnss data into fusion module
                    ingest_error = fusion.ingest_gnss_data(input_gnss)
                    if ingest_error != sl.FUSION_ERROR_CODE.SUCCESS and ingest_error != sl.FUSION_ERROR_CODE.NO_NEW_DATA_AVAILABLE:                        
                        print("Ingest error occurred when ingesting GNSSData: ", ingest_error)
                    else:
                        print(input_gnss.get_coordinates(False))
                        current_geopose=sl.GeoPose()
                        # convert pose to cooresponding geo pose
                        current_geopose_status=fusion.camera_to_geo(pose,current_geopose)
                        if current_geopose_status==sl.GNSS_FUSION_STATUS.OK:
                            is_data_ingested=True
                            break                        
            # Stage 10 - Get spatial data based onn chunck
            if time.time() - last_call > 0.5:
                if zed.get_spatial_map_request_status_async() != sl.ERROR_CODE.SUCCESS:
                    zed.request_spatial_map_async()
                    last_call = time.time()
                if zed.get_spatial_map_request_status_async() == sl.ERROR_CODE.SUCCESS:
                    zed.retrieve_spatial_map_async(pymesh)                    
            
            # Stage 11 - Start fusion process
            fusion_process_status = fusion.process()
            if fusion_process_status != sl.FUSION_ERROR_CODE.SUCCESS and fusion_process_status != sl.FUSION_ERROR_CODE.NO_NEW_DATA_AVAILABLE:
                # print(f"Fusion process error: {fusion_process_status}") # Can be verbose
                pass
            
            camera_pose = sl.Pose()
            if fusion.process() == sl.FUSION_ERROR_CODE.SUCCESS:
                fused_tracking_state = fusion.get_position(camera_pose, sl.REFERENCE_FRAME.WORLD)
                if fused_tracking_state == sl.POSITIONAL_TRACKING_STATE.OK:                    
                    rotation = camera_pose.get_rotation_vector()
                    py_translation=sl.translation()
                    translation = camera_pose.get_translation(py_translation)
                    text_rotation = str((round(rotation[0], 2), round(rotation[1], 2), round(rotation[2], 2)))
                    text_translation = str((round(translation.get()[0], 2), round(translation.get()[1], 2), round(translation.get()[2], 2)))
                    pose_data = camera_pose.pose_data(sl.Transform())
                    print("get position translation  = ",text_translation,", rotation_message = ",text_rotation)

        elif err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED:
            break            
            
            # Fusion process start here
            # zed_pose = sl.Pose()
            # fusion_pose = sl.Pose()
            # zed_position_status = zed.get_position(zed_pose, sl.REFERENCE_FRAME.WORLD)
            
            
                                
                # if is_data_ingested:
                #     fusion_position_status = fusion.get_position(fusion_pose, sl.REFERENCE_FRAME.WORLD)
                #     if fusion.process() == sl.FUSION_ERROR_C0ODE.SUCCESS:
                #         current_state = fusion.get_position(fusion_pose, sl.REFERENCE_FRAME.WORLD)

            # if zed.get_spatial_map_request_status_async() == sl.ERROR_CODE.SUCCESS:
            #     zed.retrieve_spatial_map_async(pymesh)                
            # else:
            #     print("Data map is blank for this image")
            # print("Processing frame no "+ str(frame_count))
            frame_count += 1        
        
        
        last_timestamp=current_ts
    
    # print("Extracting and saving point cloud...")    
    # status = zed.extract_whole_spatial_map(pymesh)
    # Write resulatant pymesh into ply file
    if status == sl.ERROR_CODE.SUCCESS:        
        filepath = "/home/ceinfo/Documents/spatial_output/sample.ply"
        status = pymesh.save(filepath, typeMesh=sl.MESH_FILE_FORMAT.PLY)
        if status:
            print(f"Point cloud saved successfully: {filepath}")
        else:
            print("Failed to save point cloud")
    

    zed.disable_spatial_mapping()
    zed.disable_positional_tracking()
    zed.close()
    pymesh.clear()
    zed.close()   
          
def parse_args(init):
    if len(opt.input_svo_file)>0 and opt.input_svo_file.endswith(".svo2"):
        init.set_from_svo_file(opt.input_svo_file)
        print("[Sample] Using SVO File input: {0}".format(opt.input_svo_file))
    elif len(opt.ip_address)>0 :
        ip_str = opt.ip_address
        if ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4 and len(ip_str.split(':'))==2:
            init.set_from_stream(ip_str.split(':')[0],int(ip_str.split(':')[1]))
            print("[Sample] Using Stream input, IP : ",ip_str)
        elif ip_str.replace(':','').replace('.','').isdigit() and len(ip_str.split('.'))==4:
            init.set_from_stream(ip_str)
            print("[Sample] Using Stream input, IP : ",ip_str)
        else :
            print("Unvalid IP format. Using live stream")
        
         
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_svo_file', type=str, help='Path to an .svo file, if you want to replay it',default = '')
    parser.add_argument('--ip_address', type=str, help='IP Adress, in format a.b.c.d:port or a.b.c.d, if you have a streaming setup', default = '')
    parser.add_argument('--resolution', type=str, help='Resolution, can be either HD2K, HD1200, HD1080, HD720, SVGA or VGA', default = '')
    parser.add_argument('--build_mesh', help = 'Either the script should plot a mesh or point clouds of surroundings', action='store_true')
    opt = parser.parse_args()
    if len(opt.input_svo_file)>0 and len(opt.ip_address)>0:
        print("Specify only input_svo_file or ip_address, or none to use wired camera, not both. Exit program")
        exit()
    main()