Source code for sk_dsp_comm.sigsys

"""
Signals and Systems Function Module

Copyright (c) March 2017, Mark Wickert
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this
   list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
   this list of conditions and the following disclaimer in the documentation
   and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of the FreeBSD Project.


Notes
-----
The primary purpose of this function library is to support the book Signals and Systems for Dummies. Beyond that it should be useful to anyone who wants to use Pylab for general signals and systems modeling and simulation. There is a good collection of digital communication simulation primitives included in the library. More enhancements are planned over time.

The formatted docstrings for the library follow. Click index in the upper right to get an
alphabetical listing of the library functions. In all of the example code given it is assumed that ssd has been imported into your workspace. See  the examples below for import options.

Examples
--------
>>> import sk_dsp_comm.sigsys as ssd
>>> # Commands then need to be prefixed with ssd., i.e.,
>>> ssd.tri(t,tau)
>>> # A full import of the module, to avoid the the need to prefix with ssd, is:
>>> from sk_dsp_comm.sigsys import *

Function Catalog
----------------
"""

from matplotlib import pylab
import numpy as np
from numpy import fft
import matplotlib.pyplot as plt
from scipy import signal
from scipy.io import wavfile
from logging import getLogger
log = getLogger(__name__)
import warnings


[docs] def cic(m, k): """ A functional form implementation of a cascade of integrator comb (CIC) filters. Parameters ---------- m : Effective number of taps per section (typically the decimation factor). k : The number of CIC sections cascaded (larger K gives the filter a wider image rejection bandwidth). Returns ------- b : FIR filter coefficients for a simple direct form implementation using the filter() function. Notes ----- Commonly used in multirate signal processing digital down-converters and digital up-converters. A true CIC filter requires no multiplies, only add and subtract operations. The functional form created here is a simple FIR requiring real coefficient multiplies via filter(). Mark Wickert July 2013 """ if k == 1: b = np.ones(m) else: h = np.ones(m) b = h for i in range(1, k): b = signal.convolve(b, h) # cascade by convolving impulse responses # Make filter have unity gain at DC return b / np.sum(b)
[docs] def ten_band_eq_filt(x,GdB,Q=3.5): """ Filter the input signal x with a ten-band equalizer having octave gain values in ndarray GdB. The signal x is filtered using octave-spaced peaking filters starting at 31.25 Hz and stopping at 16 kHz. The Q of each filter is 3.5, but can be changed. The sampling rate is assumed to be 44.1 kHz. Parameters ---------- x : ndarray of the input signal samples GdB : ndarray containing ten octave band gain values [G0dB,...,G9dB] Q : Quality factor vector for each of the NB peaking filters Returns ------- y : ndarray of output signal samples Examples -------- >>> # Test with white noise >>> w = randn(100000) >>> y = ten_band_eq_filt(x,GdB) >>> psd(y,2**10,44.1) """ fs = 44100.0 # Hz NB = len(GdB) if not NB == 10: raise ValueError("GdB length not equal to ten") Fc = 31.25*2**np.arange(NB) B = np.zeros((NB,3)) A = np.zeros((NB,3)) # Create matrix of cascade coefficients for k in range(NB): [b,a] = peaking(GdB[k],Fc[k],Q) B[k,:] = b A[k,:] = a # Pass signal x through the cascade of ten filters y = np.zeros(len(x)) for k in range(NB): if k == 0: y = signal.lfilter(B[k,:],A[k,:],x) else: y = signal.lfilter(B[k,:],A[k,:],y) return y
[docs] def ten_band_eq_resp(GdB,Q=3.5): """ Create a frequency response magnitude plot in dB of a ten band equalizer using a semilogplot (semilogx()) type plot Parameters ---------- GdB : Gain vector for 10 peaking filters [G0,...,G9] Q : Quality factor for each peaking filter (default 3.5) Returns ------- Nothing : two plots are created Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm import sigsys as ss >>> ss.ten_band_eq_resp([0,10.0,0,0,-1,0,5,0,-4,0]) >>> plt.show() """ fs = 44100.0 # Hz NB = len(GdB) if not NB == 10: raise ValueError("GdB length not equal to ten") Fc = 31.25*2**np.arange(NB) B = np.zeros((NB,3)); A = np.zeros((NB,3)); # Create matrix of cascade coefficients for k in range(NB): b,a = peaking(GdB[k],Fc[k],Q,fs) B[k,:] = b A[k,:] = a # Create the cascade frequency response F = np.logspace(1,np.log10(20e3),1000) H = np.ones(len(F))*np.complex(1.0,0.0) for k in range(NB): w,Htemp = signal.freqz(B[k,:],A[k,:],2*np.pi*F/fs) H *= Htemp plt.figure(figsize=(6,4)) plt.subplot(211) plt.semilogx(F,20*np.log10(abs(H))) plt.axis([10, fs/2, -12, 12]) plt.grid() plt.title('Ten-Band Equalizer Frequency Response') plt.xlabel('Frequency (Hz)') plt.ylabel('Gain (dB)') plt.subplot(212) plt.stem(np.arange(NB),GdB,'b','bs') #plt.bar(np.arange(NB)-.1,GdB,0.2) plt.axis([0, NB-1, -12, 12]) plt.xlabel('Equalizer Band Number') plt.ylabel('Gain Set (dB)') plt.grid()
[docs] def peaking(GdB, fc, Q=3.5, fs=44100.): """ A second-order peaking filter having GdB gain at fc and approximately and 0 dB otherwise. The filter coefficients returns correspond to a biquadratic system function containing five parameters. Parameters ---------- GdB : Lowpass gain in dB fc : Center frequency in Hz Q : Filter Q which is inversely proportional to bandwidth fs : Sampling frquency in Hz Returns ------- b : ndarray containing the numerator filter coefficients a : ndarray containing the denominator filter coefficients Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> from sk_dsp_comm.sigsys import peaking >>> from scipy import signal >>> b,a = peaking(2.0,500) >>> f = np.logspace(1,5,400) >>> w,H = signal.freqz(b,a,2*np.pi*f/44100) >>> plt.semilogx(f,20*np.log10(abs(H))) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel("Frequency (Hz)") >>> plt.show() >>> b,a = peaking(-5.0,500,4) >>> w,H = signal.freqz(b,a,2*np.pi*f/44100) >>> plt.semilogx(f,20*np.log10(abs(H))) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel("Frequency (Hz)") """ mu = 10**(GdB/20.) kq = 4/(1 + mu)*np.tan(2*np.pi*fc/fs/(2*Q)) Cpk = (1 + kq *mu)/(1 + kq) b1 = -2*np.cos(2*np.pi*fc/fs)/(1 + kq*mu) b2 = (1 - kq*mu)/(1 + kq*mu) a1 = -2*np.cos(2*np.pi*fc/fs)/(1 + kq) a2 = (1 - kq)/(1 + kq) b = Cpk*np.array([1, b1, b2]) a = np.array([1, a1, a2]) return b,a
[docs] def ex6_2(n): """ Generate a triangle pulse as described in Example 6-2 of Chapter 6. You need to supply an index array n that covers at least [-2, 5]. The function returns the hard-coded signal of the example. Parameters ---------- n : time index ndarray covering at least -2 to +5. Returns ------- x : ndarray of signal samples in x Examples -------- >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm import sigsys as ss >>> n = np.arange(-5,8) >>> x = ss.ex6_2(n) >>> plt.stem(n,x) # creates a stem plot of x vs n """ x = np.zeros(len(n)) for k, nn in enumerate(n): if nn >= -2 and nn <= 5: x[k] = 8 - nn return x
[docs] def position_cd(Ka, out_type ='fb_exact'): """ CD sled position control case study of Chapter 18. The function returns the closed-loop and open-loop system function for a CD/DVD sled position control system. The loop amplifier gain is the only variable that may be changed. The returned system function can however be changed. Parameters ---------- Ka : loop amplifier gain, start with 50. out_type : 'open_loop' for open loop system function out_type : 'fb_approx' for closed-loop approximation out_type : 'fb_exact' for closed-loop exact Returns ------- b : numerator coefficient ndarray a : denominator coefficient ndarray Notes ----- With the exception of the loop amplifier gain, all other parameters are hard-coded from Case Study example. Examples -------- >>> b,a = position_cd(Ka,'fb_approx') >>> b,a = position_cd(Ka,'fb_exact') """ rs = 10/(2*np.pi) # Load b and a ndarrays with the coefficients if out_type.lower() == 'open_loop': b = np.array([Ka*4000*rs]) a = np.array([1,1275,31250,0]) elif out_type.lower() == 'fb_approx': b = np.array([3.2*Ka*rs]) a = np.array([1, 25, 3.2*Ka*rs]) elif out_type.lower() == 'fb_exact': b = np.array([4000*Ka*rs]) a = np.array([1, 1250+25, 25*1250, 4000*Ka*rs]) else: raise ValueError('out_type must be: open_loop, fb_approx, or fc_exact') return b, a
[docs] def cruise_control(wn,zeta,T,vcruise,vmax,tf_mode='H'): """ Cruise control with PI controller and hill disturbance. This function returns various system function configurations for a the cruise control Case Study example found in the supplementary article. The plant model is obtained by the linearizing the equations of motion and the controller contains a proportional and integral gain term set via the closed-loop parameters natural frequency wn (rad/s) and damping zeta. Parameters ---------- wn : closed-loop natural frequency in rad/s, nominally 0.1 zeta : closed-loop damping factor, nominally 1.0 T : vehicle time constant, nominally 10 s vcruise : cruise velocity set point, nominally 75 mph vmax : maximum vehicle velocity, nominally 120 mph tf_mode : 'H', 'HE', 'HVW', or 'HED' controls the system function returned by the function 'H' : closed-loop system function V(s)/R(s) 'HE' : closed-loop system function E(s)/R(s) 'HVW' : closed-loop system function V(s)/W(s) 'HED' : closed-loop system function E(s)/D(s), where D is the hill disturbance input Returns ------- b : numerator coefficient ndarray a : denominator coefficient ndarray Examples -------- >>> # return the closed-loop system function output/input velocity >>> b,a = cruise_control(wn,zeta,T,vcruise,vmax,tf_mode='H') >>> # return the closed-loop system function loop error/hill disturbance >>> b,a = cruise_control(wn,zeta,T,vcruise,vmax,tf_mode='HED') """ tau = T/2.*vmax/vcruise g = 9.8 g *= 3*60**2/5280. # m/s to mph conversion Kp = T*(2*zeta*wn-1/tau)/vmax Ki = T*wn**2./vmax K = Kp*vmax/T wn = np.sqrt(K/(Kp/Ki)) zeta = (K + 1/tau)/(2*wn) log.info('wn = %s' % (wn)) log.info('zeta = %s' % (zeta)) a = np.array([1, 2*zeta*wn, wn**2]) if tf_mode == 'H': b = np.array([K, wn**2]) elif tf_mode == 'HE': b = np.array([1, 2*zeta*wn-K, 0.]) elif tf_mode == 'HVW': b = np.array([ 1, wn**2/K+1/tau, wn**2/(K*tau)]) b *= Kp elif tf_mode == 'HED': b = np.array([g, 0]) else: raise ValueError('tf_mode must be: H, HE, HVU, or HED') return b, a
[docs] def splane(b,a,auto_scale=True,size=[-1,1,-1,1]): """ Create an s-plane pole-zero plot. As input the function uses the numerator and denominator s-domain system function coefficient ndarrays b and a respectively. Assumed to be stored in descending powers of s. Parameters ---------- b : numerator coefficient ndarray. a : denominator coefficient ndarray. auto_scale : True size : [xmin,xmax,ymin,ymax] plot scaling when scale = False Returns ------- (M,N) : tuple of zero and pole counts + plot window Notes ----- This function tries to identify repeated poles and zeros and will place the multiplicity number above and to the right of the pole or zero. The difficulty is setting the tolerance for this detection. Currently it is set at 1e-3 via the function signal.unique_roots. Examples -------- >>> # Here the plot is generated using auto_scale >>> splane(b,a) >>> # Here the plot is generated using manual scaling >>> splane(b,a,False,[-10,1,-10,10]) """ if (isinstance(a,int) or isinstance(a,float)): a = [a] if (isinstance(b,int) or isinstance(b,float)): b = [b] M = len(b) - 1 N = len(a) - 1 plt.figure(figsize=(5,5)) #plt.axis('equal') N_roots = np.array([0.0]) if M > 0: N_roots = np.roots(b) D_roots = np.array([0.0]) if N > 0: D_roots = np.roots(a) if auto_scale: size[0] = min(np.min(np.real(N_roots)),np.min(np.real(D_roots)))-0.5 size[1] = max(np.max(np.real(N_roots)),np.max(np.real(D_roots)))+0.5 size[1] = max(size[1],0.5) size[2] = min(np.min(np.imag(N_roots)),np.min(np.imag(D_roots)))-0.5 size[3] = max(np.max(np.imag(N_roots)),np.max(np.imag(D_roots)))+0.5 plt.plot([size[0],size[1]],[0,0],'k--') plt.plot([0,0],[size[2],size[3]],'r--') # Plot labels if multiplicity greater than 1 x_scale = size[1]-size[0] y_scale = size[3]-size[2] x_off = 0.03 y_off = 0.01 if M > 0: #N_roots = np.roots(b) N_uniq, N_mult=signal.unique_roots(N_roots,tol=1e-3, rtype='avg') plt.plot(np.real(N_uniq),np.imag(N_uniq),'ko',mfc='None',ms=8) idx_N_mult = np.nonzero(np.ravel(N_mult>1))[0] for k in range(len(idx_N_mult)): x_loc = np.real(N_uniq[idx_N_mult[k]]) + x_off*x_scale y_loc =np.imag(N_uniq[idx_N_mult[k]]) + y_off*y_scale plt.text(x_loc,y_loc,str(N_mult[idx_N_mult[k]]),ha='center',va='bottom',fontsize=10) if N > 0: #D_roots = np.roots(a) D_uniq, D_mult=signal.unique_roots(D_roots,tol=1e-3, rtype='avg') plt.plot(np.real(D_uniq),np.imag(D_uniq),'kx',ms=8) idx_D_mult = np.nonzero(np.ravel(D_mult>1))[0] for k in range(len(idx_D_mult)): x_loc = np.real(D_uniq[idx_D_mult[k]]) + x_off*x_scale y_loc =np.imag(D_uniq[idx_D_mult[k]]) + y_off*y_scale plt.text(x_loc,y_loc,str(D_mult[idx_D_mult[k]]),ha='center',va='bottom',fontsize=10) plt.xlabel('Real Part') plt.ylabel('Imaginary Part') plt.title('Pole-Zero Plot') #plt.grid() plt.axis(np.array(size)) return M,N
[docs] def os_filter(x, h, N, mode=0): """ Overlap and save transform domain FIR filtering. This function implements the classical overlap and save method of transform domain filtering using a length P FIR filter. Parameters ---------- x : input signal to be filtered as an ndarray h : FIR filter coefficients as an ndarray of length P N : FFT size > P, typically a power of two mode : 0 or 1, when 1 returns a diagnostic matrix Returns ------- y : the filtered output as an ndarray y_mat : an ndarray whose rows are the individual overlap outputs. Notes ----- y_mat is used for diagnostics and to gain understanding of the algorithm. Examples -------- >>> from numpy import arange, cos, pi, ones >>> n = arange(0,100) >>> x = cos(2*pi*0.05*n) >>> b = ones(10) >>> y = os_filter(x,h,N) >>> # set mode = 1 >>> y, y_mat = os_filter(x,h,N,1) """ P = len(h) # zero pad start of x so first frame can recover first true samples of x x = np.hstack((np.zeros(P-1),x)) L = N - P + 1 Nx = len(x) Nframe = int(np.ceil(Nx/float(L))) # zero pad end of x to full number of frames needed x = np.hstack((x,np.zeros(Nframe*L-Nx))) y = np.zeros(int(Nframe*N)) # create an instrumentation matrix to observe the overlap and save behavior y_mat = np.zeros((Nframe,int(Nframe*N))) H = fft.fft(h,N) # begin the filtering operation for k in range(Nframe): xk = x[k*L:k*L+N] Xk = fft.fft(xk,N) Yk = H*Xk yk = np.real(fft.ifft(Yk)) # imag part should be zero y[k*L+P-1:k*L+N] = yk[P-1:] y_mat[k,k*L:k*L+N] = yk if mode == 1: return y[P-1:Nx], y_mat[:,P-1:Nx] else: return y[P-1:Nx]
[docs] def oa_filter(x, h, N, mode=0): """ Overlap and add transform domain FIR filtering. This function implements the classical overlap and add method of transform domain filtering using a length P FIR filter. Parameters ---------- x : input signal to be filtered as an ndarray h : FIR filter coefficients as an ndarray of length P N : FFT size > P, typically a power of two mode : 0 or 1, when 1 returns a diagnostic matrix Returns ------- y : the filtered output as an ndarray y_mat : an ndarray whose rows are the individual overlap outputs. Notes ----- y_mat is used for diagnostics and to gain understanding of the algorithm. Examples -------- >>> import numpy as np >>> from sk_dsp_comm.sigsys import oa_filter >>> n = np.arange(0,100) >>> x = np.cos(2*np.pi*0.05*n) >>> b = np.ones(10) >>> y = oa_filter(x,h,N) >>> # set mode = 1 >>> y, y_mat = oa_filter(x,h,N,1) """ P = len(h) L = int(N) - P + 1 # need N >= L + P -1 Nx = len(x) Nframe = int(np.ceil(Nx/float(L))) # zero pad to full number of frames needed x = np.hstack((x,np.zeros(Nframe*L-Nx))) y = np.zeros(Nframe*N) # create an instrumentation matrix to observe the overlap and add behavior y_mat = np.zeros((Nframe,Nframe*N)) H = fft.fft(h,N) # begin the filtering operation for k in range(Nframe): xk = x[k*L:(k+1)*L] Xk = fft.fft(xk,N) Yk = H*Xk yk = np.real(fft.ifft(Yk)) y[k*L:k*L+N] += yk y_mat[k,k*L:k*L+N] = yk if mode == 1: return y[0:Nx], y_mat[:,0:Nx] else: return y[0:Nx]
[docs] def lp_samp(fb,fs,fmax,N,shape='tri',fsize=(6,4)): """ Lowpass sampling theorem plotting function. Display the spectrum of a sampled signal after setting the bandwidth, sampling frequency, maximum display frequency, and spectral shape. Parameters ---------- fb : spectrum lowpass bandwidth in Hz fs : sampling frequency in Hz fmax : plot over [-fmax,fmax] shape : 'tri' or 'line' N : number of translates, N positive and N negative fsize : the size of the figure window, default (6,4) Returns ------- Nothing : A plot window opens containing the spectrum plot Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm.sigsys import lp_samp No aliasing as bandwidth 10 Hz < 25/2; fs > fb. >>> lp_samp(10,25,50,10) >>> plt.show() Now aliasing as bandwidth 15 Hz > 25/2; fs < fb. >>> lp_samp(15,25,50,10) """ plt.figure(figsize=fsize) # define the plot interval f = np.arange(-fmax,fmax+fmax/200.,fmax/200.) A = 1.0 line_ampl = A/2.*np.array([0, 1]) # plot the lowpass spectrum in black shapes = ['tri', 'line'] if shape.lower() not in shapes: raise ValueError('shape must be tri or line') if shape.lower() == 'tri': plt.plot(f,lp_tri(f,fb)) # overlay positive and negative frequency translates for n in range(N): plt.plot(f, lp_tri(f - (n + 1) * fs, fb), '--r') plt.plot(f, lp_tri(f + (n + 1) * fs, fb), '--g') elif shape.lower() == 'line': plt.plot([fb, fb],line_ampl,'b', linewidth=2) plt.plot([-fb, -fb],line_ampl,'b', linewidth=2) # overlay positive and negative frequency translates for n in range(N): plt.plot([fb+(n+1)*fs, fb+(n+1)*fs],line_ampl,'--r', linewidth=2) plt.plot([-fb+(n+1)*fs, -fb+(n+1)*fs],line_ampl,'--r', linewidth=2) plt.plot([fb-(n+1)*fs, fb-(n+1)*fs],line_ampl,'--g', linewidth=2) plt.plot([-fb-(n+1)*fs, -fb-(n+1)*fs],line_ampl,'--g', linewidth=2) plt.ylabel('Spectrum Magnitude') plt.xlabel('Frequency in Hz') plt.axis([-fmax,fmax,0,1]) plt.grid()
[docs] def lp_tri(f, fb): """ Triangle spectral shape function used by :func:`lp_samp`. Parameters ---------- f : ndarray containing frequency samples fb : the bandwidth as a float constant Returns ------- x : ndarray of spectrum samples for a single triangle shape Notes ----- This is a support function for the lowpass spectrum plotting function :func:`lp_samp`. Examples -------- >>> x = lp_tri(f, fb) """ x = np.zeros(len(f)) for k in range(len(f)): if abs(f[k]) <= fb: x[k] = 1 - abs(f[k])/float(fb) return x
[docs] def sinusoid_awgn(x, SNRdB): """ Add white Gaussian noise to a single real sinusoid. Input a single sinusoid to this function and it returns a noisy sinusoid at a specific SNR value in dB. Sinusoid power is calculated using np.var. Parameters ---------- x : Input signal as ndarray consisting of a single sinusoid SNRdB : SNR in dB for output sinusoid Returns ------- y : Noisy sinusoid return vector Examples -------- >>> # set the SNR to 10 dB >>> n = arange(0,10000) >>> x = cos(2*pi*0.04*n) >>> y = sinusoid_awgn(x,10.0) """ # Estimate signal power x_pwr = np.var(x) # Create noise vector noise = np.sqrt(x_pwr/10**(SNRdB/10.))*np.random.randn(len(x)); return x + noise
[docs] def simple_quant(x, b_tot, x_max, limit): """ A simple rounding quantizer for bipolar signals having Btot = B + 1 bits. This function models a quantizer that employs Btot bits that has one of three selectable limiting types: saturation, overflow, and none. The quantizer is bipolar and implements rounding. Parameters ---------- x : input signal ndarray to be quantized b_tot : total number of bits in the quantizer, e.g. 16 x_max : quantizer full-scale dynamic range is [-Xmax, Xmax] Limit = Limiting of the form 'sat', 'over', 'none' Returns ------- xq : quantized output ndarray Notes ----- The quantization can be formed as e = xq - x Examples -------- >>> import matplotlib.pyplot as plt >>> from matplotlib.mlab import psd >>> import numpy as np >>> from sk_dsp_comm import sigsys as ss >>> n = np.arange(0,10000) >>> x = np.cos(2*np.pi*0.211*n) >>> y = ss.sinusoid_awgn(x,90) >>> Px, f = psd(y,2**10,Fs=1) >>> plt.plot(f, 10*np.log10(Px)) >>> plt.ylim([-80, 25]) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel(r'Normalized Frequency $\omega/2\pi$') >>> plt.show() >>> yq = ss.simple_quant(y,12,1,'sat') >>> Px, f = psd(yq,2**10,Fs=1) >>> plt.plot(f, 10*np.log10(Px)) >>> plt.ylim([-80, 25]) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel(r'Normalized Frequency $\omega/2\pi$') >>> plt.show() """ B = b_tot - 1 x = x / x_max if limit.lower() == 'over': xq = (np.mod(np.round(x*2**B) + 2 ** B, 2 ** b_tot) - 2 ** B) / 2 ** B elif limit.lower() == 'sat': xq = np.round(x*2**B)+2**B s1 = np.nonzero(np.ravel(xq >= 2 ** b_tot - 1))[0] s2 = np.nonzero(np.ravel(xq < 0))[0] xq[s1] = (2 ** b_tot - 1) * np.ones(len(s1)) xq[s2] = np.zeros(len(s2)) xq = (xq - 2**B)/2**B elif limit.lower() == 'none': xq = np.round(x*2**B)/2**B else: raise ValueError('limit must be the string over, sat, or none') return xq * x_max
[docs] def prin_alias(f_in,fs): """ Calculate the principle alias frequencies. Given an array of input frequencies the function returns an array of principle alias frequencies. Parameters ---------- f_in : ndarray of input frequencies fs : sampling frequency Returns ------- f_out : ndarray of principle alias frequencies Examples -------- >>> # Linear frequency sweep from 0 to 50 Hz >>> f_in = arange(0,50,0.1) >>> # Calculate principle alias with fs = 10 Hz >>> f_out = prin_alias(f_in,10) """ return abs(np.rint(f_in/fs)*fs - f_in) """ Principle alias via recursion f_out = np.copy(f_in) for k in range(len(f_out)): while f_out[k] > fs/2.: f_out[k] = abs(f_out[k] - fs) return f_out """
[docs] def cascade_filters(b1,a1,b2,a2): """ Cascade two IIR digital filters into a single (b,a) coefficient set. To cascade two digital filters (system functions) given their numerator and denominator coefficients you simply convolve the coefficient arrays. Parameters ---------- b1 : ndarray of numerator coefficients for filter 1 a1 : ndarray of denominator coefficients for filter 1 b2 : ndarray of numerator coefficients for filter 2 a2 : ndarray of denominator coefficients for filter 2 Returns ------- b : ndarray of numerator coefficients for the cascade a : ndarray of denominator coefficients for the cascade Examples -------- >>> from scipy import signal >>> b1,a1 = signal.butter(3, 0.1) >>> b2,a2 = signal.butter(3, 0.15) >>> b,a = cascade_filters(b1,a1,b2,a2) """ return signal.convolve(b1,b2), signal.convolve(a1,a2)
[docs] def soi_snoi_gen(s,SIR_dB,N,fi,fs = 8000): """ Add an interfering sinusoidal tone to the input signal at a given SIR_dB. The input is the signal of interest (SOI) and number of sinsuoid signals not of interest (SNOI) are addedto the SOI at a prescribed signal-to- intereference SIR level in dB. Parameters ---------- s : ndarray of signal of SOI SIR_dB : interference level in dB N : Trim input signal s to length N + 1 samples fi : ndarray of intereference frequencies in Hz fs : sampling rate in Hz, default is 8000 Hz Returns ------- r : ndarray of combined signal plus intereference of length N+1 samples Examples -------- >>> # load a speech ndarray and trim to 5*8000 + 1 samples >>> fs,s = from_wav('OSR_us_000_0030_8k.wav') >>> r = soi_snoi_gen(s,10,5*8000,[1000, 1500]) """ n = np.arange(0,N+1) K = len(fi) si = np.zeros(N+1) for k in range(K): si += np.cos(2*np.pi*fi[k]/fs*n); s = s[:N+1] Ps = np.var(s) Psi = np.var(si) r = s + np.sqrt(Ps/Psi*10**(-SIR_dB/10))*si return r
[docs] def lms_ic(r,M,mu,delta=1): """ Least mean square (LMS) interference canceller adaptive filter. A complete LMS adaptive filter simulation function for the case of interference cancellation. Used in the digital filtering case study. Parameters ---------- M : FIR Filter length (order M-1) delta : Delay used to generate the reference signal mu : LMS step-size delta : decorrelation delay between input and FIR filter input Returns ------- n : ndarray Index vector r : ndarray noisy (with interference) input signal r_hat : ndarray filtered output (NB_hat[n]) e : ndarray error sequence (WB_hat[n]) ao : ndarray final value of weight vector F : ndarray frequency response axis vector Ao : ndarray frequency response of filter Examples ---------- >>> # import a speech signal >>> fs,s = from_wav('OSR_us_000_0030_8k.wav') >>> # add interference at 1kHz and 1.5 kHz and >>> # truncate to 5 seconds >>> r = soi_snoi_gen(s,10,5*8000,[1000, 1500]) >>> # simulate with a 64 tap FIR and mu = 0.005 >>> n,r,r_hat,e,ao,F,Ao = lms_ic(r,64,0.005) """ N = len(r)-1; # Form the reference signal y via delay delta y = signal.lfilter(np.hstack((np.zeros(delta), np.array([1]))),1,r) # Initialize output vector x_hat to zero r_hat = np.zeros(N+1) # Initialize error vector e to zero e = np.zeros(N+1) # Initialize weight vector to zero ao = np.zeros(M+1) # Initialize filter memory to zero z = np.zeros(M) # Initialize a vector for holding ym of length M+1 ym = np.zeros(M+1) for k in range(N+1): # Filter one sample at a time r_hat[k],z = signal.lfilter(ao,np.array([1]),np.array([y[k]]),zi=z) # Form the error sequence e[k] = r[k] - r_hat[k] # Update the weight vector ao = ao + 2*mu*e[k]*ym # Update vector used for correlation with e(k) ym = np.hstack((np.array([y[k]]), ym[:-1])) # Create filter frequency response F, Ao = signal.freqz(ao,1,1024) F/= (2*np.pi) Ao = 20*np.log10(abs(Ao)) return np.arange(0,N+1), r, r_hat, e, ao, F, Ao
[docs] def fir_iir_notch(fi,fs,r=0.95): """ Design a second-order FIR or IIR notch filter. A second-order FIR notch filter is created by placing conjugate zeros on the unit circle at angle corresponidng to the notch center frequency. The IIR notch variation places a pair of conjugate poles at the same angle, but with radius r < 1 (typically 0.9 to 0.95). Parameters ---------- fi : notch frequency is Hz relative to fs fs : the sampling frequency in Hz, e.g. 8000 r : pole radius for IIR version, default = 0.95 Returns ------- b : numerator coefficient ndarray a : denominator coefficient ndarray Notes ----- If the pole radius is 0 then an FIR version is created, that is there are no poles except at z = 0. Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm import sigsys as ss >>> b_FIR, a_FIR = ss.fir_iir_notch(1000,8000,0) >>> ss.zplane(b_FIR, a_FIR) >>> plt.show() >>> b_IIR, a_IIR = ss.fir_iir_notch(1000,8000) >>> ss.zplane(b_IIR, a_IIR) """ w0 = 2*np.pi*fi/float(fs) if r >= 1: raise ValueError('Poles on or outside unit circle.') elif r == 0: a = np.array([1.0]) else: a = np.array([1, -2*r*np.cos(w0), r**2]) b = np.array([1, -2*np.cos(w0), 1]) return b, a
[docs] def simple_sa(x, NS, NFFT, fs, NAVG=1, window='boxcar'): """ Spectral estimation using windowing and averaging. This function implements averaged periodogram spectral estimation estimation similar to the NumPy's psd() function, but more specialized for the windowing case study of Chapter 16. Parameters ---------- x : ndarray containing the input signal NS : The subrecord length less zero padding, e.g. NS < NFFT NFFT : FFT length, e.g., 1024 = 2**10 fs : sampling rate in Hz NAVG : the number of averages, e.g., 1 for deterministic signals window : hardcoded window 'boxcar' (default) or 'hanning' Returns ------- f : ndarray frequency axis in Hz on [0, fs/2] Sx : ndarray the power spectrum estimate Notes ----- The function also prints the maximum number of averages K possible for the input data record. # For a list of the available windows see: # https://docs.scipy.org/doc/scipy/reference/signal.windows.html#module-scipy.signal.windows Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> from sk_dsp_comm import sigsys as ss >>> n = np.arange(0,2048) >>> x = np.cos(2*np.pi*1000/10000*n) + 0.01*np.cos(2*np.pi*3000/10000*n) >>> f, Sx = ss.simple_sa(x,128,512,10000) >>> plt.plot(f, 10*np.log10(Sx)) >>> plt.ylim([-80, 0]) >>> plt.xlabel("Frequency (Hz)") >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.show() With a hanning/hann window. >>> f, Sx = ss.simple_sa(x,256,1024,10000,window='hann') >>> plt.plot(f, 10*np.log10(Sx)) >>> plt.xlabel("Frequency (Hz)") >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.ylim([-80, 0]) """ Nx = len(x) K = int(Nx/NS) log.info('K = ', K) if NAVG > K: warnings.warn('NAVG exceeds number of available subrecords') return 0,0 w = signal.get_window(window, NS, fftbins=False) xsw = np.zeros((K,NS)) + 1j*np.zeros((K,NS)) for k in range(NAVG): xsw[k,] = w*x[k*NS:(k+1)*NS] Sx = np.zeros(NFFT) for k in range(NAVG): X = fft.fft(xsw[k,],NFFT) Sx += abs(X)**2 Sx /= float(NAVG) Sx /= float(NFFT**2) NFFTby2 = int(NFFT/2) if x.dtype != 'complex128': n = np.arange(NFFTby2) f = fs*n/float(NFFT) Sx = Sx[0:NFFTby2] else: n = np.arange(NFFTby2) f = fs*np.hstack((np.arange(-NFFTby2,0),np.arange(NFFTby2)))/float(NFFT) Sx = np.hstack((Sx[NFFTby2:],Sx[0:NFFTby2])) return f, Sx
[docs] def line_spectra(fk,Xk,mode,sides=2,linetype='b',lwidth=2,floor_dB=-100,fsize=(6,4)): """ Plot the Fourier series line spectral given the coefficients. This function plots two-sided and one-sided line spectra of a periodic signal given the complex exponential Fourier series coefficients and the corresponding harmonic frequencies. Parameters ---------- fk : vector of real sinusoid frequencies Xk : magnitude and phase at each positive frequency in fk mode : 'mag' => magnitude plot, 'magdB' => magnitude in dB plot, mode cont : 'magdBn' => magnitude in dB normalized, 'phase' => a phase plot in radians sides : 2; 2-sided or 1-sided linetype : line type per Matplotlib definitions, e.g., 'b'; lwidth : 2; linewidth in points fsize : optional figure size in inches, default = (6,4) inches Returns ------- Nothing : A plot window opens containing the line spectrum plot Notes ----- Since real signals are assumed the frequencies of fk are 0 and/or positive numbers. The supplied Fourier coefficients correspond. Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> from sk_dsp_comm.sigsys import line_spectra >>> n = np.arange(0,25) >>> # a pulse train with 10 Hz fundamental and 20% duty cycle >>> fk = n*10 >>> Xk = np.sinc(n*10*.02)*np.exp(-1j*2*np.pi*n*10*.01) # 1j = sqrt(-1) >>> line_spectra(fk,Xk,'mag') >>> plt.show() >>> line_spectra(fk,Xk,'phase') """ plt.figure(figsize=fsize) # Eliminate zero valued coefficients idx = np.nonzero(Xk)[0] Xk = Xk[idx] fk = fk[idx] if mode == 'mag': for k in range(len(fk)): if fk[k] == 0 and sides == 2: plt.plot([fk[k], fk[k]],[0, np.abs(Xk[k])],linetype, linewidth=lwidth) elif fk[k] == 0 and sides == 1: plt.plot([fk[k], fk[k]],[0, np.abs(Xk[k])],linetype, linewidth=2*lwidth) elif fk[k] > 0 and sides == 2: plt.plot([fk[k], fk[k]],[0, np.abs(Xk[k])],linetype, linewidth=lwidth) plt.plot([-fk[k], -fk[k]],[0, np.abs(Xk[k])],linetype, linewidth=lwidth) elif fk[k] > 0 and sides == 1: plt.plot([fk[k], fk[k]],[0, 2.*np.abs(Xk[k])],linetype, linewidth=lwidth) else: warnings.warn('Invalid sides type') plt.grid() if sides == 2: plt.axis([-1.2*max(fk), 1.2*max(fk), 0, 1.05*max(abs(Xk))]) elif sides == 1: plt.axis([0, 1.2*max(fk), 0, 1.05*2*max(abs(Xk))]) else: warnings.warn('Invalid sides type') plt.ylabel('Magnitude') plt.xlabel('Frequency (Hz)') elif mode == 'magdB': Xk_dB = 20*np.log10(np.abs(Xk)) for k in range(len(fk)): if fk[k] == 0 and sides == 2: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) elif fk[k] == 0 and sides == 1: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=2*lwidth) elif fk[k] > 0 and sides == 2: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) plt.plot([-fk[k], -fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) elif fk[k] > 0 and sides == 1: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]+6.02],linetype, linewidth=lwidth) else: warnings.warn('Invalid sides type') plt.grid() max_dB = np.ceil(max(Xk_dB/10.))*10 min_dB = max(floor_dB,np.floor(min(Xk_dB/10.))*10) if sides == 2: plt.axis([-1.2*max(fk), 1.2*max(fk), min_dB, max_dB]) elif sides == 1: plt.axis([0, 1.2*max(fk), min_dB, max_dB]) else: warnings.warn('Invalid sides type') plt.ylabel('Magnitude (dB)') plt.xlabel('Frequency (Hz)') elif mode == 'magdBn': Xk_dB = 20*np.log10(np.abs(Xk)/max(np.abs(Xk))) for k in range(len(fk)): if fk[k] == 0 and sides == 2: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) elif fk[k] == 0 and sides == 1: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=2*lwidth) elif fk[k] > 0 and sides == 2: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) plt.plot([-fk[k], -fk[k]],[floor_dB, Xk_dB[k]],linetype, linewidth=lwidth) elif fk[k] > 0 and sides == 1: plt.plot([fk[k], fk[k]],[floor_dB, Xk_dB[k]+6.02],linetype, linewidth=lwidth) else: warnings.warn('Invalid sides type') plt.grid() max_dB = np.ceil(max(Xk_dB/10.))*10 min_dB = max(floor_dB,np.floor(min(Xk_dB/10.))*10) if sides == 2: plt.axis([-1.2*max(fk), 1.2*max(fk), min_dB, max_dB]) elif sides == 1: plt.axis([0, 1.2*max(fk), min_dB, max_dB]) else: warnings.warn('Invalid sides type') plt.ylabel('Normalized Magnitude (dB)') plt.xlabel('Frequency (Hz)') elif mode == 'phase': for k in range(len(fk)): if fk[k] == 0 and sides == 2: plt.plot([fk[k], fk[k]],[0, np.angle(Xk[k])],linetype, linewidth=lwidth) elif fk[k] == 0 and sides == 1: plt.plot([fk[k], fk[k]],[0, np.angle(Xk[k])],linetype, linewidth=2*lwidth) elif fk[k] > 0 and sides == 2: plt.plot([fk[k], fk[k]],[0, np.angle(Xk[k])],linetype, linewidth=lwidth) plt.plot([-fk[k], -fk[k]],[0, -np.angle(Xk[k])],linetype, linewidth=lwidth) elif fk[k] > 0 and sides == 1: plt.plot([fk[k], fk[k]],[0, np.angle(Xk[k])],linetype, linewidth=lwidth) else: warnings.warn('Invalid sides type') plt.grid() if sides == 2: plt.plot([-1.2*max(fk), 1.2*max(fk)], [0, 0],'k') plt.axis([-1.2*max(fk), 1.2*max(fk), -1.1*max(np.abs(np.angle(Xk))), 1.1*max(np.abs(np.angle(Xk)))]) elif sides == 1: plt.plot([0, 1.2*max(fk)], [0, 0],'k') plt.axis([0, 1.2*max(fk), -1.1*max(np.abs(np.angle(Xk))), 1.1*max(np.abs(np.angle(Xk)))]) else: warnings.warn('Invalid sides type') plt.ylabel('Phase (rad)') plt.xlabel('Frequency (Hz)') else: warnings.warn('Invalid mode type')
[docs] def fs_coeff(xp,N,f0,one_side=True): """ Numerically approximate the Fourier series coefficients given periodic x(t). The input is assummed to represent one period of the waveform x(t) that has been uniformly sampled. The number of samples supplied to represent one period of the waveform sets the sampling rate. Parameters ---------- xp : ndarray of one period of the waveform x(t) N : maximum Fourier series coefficient, [0,...,N] f0 : fundamental frequency used to form fk. Returns ------- Xk : ndarray of the coefficients over indices [0,1,...,N] fk : ndarray of the harmonic frequencies [0, f0,2f0,...,Nf0] Notes ----- len(xp) >= 2*N+1 as len(xp) is the fft length. Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> import sk_dsp_comm.sigsys as ss >>> t = arange(0,1,1/1024.) >>> # a 20% duty cycle pulse starting at t = 0 >>> x_rect = ss.rect(t-.1,0.2) >>> Xk, fk = ss.fs_coeff(x_rect,25,10) >>> # plot the spectral lines >>> ss.line_spectra(fk,Xk,'mag') >>> plt.show() """ Nint = len(xp) if Nint < 2*N+1: raise ValueError('Number of samples in xp insufficient for requested N.') Xp = fft.fft(xp,Nint)/float(Nint) # To interface with the line_spectra function use one_side mode if one_side: Xk = Xp[0:N+1] fk = f0*np.arange(0,N+1) else: Xk = np.hstack((Xp[-N:],Xp[0:N+1])) fk = f0*np.arange(-N,N+1) return Xk, fk
[docs] def fs_approx(Xk,fk,t): """ Synthesize periodic signal x(t) using Fourier series coefficients at harmonic frequencies Assume the signal is real so coefficients Xk are supplied for nonnegative indicies. The negative index coefficients are assumed to be complex conjugates. Parameters ---------- Xk : ndarray of complex Fourier series coefficients fk : ndarray of harmonic frequencies in Hz t : ndarray time axis corresponding to output signal array x_approx Returns ------- x_approx : ndarray of periodic waveform approximation over time span t Examples -------- >>> t = arange(0,2,.002) >>> # a 20% duty cycle pulse train >>> n = arange(0,20,1) # 0 to 19th harmonic >>> fk = 1*n % period = 1s >>> t, x_approx = fs_approx(Xk,fk,t) >>> plot(t,x_approx) """ x_approx = np.zeros(len(t)) for k,Xkk in enumerate(Xk): if fk[k] == 0: x_approx += Xkk.real*np.ones(len(t)) else: x_approx += 2*np.abs(Xkk)*np.cos(2*np.pi*fk[k]*t+np.angle(Xkk)) return x_approx
[docs] def ft_approx(x,t,Nfft): ''' Approximate the Fourier transform of a finite duration signal using scipy.signal.freqz() Parameters ---------- x : input signal array t : time array used to create x(t) Nfft : the number of frdquency domain points used to approximate X(f) on the interval [fs/2,fs/2], where fs = 1/Dt. Dt being the time spacing in array t Returns ------- f : frequency axis array in Hz X : the Fourier transform approximation (complex) Notes ----- The output time axis starts at the sum of the starting values in x1 and x2 and ends at the sum of the two ending values in x1 and x2. The default extents of ('f','f') are used for signals that are active (have support) on or within n1 and n2 respectively. A right-sided signal such as :math:`a^n*u[n]` is semi-infinite, so it has extent 'r' and the convolution output will be truncated to display only the valid results. Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import sk_dsp_comm.sigsys as ss >>> fs = 100 # sampling rate in Hz >>> tau = 1 >>> t = np.arange(-5,5,1/fs) >>> x0 = ss.rect(t-.5,tau) >>> plt.figure(figsize=(6,5)) >>> plt.plot(t,x0) >>> plt.grid() >>> plt.ylim([-0.1,1.1]) >>> plt.xlim([-2,2]) >>> plt.title(r'Exact Waveform') >>> plt.xlabel(r'Time (s)') >>> plt.ylabel(r'$x_0(t)$') >>> plt.show() >>> # FT Exact Plot >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import sk_dsp_comm.sigsys as ss >>> fs = 100 # sampling rate in Hz >>> tau = 1 >>> t = np.arange(-5,5,1/fs) >>> x0 = ss.rect(t-.5,tau) >>> fe = np.arange(-10,10,.01) >>> X0e = tau*np.sinc(fe*tau) >>> plt.plot(fe,abs(X0e)) >>> #plot(f,angle(X0)) >>> plt.grid() >>> plt.xlim([-10,10]) >>> plt.title(r'Exact (Theory) Spectrum Magnitude') >>> plt.xlabel(r'Frequency (Hz)') >>> plt.ylabel(r'$|X_0e(f)|$') >>> plt.show() >>> # FT Approximation Plot >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import sk_dsp_comm.sigsys as ss >>> fs = 100 # sampling rate in Hz >>> tau = 1 >>> t = np.arange(-5,5,1/fs) >>> x0 = ss.rect(t-.5,tau) >>> f,X0 = ss.ft_approx(x0,t,4096) >>> plt.plot(f,abs(X0)) >>> #plt.plot(f,angle(X0)) >>> plt.grid() >>> plt.xlim([-10,10]) >>> plt.title(r'Approximation Spectrum Magnitude') >>> plt.xlabel(r'Frequency (Hz)') >>> plt.ylabel(r'$|X_0(f)|$'); >>> plt.tight_layout() >>> plt.show() ''' fs = 1/(t[1] - t[0]) t0 = (t[-1]+t[0])/2 # time delay at center N0 = len(t)/2 # FFT center in samples f = np.arange(-1./2,1./2,1./Nfft) w, X = signal.freqz(x,1,2*np.pi*f) X /= fs # account for dt = 1/fs in integral X *= np.exp(-1j*2*np.pi*f*fs*t0)# time interval correction X *= np.exp(1j*2*np.pi*f*N0)# FFT time interval is [0,Nfft-1] F = f*fs return F, X
[docs] def conv_sum(x1,nx1,x2,nx2,extent=('f','f')): """ Discrete convolution of x1 and x2 with proper tracking of the output time axis. Convolve two discrete-time signals using the SciPy function :func:`scipy.signal.convolution`. The time (sequence axis) are managed from input to output. y[n] = x1[n]*x2[n]. Parameters ---------- x1 : ndarray of signal x1 corresponding to nx1 nx1 : ndarray time axis for x1 x2 : ndarray of signal x2 corresponding to nx2 nx2 : ndarray time axis for x2 extent : ('e1','e2') where 'e1', 'e2' may be 'f' finite, 'r' right-sided, or 'l' left-sided Returns ------- y : ndarray of output values y ny : ndarray of the corresponding sequence index n Notes ----- The output time axis starts at the sum of the starting values in x1 and x2 and ends at the sum of the two ending values in x1 and x2. The default extents of ('f','f') are used for signals that are active (have support) on or within n1 and n2 respectively. A right-sided signal such as a^n*u[n] is semi-infinite, so it has extent 'r' and the convolution output will be truncated to display only the valid results. Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import sk_dsp_comm.sigsys as ss >>> nx = np.arange(-5,10) >>> x = ss.drect(nx,4) >>> y,ny = ss.conv_sum(x,nx,x,nx) >>> plt.stem(ny,y) >>> plt.show() Consider a pulse convolved with an exponential. ('r' type extent) >>> h = 0.5**nx*ss.dstep(nx) >>> y,ny = ss.conv_sum(x,nx,h,nx,('f','r')) # note extents set >>> plt.stem(ny,y) # expect a pulse charge and discharge sequence """ nnx1 = np.arange(0,len(nx1)) nnx2 = np.arange(0,len(nx2)) n1 = nnx1[0] n2 = nnx1[-1] n3 = nnx2[0] n4 = nnx2[-1] # Start by finding the valid output support or extent interval to insure that # for no finite extent signals ambiquous results are not returned. # Valid extents are f (finite), r (right-sided), and l (left-sided) if extent[0] == 'f' and extent[1] == 'f': nny = np.arange(n1+n3,n2+1+n4+1-1) ny = np.arange(0,len(x1)+len(x2)-1) + nx1[0]+nx2[0] elif extent[0] == 'f' and extent[1] == 'r': nny = np.arange(n1+n3,n1+1+n4+1-1) ny = nny + nx1[0]+nx2[0] elif extent[0] == 'r' and extent[1] == 'f': nny = np.arange(n1+n3,n2+1+n3+1-1) ny = nny + nx1[0]+nx2[0] elif extent[0] == 'f' and extent[1] == 'l': nny = np.arange(n2+n3,n2+1+n4+1-1) ny = nny + nx1[-1]+nx2[0] elif extent[0] == 'l' and extent[1] == 'f': nny = np.arange(n1+n4,n2+1+n4+1-1) ny = nny + nx1[0]+nx2[-1] elif extent[0] == 'r' and extent[1] == 'r': nny = np.arange(n1+n3,min(n1+1+n4+1,n2+1+n3+1)-1) ny = nny + nx1[0]+nx2[0] elif extent[0] == 'l' and extent[1] == 'l': nny = np.arange(max(n1+n4,n2+n3),n2+1+n4+1-1) ny = nny + max(nx1[0]+nx2[-1],nx1[-1]+nx2[0]) else: raise ValueError('Invalid x1 x2 extents specified or valid extent not found!') # Finally convolve the sequences y = signal.convolve(x1, x2) log.info('Output support: (%+d, %+d)' % (ny[0],ny[-1])) return y[nny], ny
[docs] def conv_integral(x1,tx1,x2,tx2,extent=('f','f')): """ Continuous-time convolution of x1 and x2 with proper tracking of the output time axis. Appromimate the convolution integral for the convolution of two continuous-time signals using the SciPy function signal. The time (sequence axis) are managed from input to output. y(t) = x1(t)*x2(t). Parameters ---------- x1 : ndarray of signal x1 corresponding to tx1 tx1 : ndarray time axis for x1 x2 : ndarray of signal x2 corresponding to tx2 tx2 : ndarray time axis for x2 extent : ('e1','e2') where 'e1', 'e2' may be 'f' finite, 'r' right-sided, or 'l' left-sided Returns ------- y : ndarray of output values y ty : ndarray of the corresponding time axis for y Notes ----- The output time axis starts at the sum of the starting values in x1 and x2 and ends at the sum of the two ending values in x1 and x2. The time steps used in x1(t) and x2(t) must match. The default extents of ('f','f') are used for signals that are active (have support) on or within t1 and t2 respectively. A right-sided signal such as exp(-a*t)*u(t) is semi-infinite, so it has extent 'r' and the convolution output will be truncated to display only the valid results. Examples -------- >>> import matplotlib.pyplot as plt >>> import numpy as np >>> import sk_dsp_comm.sigsys as ss >>> tx = np.arange(-5,10,.01) >>> x = ss.rect(tx-2,4) # pulse starts at t = 0 >>> y,ty = ss.conv_integral(x,tx,x,tx) >>> plt.plot(ty,y) # expect a triangle on [0,8] >>> plt.show() Now, consider a pulse convolved with an exponential. >>> h = 4*np.exp(-4*tx)*ss.step(tx) >>> y,ty = ss.conv_integral(x,tx,h,tx,extent=('f','r')) # note extents set >>> plt.plot(ty,y) # expect a pulse charge and discharge waveform """ dt = tx1[1] - tx1[0] nx1 = np.arange(0,len(tx1)) nx2 = np.arange(0,len(tx2)) n1 = nx1[0] n2 = nx1[-1] n3 = nx2[0] n4 = nx2[-1] # Start by finding the valid output support or extent interval to insure that # for no finite extent signals ambiquous results are not returned. # Valid extents are f (finite), r (right-sided), and l (left-sided) if extent[0] == 'f' and extent[1] == 'f': ny = np.arange(n1+n3,n2+1+n4+1-1) ty = np.arange(0,len(x1)+len(x2)-1)*dt + tx1[0]+tx2[0] elif extent[0] == 'f' and extent[1] == 'r': ny = np.arange(n1+n3,n1+1+n4+1-1) ty = ny*dt + tx1[0]+tx2[0] elif extent[0] == 'r' and extent[1] == 'f': ny = np.arange(n1+n3,n2+1+n3+1-1) ty = ny*dt + tx1[0]+tx2[0] elif extent[0] == 'f' and extent[1] == 'l': ny = np.arange(n2+n3,n2+1+n4+1-1) ty = ny*dt + tx1[-1]+tx2[0] elif extent[0] == 'l' and extent[1] == 'f': ny = np.arange(n1+n4,n2+1+n4+1-1) ty = ny*dt + tx1[0]+tx2[-1] elif extent[0] == 'r' and extent[1] == 'r': ny = np.arange(n1+n3,min(n1+1+n4+1,n2+1+n3+1)-1) ty = ny*dt + tx1[0]+tx2[0] elif extent[0] == 'l' and extent[1] == 'l': ny = np.arange(max(n1+n4,n2+n3),n2+1+n4+1-1) ty = ny*dt + max(tx1[0]+tx2[-1],tx1[-1]+tx2[0]) else: raise ValueError('Invalid x1 x2 extents specified or valid extent not found!') # Finally convolve the sampled sequences and scale by dt y = signal.convolve(x1, x2)*dt log.info('Output support: (%+2.2f, %+2.2f)' % (ty[0],ty[-1])) return y[ny], ty
[docs] def delta_eps(t,eps): """ Rectangular pulse approximation to impulse function. Parameters ---------- t : ndarray of time axis eps : pulse width Returns ------- d : ndarray containing the impulse approximation Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import delta_eps >>> t = np.arange(-2,2,.001) >>> d = delta_eps(t,.1) >>> plt.plot(t,d) >>> plt.show() """ d = np.zeros(len(t)) for k,tt in enumerate(t): if abs(tt) <= eps/2.: d[k] = 1/float(eps) return d
[docs] def step(t): """ Approximation to step function signal u(t). In this numerical version of u(t) the step turns on at t = 0. Parameters ---------- t : ndarray of the time axis Returns ------- x : ndarray of the step function signal u(t) Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import step >>> t = arange(-1,5,.01) >>> x = step(t) >>> plt.plot(t,x) >>> plt.ylim([-0.01, 1.01]) >>> plt.show() To turn on at t = 1, shift t. >>> x = step(t - 1.0) >>> plt.ylim([-0.01, 1.01]) >>> plt.plot(t,x) """ x = np.zeros(len(t)) for k,tt in enumerate(t): if tt >= 0: x[k] = 1.0 return x
[docs] def rect(t,tau): """ Approximation to the rectangle pulse Pi(t/tau). In this numerical version of Pi(t/tau) the pulse is active over -tau/2 <= t <= tau/2. Parameters ---------- t : ndarray of the time axis tau : the pulse width Returns ------- x : ndarray of the signal Pi(t/tau) Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import rect >>> t = arange(-1,5,.01) >>> x = rect(t,1.0) >>> plt.plot(t,x) >>> plt.ylim([0, 1.01]) >>> plt.show() To turn on the pulse at t = 1 shift t. >>> x = rect(t - 1.0,1.0) >>> plt.plot(t,x) >>> plt.ylim([0, 1.01]) """ x = np.zeros(len(t)) for k,tk in enumerate(t): if np.abs(tk) > tau/2.: x[k] = 0 else: x[k] = 1 return x
[docs] def tri(t,tau): """ Approximation to the triangle pulse Lambda(t/tau). In this numerical version of Lambda(t/tau) the pulse is active over -tau <= t <= tau. Parameters ---------- t : ndarray of the time axis tau : one half the triangle base width Returns ------- x : ndarray of the signal Lambda(t/tau) Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import tri >>> t = arange(-1,5,.01) >>> x = tri(t,1.0) >>> plt.plot(t,x) >>> plt.show() To turn on at t = 1, shift t. >>> x = tri(t - 1.0,1.0) >>> plt.plot(t,x) """ x = np.zeros(len(t)) for k,tk in enumerate(t): if np.abs(tk) > tau/1.: x[k] = 0 else: x[k] = 1 - np.abs(tk)/tau return x
[docs] def dimpulse(n): """ Discrete impulse function delta[n]. Parameters ---------- n : ndarray of the time axis Returns ------- x : ndarray of the signal delta[n] Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import dimpulse >>> n = arange(-5,5) >>> x = dimpulse(n) >>> plt.stem(n,x) >>> plt.show() Shift the delta left by 2. >>> x = dimpulse(n+2) >>> plt.stem(n,x) """ x = np.zeros(len(n)) for k,nn in enumerate(n): if nn == 0: x[k] = 1.0 return x
[docs] def dstep(n): """ Discrete step function u[n]. Parameters ---------- n : ndarray of the time axis Returns ------- x : ndarray of the signal u[n] Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import dstep >>> n = arange(-5,5) >>> x = dstep(n) >>> plt.stem(n,x) >>> plt.show() Shift the delta left by 2. >>> x = dstep(n+2) >>> plt.stem(n,x) """ x = np.zeros(len(n)) for k,nn in enumerate(n): if nn >= 0: x[k] = 1.0 return x
[docs] def drect(n,N): """ Discrete rectangle function of duration N samples. The signal is active on the interval 0 <= n <= N-1. Also known as the rectangular window function, which is available in scipy.signal. Parameters ---------- n : ndarray of the time axis N : the pulse duration Returns ------- x : ndarray of the signal Notes ----- The discrete rectangle turns on at n = 0, off at n = N-1 and has duration of exactly N samples. Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import drect >>> n = arange(-5,5) >>> x = drect(n, N=3) >>> plt.stem(n,x) >>> plt.show() Shift the delta left by 2. >>> x = drect(n+2, N=3) >>> plt.stem(n,x) """ x = np.zeros(len(n)) for k,nn in enumerate(n): if nn >= 0 and nn < N: x[k] = 1.0 return x
[docs] def rc_imp(Ns,alpha,M=6): """ A truncated raised cosine pulse used in digital communications. The pulse shaping factor :math:`0< \\alpha < 1` is required as well as the truncation factor M which sets the pulse duration to be 2*M*Tsymbol. Parameters ---------- Ns : number of samples per symbol alpha : excess bandwidth factor on (0, 1), e.g., 0.35 M : equals RC one-sided symbol truncation factor Returns ------- b : ndarray containing the pulse shape Notes ----- The pulse shape b is typically used as the FIR filter coefficients when forming a pulse shaped digital communications waveform. Examples -------- Ten samples per symbol and alpha = 0.35. >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import rc_imp >>> b = rc_imp(10,0.35) >>> n = arange(-10*6,10*6+1) >>> plt.stem(n,b) >>> plt.show() """ # Design the filter n = np.arange(-M*Ns,M*Ns+1) b = np.zeros(len(n)); a = alpha; Ns *= 1.0 for i in range(len(n)): if (1 - 4*(a*n[i]/Ns)**2) == 0: b[i] = np.pi/4*np.sinc(1/(2.*a)) else: b[i] = np.sinc(n[i]/Ns)*np.cos(np.pi*a*n[i]/Ns)/(1 - 4*(a*n[i]/Ns)**2) return b
[docs] def sqrt_rc_imp(Ns,alpha,M=6): """ A truncated square root raised cosine pulse used in digital communications. The pulse shaping factor 0< alpha < 1 is required as well as the truncation factor M which sets the pulse duration to be 2*M*Tsymbol. Parameters ---------- Ns : number of samples per symbol alpha : excess bandwidth factor on (0, 1), e.g., 0.35 M : equals RC one-sided symbol truncation factor Returns ------- b : ndarray containing the pulse shape Notes ----- The pulse shape b is typically used as the FIR filter coefficients when forming a pulse shaped digital communications waveform. When square root raised cosine (SRC) pulse is used generate Tx signals and at the receiver used as a matched filter (receiver FIR filter), the received signal is now raised cosine shaped, this having zero intersymbol interference and the optimum removal of additive white noise if present at the receiver input. Examples -------- >>> # ten samples per symbol and alpha = 0.35 >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import sqrt_rc_imp >>> b = sqrt_rc_imp(10,0.35) >>> n = arange(-10*6,10*6+1) >>> plt.stem(n,b) >>> plt.show() """ # Design the filter n = np.arange(-M*Ns,M*Ns+1) b = np.zeros(len(n)) Ns *= 1.0 a = alpha for i in range(len(n)): if abs(1 - 16*a**2*(n[i]/Ns)**2) <= np.finfo(np.float32).eps/2: b[i] = 1/2.*((1+a)*np.sin((1+a)*np.pi/(4.*a))-(1-a)*np.cos((1-a)*np.pi/(4.*a))+(4*a)/np.pi*np.sin((1-a)*np.pi/(4.*a))) else: b[i] = 4*a/(np.pi*(1 - 16*a**2*(n[i]/Ns)**2)) b[i] = b[i]*(np.cos((1+a)*np.pi*n[i]/Ns) + np.sinc((1-a)*n[i]/Ns)*(1-a)*np.pi/(4.*a)) return b
[docs] def pn_gen(n_bits, m=5): """ Maximal length sequence signal generator. Generates a sequence 0/1 bits of N_bit duration. The bits themselves are obtained from an m-sequence of length m. Available m-sequence (PN generators) include m = 2,3,...,12, & 16. Parameters ---------- n_bits : the number of bits to generate m : the number of shift registers. 2,3, .., 12, & 16 Returns ------- PN : ndarray of the generator output over N_bits Notes ----- The sequence is periodic having period 2**m - 1 (2^m - 1). Examples -------- >>> # A 15 bit period signal nover 50 bits >>> PN = pn_gen(50,4) """ c = m_seq(m) Q = len(c) max_periods = int(np.ceil(n_bits / float(Q))) PN = np.zeros(max_periods*Q) for k in range(max_periods): PN[k*Q:(k+1)*Q] = c PN = np.resize(PN, (1, n_bits)) return PN.flatten()
[docs] def m_seq(m): """ Generate an m-sequence ndarray using an all-ones initialization. Available m-sequence (PN generators) include m = 2,3,...,12, & 16. Parameters ---------- m : the number of shift registers. 2,3, .., 12, & 16 Returns ------- c : ndarray of one period of the m-sequence Notes ----- The sequence period is 2**m - 1 (2^m - 1). Examples -------- >>> c = m_seq(5) """ if m == 2: taps = np.array([1, 1, 1]) elif m == 3: taps = np.array([1, 0, 1, 1]) elif m == 4: taps = np.array([1, 0, 0, 1, 1]) elif m == 5: taps = np.array([1, 0, 0, 1, 0, 1]) elif m == 6: taps = np.array([1, 0, 0, 0, 0, 1, 1]) elif m == 7: taps = np.array([1, 0, 0, 0, 1, 0, 0, 1]) elif m == 8: taps = np.array([1, 0, 0, 0, 1, 1, 1, 0, 1]) elif m == 9: taps = np.array([1, 0, 0, 0, 0, 1, 0, 0, 0, 1]) elif m == 10: taps = np.array([1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1]) elif m == 11: taps = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1]) elif m == 12: taps = np.array([1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1]) elif m == 13: taps = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1]) elif m == 14: taps = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1]) elif m == 15: taps = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1]) elif m == 16: taps = np.array([1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1]) else: raise ValueError('Invalid length specified') # Load shift register with all ones to start sr = np.ones(m) # M-squence length is: Q = 2**m - 1 c = np.zeros(Q) for n in range(Q): tap_xor = 0 c[n] = sr[-1] for k in range(1,m): if taps[k] == 1: tap_xor = np.bitwise_xor(tap_xor,np.bitwise_xor(int(sr[-1]),int(sr[m-1-k]))) sr[1:] = sr[:-1] sr[0] = tap_xor return c
[docs] def bpsk_tx(N_bits, Ns, ach_fc=2.0, ach_lvl_dB=-100, pulse='rect', alpha = 0.25, M=6): """ Generates biphase shift keyed (BPSK) transmitter with adjacent channel interference. Generates three BPSK signals with rectangular or square root raised cosine (SRC) pulse shaping of duration N_bits and Ns samples per bit. The desired signal is centered on f = 0, which the adjacent channel signals to the left and right are also generated at dB level relative to the desired signal. Used in the digital communications Case Study supplement. Parameters ---------- N_bits : the number of bits to simulate Ns : the number of samples per bit ach_fc : the frequency offset of the adjacent channel signals (default 2.0) ach_lvl_dB : the level of the adjacent channel signals in dB (default -100) pulse :the pulse shape 'rect' or 'src' alpha : square root raised cosine pulse shape factor (default = 0.25) M : square root raised cosine pulse truncation factor (default = 6) Returns ------- x : ndarray of the composite signal x0 + ach_lvl*(x1p + x1m) b : the transmit pulse shape data0 : the data bits used to form the desired signal; used for error checking Notes ----- Examples -------- >>> x,b,data0 = bpsk_tx(1000,10,pulse='src') """ pulse_types = ['rect', 'src'] if pulse not in pulse_types: raise ValueError('Pulse shape must be \'rect\' or \'src\'''') x0,b,data0 = nrz_bits(N_bits, Ns, pulse, alpha, M) x1p,b,data1p = nrz_bits(N_bits, Ns, pulse, alpha, M) x1m,b,data1m = nrz_bits(N_bits, Ns, pulse, alpha, M) n = np.arange(len(x0)) x1p = x1p*np.exp(1j*2*np.pi*ach_fc/float(Ns)*n) x1m = x1m*np.exp(-1j*2*np.pi*ach_fc/float(Ns)*n) ach_lvl = 10**(ach_lvl_dB/20.) return x0 + ach_lvl*(x1p + x1m), b, data0
[docs] def nrz_bits(n_bits, ns, pulse='rect', alpha=0.25, m=6): """ Generate non-return-to-zero (NRZ) data bits with pulse shaping. A baseband digital data signal using +/-1 amplitude signal values and including pulse shaping. Parameters ---------- n_bits : number of NRZ +/-1 data bits to produce ns : the number of samples per bit, pulse_type : 'rect' , 'rc', 'src' (default 'rect') alpha : excess bandwidth factor(default 0.25) m : single sided pulse duration (default = 6) Returns ------- x : ndarray of the NRZ signal values b : ndarray of the pulse shape data : ndarray of the underlying data bits Notes ----- Pulse shapes include 'rect' (rectangular), 'rc' (raised cosine), 'src' (root raised cosine). The actual pulse length is 2*M+1 samples. This function is used by BPSK_tx in the Case Study article. Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm.sigsys import nrz_bits >>> from numpy import arange >>> x,b,data = nrz_bits(100, 10) >>> t = arange(len(x)) >>> plt.plot(t, x) >>> plt.ylim([-1.01, 1.01]) >>> plt.show() """ data = np.random.randint(0, 2, n_bits) n_zeros = np.zeros((n_bits, int(ns) - 1)) x = np.hstack((2 * data.reshape(n_bits, 1) - 1, n_zeros)) x =x.flatten() if pulse.lower() == 'rect': b = np.ones(int(ns)) elif pulse.lower() == 'rc': b = rc_imp(ns, alpha, m) elif pulse.lower() == 'src': b = sqrt_rc_imp(ns, alpha, m) else: raise ValueError('pulse type must be rec, rc, or src') x = signal.lfilter(b,1,x) return x, b / float(ns), data
[docs] def nrz_bits2(data, Ns, pulse='rect', alpha = 0.25, M=6): """ Generate non-return-to-zero (NRZ) data bits with pulse shaping with user data A baseband digital data signal using +/-1 amplitude signal values and including pulse shaping. The data sequence is user supplied. Parameters ---------- data : ndarray of the data bits as 0/1 values Ns : the number of samples per bit, pulse_type : 'rect' , 'rc', 'src' (default 'rect') alpha : excess bandwidth factor(default 0.25) M : single sided pulse duration (default = 6) Returns ------- x : ndarray of the NRZ signal values b : ndarray of the pulse shape Notes ----- Pulse shapes include 'rect' (rectangular), 'rc' (raised cosine), 'src' (root raised cosine). The actual pulse length is 2*M+1 samples. Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm.sigsys import nrz_bits2 >>> from sk_dsp_comm.sigsys import m_seq >>> from numpy import arange >>> x,b = nrz_bits2(m_seq(5),10) >>> t = arange(len(x)) >>> plt.ylim([-1.01, 1.01]) >>> plt.plot(t,x) """ N_bits = len(data) n_zeros = np.zeros((N_bits,int(Ns)-1)) x = np.hstack((2*data.reshape(N_bits,1)-1,n_zeros)) x = x.flatten() if pulse.lower() == 'rect': b = np.ones(int(Ns)) elif pulse.lower() == 'rc': b = rc_imp(Ns,alpha,M) elif pulse.lower() == 'src': b = sqrt_rc_imp(Ns,alpha,M) else: raise ValueError('pulse type must be rec, rc, or src') x = signal.lfilter(b,1,x) return x,b/float(Ns)
[docs] def eye_plot(x, l, s=0): """ Eye pattern plot of a baseband digital communications waveform. The signal must be real, but can be multivalued in terms of the underlying modulation scheme. Used for BPSK eye plots in the Case Study article. Parameters ---------- x : ndarray of the real input data vector/array l : display length in samples (usually two symbols) s : start index Returns ------- Nothing : A plot window opens containing the eye plot Notes ----- Increase S to eliminate filter transients. Examples -------- 1000 bits at 10 samples per bit with 'rc' shaping. >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm import sigsys as ss >>> x,b, data = ss.nrz_bits(1000,10,'rc') >>> ss.eye_plot(x,20,60) """ plt.figure(figsize=(6,4)) idx = np.arange(0, l + 1) plt.plot(idx, x[s:s + l + 1], 'b') k_max = int((len(x) - s) / l) - 1 for k in range(1,k_max): plt.plot(idx, x[s + k * l:s + l + 1 + k * l], 'b') plt.grid() plt.xlabel('Time Index - n') plt.ylabel('Amplitude') plt.title('Eye Plot') return 0
[docs] def scatter(x, ns, start): """ Sample a baseband digital communications waveform at the symbol spacing. Parameters ---------- x : ndarray of the input digital comm signal ns : number of samples per symbol (bit) start : the array index to start the sampling Returns ------- xI : ndarray of the real part of x following sampling xQ : ndarray of the imaginary part of x following sampling Notes ----- Normally the signal is complex, so the scatter plot contains clusters at points in the complex plane. For a binary signal such as BPSK, the point centers are nominally +/-1 on the real axis. Start is used to eliminate transients from the FIR pulse shaping filters from appearing in the scatter plot. Examples -------- >>> import matplotlib.pyplot as plt >>> from sk_dsp_comm import sigsys as ss >>> x,b, data = ss.nrz_bits(1000,10,'rc') >>> # Add some noise so points are now scattered about +/-1 >>> y = ss.cpx_awgn(x,20,10) >>> yI,yQ = ss.scatter(y,10,60) >>> plt.plot(yI,yQ,'.') >>> plt.axis('equal') >>> plt.ylabel("Quadrature") >>> plt.xlabel("In-Phase") >>> plt.grid() >>> plt.show() """ xI = np.real(x[start::ns]) xQ = np.imag(x[start::ns]) return xI, xQ
[docs] def bit_errors(z, data, start, ns): """ A simple bit error counting function. In its present form this function counts bit errors between hard decision BPSK bits in +/-1 form and compares them with 0/1 binary data that was transmitted. Timing between the Tx and Rx data is the responsibility of the user. An enhanced version of this function, which features automatic synching will be created in the future. Parameters ---------- z : ndarray of hard decision BPSK data prior to symbol spaced sampling data : ndarray of reference bits in 1/0 format start : timing reference for the received ns : the number of samples per symbol Returns ------- Pe_hat : the estimated probability of a bit error Notes ----- The Tx and Rx data streams are exclusive-or'd and the then the bit errors are summed, and finally divided by the number of bits observed to form an estimate of the bit error probability. This function needs to be enhanced to be more useful. Examples -------- >>> from scipy import signal >>> x,b, data = nrz_bits(1000,10) >>> # set Eb/N0 to 8 dB >>> y = cpx_awgn(x,8,10) >>> # matched filter the signal >>> z = signal.lfilter(b,1,y) >>> # make bit decisions at 10 and Ns multiples thereafter >>> Pe_hat = bit_errors(z,data,10,10) """ Pe_hat = np.sum(data[0:len(z[start::ns])] ^ np.int64((np.sign(np.real(z[start::ns])) + 1) / 2)) / float(len(z[start::ns])) return Pe_hat
[docs] def cpx_awgn(x, es_n0, ns): """ Apply white Gaussian noise to a digital communications signal. This function represents a complex baseband white Gaussian noise digital communications channel. The input signal array may be real or complex. Parameters ---------- x : ndarray noise free complex baseband input signal. EsNO : set the channel Es/N0 (Eb/N0 for binary) level in dB ns : number of samples per symbol (bit) Returns ------- y : ndarray x with additive noise added. Notes ----- Set the channel energy per symbol-to-noise power spectral density ratio (Es/N0) in dB. Examples -------- >>> x,b, data = nrz_bits(1000,10) >>> # set Eb/N0 = 10 dB >>> y = cpx_awgn(x,10,10) """ w = np.sqrt(ns * np.var(x) * 10 ** (-es_n0 / 10.) / 2.) * (np.random.randn(len(x)) + 1j * np.random.randn(len(x))) return x+w
[docs] def my_psd(x,NFFT=2**10,Fs=1): """ A local version of NumPy's PSD function that returns the plot arrays. A mlab.psd wrapper function that returns two ndarrays; makes no attempt to auto plot anything. Parameters ---------- x : ndarray input signal NFFT : a power of two, e.g., 2**10 = 1024 Fs : the sampling rate in Hz Returns ------- Px : ndarray of the power spectrum estimate f : ndarray of frequency values Notes ----- This function makes it easier to overlay spectrum plots because you have better control over the axis scaling than when using psd() in the autoscale mode. Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import log10 >>> from sk_dsp_comm import sigsys as ss >>> x,b, data = ss.nrz_bits(10000,10) >>> Px,f = ss.my_psd(x,2**10,10) >>> plt.plot(f, 10*log10(Px)) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel("Frequency (Hz)") >>> plt.show() """ Px,f = pylab.mlab.psd(x,NFFT,Fs) return Px.flatten(), f
[docs] def am_tx(m,a_mod,fc=75e3): """ AM transmitter for Case Study of Chapter 17. Assume input is sampled at 8 Ksps and upsampling by 24 is performed to arrive at fs_out = 192 Ksps. Parameters ---------- m : ndarray of the input message signal a_mod : AM modulation index, between 0 and 1 fc : the carrier frequency in Hz Returns ------- x192 : ndarray of the upsampled by 24 and modulated carrier t192 : ndarray of the upsampled by 24 time axis m24 : ndarray of the upsampled by 24 message signal Notes ----- The sampling rate of the input signal is assumed to be 8 kHz. Examples -------- >>> n = arange(0,1000) >>> # 1 kHz message signal >>> m = cos(2*pi*1000/8000.*n) >>> x192, t192 = am_tx(m,0.8,fc=75e3) """ m24 = interp24(m) t192 = np.arange(len(m24))/192.0e3 #m24 = np.cos(2*np.pi*2.0e3*t192) m_max = np.max(np.abs(m24)) x192 = (1 + a_mod*m24/m_max)*np.cos(2*np.pi*fc*t192) return x192, t192, m24
[docs] def am_rx(x192): """ AM envelope detector receiver for the Chapter 17 Case Study The receiver bandpass filter is not included in this function. Parameters ---------- x192 : ndarray of the AM signal at sampling rate 192 ksps Returns ------- m_rx8 : ndarray of the demodulated message at 8 ksps t8 : ndarray of the time axis at 8 ksps m_rx192 : ndarray of the demodulated output at 192 ksps x_edet192 : ndarray of the envelope detector output at 192 ksps Notes ----- The bandpass filter needed at the receiver front-end can be designed using b_bpf,a_bpf = :func:`am_rx_BPF`. Examples -------- >>> import numpy as np >>> n = np.arange(0,1000) >>> # 1 kHz message signal >>> m = np.cos(2*np.pi*1000/8000.*n) >>> m_rx8,t8,m_rx192,x_edet192 = am_rx(x192) """ x_edet192 = env_det(x192) m_rx8 = deci24(x_edet192) # remove DC offset from the env_det + LPF output m_rx8 -= np.mean(m_rx8) t8 = np.arange(len(m_rx8))/8.0e3 """ For performance testing also filter x_env_det 192e3 using a Butterworth cascade. The filter cutoff is 5kHz, the message BW. """ b192,a192 = signal.butter(5,2*5.0e3/192.0e3) m_rx192 = signal.lfilter(b192,a192,x_edet192) m_rx192 = signal.lfilter(b192,a192,m_rx192) m_rx192 -= np.mean(m_rx192) return m_rx8,t8,m_rx192,x_edet192
[docs] def am_rx_bpf(n_order=7, ripple_dB=1, b=10e3, fs=192e3): """ Bandpass filter design for the AM receiver Case Study of Chapter 17. Design a 7th-order Chebyshev type 1 bandpass filter to remove/reduce adjacent channel intereference at the envelope detector input. Parameters ---------- n_order : the filter order (default = 7) ripple_dB : the passband ripple in dB (default = 1) b : the RF bandwidth (default = 10e3) fs : the sampling frequency Returns ------- b_bpf : ndarray of the numerator filter coefficients a_bpf : ndarray of the denominator filter coefficients Examples -------- >>> from scipy import signal >>> import numpy as np >>> import matplotlib.pyplot as plt >>> import sk_dsp_comm.sigsys as ss >>> # Use the default values >>> b_bpf,a_bpf = ss.am_rx_bpf() Pole-zero plot of the filter. >>> ss.zplane(b_bpf,a_bpf) >>> plt.show() Plot of the frequency response. >>> f = np.arange(0,192/2.,.1) >>> w, Hbpf = signal.freqz(b_bpf,a_bpf,2*np.pi*f/192) >>> plt.plot(f*10,20*np.log10(abs(Hbpf))) >>> plt.axis([0,1920/2.,-80,10]) >>> plt.ylabel("Power Spectral Density (dB)") >>> plt.xlabel("Frequency (kHz)") >>> plt.show() """ b_bpf,a_bpf = signal.cheby1(n_order, ripple_dB, 2 * np.array([75e3 - b / 2., 75e3 + b / 2.]) / fs, 'bandpass') return b_bpf,a_bpf
[docs] def env_det(x): """ Ideal envelope detector. This function retains the positive half cycles of the input signal. Parameters ---------- x : ndarray of the input sugnal Returns ------- y : ndarray of the output signal Examples -------- >>> n = arange(0,100) >>> # 1 kHz message signal >>> m = cos(2*pi*1000/8000.*n) >>> x192, t192, m24 = am_tx(m,0.8,fc=75e3) >>> y = env_det(x192) """ y = np.zeros(len(x)) for k,xx in enumerate(x): if xx >= 0: y[k] = xx return y
[docs] def interp24(x): """ Interpolate by L = 24 using Butterworth filters. The interpolation is done using three stages. Upsample by L = 2 and lowpass filter, upsample by 3 and lowpass filter, then upsample by L = 4 and lowpass filter. In all cases the lowpass filter is a 10th-order Butterworth lowpass. Parameters ---------- x : ndarray of the input signal Returns ------- y : ndarray of the output signal Notes ----- The cutoff frequency of the lowpass filters is 1/2, 1/3, and 1/4 to track the upsampling by 2, 3, and 4 respectively. Examples -------- >>> y = interp24(x) """ # Stage 1: L = 2 b2,a2 = signal.butter(10,1/2.) y1 = upsample(x,2) y1 = signal.lfilter(b2,a2,2*y1) # Stage 2: L = 3 b3,a3 = signal.butter(10,1/3.) y2 = upsample(y1,3) y2 = signal.lfilter(b3,a3,3*y2) # Stage 3: L = 4 b4,a4 = signal.butter(10,1/4.) y3 = upsample(y2,4) y3 = signal.lfilter(b4,a4,4*y3) return y3
[docs] def deci24(x): """ Decimate by L = 24 using Butterworth filters. The decimation is done using two three stages. Downsample sample by L = 2 and lowpass filter, downsample by 3 and lowpass filter, then downsample by L = 4 and lowpass filter. In all cases the lowpass filter is a 10th-order Butterworth lowpass. Parameters ---------- x : ndarray of the input signal Returns ------- y : ndarray of the output signal Notes ----- The cutoff frequency of the lowpass filters is 1/2, 1/3, and 1/4 to track the upsampling by 2, 3, and 4 respectively. Examples -------- >>> y = deci24(x) """ # Stage 1: M = 2 b2,a2 = signal.butter(10,1/2.) y1 = signal.lfilter(b2,a2,x) y1 = downsample(y1,2) # Stage 2: M = 3 b3,a3 = signal.butter(10,1/3.) y2 = signal.lfilter(b3,a3,y1) y2 = downsample(y2,3) # Stage 3: L = 4 b4,a4 = signal.butter(10,1/4.) y3 = signal.lfilter(b4,a4,y2) y3 = downsample(y3,4) return y3
[docs] def upsample(x,L): """ Upsample by factor L Insert L - 1 zero samples in between each input sample. Parameters ---------- x : ndarray of input signal values L : upsample factor Returns ------- y : ndarray of the output signal values Examples -------- >>> y = upsample(x,3) """ N_input = len(x) y = np.hstack((x.reshape(N_input,1),np.zeros((N_input, int(L-1))))) y = y.flatten() return y
[docs] def downsample(x,M,p=0): """ Downsample by factor M Keep every Mth sample of the input. The phase of the input samples kept can be selected. Parameters ---------- x : ndarray of input signal values M : downsample factor p : phase of decimated value, 0 (default), 1, ..., M-1 Returns ------- y : ndarray of the output signal values Examples -------- >>> y = downsample(x,3) >>> y = downsample(x,3,1) """ if not isinstance(M, int): raise TypeError("M must be an int") x = x[0:int(np.floor(len(x)/M))*M] x = x.reshape((int(np.floor(len(x)/M)),M)) y = x[:,p] return y
[docs] def unique_cpx_roots(rlist,tol = 0.001): """ The average of the root values is used when multiplicity is greater than one. Mark Wickert October 2016 """ uniq = [rlist[0]] mult = [1] for k in range(1,len(rlist)): N_uniq = len(uniq) for m in range(N_uniq): if abs(rlist[k]-uniq[m]) <= tol: mult[m] += 1 uniq[m] = (uniq[m]*(mult[m]-1) + rlist[k])/float(mult[m]) break uniq = np.hstack((uniq,rlist[k])) mult = np.hstack((mult,[1])) return np.array(uniq), np.array(mult)
[docs] def zplane(b,a,auto_scale=True,size=2,detect_mult=True,tol=0.001): """ Create an z-plane pole-zero plot. Create an z-plane pole-zero plot using the numerator and denominator z-domain system function coefficient ndarrays b and a respectively. Assume descending powers of z. Parameters ---------- b : ndarray of the numerator coefficients a : ndarray of the denominator coefficients auto_scale : bool (default True) size : plot radius maximum when scale = False Returns ------- (M,N) : tuple of zero and pole counts + plot window Notes ----- This function tries to identify repeated poles and zeros and will place the multiplicity number above and to the right of the pole or zero. The difficulty is setting the tolerance for this detection. Currently it is set at 1e-3 via the function signal.unique_roots. Examples -------- >>> # Here the plot is generated using auto_scale >>> zplane(b,a) >>> # Here the plot is generated using manual scaling >>> zplane(b,a,False,1.5) """ if (isinstance(a,int) or isinstance(a,float)): a = [a] if (isinstance(b,int) or isinstance(b,float)): b = [b] M = len(b) - 1 N = len(a) - 1 # Plot labels if multiplicity greater than 1 x_scale = 1.5*size y_scale = 1.5*size x_off = 0.02 y_off = 0.01 #N_roots = np.array([1.0]) if M > 0: N_roots = np.roots(b) #D_roots = np.array([1.0]) if N > 0: D_roots = np.roots(a) if auto_scale: if M > 0 and N > 0: size = max(np.max(np.abs(N_roots)),np.max(np.abs(D_roots)))+.1 elif M > 0: size = max(np.max(np.abs(N_roots)),1.0)+.1 elif N > 0: size = max(1.0,np.max(np.abs(D_roots)))+.1 else: size = 1.1 plt.figure(figsize=(5,5)) plt.axis('equal') r = np.linspace(0,2*np.pi,200) plt.plot(np.cos(r),np.sin(r),'r--') plt.plot([-size,size],[0,0],'k-.') plt.plot([0,0],[-size,size],'k-.') if M > 0: if detect_mult == True: N_uniq, N_mult = unique_cpx_roots(N_roots,tol=tol) plt.plot(np.real(N_uniq),np.imag(N_uniq),'ko',mfc='None',ms=8) idx_N_mult = np.nonzero(np.ravel(N_mult>1))[0] for k in range(len(idx_N_mult)): x_loc = np.real(N_uniq[idx_N_mult[k]]) + x_off*x_scale y_loc =np.imag(N_uniq[idx_N_mult[k]]) + y_off*y_scale plt.text(x_loc,y_loc,str(N_mult[idx_N_mult[k]]),ha='center',va='bottom',fontsize=10) else: plt.plot(np.real(N_roots),np.imag(N_roots),'ko',mfc='None',ms=8) if N > 0: if detect_mult == True: D_uniq, D_mult=unique_cpx_roots(D_roots,tol=tol) plt.plot(np.real(D_uniq),np.imag(D_uniq),'kx',ms=8) idx_D_mult = np.nonzero(np.ravel(D_mult>1))[0] for k in range(len(idx_D_mult)): x_loc = np.real(D_uniq[idx_D_mult[k]]) + x_off*x_scale y_loc =np.imag(D_uniq[idx_D_mult[k]]) + y_off*y_scale plt.text(x_loc,y_loc,str(D_mult[idx_D_mult[k]]),ha='center',va='bottom',fontsize=10) else: plt.plot(np.real(D_roots),np.imag(D_roots),'kx',ms=8) if M - N < 0: plt.plot(0.0,0.0,'bo',mfc='None',ms=8) elif M - N > 0: plt.plot(0.0,0.0,'kx',ms=8) if abs(M - N) > 1: plt.text(x_off*x_scale,y_off*y_scale,str(abs(M-N)),ha='center',va='bottom',fontsize=10) plt.xlabel('Real Part') plt.ylabel('Imaginary Part') plt.title('Pole-Zero Plot') #plt.grid() plt.axis([-size,size,-size,size]) return M,N
[docs] def rect_conv(n, n_len): """ The theoretical result of convolving two rectangle sequences. The result is a triangle. The solution is based on pure analysis. Simply coded as opposed to efficiently coded. Parameters ---------- n : ndarray of time axis n_len : rectangle pulse duration Returns ------- y : ndarray of of output signal Examples -------- >>> import matplotlib.pyplot as plt >>> from numpy import arange >>> from sk_dsp_comm.sigsys import rect_conv >>> n = arange(-5,20) >>> y = rect_conv(n,6) >>> plt.plot(n, y) >>> plt.show() """ y = np.zeros(len(n)) for k in range(len(n)): if n[k] >= 0 and n[k] < n_len-1: y[k] = n[k] + 1 elif n[k] >= n_len-1 and n[k] <= 2*n_len-2: y[k] = 2 * n_len - 1 - n[k] return y
[docs] def biquad2(w_num, r_num, w_den, r_den): """ A biquadratic filter in terms of conjugate pole and zero pairs. Parameters ---------- w_num : zero frequency (angle) in rad/sample r_num : conjugate zeros radius w_den : pole frequency (angle) in rad/sample r_den : conjugate poles radius; less than 1 for stability Returns ------- b : ndarray of numerator coefficients a : ndarray of denominator coefficients Examples -------- >>> b,a = biquad2(pi/4., 1, pi/4., 0.95) """ b = np.array([1, -2*r_num*np.cos(w_num), r_num**2]) a = np.array([1, -2*r_den*np.cos(w_den), r_den**2]) return b, a
def plot_na(x, y, mode='stem'): pylab.figure(figsize=(5,2)) frame1 = pylab.gca() if mode.lower() == 'stem': pylab.stem(x,y) else: pylab.plot(x,y) frame1.axes.get_xaxis().set_visible(False) frame1.axes.get_yaxis().set_visible(False) pylab.show()
[docs] def from_wav(filename): """ Read a wave file. A wrapper function for scipy.io.wavfile.read that also includes int16 to float [-1,1] scaling. Parameters ---------- filename : file name string Returns ------- fs : sampling frequency in Hz x : ndarray of normalized to 1 signal samples Examples -------- >>> fs,x = from_wav('test_file.wav') """ fs, x = wavfile.read(filename) return fs, x/32767.
[docs] def to_wav(filename, rate, x): """ Write a wave file. A wrapper function for scipy.io.wavfile.write that also includes int16 scaling and conversion. Assume input x is [-1,1] values. Parameters ---------- filename : file name string rate : sampling frequency in Hz Returns ------- Nothing : writes only the *.wav file Examples -------- >>> to_wav('test_file.wav', 8000, x) """ x16 = np.int16(x*32767) wavfile.write(filename, rate, x16)
[docs] def bin_num(n, n_bits): """ Produce a signed representation of the number n using n_bits. :param n: Number n :param n_bits: Number of bits :return: """ mask = (2 << n_bits - 1) - 1 num = int(n) & mask f_str = '{:0' + str(n_bits) + 'b}' f_res = f_str.format(int(num)) return f_res