Source code for

import multiprocessing as mp
import time

[docs]class camera(mp.Process): """ This class describes the object that will read the camera as a separate subprocess. The start/stop is regulated by an Event object. """ def __init__(self, flag_camera_ready, flag_reading_data, s): """ Initialize a few flags to sync the camera with the rest of the processes. :param flag_camera_ready: :param flag_reading_data: :param s: """ mp.Process.__init__(self) # Calling super constructor - mandatory self.s = s self.flag_camera_ready = flag_camera_ready self.flag_reading_data = flag_reading_data # Read the state of the FPGA reading process self.frame_buffer = []
[docs] def run(self): """ Yes, importing all these modules from within a function is gettho, but I have not found a way around it yet. import cv2 will start displaying the feed with Qt5 (my default), which then appears to be "taken" for all the other plots being display. Therefore, the next backend is used for the other plots - TkAgg - and my package was not written to support something else than TkAgg. :return: """ #import cv2 # Gettho but I have not there seems to be conflicts with matplotlib if outside local scope. from cv2 import imshow, VideoCapture, flip, waitKey, destroyAllWindows, VideoWriter, VideoWriter_fourcc # Create the camera object self.capture_object = VideoCapture(self.s['camera_address']) ret, frame = imshow('frame', frame) # Display the first image actual_resolution = (frame.shape[1], frame.shape[0]) print("[INFO] Webcam frames are {} shape".format(frame.shape)) counter = 0 self.flag_camera_ready.set() self.t0 = time.perf_counter() while self.flag_camera_ready.is_set(): # Loop until parent sets the camera ready flag to False if self.flag_reading_data.is_set(): ret, frame = counter += 1 #print(counter) frame = flip(frame, 1) # Vertical flip if ret == True: self.frame_buffer.append(frame) # Put the frame in a buffer imshow('frame', frame) # For fun, not required if that takes too much resources if waitKey(1) & 0xFF == ord('q'): break else: raise ValueError('[ERROR] Problem here, look at it.') else: #print('[INFO] Camera is not ready') pass self.t1 = time.perf_counter() # Define the codec and create VideoWriter object print("[INFO] Recorded video for {} s".format(self.t1 - self.t0)) fourcc = VideoWriter_fourcc(*'MPEG') actual_fps = len(self.frame_buffer) / (self.t1 - self.t0) # Cannot be controlled self.out = VideoWriter(self.s['path_video'], fourcc, actual_fps, actual_resolution) for frame in self.frame_buffer: self.out.write(frame) self.capture_object.release() self.out.release() destroyAllWindows() print("[INFO] Camera process is now terminating.")