import multiprocessing as mp
from fmcw import adc, ftdi
import time
import os
[docs]class fpga_reader(mp.Process):
"""
This class describes the object that will read the FPGA. By inheriting from Process, it can be launched as a
subprocess.
"""
def __init__(self, flag_reading_data, connection, s):
"""
Nothing too fancy here. Note that initially all the ftdi initialization was done here but the generated object
cannot be pickled. Therefore, I moved it to the run function.
:param connection: the end of a Pipe
:param s: settings dictionary
"""
mp.Process.__init__(self) # Calling super constructor - mandatory
self.connection = connection
self.s = s
# Send some basic data back to the parent
process_info = {
'pid': os.getpid()
}
self.connection.send(process_info)
self.flag_reading_data = flag_reading_data # Indicates to everyone that the FPGA is being read.
[docs] def run(self):
"""
Main routine polling the USB bus. It is not perfect, but >95% of the frames are valid with this configuration.
:return: void
"""
self.adf4158 = adc.ADF4158()
try:
self.fpga = ftdi.FPGA(self.adf4158, encoding=self.s['encoding'])
except Exception as e:
print("[ERROR]", e)
return
self.fpga.set_gpio(led=True, adf_ce=True)
self.fpga.set_adc(oe2=True)
self.fpga.clear_adc(oe1=True, shdn1=True, shdn2=True)
real_delay = self.fpga.set_sweep(self.s['f0'], self.s['bw'], self.s['t_sweep'], self.s['t_delay']) # Returned value delay not used
print("[INFO] Real delay between sweeps: {} s".format(real_delay))
self.fpga.set_downsampler(enable=self.s['down_sampler'], quarter=self.s['quarter'])
self.fpga.write_sweep_timer(self.s['t_sweep'])
self.fpga.write_sweep_delay(self.s['t_delay'])
self.fpga.write_decimate(self.s['acquisition_decimate'])
self.fpga.write_pa_off_timer(self.s['t_delay'] - self.s['pa_off_advance'])
self.fpga.clear_gpio(pa_off=True)
self.fpga.clear_buffer()
self.fpga.set_channels(a=self.s[1], b=self.s[2]) # Would not scale up
# Run while nothing is put to the pipe - it is an "almost" read only pipe
self.flag_reading_data.set() # To maximize accuracy, this is set just before entering the while loop
while True:
self.connection.send_bytes(self.fpga.device.read(self.s['byte_usb_read']//self.s['sub_call']))
[docs] def close(self):
self.flag_reading_data.clear() # First thing done after the terminate call from parent process
# Close properly the fpga device
self.fpga.set_adc(oe1=True, shdn1=True, shdn2=True)
self.fpga.set_channels(a=False, b=False)
self.fpga.clear_gpio(led=True, adf_ce=True)
self.fpga.set_gpio(pa_off=True)
self.fpga.clear_buffer()
self.fpga.close()
print("[INFO] FPGA subprocess successfully closed")
return