Point Perfect corrections on Zed Orin Box (Zed X)

Hi, I am using a Zed box ORIN NX with zed cameras and F9P chip. I have the geotracking recording sample from the zed sdk github working with svo file, kml and json file of the route as output. The GPS location of the output is not great and i want to use pointperfect for positional correction.

I want the recording script of the geotracking example to use corrections generated from the pointperfect script for positional correction. I have tried running the PP client script initially and then run the recording script but i get a connection error message any time i try to get this done.

Are there other ways of ensuring the PP client provides positional correction to the recording script? WHat modifications can i adopt to ensure there are no concflicts at the port to make both scripts work without connection error and positional infirmation from the recording script is corrected?

Any help would be greatly appreciated.

Can you please share the “point perfect correction script” that you are referring to?

We are currently working as well on handling RTK corrections through NTRIP in the near future.

The script is expected to run as a CLI. I am unable to share the script as a file as i am a first time user

#!/usr/bin/env python3

Copyright 2022-2023 u-blox AG

SPDX-License-Identifier: Apache-2.0

u-blox PointPerfect MQTT client with AssistNow and localization support

Run with -h (or --help) to see supported command line arguments

Command line examples:

Using continental corrections for Europe:
python pointperfect_client.py -P /dev/ttyACM0 -j /home/user/Downloads/device-a2f3e2dc-b979-4196-98c8-3ad0de34d230-ucenter-config.json --region eu

Using auto-detected regional/continental corrections:
python pointperfect_client.py -P <port> -j <U-Center JSON config>

Using localized corrections:
python pointperfect_client.py -P <port> -j <U-Center JSON config> -l

Download the U-Center JSON config from the u-blox Thingstream portal:
"Thingstream > Location Services > Location Thing > credentials"
Alternatively, use the key/certificate files and client ID from the portal.

<port> is the serial port of the u-blox GNSS receiver with SPARTN support,
e.g. /dev/ttyACM0 or COM3. Optionally with baudrate, e.g. /dev/ttyACM0@115200.


import argparse
import json
import logging
import os.path
import re
import sys
import tempfile
import time

from math import radians, floor, cos, pi

pip install paho-mqtt

import paho.mqtt.client as mqtt

pip install pyserial

import serial

from pyspartn import SPARTNReader

Center point of rectangular regions mapped to the region name. Used

for detecting the region based on proximity to one of these points.

These mappings may be inaccurate or out of date. Please check the

PointPerfect documentation for the latest information and consider

using the --region option to manually specify the correct region.

‘S2655E13470’: ‘au’,
‘N5245E01185’: ‘eu’,
‘N3895E13960’: ‘jp’, # East
‘N3310E13220’: ‘jp’, # West
‘N3630E12820’: ‘kr’,
‘N3920W09660’: ‘us’,



STATS = 100 # logging level for stats

class NmeaParser:
Parse NMEA sentences from bytes and invoke callbacks for matching sentences.
Strips newlines before passing the sentence to the callback. Errors are silently
ignored and the parser is robust to malformed sentences or UBX, RTCM, SPARTN, etc.

def __init__(self, callbacks):
    Initialize the parser with a dictionary of callbacks for matching sentences.

        callbacks (dict): Dictionary of compiled regular expressions objects mapped
                          to callbacks.
    self.callbacks = callbacks
    self.buffer = None

def parse(self, data):
    '''Parse the given bytes and invoke callbacks for matching sentences.'''
    for byte in data:
        if byte == ord('$'):
            self.buffer = bytearray([byte])
        elif self.buffer is not None:
            if (byte in range(ord('A'), ord('Z')+1) or
                byte in range(ord('0'), ord('9')+1) or
                byte in (ord(','), ord('.'), ord('-'), ord('*'))):
            elif byte == 0x0d:  # CR
                if len(self.buffer) > 3 and self.buffer[-3] == ord('*'):
                        chksum_received = int(self.buffer[-2:], 16)
                    except ValueError:
                        chksum_received = -1  # will never match below
                    chksum = 0
                    for i in self.buffer[1:-3]:
                        chksum ^= i
                    if chksum == chksum_received:
                        for regexp in self.callbacks.keys():
                            if regexp.match(self.buffer):
                                data = self.buffer.decode(encoding='ascii')
                                # invoke callback with data
                        logging.warning('chksum error: %02x != %02x',
                                            chksum_received, chksum)
                self.buffer = None
                self.buffer = None

