Source code for sk_dsp_comm.rtlsdr_helper

"""
Support functions for the RTL-SDR using pyrtlsdr

Copyright (c) July 2017, Mark Wickert
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
   list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
   this list of conditions and the following disclaimer in the documentation
   and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of the FreeBSD Project.
"""
import warnings
try:
    import rtlsdr
except ImportError:
    warnings.warn("Please install the helpers extras for full functionality", ImportWarning)
from . import sigsys as ss
from . import digitalcom as dc
import numpy as np
import scipy.signal as signal

def capture(Tc,fo=88.7e6,fs=2.4e6,gain=40,device_index=0):
    # Setup SDR
    sdr = rtlsdr.RtlSdr(device_index) #create a RtlSdr object
    #sdr.get_tuner_type()
    sdr.sample_rate = fs
    sdr.center_freq = fo
    #sdr.gain = 'auto'
    sdr.gain = gain
    # Capture samples
    Nc = np.ceil(Tc*fs)
    x = sdr.read_samples(Nc)
    sdr.close()
    return x

[docs]def discrim(x): """ function disdata = discrim(x) where x is an angle modulated signal in complex baseband form. Mark Wickert """ X=np.real(x) # X is the real part of the received signal Y=np.imag(x) # Y is the imaginary part of the received signal b=np.array([1, -1]) # filter coefficients for discrete derivative a=np.array([1, 0]) # filter coefficients for discrete derivative derY=signal.lfilter(b,a,Y) # derivative of Y, derX=signal.lfilter(b,a,X) # " X, disdata=(X*derY-Y*derX)/(X**2+Y**2) return disdata
[docs]def mono_FM(x,fs=2.4e6,file_name='test.wav'): """ Decimate complex baseband input by 10 Design 1st decimation lowpass filter (f_c = 200 KHz) """ b = signal.firwin(64,2*200e3/float(fs)) # Filter and decimate (should be polyphase) y = signal.lfilter(b,1,x) z = ss.downsample(y,10) # Apply complex baseband discriminator z_bb = discrim(z) # Design 2nd decimation lowpass filter (fc = 12 KHz) bb = signal.firwin(64,2*12e3/(float(fs)/10)) # Filter and decimate zz_bb = signal.lfilter(bb,1,z_bb) # Decimate by 5 z_out = ss.downsample(zz_bb,5) # Save to wave file ss.to_wav(file_name, 48000, z_out/2) print('Done!') return z_bb, z_out
[docs]def stereo_FM(x,fs=2.4e6,file_name='test.wav'): """ Stereo demod from complex baseband at sampling rate fs. Assume fs is 2400 ksps Mark Wickert July 2017 """ N1 = 10 b = signal.firwin(64,2*200e3/float(fs)) # Filter and decimate (should be polyphase) y = signal.lfilter(b,1,x) z = ss.downsample(y,N1) # Apply complex baseband discriminator z_bb = discrim(z) # Work with the (3) stereo multiplex signals: # Begin by designing a lowpass filter for L+R and DSP demoded (L-R) # (fc = 12 KHz) b12 = signal.firwin(128,2*12e3/(float(fs)/N1)) # The L + R term is at baseband, we just lowpass filter to remove # other terms above 12 kHz. y_lpr = signal.lfilter(b12,1,z_bb) b19 = signal.firwin(128,2*1e3*np.array([19-5,19+5])/(float(fs)/N1), pass_zero=False); z_bb19 = signal.lfilter(b19,1,z_bb) # Lock PLL to 19 kHz pilot # A type 2 loop with bandwidth Bn = 10 Hz and damping zeta = 0.707 # The VCO quiescent frequency is set to 19000 Hz. theta, phi_error = pilot_PLL(z_bb19,19000,fs/N1,2,10,0.707) # Coherently demodulate the L - R subcarrier at 38 kHz. # theta is the PLL output phase at 19 kHz, so to double multiply # by 2 and wrap with cos() or sin(). # First bandpass filter b38 = signal.firwin(128,2*1e3*np.array([38-5,38+5])/(float(fs)/N1), pass_zero=False); x_lmr = signal.lfilter(b38,1,z_bb) # Coherently demodulate using the PLL output phase x_lmr = 2*np.sqrt(2)*np.cos(2*theta)*x_lmr # Lowpass at 12 kHz to recover the desired DSB demod term y_lmr = signal.lfilter(b12,1,x_lmr) # Matrix the y_lmr and y_lpr for form right and left channels: y_left = y_lpr + y_lmr y_right = y_lpr - y_lmr # Decimate by N2 (nominally 5) N2 = 5 fs2 = float(fs)/(N1*N2) # (nominally 48 ksps) y_left_DN2 = ss.downsample(y_left,N2) y_right_DN2 = ss.downsample(y_right,N2) # Deemphasize with 75 us time constant to 'undo' the preemphasis # applied at the transmitter in broadcast FM. # A 1-pole digital lowpass works well here. a_de = np.exp(-2.1*1e3*2*np.pi/fs2) z_left = signal.lfilter([1-a_de],[1, -a_de],y_left_DN2) z_right = signal.lfilter([1-a_de],[1, -a_de],y_right_DN2) # Place left and righ channels as side-by-side columns in a 2D array z_out = np.hstack((np.array([z_left]).T,(np.array([z_right]).T))) ss.to_wav(file_name, 48000, z_out/2) print('Done!') #return z_bb, z_out return z_bb, theta, y_lpr, y_lmr, z_out
def wide_PSD(Tc,f_start,f_stop,fs = 2.4e6,rho = 0.6): K = np.ceil((f_stop - f_start)/(rho*fs)) K = int(K) print(K) fo = np.zeros(K) for i in range(K): if i == 0: fo[i] = f_start + rho*fs/2. else: fo[i] = fo[i-1] + rho*fs return fo
[docs]def pilot_PLL(xr,fq,fs,loop_type,Bn,zeta): """ theta, phi_error = pilot_PLL(xr,fq,fs,loop_type,Bn,zeta) Mark Wickert, April 2014 """ T = 1/float(fs) # Set the VCO gain in Hz/V Kv = 1.0 # Design a lowpass filter to remove the double freq term Norder = 5 b_lp,a_lp = signal.butter(Norder,2*(fq/2.)/float(fs)) fstate = np.zeros(Norder) # LPF state vector Kv = 2*np.pi*Kv # convert Kv in Hz/v to rad/s/v if loop_type == 1: # First-order loop parameters fn = Bn Kt = 2*np.pi*fn # loop natural frequency in rad/s elif loop_type == 2: # Second-order loop parameters fn = 1/(2*np.pi)*2*Bn/(zeta + 1/(4*zeta)) # given Bn in Hz Kt = 4*np.pi*zeta*fn # loop natural frequency in rad/s a = np.pi*fn/zeta else: print('Loop type must be 1 or 2') # Initialize integration approximation filters filt_in_last = 0 filt_out_last = 0 vco_in_last = 0 vco_out = 0 vco_out_last = 0 # Initialize working and final output vectors n = np.arange(0,len(xr)) theta = np.zeros(len(xr)) ev = np.zeros(len(xr)) phi_error = np.zeros(len(xr)) # Normalize total power in an attemp to make the 19kHz sinusoid # component have amplitude ~1. #xr = xr/(2/3*std(xr)); # Begin the simulation loop for kk in range(len(n)): # Sinusoidal phase detector (simple multiplier) phi_error[kk] = 2*xr[kk]*np.sin(vco_out) # LPF to remove double frequency term phi_error[kk],fstate = signal.lfilter(b_lp,a_lp,np.array([phi_error[kk]]),zi=fstate) pd_out = phi_error[kk] #pd_out = 0 # Loop gain gain_out = Kt/Kv*pd_out # apply VCO gain at VCO # Loop filter if loop_type == 2: filt_in = a*gain_out filt_out = filt_out_last + T/2.*(filt_in + filt_in_last) filt_in_last = filt_in filt_out_last = filt_out filt_out = filt_out + gain_out else: filt_out = gain_out # VCO vco_in = filt_out + fq/(Kv/(2*np.pi)) # bias to quiescent freq. vco_out = vco_out_last + T/2.*(vco_in + vco_in_last) vco_in_last = vco_in vco_out_last = vco_out vco_out = Kv*vco_out # apply Kv # Measured loop signals ev[kk] = filt_out theta[kk] = np.mod(vco_out,2*np.pi); # The vco phase mod 2pi return theta,phi_error
[docs]def sccs_bit_sync(y,Ns): """ rx_symb_d,clk,track = sccs_bit_sync(y,Ns) ////////////////////////////////////////////////////// Symbol synchronization algorithm using SCCS ////////////////////////////////////////////////////// y = baseband NRZ data waveform Ns = nominal number of samples per symbol Reworked from ECE 5675 Project Translated from m-code version Mark Wickert April 2014 """ # decimated symbol sequence for SEP rx_symb_d = np.zeros(int(np.fix(len(y)/Ns))) track = np.zeros(int(np.fix(len(y)/Ns))) bit_count = -1 y_abs = np.zeros(len(y)) clk = np.zeros(len(y)) k = Ns+1 #initial 1-of-Ns symbol synch clock phase # Sample-by-sample processing required for i in range(len(y)): #y_abs(i) = abs(round(real(y(i)))) if i >= Ns: # do not process first Ns samples # Collect timing decision unit (TDU) samples y_abs[i] = np.abs(np.sum(y[i-Ns+1:i+1])) # Update sampling instant and take a sample # For causality reason the early sample is 'i', # the on-time or prompt sample is 'i-1', and # the late sample is 'i-2'. if (k == 0): # Load the samples into the 3x1 TDU register w_hat. # w_hat[1] = late, w_hat[2] = on-time; w_hat[3] = early. w_hat = y_abs[i-2:i+1] bit_count += 1 if w_hat[1] != 0: if w_hat[0] < w_hat[2]: k = Ns-1 clk[i-2] = 1 rx_symb_d[bit_count] = y[i-2-int(np.round(Ns/2))-1] elif w_hat[0] > w_hat[2]: k = Ns+1 clk[i] = 1 rx_symb_d[bit_count] = y[i-int(np.round(Ns/2))-1] else: k = Ns clk[i-1] = 1 rx_symb_d[bit_count] = y[i-1-int(np.round(Ns/2))-1] else: k = Ns clk[i-1] = 1 rx_symb_d[bit_count] = y[i-1-int(np.round(Ns/2))] track[bit_count] = np.mod(i,Ns) k -= 1 # Trim the final output to bit_count rx_symb_d = rx_symb_d[:bit_count] return rx_symb_d, clk, track
[docs]def fsk_BEP(rx_data,m,flip): """ fsk_BEP(rx_data,m,flip) Estimate the BEP of the data bits recovered by the RTL-SDR Based FSK Receiver. The reference m-sequence generated in Python was found to produce sequences running in the opposite direction relative to the m-sequences generated by the mbed. To allow error detection the reference m-sequence is flipped. Mark Wickert April 2014 """ Nbits = len(rx_data) c = dc.m_seq(m) if flip == 1: # Flip the sequence to compenstate for mbed code difference # First make it a 1xN array c.shape = (1,len(c)) c = np.fliplr(c).flatten() L = int(np.ceil(Nbits/float(len(c)))) tx_data = np.dot(c.reshape(len(c),1),np.ones((1,L))) tx_data = tx_data.T.reshape((1,len(c)*L)).flatten() tx_data = tx_data[:Nbits] # Convert to +1/-1 bits tx_data = 2*tx_data - 1 Bit_count,Bit_errors = dc.BPSK_BEP(rx_data,tx_data) print('len rx_data = %d, len tx_data = %d' % (len(rx_data),len(tx_data))) Pe = Bit_errors/float(Bit_count) print('/////////////////////////////////////') print('Bit Errors: %d' % Bit_errors) print('Bits Total: %d' % Bit_count) print(' BEP: %2.2e' % Pe) print('/////////////////////////////////////')
def to_wav_stereo(filename,rate,x_l,x_r=None): if x_r == None: ss.to_wav(filename,rate,x_l) else: xlr = np.hstack((np.array([x_l]).T,np.array([x_r]).T)) ss.to_wav(filename,rate,xlr)
[docs]def complex2wav(filename,rate,x): """ Save a complex signal vector to a wav file for compact binary storage of 16-bit signal samples. The wav left and right channels are used to save real (I) and imaginary (Q) values. The rate is just a convent way of documenting the original signal sample rate. complex2wav(filename,rate,x) Mark Wickert April 2014 """ x_wav = np.hstack((np.array([x.real]).T,np.array([x.imag]).T)) ss.to_wav(filename, rate, x_wav) print('Saved as binary wav file with (I,Q)<=>(L,R)')
[docs]def wav2complex(filename): """ Return a complex signal vector from a wav file that was used to store the real (I) and imaginary (Q) values of a complex signal ndarray. The rate is included as means of recalling the original signal sample rate. fs,x = wav2complex(filename) Mark Wickert April 2014 """ fs, x_LR_cols = ss.from_wav(filename) x = x_LR_cols[:,0] + 1j*x_LR_cols[:,1] return fs,x
if __name__ == '__main__': # Test bench area pass