summaryrefslogtreecommitdiff
path: root/gsm-tvoid/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'gsm-tvoid/src/python')
-rw-r--r--gsm-tvoid/src/python/Makefile.am33
-rwxr-xr-xgsm-tvoid/src/python/capture.sh39
-rwxr-xr-xgsm-tvoid/src/python/go.sh16
-rwxr-xr-xgsm-tvoid/src/python/gsm_scan.py731
-rwxr-xr-xgsm-tvoid/src/python/qa_gsm.py32
-rw-r--r--gsm-tvoid/src/python/run_tests.in50
6 files changed, 901 insertions, 0 deletions
diff --git a/gsm-tvoid/src/python/Makefile.am b/gsm-tvoid/src/python/Makefile.am
new file mode 100644
index 0000000..10e282c
--- /dev/null
+++ b/gsm-tvoid/src/python/Makefile.am
@@ -0,0 +1,33 @@
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+include $(top_srcdir)/Makefile.common
+
+EXTRA_DIST = run_tests.in \
+ gsm_scan.py
+
+
+TESTS = \
+ run_tests
+
+
+noinst_PYTHON = \
+ qa_gsm.py
diff --git a/gsm-tvoid/src/python/capture.sh b/gsm-tvoid/src/python/capture.sh
new file mode 100755
index 0000000..7256f2b
--- /dev/null
+++ b/gsm-tvoid/src/python/capture.sh
@@ -0,0 +1,39 @@
+#! /bin/sh
+
+if [ $1"x" = x ]; then
+ echo "./capture.sh <freq> [duration==10] [decim==112]"
+ echo "Example: ./capture.sh 940.4M"
+ exit 1
+fi
+FREQ=$1
+
+DURATION=$2
+if [ $2"x" = x ]; then
+ DURATION=10
+fi
+DECIM=$3
+if [ $3"x" = x ]; then
+ DECIM=112
+fi
+
+USRP_PROG=usrp_rx_cfile.py
+while :; do
+ which "$USRP_PROG"
+ if [ $? -eq 0 ]; then
+ break
+ fi
+ USRP_PROG=/usr/share/gnuradio/usrp/usrp_rx_cfile.py
+ which "$USRP_PROG"
+ if [ $? -eq 0 ]; then
+ break
+ fi
+
+ echo "ERROR: usrp_rx_cfile.py not found. Make sure it's in your PATH!"
+ exit 1
+done
+
+FILE="capture_${FREQ}_${DECIM}.cfile"
+samples=`expr 64000000 / $DECIM '*' $DURATION`
+echo "Capturing for $DURATION seconds to $FILE ($samples samples)"
+$USRP_PROG -d "$DECIM" -f "$FREQ" -N $samples $FILE
+
diff --git a/gsm-tvoid/src/python/go.sh b/gsm-tvoid/src/python/go.sh
new file mode 100755
index 0000000..85fbeb9
--- /dev/null
+++ b/gsm-tvoid/src/python/go.sh
@@ -0,0 +1,16 @@
+#! /bin/sh
+
+echo "go.sh <file.cfile> [decim==112]"
+
+DECIM=$2
+FILE=$1
+if [ $FILE"x" = x ]; then
+ FILE="../../../resource/data/GSMSP_940.8Mhz_118.cfile"
+ DECIM=118
+fi
+
+if [ $DECIM"x" = x ]; then
+ DECIM=112
+fi
+
+./gsm_scan.py -SN -pd -d "$DECIM" -I "$FILE" | ../../../gsmdecode/src/gsmdecode -i
diff --git a/gsm-tvoid/src/python/gsm_scan.py b/gsm-tvoid/src/python/gsm_scan.py
new file mode 100755
index 0000000..a366960
--- /dev/null
+++ b/gsm-tvoid/src/python/gsm_scan.py
@@ -0,0 +1,731 @@
+#!/usr/bin/env python
+# TODO:
+# * Add status info to window (frequency, offset, etc)
+# * Put direct frequency tuning back
+# * Add rate-limited file reads (throttle?)
+# * Make console only version
+# * Reset burst_stats on retune
+# * Add better option checking
+# * Wideband (multi-channel) processing (usrp and/or file input)
+# * Automatic beacon scan (quick scan RSSI, then check for BCCH)
+# * AGC
+
+import sys
+
+#nasty hack for testing
+for extdir in ['../lib','../lib/.libs']:
+ if extdir not in sys.path:
+ sys.path.append(extdir)
+
+from gnuradio import gr, gru, blks
+from gnuradio import usrp
+from gnuradio import eng_notation
+from gnuradio.eng_option import eng_option
+from gnuradio.wxgui import stdgui, fftsink, waterfallsink, scopesink, form, slider
+from optparse import OptionParser
+from math import pi
+import wx
+import gsm
+
+####################
+class burst_callback(gr.feval_ll):
+ def __init__(self, fg):
+ gr.feval_ll.__init__(self)
+ self.fg = fg
+ self.offset_mean_num = 10 #number of FCCH offsets to average
+ self.offset_vals = []
+
+####################
+ def eval(self, x):
+ #print "burst_callback: eval(",x,")\n";
+ try:
+ if gsm.BURST_CB_SYNC_OFFSET == x:
+ #print "burst_callback: SYNC_OFFSET\n";
+ if self.fg.options.tuning.count("o"):
+ last_offset = self.fg.burst.last_freq_offset()
+ self.fg.offset -= last_offset
+ #print "burst_callback: SYNC_OFFSET:", last_offset, " ARFCN: ", self.fg.channel, "\n";
+ self.fg.set_channel(self.fg.channel)
+
+ elif gsm.BURST_CB_ADJ_OFFSET == x:
+ last_offset = self.fg.burst.last_freq_offset()
+
+ self.offset_vals.append(last_offset)
+ count = len(self.offset_vals)
+ #print "burst_callback: ADJ_OFFSET:", last_offset, ", count=",count,"\n";
+
+ if count >= self.offset_mean_num:
+ sum = 0.0
+ while len(self.offset_vals):
+ sum += self.offset_vals.pop(0)
+
+ self.fg.mean_offset = sum / self.offset_mean_num
+
+ #print "burst_callback: mean offset:", self.fg.mean_offset, "\n";
+
+ #retune if greater than 100 Hz
+ if abs(self.fg.mean_offset) > 100.0:
+ #print "burst_callback: mean offset adjust:", self.fg.mean_offset, "\n";
+ if self.fg.options.tuning.count("o"):
+ #print "burst_callback: tuning.\n";
+ self.fg.offset -= self.fg.mean_offset
+ self.fg.set_channel(self.fg.channel)
+
+ elif gsm.BURST_CB_TUNE == x:
+ #print "burst_callback: BURST_CB_TUNE: ARFCN: ", self.fg.burst.next_arfcn, "\n";
+ if self.fg.options.tuning.count("h"):
+ #print "burst_callback: tuning.\n";
+ self.fg.set_channel(self.fg.burst.next_arfcn)
+
+ return 0
+
+ except Exception, e:
+ print >> sys.stderr, "burst_callback: Exception: ", e
+
+
+####################
+def pick_subdevice(u):
+ if u.db[0][0].dbid() >= 0:
+ return (0, 0)
+ if u.db[1][0].dbid() >= 0:
+ return (1, 0)
+ return (0, 0)
+
+####################
+def get_freq_from_arfcn(chan,region):
+
+ #P/E/R-GSM 900
+ if chan >= 0 and chan <= 124:
+ freq = 890 + 0.2*chan + 45
+
+ #GSM 850
+ elif chan >= 128 and chan <= 251:
+ freq = 824.2 + 0.2*(chan - 128) + 45
+
+ #GSM 450
+ elif chan >= 259 and chan <= 293:
+ freq = 450.6 + 0.2*(chan - 259) + 10
+
+ #GSM 480
+ elif chan >= 306 and chan <= 340:
+ freq = 479 + 0.2*(chan - 306) + 10
+
+ #DCS 1800
+ elif region is "e" and chan >= 512 and chan <= 885:
+ freq = 1710.2 + 0.2*(chan - 512) + 95
+
+ #DCS 1900
+ elif region is "u" and chan >= 512 and chan <= 810:
+ freq = 1850.2 + 0.2*(chan - 512) + 80
+
+ #E/R-GSM 900
+ elif chan >= 955 and chan <= 1023:
+ freq = 890 + 0.2*(chan - 1024) + 45
+
+ else:
+ freq = 0
+
+ return freq * 1e6
+
+def get_arfcn_from_freq(freq,region):
+ freq = freq / 1e6
+ # GSM 450
+ if freq <= 450.6 + 0.2*(293 - 259) + 10:
+ arfcn = ((freq - (450.6 + 10)) / 0.2) + 259
+ # GSM 480
+ elif freq <= 479 + 0.2*(340 - 306) + 10:
+ arfcn = ((freq - (479 + 10)) / 0.2) + 306
+ # GSM 850
+ elif freq <= 824.2 + 0.2*(251 - 128) + 45:
+ arfcn = ((freq - (824.2 + 45)) / 0.2) + 128
+ #E/R-GSM 900
+ elif freq <= 890 + 0.2*(1023 - 1024) + 45:
+ arfcn = ((freq - (890 + 45)) / -0.2) + 955
+ # GSM 900
+ elif freq <= 890 + 0.2*124 + 45:
+ arfcn = (freq - (890 + 45)) / 0.2
+ else:
+ if region is "u":
+ if freq > 1850.2 + 0.2*(810 - 512) + 80:
+ arfcn = 0;
+ else:
+ arfcn = (freq - (1850.2 + 80) / 0.2) + 512
+ elif region is "e":
+ if freq > 1710.2 + 0.2*(885 - 512) + 95:
+ arfcn = 0;
+ else:
+ arfcn = (freq - (1710.2 + 95) / 0.2) + 512
+ else:
+ arfcn = 0
+
+ return arfcn
+
+####################
+class app_flow_graph(stdgui.gui_flow_graph):
+
+ def __init__(self, frame, panel, vbox, argv):
+ stdgui.gui_flow_graph.__init__(self)
+
+ self.frame = frame
+ self.panel = panel
+ self.parse_options()
+ self.setup_flowgraph()
+ self.setup_print_options()
+ self._build_gui(vbox)
+
+ #some non-gui wxwindows handlers
+ self.t1 = wx.Timer(self.frame)
+ self.t1.Start(5000,0)
+ self.frame.Bind(wx.EVT_TIMER, self.on_tick)
+
+ #bind the idle routing for message_queue processing
+ self.frame.Bind(wx.EVT_IDLE, self.on_idle)
+
+ #tune
+ self.set_channel(self.channel)
+
+ #giddyup
+ self.status_msg = "Started."
+
+
+####################
+ def parse_options(self):
+ parser = OptionParser(option_class=eng_option)
+
+ #view options
+ parser.add_option("-S", "--scopes", type="string", default="I",
+ help="Select scopes to display. (N)one, (I)nput,(F)ilter,(d)emod,(c)locked,(b)urst [default=%default]")
+ parser.add_option("-p", "--print-console", type="string", default="s",
+ help="What to print on console. [default=%default]\n" +
+ "(n)othing, (e)verything, (s)tatus, (a)ll Types, (k)nown, (u)nknown, \n" +
+ "TS(0), (F)CCH, (S)CH, (N)ormal, (D)ummy\n" +
+ "Usefull (b)its, All TS (B)its, (C)orrelation bits, he(x) raw burst data, \n" +
+ "(d)ecoded hex for gsmdecode")
+
+
+ #decoder options
+ parser.add_option("-D", "--decoder", type="string", default="c",
+ help="Select decoder block to use. (c)omplex,(f)loat [default=%default]")
+ parser.add_option("-E", "--equalizer", type="string", default="none",
+ help="Type of equalizer to use. none, fixed-dfe, fixed-linear [default=%default]")
+ parser.add_option("-t", "--timing", type="string", default="cn",
+ help="Type of timing techniques to use. [default=%default] \n" +
+ "(n)one, (c)orrelation track, (q)uarter bit, (f)ull04 ")
+
+ #tuning options
+ parser.add_option("-T", "--tuning", type="string", default="oh",
+ help="Type of tuning to perform. [default=%default] \n" +
+ "(n)one, (o)ffset adjustment, (h)opping ")
+ parser.add_option("-o", "--offset", type="eng_float", default=0.0,
+ help="Tuning offset frequency")
+
+ #file options
+ parser.add_option("-I", "--inputfile", type="string", default=None,
+ help="Select a capture file to read")
+ parser.add_option("-O", "--outputfile", type="string", default=None,
+ help="Filename to save burst output")
+ parser.add_option("-l", "--fileloop", action="store_true", dest="fileloop", default=False,
+ help="Continuously loop data from input file")
+
+ #usrp options
+ parser.add_option("-d", "--decim", type="int", default=112,
+ help="Set USRP decimation rate to DECIM [default=%default]")
+ parser.add_option("-R", "--rx-subdev-spec", type="subdev", default=None,
+ help="Select USRP Rx side A or B (default=first one with a daughterboard)")
+ parser.add_option("--fusb-block-size", type="int", default=0,
+ help="Set USRP blocksize")
+ parser.add_option("--fusb-nblocks", type="int", default=0,
+ help="Set USRP block buffers")
+ parser.add_option("--realtime",action="store_true", dest="realtime",
+ help="Use realtime scheduling.")
+ parser.add_option("-C", "--clock-offset", type="eng_float", default=0.0,
+ help="Sample clock offset frequency")
+
+ #FIXME: gain not working?
+ parser.add_option("-g", "--gain", type="eng_float", default=None,
+ help="Set gain in dB (default is midpoint)")
+ parser.add_option("-c", "--channel", type="int", default=1,
+ help="Tune to GSM ARFCN.")
+ parser.add_option("-r", "--region", type="string", default="u",
+ help="Frequency bands to use for channels. (u)s or (e)urope [default=%default]")
+
+ #testing options
+ parser.add_option("--test-hop-speed",action="store_true", dest="test_hop_speed",
+ help="Test hopping speed.")
+ parser.add_option("--hopgood", type="int", default=658,
+ help="Good ARFCN [default=%default]")
+ parser.add_option("--hopbad", type="int", default=655,
+ help="Emtpy ARFCN [default=%default]")
+
+ (options, args) = parser.parse_args()
+ if (len(args) != 0):
+ parser.print_help()
+ sys.exit(1)
+
+# if (options.inputfile and ( options.freq or options.rx_subdev_spec or options.gain)):
+# print "datafile option cannot be used with USRP options."
+# sys.exit(1)
+
+ if options.test_hop_speed:
+ options.tuning = 'h' #hopping only, no offset
+ options.channel = options.hopgood
+
+ self.options = options
+ self.scopes = options.scopes
+ self.region = options.region
+ self.channel = options.channel
+ self.offset = options.offset
+
+####################
+ def setup_print_options(self):
+ options = self.options
+
+ self.print_status = options.print_console.count('s')
+
+ if options.print_console.count('e'):
+ self.print_status = 1
+
+ #console print options
+ popts = 0
+
+ if options.print_console.count('d'):
+ popts |= gsm.PRINT_GSM_DECODE
+
+ if options.print_console.count('s'):
+ popts |= gsm.PRINT_STATE
+
+ if options.print_console.count('e'):
+ popts |= gsm.PRINT_EVERYTHING
+
+ if options.print_console.count('a'):
+ popts |= gsm.PRINT_ALL_TYPES
+
+ if options.print_console.count('k'):
+ popts |= gsm.PRINT_KNOWN
+
+ if options.print_console.count('u'):
+ popts |= gsm.PRINT_UNKNOWN
+
+ if options.print_console.count('0'):
+ popts |= gsm.PRINT_TS0
+
+ if options.print_console.count('F'):
+ popts |= gsm.PRINT_FCCH
+
+ if options.print_console.count('S'):
+ popts |= gsm.PRINT_SCH
+
+ if options.print_console.count('N'):
+ popts |= gsm.PRINT_NORMAL
+
+ if options.print_console.count('D'):
+ popts |= gsm.PRINT_DUMMY
+
+ if options.print_console.count('x'):
+ popts |= gsm.PRINT_BITS | gsm.PRINT_HEX
+
+ if options.print_console.count('B'):
+ popts |= gsm.PRINT_BITS | gsm.PRINT_ALL_BITS
+
+ elif options.print_console.count('b'):
+ popts |= gsm.PRINT_BITS
+
+ elif options.print_console.count('C'):
+ popts |= gsm.PRINT_BITS | gsm.PRINT_CORR_BITS
+
+ #print "Print flags: 0x%8.8x\n" %(popts)
+
+ self.burst.d_print_options = popts
+
+
+####################
+ def setup_usrp(self):
+ options = self.options
+
+ #set resonable defaults if no user prefs set
+ if options.realtime:
+ if options.fusb_block_size == 0:
+ options.fusb_block_size = gr.prefs().get_long('fusb', 'rt_block_size', 1024)
+ if options.fusb_nblocks == 0:
+ options.fusb_nblocks = gr.prefs().get_long('fusb', 'rt_nblocks', 16)
+ else:
+ if options.fusb_block_size == 0:
+ options.fusb_block_size = gr.prefs().get_long('fusb', 'block_size', 4096)
+ if options.fusb_nblocks == 0:
+ options.fusb_nblocks = gr.prefs().get_long('fusb', 'nblocks', 16)
+
+ print >> sys.stderr, "fusb_block_size =", options.fusb_block_size
+ print >> sys.stderr, "fusb_nblocks =", options.fusb_nblocks
+
+
+ self.ursp = usrp.source_c(decim_rate=options.decim,fusb_block_size=options.fusb_block_size,fusb_nblocks=options.fusb_nblocks)
+
+ if options.rx_subdev_spec is None:
+ options.rx_subdev_spec = pick_subdevice(self.ursp)
+
+ self.ursp.set_mux(usrp.determine_rx_mux_value(self.ursp, options.rx_subdev_spec))
+
+ # determine the daughterboard subdevice
+ self.subdev = usrp.selected_subdev(self.ursp, options.rx_subdev_spec)
+ input_rate = self.ursp.adc_freq() / self.ursp.decim_rate()
+
+ # set initial values
+ if options.gain is None:
+ # if no gain was specified, use the mid-point in dB
+ g = self.subdev.gain_range()
+ options.gain = float(g[0]+g[1])/2
+
+ self.set_gain(options.gain)
+
+ self.source = self.ursp
+
+####################
+ def setup_timing(self):
+ options = self.options
+ clock_rate = 64e6
+
+ if options.clock_offset:
+ clock_rate = 64e6 + options.clock_offset
+ elif options.channel:
+ #calculate actual clock rate based on frequency offset (assumes shared clock for sampling and tuning)
+ f = get_freq_from_arfcn(options.channel,options.region)
+ if f:
+ percent_offset = options.offset / get_freq_from_arfcn(options.channel,options.region)
+ else:
+ percent_offset = 0.0
+
+ clock_rate += clock_rate * percent_offset
+ print >> sys.stderr, "% offset = ", percent_offset, "clock = ", clock_rate
+
+ self.clock_rate = clock_rate
+ self.input_rate = clock_rate / options.decim #TODO: what about usrp value?
+ self.gsm_symb_rate = 1625000.0 / 6.0
+ self.sps = self.input_rate / self.gsm_symb_rate
+
+####################
+ def setup_filter(self):
+ #test w/o filter (for buffer latency)
+ #self.filter = self.source
+ #return
+
+ options = self.options
+
+ # configure channel filter
+ filter_cutoff = 145e3 #135,417Hz is GSM bandwidth
+ filter_t_width = 10e3
+
+ #Only DSP adjust for offset on datafile, adjust tuner for USRP
+ #TODO: see if we can change this offset at runtime based on freq detection
+ if options.inputfile:
+ offset = self.offset
+ else:
+ offset = 0.0
+
+ filter_taps = gr.firdes.low_pass(1.0, self.input_rate, filter_cutoff, filter_t_width, gr.firdes.WIN_HAMMING)
+ self.filter = gr.freq_xlating_fir_filter_ccf(1, filter_taps, offset, self.input_rate)
+
+ self.connect(self.source, self.filter)
+
+####################
+ def setup_f_flowgraph(self):
+ # configure demodulator
+ # adjust the phase gain for sampling rate
+ self.demod = gr.quadrature_demod_cf(self.sps);
+
+ #configure clock recovery
+ gain_mu = 0.01
+ gain_omega = .25 * gain_mu * gain_mu # critically damped
+ self.clocker = gr.clock_recovery_mm_ff( self.sps,
+ gain_omega,
+ 0.5, #mu
+ gain_mu,
+ 0.5) #omega_relative_limit,
+
+ self.burst = gsm.burst_ff(self.burst_cb)
+ self.connect(self.filter, self.demod, self.clocker, self.burst)
+
+####################
+ def setup_c_flowgraph(self):
+ #use the sink version if burst scope not selected
+# if self.scopes.count("b"):
+# self.burst = gsm.burst_cf(self.burst_cb,self.input_rate)
+# else:
+# self.burst = gsm.burst_sink_c(self.burst_cb,self.input_rate)
+
+ self.burst = gsm.burst_cf(self.burst_cb,self.input_rate)
+
+ self.connect(self.filter, self.burst)
+
+####################
+ def setup_scopes(self):
+ #Input FFT
+ if self.scopes.count("I"):
+ self.input_fft_scope = fftsink.fft_sink_c (self, self.panel, fft_size=1024, sample_rate=self.input_rate)
+ self.connect(self.source, self.input_fft_scope)
+
+ #Filter FFT
+ if self.scopes.count("F"):
+ self.filter_fft_scope = fftsink.fft_sink_c (self, self.panel, fft_size=1024, sample_rate=self.input_rate)
+ self.connect(self.filter, self.filter_fft_scope)
+
+ #Burst Scope
+ if self.scopes.count("b"):
+ self.burst_scope = scopesink.scope_sink_f(self, self.panel, sample_rate=self.gsm_symb_rate,v_scale=1)
+ self.connect(self.v2s, self.burst_scope)
+
+ #burst_f options
+ if self.options.decoder.count("f"):
+ if self.scopes.count("d"):
+ self.demod_scope = scopesink.scope_sink_f(self, self.panel, sample_rate=self.input_rate)
+ self.connect(self.demod, self.demod_scope)
+
+ if self.scopes.count("c"):
+ #f_flowgraph
+ self.clocked_scope = scopesink.scope_sink_f(self, self.panel, sample_rate=self.gsm_symb_rate,v_scale=1)
+ self.connect(self.clocker, self.clocked_scope)
+ #for testing: f_flowgraph2
+ #self.clocked_scope = scopesink.scope_sink_c(self, self.panel, sample_rate=self.gsm_symb_rate,v_scale=1)
+ #self.connect(self.clocker, self.clocked_scope)
+ #self.connect((self.clocker,1),(self.clocked_scope,1))
+
+####################
+ def configure_burst_decoder(self):
+ options = self.options
+
+ # equalizer
+ eq_types = {'none': gsm.EQ_NONE, 'fixed-dfe': gsm.EQ_FIXED_DFE, 'fixed-linear': gsm.EQ_FIXED_LINEAR }
+ self.burst.d_equalizer_type = eq_types[options.equalizer]
+
+ # timing
+ topts = 0
+
+ if options.timing.count('c'):
+ topts |= gsm.CLK_CORR_TRACK
+
+ if options.timing.count('q'):
+ topts |= gsm.QB_QUARTER
+
+ elif options.timing.count('f'):
+ topts |= gsm.QB_FULL04
+
+ self.burst.d_clock_options = topts
+
+ #test modes
+ testopts = 0
+
+ if options.test_hop_speed:
+ testopts |= gsm.OPT_TEST_HOP_SPEED
+ self.burst.d_hop_good_arfcn = options.hopgood
+ self.burst.d_hop_bad_arfcn = options.hopbad
+
+ print "!!!!! Enabling Hop Speed Testing (good=%d, bad=%d) !!!!!" % (options.hopgood,options.hopbad)
+
+ self.burst.d_test_options = testopts
+ #print "Test Options: 0x%8.8x" % (self.burst.d_test_options)
+
+
+
+####################
+ def setup_flowgraph(self):
+
+ options = self.options
+
+ # Attempt to enable realtime scheduling
+ if options.realtime:
+ r = gr.enable_realtime_scheduling()
+ if r == gr.RT_OK:
+ options.realtime = True
+ print >> sys.stderr, "Realtime scheduling ENABLED"
+ else:
+ options.realtime = False
+ print >> sys.stderr, "Realtime scheduling FAILED"
+
+ self.setup_timing()
+
+ # Setup our input source
+ if options.inputfile:
+ self.using_usrp = False
+ print >> sys.stderr, "Reading data from: " + options.inputfile
+ self.source = gr.file_source(gr.sizeof_gr_complex, options.inputfile, options.fileloop)
+ else:
+ self.using_usrp = True
+ self.setup_usrp()
+
+ self.setup_filter()
+
+ #create a tuner callback
+ self.mean_offset = 0.0 #this is set by tuner callback
+ self.burst_cb = burst_callback(self)
+
+ # Setup flow based on decoder selection
+ if options.decoder.count("c"):
+ self.setup_c_flowgraph()
+ elif options.decoder.count("f"):
+ self.setup_f_flowgraph()
+
+ self.configure_burst_decoder()
+
+ #Hookup a vector-stream converter if we want burst output
+ if self.scopes.count("b") or options.outputfile:
+ self.v2s = gr.vector_to_stream(gr.sizeof_float,142) #burst output is 142 (USEFUL_BITS)
+ self.connect(self.burst, self.v2s)
+# else:
+# self.burst_sink = gr.null_sink(gr.sizeof_float)
+# self.connect(self.v2s, self.burst_sink)
+
+
+ #Output file
+ if options.outputfile:
+ self.filesink = gr.file_sink(gr.sizeof_float, options.outputfile)
+ self.connect(self.v2s, self.filesink)
+
+ self.setup_scopes()
+
+####################
+ def set_status_msg(self, msg):
+ self.frame.GetStatusBar().SetStatusText(msg, 0)
+
+####################
+ def _build_gui(self, vbox):
+
+ if self.scopes.count("I"):
+ vbox.Add(self.input_fft_scope.win, 5, wx.EXPAND)
+
+ if self.scopes.count("F"):
+ vbox.Add(self.filter_fft_scope.win, 5, wx.EXPAND)
+
+ if self.scopes.count("d"):
+ vbox.Add(self.demod_scope.win, 5, wx.EXPAND)
+
+ if self.scopes.count("c"):
+ vbox.Add(self.clocked_scope.win, 5, wx.EXPAND)
+
+ if self.scopes.count("b"):
+ vbox.Add(self.burst_scope.win, 5, wx.EXPAND)
+
+ # add control area at the bottom
+ if self.using_usrp:
+
+ def _form_set_freq(kv):
+ return self.set_freq(kv['freq'])
+
+ self.usrpform = usrpform = form.form()
+ hbox = wx.BoxSizer(wx.HORIZONTAL)
+
+ hbox.Add((5,0), 0, 0)
+ g = self.subdev.gain_range()
+ usrpform['gain'] = form.slider_field(parent=self.panel, sizer=hbox, label="Gain",
+ weight=3,
+ min=int(g[0]), max=int(g[1]),
+ callback=self.set_gain)
+
+ hbox.Add((5,0), 0, 0)
+ usrpform['chan'] = form.slider_field( parent=self.panel, sizer=hbox, label="Channel",
+ weight=3,
+ min=0, max=1023,
+ value=self.channel,
+ callback=self.set_channel)
+
+ vbox.Add(hbox, 0, wx.EXPAND)
+
+
+####################
+ def set_freq(self, freq):
+ #TODO: for wideband processing, determine if the desired freq is within our current sample range.
+ # If so, use the frequency translator to tune. Tune the USRP otherwise.
+ # Maybe have a flag to force tuning the USRP?
+
+ if not self.using_usrp:
+ #if reading from file just adjust for offset in the freq translator
+ if self.print_status:
+ print >> sys.stderr, "Setting filter center freq to offset: ", self.offset, "\n"
+
+ self.filter.set_center_freq(self.offset)
+ return True
+
+ freq = freq - self.offset
+
+ r = self.ursp.tune(0, self.subdev, freq)
+
+ if r:
+ self.status_msg = '%f' % (freq/1e6)
+ return True
+ else:
+ self.status_msg = "Failed to set frequency (%f)" % (freq/1e6)
+ return False
+
+####################
+ def set_gain(self, gain):
+
+ if not self.using_usrp:
+ return False
+
+ self.subdev.set_gain(gain)
+
+####################
+ def set_channel(self, chan):
+
+ self.chan = chan
+
+ freq = get_freq_from_arfcn(chan,self.region)
+
+ if freq:
+ self.set_freq(freq)
+ else:
+ self.status_msg = "Invalid Channel"
+
+####################
+ def print_stats(self):
+ out = sys.stderr
+
+ n_total = self.burst.d_total_count
+ n_unknown = self.burst.d_unknown_count
+ n_known = n_total - n_unknown
+
+ print >> out, "======== STATS ========="
+ print >> out, 'freq_offset: ',self.offset
+ print >> out, 'mean_offset: ',self.mean_offset
+ print >> out, 'sync_loss_count:',self.burst.d_sync_loss_count
+ print >> out, 'total_bursts: ',n_total
+ print >> out, 'fcch_count: ',self.burst.d_fcch_count
+ print >> out, 'part_sch_count: ',self.burst.d_part_sch_count
+ print >> out, 'sch_count: ',self.burst.d_sch_count
+ print >> out, 'normal_count: ',self.burst.d_normal_count
+ print >> out, 'dummy_count: ',self.burst.d_dummy_count
+ print >> out, 'unknown_count: ',self.burst.d_unknown_count
+ print >> out, 'known_count: ',n_known
+ if n_total:
+ print >> out, '%known: ', 100.0 * n_known / n_total
+
+ #timing
+ if self.options.decoder.count("c"):
+ omega = self.burst.get_omega()
+ else:
+ omega = self.clocker.omega()
+
+ percent_sps = omega / self.sps
+ print >> out, 'omega: %f (%f / %f)' % (omega,self.sps,percent_sps)
+
+ print >> out, ""
+
+####################
+ def on_tick(self, evt):
+
+ if self.print_status:
+ self.print_stats()
+
+####################
+ def on_idle(self, event):
+ #We can't update this while in the tune functions since they can be invoked by callbaks and the GUI croaks...
+ #FIXME: This is icky.
+ self.set_status_msg(self.status_msg)
+ #print "Idle.\n";
+
+####################
+def main():
+ app = stdgui.stdapp(app_flow_graph, "GSM Scanner", nstatus=1)
+ app.MainLoop()
+
+####################
+if __name__ == '__main__':
+ main()
diff --git a/gsm-tvoid/src/python/qa_gsm.py b/gsm-tvoid/src/python/qa_gsm.py
new file mode 100755
index 0000000..5709756
--- /dev/null
+++ b/gsm-tvoid/src/python/qa_gsm.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+#
+
+
+from gnuradio import gr, gr_unittest
+import gsm
+
+class qa_gsm (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_001_burst_cf (self):
+# src_data = map(complex,(-3, 4, -5.5, 2, 3))
+ src_data = (-3, 4, -5.5, 2, 3)
+ print src_data
+ expected_result = (9, 16, 30.25, 4, 9)
+ print expected_result
+ src = gr.vector_source_c (src_data)
+ burst = gsm.burst_cf ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, burst)
+ self.fg.connect (burst, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gsm-tvoid/src/python/run_tests.in b/gsm-tvoid/src/python/run_tests.in
new file mode 100644
index 0000000..6e4b83e
--- /dev/null
+++ b/gsm-tvoid/src/python/run_tests.in
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# All this strange PYTHONPATH manipulation is required to run our
+# tests using our just built shared library and swig-generated python
+# code prior to installation.
+
+# build tree == src tree unless you're doing a VPATH build.
+# If you don't know what a VPATH build is, you're not doing one. Relax...
+
+prefix=@prefix@
+exec_prefix=@exec_prefix@
+
+# Where to look in the build tree for our shared library
+libbld=@abs_top_builddir@/src/lib
+# Where to look in the src tree for swig generated python code
+libsrc=@abs_top_srcdir@/src/lib
+# Where to look in the src tree for hand written python code
+py=@abs_top_srcdir@/src/python
+
+# Where to look for installed GNU Radio python modules
+# FIXME this is wrong on a distcheck. We really need to ask gnuradio-core
+# where it put its python files.
+installed_pythondir=@pythondir@
+installed_pyexecdir=@pyexecdir@
+
+PYTHONPATH="$libbld:$libbld/.libs:$libsrc:$py:$installed_pythondir:$installed_pyexecdir:$PYTHONPATH"
+#PYTHONPATH="$libbld:$libbld/.libs:$libsrc:$py:$installed_pythondir:$installed_pyexecdir"
+
+export PYTHONPATH
+
+#
+# This is the simple part...
+# Run everything that matches qa_*.py and return the final result.
+#
+
+ok=yes
+for file in @srcdir@/qa_*.py
+do
+ if ! $file
+ then
+ ok=no
+ fi
+done
+
+if [ $ok = yes ]
+then
+ exit 0
+else
+ exit 1
+fi
personal git repositories of Harald Welte. Your mileage may vary