class PointPerfectClient:
u-blox PointPerfect MQTT client with AssistNow and localization support

Subscribes to the PointPerfect MQTT service and sends corrections to
a u-blox receiver with native SPARTN support. Monitors the receiver's
position in order to subscribe to the appropriate corrections.
EARTH_CIRCUMFERENCE = 6371000 * 2 * pi
NSEW_TO_SIGN = str.maketrans('NSEW', '0-0-')

def __init__(self, gnss, mqtt_client, mqtt_server, mqtt_port,
             localized=False, tile_level=0, lband=False, region=None,
             distance=50000, epochs=float('inf'), ubxfile=None, stats=None,
             assist_now = False):
    self.gnss = gnss
    self.mqtt_client = mqtt_client
    self.mqtt_server = mqtt_server
    self.mqtt_port = mqtt_port
    self.localized = localized
    self.distance = distance
    self.epochs = epochs
    self.tile_level = tile_level
    self.ubxfile = ubxfile
    self.plan = 'Lb' if lband else 'ip'
    self.lat = 0  # lat at which node selection was last performed
    self.lon = 0  # lon at which node selection was last performed
    self.epoch_count = 0  # number of epochs since last node selection
    self.dlat_threshold = distance * 360 / self.EARTH_CIRCUMFERENCE
    self.dlon_threshold = 0  # will be set in process_position()
    self.tile_dict = None  # cached tile data
    self.tile_topic = ''  # current tile topic
    self.spartn_topic = ''  # current SPARTN topic
    self.assist_now = assist_now  # True means always use AssistNow
    self.assist_now_topic = '/pp/ubx/mga' if assist_now else None
    self.connected = False
    self.new_server = None  # if set, connect to this server after disconnect

    if stats:
        self.stats = type('stat', (object,), { 'epochs': [0] * len(QUALITIES),
                                               'total': 0, 'interval': stats })
        self.stats = None

    handlers = { re.compile(b'^\\$G[A-Z]GGA,'): self.handle_nmea_gga }
    self.nmea_parser = NmeaParser(handlers)

    self.mqtt_topics = []
    if not localized:
        self.mqtt_topics.append((f'/pp/ubx/0236/{self.plan}', 1))
        if region:
            self.spartn_topic = f'/pp/{self.plan}/{region}'

    self.mqtt_client.on_connect = self.on_mqtt_connect
    self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
    self.mqtt_client.on_message = self.on_mqtt_message

    while True:
            logging.info('Connecting to %s', self.mqtt_server)
            self.mqtt_client.connect(self.mqtt_server, self.mqtt_port)
        except OSError:
            logging.warning("MQTT connection failed, retrying ...")

def on_mqtt_connect(self, mqtt_client, userdata, flags, return_code):
    '''Callback for handling MQTT connection.'''
    print('working on connecting to mqtt')
    del userdata, flags  # unused
    if return_code == 0:
        self.connected = True
        logging.info('Connected to %s', self.mqtt_server)
        if self.mqtt_topics:
            for topic_qos_tuple in self.mqtt_topics:
                logging.info('Subscribing to %s', topic_qos_tuple[0])
        if self.spartn_topic:
            logging.info('Subscribing to %s', self.spartn_topic)
            mqtt_client.subscribe((self.spartn_topic, 0))
        if self.assist_now_topic:
            logging.info('Subscribing to %s', self.assist_now_topic)
            qos = 0 if self.assist_now_topic.endswith('/updates') else 1
            mqtt_client.subscribe((self.assist_now_topic, qos))
        logging.error('Failed to connect, return code %d', return_code)

