Source code for fmcw.ftdi

import pylibftdi as ftdi
from queue import Queue, Empty
from threading import Thread
import datetime

[docs]class FPGA(): """ Creates the FTDI object that handles the communication with the FPGA. """ def __init__(self, ADC, encoding='latin1'): """Set up the FTDI object that represents the FPGA. :param ADC: ADC object (see ADC module) :param encoding: Encoding of the text data coming from the FTDI object """ SYNCFF = 0x40 SIO_RTS_CTS_HS = (0x1 << 8) #self.device = ftdi.Device(mode='t', interface_select=ftdi.INTERFACE_A, encoding=encoding) self.device = ftdi.Device(mode='b', interface_select=ftdi.INTERFACE_A, encoding=encoding) #self.device.open() # Not needed if not lazy open self.device.ftdi_fn.ftdi_set_bitmode(0xff, SYNCFF) self.device.ftdi_fn.ftdi_read_data_set_chunksize(0x10000) self.device.ftdi_fn.ftdi_write_data_set_chunksize(0x10000) self.device.ftdi_fn.ftdi_setflowctrl(SIO_RTS_CTS_HS) self.device.flush() print("[INFO] FTDI baudrate:", self.device.baudrate) self.pll = ADC # ADC and PLL are on the same clock self.fclk = 40e6 # [Hz] Clock frequency self.fpd_freq = self.fclk/2
[docs] def close(self): """ Close the FTDI object. :return: """ self.device.close() return
[docs] def send_packet(self, x, cmd): """ Add a header to a packet, encode it and write to FTDI. :param x: data :param cmd: type of packet :return: write to FTDI """ try: l = len(x) except TypeError: l = 1 x = [x] header = [0xaa, l, cmd] b = bytearray(header+x) #print map(hex, map(ord, str(b))) return self.device.write(b)
[docs] def clear_gpio(self, led=False, pa_off=False, mix_enbl=False, adf_ce=False): """ Create a packet signaling the GPIO pins to clear on the FPGA. :param led: Clear the LED :param pa_off: Clear pa_off :param mix_enbl: Disable mixer :param adf_ce: Clear the adf_ce :return: Packet to be encapsulated """ w = 0 if led: w |= 1 << 0 if pa_off: w |= 1 << 1 if mix_enbl: w |= 1 << 2 if adf_ce: w |= 1 << 3 return self.send_packet(w, 0)
[docs] def set_gpio(self, led=False, pa_off=False, mix_enbl=False, adf_ce=False): """ Create a packet signaling the GPIO pins to set on the FPGA. :param led: Set the LED :param pa_off: Set the pa_off :param mix_enbl: Enable the mixer :param adf_ce: Set the adf_ce :return: Packet to be encapsulated """ w = 0 if led: w |= 1 << 0 if pa_off: w |= 1 << 1 if mix_enbl: w |= 1 << 2 if adf_ce: w |= 1 << 3 return self.send_packet(w, 1)
[docs] def clear_adc(self, oe1=False, oe2=False, shdn1=False, shdn2=False): """ Create a packet signaling the ADC pins to clear on the FPGA. :param oe1: Clear Output Enable 1 :param oe2: Clear Output Enable 2 :param shdn1: Clear Shutdown 1 :param shdn2: Clear Shutdown 2 :return: Packet to be encapsulated """ w = 0 if oe1: w |= 1 << 0 if oe2: w |= 1 << 1 if shdn1: w |= 1 << 2 if shdn2: w |= 1 << 3 return self.send_packet(w, 2)
[docs] def set_adc(self, oe1=False, oe2=False, shdn1=False, shdn2=False): """ Create a packet signaling the ADC pins to set on the FPGA. :param oe1: Set Output Enable 1 :param oe2: Set Output Enable 2 :param shdn1: Set Shutdown 1 :param shdn2: Set Shutdown 2 :return: Packet to be encapsulated """ w = 0 if oe1: w |= 1 << 0 if oe2: w |= 1 << 1 if shdn1: w |= 1 << 2 if shdn2: w |= 1 << 3 return self.send_packet(w, 3)
[docs] def write_pll_reg(self, n): """ Create a packet to configure a PLL register. :param n: Configuration parameter :return: Packet to be encapsulated """ reg = self.pll.registers[n] w = [(reg & 0xff) | n, (reg >> 8) & 0xff, (reg >> 16) & 0xff, (reg >> 24) & 0xff] return self.send_packet(w, 4)
[docs] def write_pll(self): """ Call for the configuration of all the registers. :return: void """ self.write_pll_reg(7) self.pll.write_value(step_sel=0) self.write_pll_reg(6) self.pll.write_value(step_sel=1) self.write_pll_reg(6) self.pll.write_value(dev_sel=0) self.write_pll_reg(5) self.pll.write_value(dev_sel=1) self.write_pll_reg(5) for i in range(4,-1,-1): self.write_pll_reg(i)
[docs] def set_sweep(self, fstart, bw, length, delay): """ Set sweep parameters. :param fstart: [Hz] Start frequency of the chirp :param bw: [Hz] Bandwidth to use :param length: [s] Duration of the sweep :param delay: [s] Delay between two sweeps :return: real delay between two sweeps (just informational) """ real_delay = self.pll.freq_to_regs(fstart, self.fpd_freq, bw, length, delay) # Configure the PLL self.write_pll() return real_delay
[docs] def write_sweep_timer(self, length): """ Convert the duration of the sweep to a number of ADC clock cycles. :param length: number of clock cycles that represent the duration of a sweep :return: Packet to be encapsulated """ length = int(self.fclk*length) w = [(length & 0xff), (length >> 8) & 0xff, (length >> 16) & 0xff, (length >> 24) & 0xff] return self.send_packet(w, 5)
[docs] def write_sweep_delay(self, length): """ Convert the duration between two sweeps (sweep delay) to a number of ADC clock cycles. :param length: number of clock cycles that represent the duration between two sweeps :return: Packet to be encapsulated """ length = int(self.fclk*length) w = [(length & 0xff), (length >> 8) & 0xff, (length >> 16) & 0xff, (length >> 24) & 0xff] return self.send_packet(w, 6)
[docs] def set_channels(self, a=True, b=True): """WARNING: ONLY 2 CHANNELS SUPPORTED Set the channels to be activated (only two supported). :param a: State of channel a :param b: State of channel b :return: Packet to be encapsulated """ w = 0 if a: w |= 1 << 0 if b: w |= 1 << 1 return self.send_packet(w, 7)
[docs] def set_downsampler(self, enable=True, quarter=False): """WARNING: THE FPGA CODE REQUIRES IT TO BE ENABLED. TO DO: ALLOW THE USER TO DEACTIVATE IT Set the downsampler. :param enable: Turn it on :param quarter: Divide the sampling rate by another factor of 2 :return: Packet to be encapsulated """ w = 0 if enable: w |= 1 << 0 if quarter: w |= 1 << 1 return self.send_packet(w, 8)
[docs] def write_decimate(self, decimate): """ Create a packet to configure the decimation factor at the FPGA level. :param decimate: Number of sweeps to skip. 0 means no sweeps are skipped. :return: Packet to be encapsulated """ """[Old Henrik] Used to be 1 based if decimate > 2**16-1 or decimate < 1: raise ValueError("Invalid decimate value") decimate = int(decimate) - 1 """ # Sanity checks if type(decimate) != int: raise TypeError('decimate needs to be an int') if decimate > 2**16-1 or decimate < 0: raise ValueError("Invalid decimate value: {} is not between {} and {}".format(decimate, 0, 2**16-1)) w = [decimate & 0xff, (decimate >> 8) & 0xff] return self.send_packet(w, 9)
[docs] def write_pa_off_timer(self, length): """ Convert the duration pa_off_timer to a number of ADC clock cycles. :param length: number of clock cycles that represent the pa_off_timer :return: Packet to be encapsulated """ length = int(self.fclk*length) w = [(length & 0xff), (length >> 8) & 0xff, (length >> 16) & 0xff, (length >> 24) & 0xff] return self.send_packet(w, 10)
[docs] def clear_buffer(self): """ Clear some buffer. :return: Packet to be encapsulated """ return self.send_packet(0, 11)
[docs]class Writer(Thread): """DEPRECATED Legacy Writer thread used to write to the binary log file. """ def __init__(self, filename, queue, encoding='latin1', timeout=0.5): Thread.__init__(self) self.queue = queue self.f = open(filename, 'w', encoding=encoding) self.timeout = timeout print("[INFO] Opened {} for writing".format(filename))
[docs] def run(self): wrote = 0 freq = 1 time_tracker = 0 while True: try: d = self.queue.get(True, self.timeout) except Empty: print('[ERROR] Timeout after {} s without data | Wrote {} byte'.format(self.timeout, wrote)) self.f.close() return if len(d) == 0: print('\n[INFO] Done after writing {:,} byte'.format(wrote)) self.f.close() return else: self.f.write(d) #self.f.write("\n") wrote += len(d)
#print("[INFO] Written {:,} byte to file".format(wrote))