Thanks for your reply.
Here after is the code I use (hope its not too long). I witness drops in both image stream and IMU stream (1s without data).
Maybe trying to write directly the imu entries to fils can be optimized with a buffer, but I assume python is already doing that kind of optimization.
import argparse
import logging
import os
import os.path as path
import shutil
from threading import Thread, Event
import pyzed.sl as sl
logger = logging.getLogger('recorder')
from yaml import safe_load
class Config(dict):
def __getattr__(self, name): # for ease of access, e.g. config.verbose
if name in self:
return self[name]
else:
raise AttributeError("No such attribute: " + name)
def __repr__(self):
return '\n'.join(f'\t\t{k}: {v}' for k, v in self.items())
def validate(self):
# verbose
verbose_level = self.get('verbose')
verbose_level = 'warning' if verbose_level is None else verbose_level
if verbose_level is not None: # convert verbose level to logging type (int)
try: # in case it represents an int, directly get it
verbose_level = int(verbose_level)
except ValueError: # else ask logging to sort it out
assert isinstance(verbose_level, str)
verbose_level = logging.getLevelName(verbose_level.upper())
self['verbose'] = verbose_level
# output
if self.get('verbose') is None:
raise ValueError('output path required')
# custom thread class
class StopableThread(Thread):
def __init__(self, cam, output_path):
super(StopableThread, self).__init__()
self.cam = cam
self.output_path = output_path
# store the event
self.stop_event = Event()
def stop(self):
self.stop_event.set()
class ImuThread(StopableThread):
def run(self):
sensors_data = sl.SensorsData()
last_imu_ts_ns = sl.Timestamp().get_microseconds()
filepath = path.join(self.output_path, 'sensors.csv')
logger.debug(f'record IMU to {filepath}')
with open(filepath, 'wt') as file:
"""
full sensor recording is :
idx,imu_Timestamp[sec],mag_Timestamp[sec],baro_Timestamp[sec],
accX[m/s^2],accY[m/s^2],accZ[m/s^2],
gyroX[deg/s],gyroY[deg/s],gyroZ[deg/s],
magX[uT],magY[uT],magZ[uT],
orX[deg],orY[deg],orZ[deg],
press[hPa],rel_alt[m],moving,
temp_left[C],temp_right[C],temp_imu[C],temp_barom[C],
see : https://www.stereolabs.com/docs/gstreamer/zed-data-csv-sink/
"""
file.write('#idx, imu_Timestamp[sec],mag_Timestamp[sec],baro_Timestamp[sec],'
'accX[m/s^2],accY[m/s^2],accZ[m/s^2],'
'gyroX[deg/s],gyroY[deg/s],gyroZ[deg/s],'
'magX[uT],magY[uT],magZ[uT],'
'orX[deg],orY[deg],orZ[deg],'
'press[hPa],rel_alt[m],moving,'
'temp_left[C],temp_right[C],temp_imu[C],temp_barom[C],'
'\n')
idx = 0
while not self.stop_event.is_set():
ack = self.cam.get_sensors_data(sensors_data, sl.TIME_REFERENCE.CURRENT)
if ack != sl.ERROR_CODE.SUCCESS:
logging.critical('unable to get IMU data.')
break
imu_data = sensors_data.get_imu_data()
current_imu_ts_ns = imu_data.timestamp.get_nanoseconds()
if current_imu_ts_ns == last_imu_ts_ns:
continue
mag_data = sensors_data.get_magnetometer_data()
baro_data = sensors_data.get_barometer_data()
temperature_data = sensors_data.get_temperature_data()
current_mag_ts_ns = mag_data.timestamp.get_nanoseconds()
current_baro_ts_ns = baro_data.timestamp.get_nanoseconds()
delta_imu_ts_us = current_imu_ts_ns - last_imu_ts_ns # for debug
#print(f'\rIMU delta us {delta_imu_ts_us} ns ({int(1e9/delta_imu_ts_us)} fps) ', end='')
last_imu_ts_ns = current_imu_ts_ns
# linear acceleration
linear_acceleration = imu_data.get_linear_acceleration()
angular_velocity_deg_s = imu_data.get_angular_velocity()
# angular_velocity_rad_s = [np.deg2rad(gyr) for gyr in angular_velocity_deg_s] # delegate conversion
magnetometer_field_uT = mag_data.get_magnetic_field_calibrated().tolist()
barometric_values_hPa = [baro_data.pressure, baro_data.relative_altitude, int(sensors_data.camera_moving_state == sl.CAMERA_MOTION_STATE.MOVING)]
formated_idx = [f'{idx}']
formated_timestamps = [f'{t/1e9:10}' for t in [current_imu_ts_ns, current_mag_ts_ns, current_baro_ts_ns]]
formated_gyro = [f'{v:10}' for v in angular_velocity_deg_s]
formated_accel = [f'{v:10}' for v in linear_acceleration]
formated_mag = [f'{v:10}' for v in magnetometer_field_uT]
formated_baro = [f'{v:10}' for v in barometric_values_hPa]
formated_orientation = [f'{v:10}' for v in [0.0] * 3] # TODO
formated_temperatures = [f'{temperature_data.get(I):4}' for I in [sl.SENSOR_LOCATION.ONBOARD_LEFT,
sl.SENSOR_LOCATION.ONBOARD_RIGHT,
sl.SENSOR_LOCATION.IMU,
sl.SENSOR_LOCATION.BAROMETER]]
formated_line = formated_idx + formated_timestamps + formated_accel + formated_gyro + formated_mag + \
formated_orientation + formated_baro + formated_temperatures
formated_line = ', '.join(formated_line)
file.write(formated_line + '\n')
idx += 1
logger.debug(f'end record IMU')
class VideoThread(StopableThread):
def run(self):
filepath = path.join(self.output_path, 'video.svo')
recording_param = sl.RecordingParameters(filepath, sl.SVO_COMPRESSION_MODE.H264)
status = self.cam.enable_recording(recording_param)
if status != sl.ERROR_CODE.SUCCESS:
logger.critical('unable to start SVO recording')
while status == sl.ERROR_CODE.SUCCESS and not self.stop_event.is_set():
status = self.cam.grab()
self.cam.disable_recording()
def copy_calib(cam, output_path):
cam_info = cam.get_camera_information()
serial_number = cam_info.serial_number
r"""
From doc:
https://support.stereolabs.com/hc/en-us/articles/360007497173-What-is-the-calibration-file-
On Windows: C:\ProgramData\Stereolabs\settings
On Linux: /usr/local/zed/settings/
On Windows (SDK ≤ 2.2.0):C:/Users/YOUR_USER_NAME\AppData\Roaming\Stereolabs\settings\
"""
calib_filename = f'SN{serial_number}.conf'
root_path_candidates = [
'C:/ProgramData/Stereolabs/settings',
'/usr/local/zed/settings/'
]
calib_filepath_candidates = (path.join(root_path, calib_filename)
for root_path in root_path_candidates)
calib_file = next((calib_filepath for calib_filepath in calib_filepath_candidates
if path.isfile(calib_filepath)), None)
assert calib_file is not None
shutil.copy(calib_file, path.join(output_path, calib_filename))
class VerbosityParsor(argparse.Action):
""" accept debug, info, ... or theirs corresponding integer value formatted as string."""
def __call__(self, parser, namespace, values, option_string=None):
try: # in case it represent an int, directly get it
values = int(values)
except ValueError: # else ask logging to sort it out
assert isinstance(values, str)
values = logging.getLevelName(values.upper())
setattr(namespace, self.dest, values)
def record_cli():
config = Config()
try:
parser_conf_file = argparse.ArgumentParser(add_help=False) # Turn off help, print all options in response to -h
parser_conf_file.add_argument('-c', '--conf_file', help="Specify config file (yaml)", metavar="FILE")
args, remaining_argv = parser_conf_file.parse_known_args() # only retrieve the config file path from cli
if args.conf_file and path.isfile(args.conf_file):
with open(args.conf_file, 'r') as f:
config.update(safe_load(f))
# Parse rest of arguments
parser = argparse.ArgumentParser(description='Description of the program.', parents=[parser_conf_file])
parser.add_argument(
'-v', '--verbose', nargs='?', const='info', type=str,
help='verbosity level (debug, info, warning, critical, ... or int value) [warning]')
parser.add_argument(
'-o', '--output', metavar='DIR',
help='output root path')
args = parser.parse_args(remaining_argv)
config.update(vars(args).items())
config.validate()
logger.setLevel(config.verbose)
except ValueError as e:
logger.critical(e)
if config.verbose <= logging.DEBUG:
raise
try:
cam = sl.Camera()
init_params = sl.InitParameters()
init_params.camera_resolution = sl.RESOLUTION.HD720 #VGA, HD720, HD1080, HD2K
init_params.camera_fps = 30
init_params.depth_mode = sl.DEPTH_MODE.NONE
status = cam.open(init_params)
if status != sl.ERROR_CODE.SUCCESS:
raise ValueError(repr(status))
logger.info("SVO is Recording, use Ctrl-C to stop.")
os.makedirs(args.output, exist_ok=True)
# write_calib(cam, args.output)
logger.info('copying camera calib file.')
copy_calib(cam, args.output)
sensor_record_thread = ImuThread(cam, args.output)
image_record_thread = VideoThread(cam, args.output)
try:
logger.info('start recording')
sensor_record_thread.start()
image_record_thread.start()
sensor_record_thread.join()
image_record_thread.join()
except KeyboardInterrupt:
logger.debug('stoped by user')
sensor_record_thread.stop()
image_record_thread.stop()
finally:
logger.info('end recording')
sensor_record_thread.join()
image_record_thread.join()
except ValueError as e:
logger.critical(e)
if config.verbose <= logging.DEBUG:
raise
finally:
# cam.disable_recording()
cam.close()
if __name__ == "__main__":
logger_formatter = logging.Formatter('%(name)s::%(levelname)-8s: %(message)s')
logger_stream = logging.StreamHandler() # logging.FileHandler(logfile)
logger_stream.setFormatter(logger_formatter)
logger.addHandler(logger_stream)
record_cli()