def on_mqtt_disconnect(self, mqtt_client, userdata, return_code):
    '''Callback for MQTT disconnect'''
    print('working on disconnecting to mqtt')
    del mqtt_client, userdata  # unused
    self.connected = False
    if self.new_server:
        self.mqtt_server = self.new_server
        self.new_server = None
        logging.info('Connecting to %s', self.mqtt_server)
        self.mqtt_client.connect(self.mqtt_server, self.mqtt_port)
    if return_code != 0:
        logging.error('Unexpected MQTT disconnect')

def on_mqtt_message(self, mqtt_client, userdata, msg):
    '''Callback for handling MQTT messages.'''
    print('getting mqtt message')
    del userdata  # unused
    if msg.topic.startswith(f'/pp/{self.plan}/'):
        print('getting mqtt message -loop 1 - ',str(f'/pp/{self.plan}/'))
        # received regional SPARTN; send to receiver

    elif msg.topic.startswith('/pp/ubx/'):
        # received SPARTN key or AssistNow data; send to receiver
        print('getting mqtt message -loop 2')
        if msg.topic == '/pp/ubx/mga':
            self.assist_now_topic = '/pp/ubx/mga/updates'
            logging.info('Subscribing to %s', self.assist_now_topic)
            mqtt_client.subscribe((self.assist_now_topic, 0))
    elif msg.topic.startswith('pp/ip'):
        print('getting mqtt message -loop3')
        if msg.topic.endswith('/dict'):
            # localized SPARTN; send to receiver
        print('getting mqtt message -loop4')
        logging.warning('Unhandled topic %s', msg.topic)

def loop_forever(self):
    '''Main loop of the client.'''
    print('trying to loop')
    # avoid subscribing before fully connected (race in paho)
    while not self.connected:
        buffer = bytearray(100)
        while True:
            bytes_read = self.gnss.readinto(buffer)
            if bytes_read:
                if self.ubxfile:
                # parse the bytes and invoke matching handlers
        logging.info('Disconnecting from %s', self.mqtt_server)
        while self.connected:

def handle_nmea_gga(self, sentence):
    '''Process an NMEA-GGA sentence passed in as a string.'''
    fields = sentence.split(',')
    quality = int(fields[6] or 0)
    f_lat = float(fields[2] or 0)
    lat = int(f_lat / 100) + (f_lat % 100) / 60
    if fields[3] == 'S':
        lat *= -1
    f_lon = float(fields[4] or 0)
    lon = int(f_lon / 100) + (f_lon % 100) / 60
    if fields[5] == 'W':
        lon *= -1

    if self.stats:
        self.stats.epochs[quality] += 1
        self.stats.total += 1
        if self.stats.total % self.stats.interval == 0:
            pct = [f'{QUALITIES[i]}: {self.stats.epochs[i] / self.stats.total * 100:.1f}%'
                    for i in range(len(QUALITIES)) if self.stats.epochs[i]]
            logging.log(STATS, ', '.join(pct))

    if quality in (0, 6):  # no fix or estimated
        if not self.assist_now_topic:
            self.assist_now_topic = '/pp/ubx/mga'
            logging.info('Subscribing to %s', self.assist_now_topic)
            self.mqtt_client.subscribe((self.assist_now_topic, 1))
        if self.assist_now_topic and not self.assist_now:
            logging.info('Unsubscribing from %s', self.assist_now_topic)
            self.assist_now_topic = None
        self.process_position(lat, lon)

