import multiprocessing as mp
import time
[docs]class camera(mp.Process):
"""
This class describes the object that will read the camera as a separate subprocess. The start/stop is regulated by an Event object.
"""
def __init__(self, flag_camera_ready, flag_reading_data, s):
"""
Initialize a few flags to sync the camera with the rest of the processes.
:param flag_camera_ready:
:param flag_reading_data:
:param s:
"""
mp.Process.__init__(self) # Calling super constructor - mandatory
self.s = s
self.flag_camera_ready = flag_camera_ready
self.flag_reading_data = flag_reading_data # Read the state of the FPGA reading process
self.frame_buffer = []
[docs] def run(self):
"""
Yes, importing all these modules from within a function is gettho, but I have not found a way around it yet.
import cv2 will start displaying the feed with Qt5 (my default), which then appears to be "taken" for all the
other plots being display. Therefore, the next backend is used for the other plots - TkAgg - and my package
was not written to support something else than TkAgg.
:return:
"""
#import cv2 # Gettho but I have not there seems to be conflicts with matplotlib if outside local scope.
from cv2 import imshow, VideoCapture, flip, waitKey, destroyAllWindows, VideoWriter, VideoWriter_fourcc
# Create the camera object
self.capture_object = VideoCapture(self.s['camera_address'])
ret, frame = self.capture_object.read()
imshow('frame', frame) # Display the first image
actual_resolution = (frame.shape[1], frame.shape[0])
print("[INFO] Webcam frames are {} shape".format(frame.shape))
counter = 0
self.flag_camera_ready.set()
self.t0 = time.perf_counter()
while self.flag_camera_ready.is_set(): # Loop until parent sets the camera ready flag to False
if self.flag_reading_data.is_set():
ret, frame = self.capture_object.read()
counter += 1
#print(counter)
frame = flip(frame, 1) # Vertical flip
if ret == True:
self.frame_buffer.append(frame) # Put the frame in a buffer
imshow('frame', frame) # For fun, not required if that takes too much resources
if waitKey(1) & 0xFF == ord('q'):
break
else:
raise ValueError('[ERROR] Problem here, look at it.')
else:
#print('[INFO] Camera is not ready')
pass
self.t1 = time.perf_counter()
# Define the codec and create VideoWriter object
print("[INFO] Recorded video for {} s".format(self.t1 - self.t0))
fourcc = VideoWriter_fourcc(*'MPEG')
actual_fps = len(self.frame_buffer) / (self.t1 - self.t0) # Cannot be controlled
self.out = VideoWriter(self.s['path_video'], fourcc, actual_fps, actual_resolution)
for frame in self.frame_buffer:
self.out.write(frame)
self.capture_object.release()
self.out.release()
destroyAllWindows()
print("[INFO] Camera process is now terminating.")