Local streaming

I would like to inquire whether the Camera Zed 2i supports multiprocessing, your assistance on this matter would be greatly appreciated.

All ZED Cameras does support multiprocessing.

Now I think you are specifically talking about the multiprocessing python’s module. This one has known issues with ZED X that you will not have with other multiprocessing libraries. I’m not sure about ZED 2i, but I will test.

Is there any news ?
Because I tried using multiprocessing with my code “local streaming using specific ip and port”, is open connection and print the first statement, stuck in cam.open(init) and terminate the connection
If you can assure the zed 2i support multiprocessing, I will do my best to solve the code problem, but I don’t want to waste my time if it’s not supported

Hi,

In my tests the multiprocessing python module does not work with our SDK. If you want to do several zeds in parallel in the same program, you should use the threading module, or use C/C++/C#.

The code I tested:

 
########################################################################
#
# Copyright (c) 2023, STEREOLABS.
#
# All rights reserved.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
########################################################################

import pyzed.sl as sl
import threading
import multiprocessing
import time
import cv2

run_zeds = True
zeds = []

# Streams' loop to grab image
def stream_loop(zed : sl.Camera):
    global run_zeds

    zed_image = sl.Mat(1280, 720, sl.MAT_TYPE.U8_C4)

    while run_zeds:
        # grab current image
        if zed.grab() == sl.ERROR_CODE.SUCCESS:
            zed.retrieve_image(zed_image, sl.VIEW.LEFT, sl.MEM.CPU, zed_image.get_resolution())
            im = zed_image.get_data()
            cv2.imshow("img", im)
            cv2.waitKey(1)
        else:
            run_zeds = False

def main():
    global run_zeds

    # Get detected cameras
    devList = sl.Camera.get_device_list()
    nb_detected_zed = len(devList)
    
    if nb_detected_zed == 0:
        print("No ZED Detected, exit program")
        exit(1)
    
    print(nb_detected_zed, "ZED detected")

    init_params = sl.InitParameters()
    init_params.camera_resolution = sl.RESOLUTION.HD2K
    init_params.camera_fps = 30
    init_params.depth_mode = sl.DEPTH_MODE.NONE
    
    # Open every detected cameras
    for i in range(nb_detected_zed):
        init_params.set_from_camera_id(i)
        zed = sl.Camera()
        zeds.append(zed)
        err = zed.open(init_params)

        if err == sl.ERROR_CODE.SUCCESS:
            cam_info = zed.get_camera_information()
            print("serial number:", cam_info.serial_number, ", model:", cam_info.camera_model, ", status: opened")
        else:
            print("Error on camera", i, ":", err )
            zed.close()
    
    # Thread loops for all streams
    thread_pool = {}
    for zed in zeds:
        if zed.is_opened():
            print("Starting a thread for zed", i, zed.get_camera_information().serial_number)
            thread_pool[zed] = multiprocessing.Process(target=stream_loop, args=(zed,))
            # thread_pool[zed] = threading.Thread(target=stream_loop, args=(zed,))
            thread_pool[zed].start()
    
    # Idle loop
    while run_zeds:
        time.sleep(1)
    
    # Wait for every thread to be stopped
    for zed in zeds:
        if zed.is_opened():
            thread_pool[zed].join()
            zed.close()

    return


if __name__ == "__main__":
    main()