def process_position(self, lat, lon):
    '''Handle position from the receiver. If needed, subscribe to a new tile
       or topic.'''
    if self.localized:
        self.epoch_count += 1
        # Only record new position if it changed significantly since the last calculation
        if abs(lat - self.lat) > self.dlat_threshold or \
        abs(lon - self.lon) > self.dlon_threshold or \
        self.epoch_count > self.epochs:
            logging.debug('updating position: %f, %f', lat, lon)
            self.lat = lat
            self.lon = lon
            self.epoch_count = 0
            self.dlon_threshold = self.dlat_threshold * cos(radians(self.lat))
            new_tile_topic = self.get_tile_topic(self.lat, self.lon)
            if new_tile_topic != self.tile_topic:
                if self.tile_topic:
                logging.info('Subscribing to tile %s', new_tile_topic)
                self.mqtt_client.subscribe((new_tile_topic, 1))
                self.tile_topic = new_tile_topic
                # the incoming tile data will trigger node selection
        if not self.spartn_topic:
            logging.debug('updating position: %f, %f', lat, lon)
            self.lat = lat
            self.lon = lon
            # Fake tile dictionary for regional mode, allowing automatic
            # selection of the region
            self.tile_dict = { 'nodes': REGION_MAPPING.keys(),
                               'nodeprefix': f'/pp/{self.plan}/',
                               'endpoint': self.mqtt_server }

def select_node(self):
    '''Select the closest node to the current position.'''
    if not self.tile_dict:
        # not yet ready to select a node, as we don't have tile data
    # Rather than calculate distance in meters, calculate a value that grows with
    # the distance, since all we care about is finding the closest.
    # As an approximation, use the sum of the lat and lon difference
    # squared, but scale the latitude difference by cos(lon) to make it the
    # same scale as longitude.
    rounded_lat = round(self.lat * 100)
    rounded_lon = round(self.lon * 100)
    factor_lon = cos(radians(self.lat))
    min_dist_scaled = float('inf')
    for node in self.tile_dict['nodes']:
        node_signed = node.translate(self.NSEW_TO_SIGN)
        node_lat = int(node_signed[0:5])
        node_lon = int(node_signed[5:11])
        # longitude difference is proportional to distance along NS
        # latitude difference is proportional to distance along EW,
        # but scale by cos(lon) to make it the same scale as lon
        dist_scaled = (node_lat-rounded_lat)**2 + ((node_lon-rounded_lon)*factor_lon)**2
        if dist_scaled < min_dist_scaled:
            min_dist_scaled = dist_scaled
            nearest_node = node
    if self.localized:
        logging.debug('Nearest node: %s', nearest_node)
        # we are trying to determine a continental topic name
        # replace the node name with the region name
        nearest_node = REGION_MAPPING[nearest_node]
        logging.warning('Region "%s" automatically detected', nearest_node)
    if self.mqtt_server != self.tile_dict['endpoint']:
        # store new server and correction topic; on completion of the
        # disconnect below, the disconnect callback will initiate the new
        # connection and the connect callback will subscribe to the new
        # topic
        self.new_server = self.tile_dict['endpoint']
        self.spartn_topic = self.tile_dict['nodeprefix'] + nearest_node
        if self.spartn_topic:
        self.spartn_topic = self.tile_dict['nodeprefix'] + nearest_node
        logging.info('Subscribing to topic %s', self.spartn_topic)
        self.mqtt_client.subscribe((self.spartn_topic, 0))

def get_tile_topic(self, lat, lon):
    '''Get the MQTT topic for the tile containing the given position.'''
    delta = [10.0, 5.0, 2.5][self.tile_level]
    n_s = 'S' if lat < 0 else 'N'
    e_w = 'W' if lon < 0 else 'E'
    # Get the lower left corner of the tile in latitude and longitude
    llat = floor(lat / delta) * delta
    llon = floor(lon / delta) * delta
    # Shift to the center of the tile
    clat = llat + (delta / 2)
    clon = llon + (delta / 2)
    # Multiply by 100, round to the nearest integer, remove sign
    slat = abs(round(clat * 100))
    slon = abs(round(clon * 100))
    return f'pp/ip/L{self.tile_level}{n_s}{slat:04d}{e_w}{slon:05d}/dict'

