python .\pythonshort.py
[1] Calib loaded: [34348310, 35835486]
[2] Opening ZED 34348310…
[2025-05-14 18:59:00 UTC][ZED][INFO] Logging level INFO
[2025-05-14 18:59:02 UTC][ZED][INFO] [Init] Depth mode: NEURAL
[2025-05-14 18:59:03 UTC][ZED][INFO] [Init] Camera successfully opened.
[2025-05-14 18:59:03 UTC][ZED][INFO] [Init] Camera FW version: 1523
[2025-05-14 18:59:03 UTC][ZED][INFO] [Init] Video mode: HD720@15
[2025-05-14 18:59:03 UTC][ZED][INFO] [Init] Serial Number: S/N 34348310
[2025-05-14 18:59:03 UTC][ZED][WARNING] [Init] Self-calibration skipped. Scene may be occluded or lack texture. (Error code: 0x01)
[2] Opening ZED 35835486…
[2025-05-14 18:59:07 UTC][ZED][INFO] Logging level INFO
[2025-05-14 18:59:08 UTC][ZED][INFO] [Init] Depth mode: NEURAL
[2025-05-14 18:59:09 UTC][ZED][INFO] [Init] Camera successfully opened.
[2025-05-14 18:59:09 UTC][ZED][INFO] [Init] Camera FW version: 1523
[2025-05-14 18:59:09 UTC][ZED][INFO] [Init] Video mode: HD720@15
[2025-05-14 18:59:09 UTC][ZED][INFO] [Init] Serial Number: S/N 35835486
[2025-05-14 18:59:09 UTC][ZED][WARNING] [Init] Self-calibration skipped. Scene may be occluded or lack texture. (Error code: 0x01)
[3] All cameras publishing
[4] Subscribing ZED 34348310…
[4] Subscribing ZED 35835486…
[5] Fusion subscribed
[6] Entering main loop. Press q or ESC to quit.
That is the output.
I’m trying to run a minimal Python fusion demo with two ZED 2 cameras (via the Python SDK . I have a working fusion_calibration.json
(poses loaded correctly) and when I run my script I see the output gave you first.
import json
import cv2
import numpy as np
import pyzed.sl as sl
CALIB_FILE = "fusion_calibration.json" # point to your file
DISPLAY_W = 1280
DISPLAY_H = 480
# ——— 1) Load calibration ———
with open(CALIB_FILE, "r") as f:
calib = json.load(f)
serials = [int(s) for s in calib.keys()]
print("[1] Calib loaded:", serials)
# ——— 2) Open & publish each camera ———
cams = {}
for sn in serials:
print(f"[2] Opening ZED {sn}…")
inp = sl.InputType(); inp.set_from_serial_number(sn)
ip = sl.InitParameters(input_t=inp,
camera_resolution=sl.RESOLUTION.HD720,
camera_fps=15,
depth_mode=sl.DEPTH_MODE.NEURAL,
coordinate_units=sl.UNIT.METER,
coordinate_system=sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP)
cam = sl.Camera()
if cam.open(ip) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot open ZED {sn}")
tp = sl.PositionalTrackingParameters()
tp.mode = sl.POSITIONAL_TRACKING_MODE.GEN_2
if cam.enable_positional_tracking(tp) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot enable tracking on {sn}")
pub = sl.CommunicationParameters()
# prefer shared memory
if hasattr(pub, "set_for_shared_memory"):
pub.set_for_shared_memory()
else:
pub.communication_type = sl.COMM_TYPE.SHARED_MEMORY
if cam.start_publishing(pub) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot start_publishing on {sn}")
cams[sn] = cam
print("[3] All cameras publishing")
# ——— 3) Fusion init & subscribe ———
fusion = sl.Fusion()
fp = sl.InitFusionParameters()
fp.coordinate_units = sl.UNIT.METER
fp.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP
if fusion.init(fp) != sl.FUSION_ERROR_CODE.SUCCESS:
raise RuntimeError("Fusion init failed")
for sn in serials:
print(f"[4] Subscribing ZED {sn}…")
cid = sl.CameraIdentifier(sn)
w = calib[str(sn)]["world"]
pose = sl.Transform()
pose.set_euler_angles(*w["rotation"], radian=True)
t = sl.Translation(); t.init_vector(*w["translation"])
pose.set_translation(t)
sub = sl.CommunicationParameters()
if hasattr(sub, "set_for_shared_memory"):
sub.set_for_shared_memory()
else:
sub.communication_type = sl.COMM_TYPE.SHARED_MEMORY
if fusion.subscribe(cid, sub, pose) != sl.FUSION_ERROR_CODE.SUCCESS:
raise RuntimeError(f"Fusion.subscribe failed for {sn}")
print("[5] Fusion subscribed")
# ——— 4) Grab–Fuse–Display loop ———
od_rt = sl.ObjectDetectionRuntimeParameters()
objects = sl.Objects()
cv2.namedWindow("ZED Fusion", cv2.WINDOW_NORMAL)
cv2.resizeWindow("ZED Fusion", DISPLAY_W, DISPLAY_H)
print("[6] Entering main loop. Press q or ESC to quit.")
while True:
# 4a) grab & tile images
tiles = []
for sn, cam in cams.items():
if cam.grab() == sl.ERROR_CODE.SUCCESS:
m = sl.Mat()
cam.retrieve_image(m, sl.VIEW.LEFT)
img = cv2.cvtColor(m.get_data()[:,:,:3], cv2.COLOR_BGRA2BGR)
tiles.append(cv2.resize(img, (DISPLAY_W//2, DISPLAY_H)))
else:
# if grab fails, show black
tiles.append(np.zeros((DISPLAY_H, DISPLAY_W//2, 3), np.uint8))
canvas = np.hstack(tiles)
# 4b) run Fusion
if fusion.process() == sl.FUSION_ERROR_CODE.SUCCESS:
if fusion.retrieve_objects(objects, od_rt) == sl.FUSION_ERROR_CODE.SUCCESS:
for obj in objects.object_list:
x,y,z = obj.position.get()
cv2.putText(canvas,
f"ID{obj.id}:{x:.2f},{y:.2f},{z:.2f}",
(10, 30 + 20*obj.id),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
# 4c) display
cv2.imshow("ZED Fusion", canvas)
key = cv2.waitKey(1) & 0xFF
if key in (27, ord('q')):
print("[7] Quit key pressed")
break
# ——— 5) Cleanup ———
print("[8] Cleaning up…")
try:
# First unsubscribe cameras from fusion
for sn in serials:
cid = sl.CameraIdentifier(sn)
fusion.unsubscribe(cid)
print("[8.1] Unsubscribed cameras from fusion")
# Then close fusion
fusion.close()
print("[8.2] Closed fusion module")
# Finally clean up cameras
for sn, cam in cams.items():
try:
print(f"[8.3] Stopping publishing for camera {sn}")
cam.stop_publishing()
print(f"[8.4] Closing camera {sn}")
cam.close()
except Exception as e:
print(f"Error closing camera {sn}: {e}")
# Small delay to ensure all resources are released
import time
time.sleep(1)
except Exception as e:
print(f"Error during cleanup: {e}")
cv2.destroyAllWindows()
print("[9] Done.")
Problem: As soon as I hit [6] Entering loop…, the window flashes open and the script terminates immediately (no [7] Quit key pressed logged).
Has anyone seen this behavior?
If im not subscribing the cameras to FUSION API it won’t flash. Also have a code with hundred lines of code, this is a simpler one to demonstrate it is not working.
Is there an extra callback or event I need to pump after subscribing?
Or does the Python wrapper require a different loop structure?
another script, but it generates the window but it says failed to get frames:
import json
import time
import cv2
import numpy as np
import pyzed.sl as sl
CALIB_FILE = "fusion_calibration.json"
DISPLAY_W, DISPLAY_H = 1280, 480
FPS = 30
TIMEOUT_MS = 5000 # 5s timeout pentru sincronizare
# 1) Încarcă calibrarea
with open(CALIB_FILE, "r") as f:
calib = json.load(f)
serials = [int(s) for s in calib.keys()]
print("[1] Calib loaded:", serials)
# 2) Deschide și publică camerele
cams = {}
for sn in serials:
print(f"[2] Opening ZED {sn}…")
input_type = sl.InputType()
input_type.set_from_serial_number(sn)
init_params = sl.InitParameters()
init_params.input = input_type
init_params.camera_resolution = sl.RESOLUTION.HD720
init_params.camera_fps = FPS
init_params.depth_mode = sl.DEPTH_MODE.NEURAL
init_params.coordinate_units = sl.UNIT.METER
init_params.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP
cam = sl.Camera()
if cam.open(init_params) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot open ZED {sn}")
tp = sl.PositionalTrackingParameters()
tp.set_as_static = True
if cam.enable_positional_tracking(tp) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot enable tracking on {sn}")
pub = sl.CommunicationParameters()
pub.set_for_shared_memory()
if cam.start_publishing(pub) != sl.ERROR_CODE.SUCCESS:
raise RuntimeError(f"Cannot start_publishing on {sn}")
cams[sn] = cam
print("[3] All cameras publishing")
# 3) Init Fusion & subscribe
fusion = sl.Fusion()
fp = sl.InitFusionParameters()
fp.coordinate_units = sl.UNIT.METER
fp.coordinate_system = sl.COORDINATE_SYSTEM.RIGHT_HANDED_Y_UP
print(f"[3] Fusion timeout set to {TIMEOUT_MS} ms")
if fusion.init(fp) != sl.FUSION_ERROR_CODE.SUCCESS:
raise RuntimeError("Fusion init failed")
for sn in serials:
print(f"[4] Subscribing ZED {sn}…")
cid = sl.CameraIdentifier()
cid.serial_number = sn
# construiește transformarea din fișier
w = calib[str(sn)]["world"]
pose = sl.Transform()
pose.set_euler_angles(w["rotation"][0], w["rotation"][1], w["rotation"][2], radian=True)
t = sl.Translation()
t.init_vector(w["translation"][0], w["translation"][1], w["translation"][2])
pose.set_translation(t)
sub = sl.CommunicationParameters()
sub.set_for_shared_memory()
if fusion.subscribe(cid, sub, pose) != sl.FUSION_ERROR_CODE.SUCCESS:
raise RuntimeError(f"Fusion.subscribe failed for {sn}")
print(f" -> Subscribed camera {sn}")
print("[5] Fusion subscribed successfully")
# Warm-up loop: așteaptă datele inițiale
print("[5] Warming up Fusion…")
start = time.time()
while time.time() - start < 2.0: # 2 secunde warm-up
if fusion.process() == sl.FUSION_ERROR_CODE.SUCCESS:
break
print("[5] Warm-up done, entering main loop")
# 4) Loop afișare
image_fused = sl.Mat()
objects = sl.Objects()
od_rt = sl.RuntimeParameters()
cv2.namedWindow("ZED Fusion", cv2.WINDOW_NORMAL)
cv2.resizeWindow("ZED Fusion", DISPLAY_W, DISPLAY_H)
print("[6] Entering main loop. Press 'q' or ESC to quit.")
failure_count = 0
while True:
code = fusion.process()
if code == sl.FUSION_ERROR_CODE.SUCCESS:
failure_count = 0
# preia imaginea
if fusion.retrieve_image(image_fused, sl.VIEW.LEFT) == sl.FUSION_ERROR_CODE.SUCCESS:
frame = image_fused.get_data()
canvas = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_BGRA2BGR)
else:
canvas = np.zeros((DISPLAY_H, DISPLAY_W, 3), np.uint8)
cv2.putText(canvas, "Image retrieve failed", (50,50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
# object detection (opțional)
if fusion.retrieve_objects(objects, od_rt) == sl.FUSION_ERROR_CODE.SUCCESS:
for obj in objects.object_list:
x,y,z = obj.position.get()
cv2.putText(canvas, f"ID{obj.id}: {x:.2f},{y:.2f},{z:.2f}",
(10, 30+20*obj.id),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
else:
failure_count += 1
canvas = np.zeros((DISPLAY_H, DISPLAY_W, 3), np.uint8)
cv2.putText(canvas, "Fusion failed", (50,50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
time.sleep(0.01)
cv2.imshow("ZED Fusion", canvas)
if cv2.waitKey(1) & 0xFF in (27, ord('q')):
break
# 5) Cleanup
print("[7] Cleaning up…")
for sn, cam in cams.items():
fusion.unsubscribe(sl.CameraIdentifier(sn))
fusion.close()
for cam in cams.values():
cam.stop_publishing()
cam.close()
cv2.destroyAllWindows()
print("[8] Done.")