def process_tile_data(self, data):
    '''Process MQTT tile data.'''
        self.tile_dict = json.loads(data)
    except json.JSONDecodeError:
        assert False, 'Invalid JSON data received for tile'

def load_json_credentials(args, argp):
‘’‘Load MQTT credentials from a u-center Config JSON file.’‘’

# verify that no conflicting arguments were given
if (args.client_id or args.dir != argp.get_default('dir') or
     args.server != argp.get_default('server') or
     args.lband != argp.get_default('lband')):
    argp.error('Cannot use -j/--json with -i/--client_id, -d/--dir, -s/--server, or --lband')

    with open(args.json, 'r', encoding='utf-8') as json_file:
        json_data = json.load(json_file)
        conn = json_data['MQTT']['Connectivity']
        args.client_id = conn['ClientID']
        server_uri = conn['ServerURI']
        creds = conn['ClientCredentials']
        key = creds['Key']
        cert = creds['Cert']
        args.lband = '/pp/ubx/0236/Lb' in json_data['MQTT']['Subscriptions']['Key']['KeyTopics']
except FileNotFoundError:
    argp.error(f'JSON file {args.json} not found')
except json.JSONDecodeError:
    argp.error(f'JSON file {args.json} is not valid JSON')
except KeyError as error:
    argp.error(f'JSON file {args.json} is missing key {error}')
except (TypeError, ValueError):
    argp.error(f'JSON file {args.json} is not valid')

# Parse the server URI
match = re.match(r'(tcp|ssl)://(.+):(\d+)', server_uri)
args.server = match.group(2)
assert match.group(1) == 'ssl'
assert match.group(3) == '8883'

# Write the credentials to temporary files, as needed by paho.mqtt.client
(keyf, args.keyfile) = tempfile.mkstemp(
                            prefix=f'device-{args.client_id}-', suffix='-pp-key.pem')
os.write(keyf, f'{KEY_HEADER}{key}{KEY_FOOTER}'.encode('ascii'))
(certf, args.certfile) = tempfile.mkstemp(
                            prefix=f'device-{args.client_id}-', suffix='-pp-cert.crt')
os.write(certf, f'{CERT_HEADER}{cert}{CERT_FOOTER}'.encode('ascii'))

def main():
‘’‘Main program.’‘’
argp = argparse.ArgumentParser()
argp.add_argument(‘-P’, ‘–port’, required=True,
help=‘Serial port[@baudrate] of u-blox GNSS receiver with SPARTN support’)
argp.add_argument(‘-j’, ‘–json’, type=str,
help=‘u-center JSON file containing MQTT credentials’)
argp.add_argument(‘–assistnow’, action=‘store_true’,
help=‘Use AssistNow regardless of GNSS receiver state’)

s_group = argp.add_mutually_exclusive_group()
s_group.add_argument('--region', default=None,
    help='Service region (e.g. us, eu), defaults to automatic detection')
s_group.add_argument('-l', '--localized', action='store_true',
    help='Use localized service')

o_group = argp.add_argument_group('Output options')
time_stamp = time.strftime("%Y%m%d_%H%M%S")
o_group.add_argument('-u', '--ubx', nargs='?', type=argparse.FileType('wb'),
    help='Write all GNSS receiver output to a UBX file')
o_group.add_argument('--log', nargs='?', type=argparse.FileType('w'),
    help='Write all program output to a text file in addition to stdout')
o_group.add_argument('--stats', type=int, nargs='?', const=5, default=None,
    help='Print statistics every N epochs (default: off, 5 if no argument given)')
o_group.add_argument('--trace', choices=('CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'),
                     help='Trace level: CRITICAL, ERROR, WARNING, INFO, DEBUG (default: INFO)')

cgroup = argp.add_argument_group('MQTT settings',
    description='These options apply only when NOT using -j/--json. '
                'Otherwise, the values are read from the JSON file.')
cgroup.add_argument('-i', '--client_id',
    help='The MQTT client ID to use')
cgroup.add_argument('-d', '--dir', default='.',
    help='Directory containing key and certificate files (default: .)')
cgroup.add_argument('-s', '--server', default='pp.services.u-blox.com',
    help='MQTT server address (default: pp.services.u-blox.com)')
cgroup.add_argument('--lband', action='store_true',
    help='Use MQTT topics suitable for devices on an Lband+IP combined plan')

lgroup = argp.add_argument_group('Localized options',
    description='These options apply only in combination with --localized')
lgroup.add_argument('--distance', default=10000, type=int,
    help='The distance threshold [m] for recalculating tile and node (default: 10000)')
lgroup.add_argument('--epochs', default=float('inf'), type=float,
    help='The maximum number of epochs between recalculating tile and node (default: infinite)')
lgroup.add_argument('-L', '--tile-level', type=int, choices=(0,1,2), default=2,
    help='Tile level for localized service (default: 2)')
args = argp.parse_args()

logging.basicConfig(level=getattr(logging, args.trace),
                    format='%(levelname)s %(message)s',
if args.log:
logging.info(' '.join(sys.argv))  # log the command line arguments
logging.addLevelName(STATS, 'STATS')

if not args.localized:
    if args.distance != argp.get_default('distance'):
        argp.error('--distance requires --localized')
    if args.epochs != argp.get_default('epochs'):
        argp.error('--epochs requires --localized')
    if args.tile_level != argp.get_default('tile_level'):
        argp.error('--tile-level requires --localized')

    if args.json:
        load_json_credentials(args, argp)
        if not args.client_id:
            argp.error('Either -j/--json or -i/--client_id must be specified')
        args.certfile = os.path.join(args.dir, f'device-{args.client_id}-pp-cert.crt')
        args.keyfile  = os.path.join(args.dir, f'device-{args.client_id}-pp-key.pem')
        if not os.path.exists(args.certfile):
            argp.error(f'Certificate file {args.certfile} does not exist')
        if not os.path.exists(args.keyfile):
            argp.error(f'Key file {args.keyfile} does not exist')

    mqtt_client = mqtt.Client(client_id=args.client_id)
    mqtt_client.tls_set(certfile=args.certfile, keyfile=args.keyfile)
    if args.json:
        # remove the temporary key/cert as early as possible
        # they do get loaded in tls_set() and are not used thereafter
        if 'certfile' in args and os.path.exists(args.certfile):
        if 'keyfile' in args and os.path.exists(args.keyfile):

if args.ubx:
    logging.info('Writing all receiver data to %s', args.ubx.name)

serial_params = args.port.split('@')  # split optional baudrate from port argument
if len(serial_params) == 2:
    (port, baud) = (serial_params[0], int(serial_params[1]))
    (port, baud) = (serial_params[0], 115200)
gnss = serial.Serial(port=port, baudrate=baud, timeout=0.1)

    pp_client = PointPerfectClient(gnss, mqtt_client, args.server, 8883,
                    localized=args.localized, lband=args.lband, region=args.region,
                    tile_level=args.tile_level, distance=args.distance, epochs=args.epochs,
                    ubxfile=args.ubx, stats=args.stats, assist_now=args.assistnow)
except KeyboardInterrupt:
    if args.ubx:

if name == ‘main’:

Hi @Abideen,

You can ingest any kind of GNSS data in our format GNSSData, as long as you provide latitude, longitude and altitude information, along with the position covariances. You can look into this part of our sample which shows how to build the data, which is used in the geotracking sample here.

In your case, you would need to retrieve the data from your program and use it as described above, or integrate the stereolabs code into your PP script to ingest this data.