#!/usr/bin/env python # TODO: # * Adjust offset by PPM # * Auto-tune offset (add option to enable) # * Add status info to window (frequency, offset, etc) # * Put direct frequency tuning back # * Add rate-limited file reads (throttle?) # * Make console only version # * Reset burst_stats on retune # * Add better option checking import sys #nasty hack for testing for extdir in ['../lib','../lib/.libs']: if extdir not in sys.path: sys.path.append(extdir) from gnuradio import gr, gru, blks from gnuradio import usrp from gnuradio import eng_notation from gnuradio.eng_option import eng_option from gnuradio.wxgui import stdgui, fftsink, waterfallsink, scopesink, form, slider from optparse import OptionParser from math import pi import wx import gsm class burst_callback(gr.feval_ll): def __init__(self, fg): gr.feval_ll.__init__(self) self.fg = fg self.offset_mean_num = 30 #number of FCCH offsets to average self.offset_vals = [] def eval(self, x): try: #TODO: rework so this will work on file input if gsm.BURST_CB_SYNC_OFFSET == x: last_offset = self.fg.burst.last_freq_offset() self.fg.offset -= last_offset print "burst_callback: SYNC_OFFSET:", last_offset, " ARFCN: ", self.fg.arfcn, "\n"; self.fg.set_channel(self.fg.arfcn) elif gsm.BURST_CB_ADJ_OFFSET == x: last_offset = self.fg.burst.last_freq_offset() #print "burst_callback: ADJ_OFFSET:", last_offset, " ARFCN: ", self.fg.arfcn, "\n"; self.offset_vals.append(last_offset) if len(self.offset_vals) >= self.offset_mean_num: sum = 0.0 while len(self.offset_vals): sum += self.offset_vals.pop(0) mean_offset = sum / self.offset_mean_num self.fg.offset -= mean_offset #retune if greater than 100 Hz if mean_offset > 100.0: print "burst_callback: mean offset adjust:", mean_offset, "\n"; self.fg.set_channel(self.fg.arfcn) elif gsm.BURST_CB_TUNE == x: print "burst_callback: BURST_CB_TUNE: ARFCN: ", self.fg.burst.next_arfcn, "\n"; self.fg.set_channel(self.fg.burst.next_arfcn) return 0 except Exception, e: print "burst_callback: Exception: ", e def pick_subdevice(u): if u.db[0][0].dbid() >= 0: return (0, 0) if u.db[1][0].dbid() >= 0: return (1, 0) return (0, 0) def get_freq_from_arfcn(chan,region): #P/E/R-GSM 900 if chan >= 0 and chan <= 124: freq = 890 + 0.2*chan + 45 #GSM 850 elif chan >= 128 and chan <= 251: freq = 824.2 + 0.2*(chan - 128) + 45 #GSM 450 elif chan >= 259 and chan <= 293: freq = 450.6 + 0.2*(chan - 259) + 10 #GSM 480 elif chan >= 306 and chan <= 340: freq = 479 + 0.2*(chan - 306) + 10 #DCS 1800 elif region is "e" and chan >= 512 and chan <= 885: freq = 1710.2 + 0.2*(chan - 512) + 95 #DCS 1900 elif region is "u" and chan >= 512 and chan <= 810: freq = 1850.2 + 0.2*(chan - 512) + 80 #E/R-GSM 900 elif chan >= 955 and chan <= 1023: freq = 890 + 0.2*(chan - 1024) + 45 else: freq = 0 return freq * 1e6 class app_flow_graph(stdgui.gui_flow_graph): def __init__(self, frame, panel, vbox, argv): stdgui.gui_flow_graph.__init__(self) #testing self.status_msg = "Started." self.frame = frame self.panel = panel parser = OptionParser(option_class=eng_option) #view options parser.add_option("-S", "--scopes", type="string", default="I", help="Select scopes to display. (N)one, (I)nput,(F)ilter,(d)emod,(c)locked,(b)urst [default=%default]") parser.add_option("-p", "--print-console", type="string", default="s", help="What to print on console. [default=%default]\n" + "(n)othing, (e)verything, (s)tatus, (a)ll Types, (k)nown, (u)nknown, \n" + "TS(0), (F)CCH, (S)CH, (N)ormal, (D)ummy\n" + "Usefull (b)its, All TS (B)its, (C)orrelation bits, he(x) burst data, \n" + "(d)ecoded hex for gsmdecode") #decoder options parser.add_option("-D", "--decoder", type="string", default="f", help="Select decoder block to use. (c)omplex,(f)loat [default=%default]") parser.add_option("-d", "--decim", type="int", default=112, help="Set fgpa decimation rate to DECIM [default=%default]") parser.add_option("-o", "--offset", type="eng_float", default=0.0, help="Tuning offset frequency") parser.add_option("-C", "--clock-offset", type="eng_float", default=0.0, help="Sample clock offset frequency") parser.add_option("-E", "--equalizer", type="string", default="none", help="Type of equalizer to use. none, fixed-dfe [default=%default]") parser.add_option("-t", "--timing", type="string", default="cn", help="Type of timing techniques to use. [default=%default] \n" + "(n)one, (c)orrelation track, (q)uarter bit, (f)ull04 ") #file options parser.add_option("-I", "--inputfile", type="string", default=None, help="Select a capture file to read") parser.add_option("-O", "--outputfile", type="string", default=None, help="Filename to save burst output") parser.add_option("-l", "--fileloop", action="store_true", dest="fileloop", default=False, help="Continuously loop data from input file") #usrp options parser.add_option("-R", "--rx-subdev-spec", type="subdev", default=None, help="Select USRP Rx side A or B (default=first one with a daughterboard)") #FIXME: gain not working? parser.add_option("-g", "--gain", type="eng_float", default=None, help="Set gain in dB (default is midpoint)") parser.add_option("-c", "--channel", type="int", default=None, help="Tune to GSM ARFCN. Overrides --freq") parser.add_option("-r", "--region", type="string", default="u", help="Frequency bands to use for channels. (u)s or (e)urope [default=%default]") (options, args) = parser.parse_args() if (len(args) != 0) or (not (options.channel or options.inputfile)): parser.print_help() sys.exit(1) self.options = options self.scopes = options.scopes self.region = options.region self.channel = options.channel self.offset = options.offset self.print_status = options.print_console.count('s') if options.print_console.count('e'): self.print_status = 1 # if (options.inputfile and ( options.freq or options.rx_subdev_spec or options.gain)): # print "datafile option cannot be used with USRP options." # sys.exit(1) #adjust or caclulate sample clock clock_rate = 64e6 if options.clock_offset: clock_rate = 64e6 + options.clock_offset elif options.channel: f = get_freq_from_arfcn(options.channel,options.region) if f: percent_offset = options.offset / get_freq_from_arfcn(options.channel,options.region) else: percent_offset = 0.0 clock_rate += clock_rate * percent_offset print "% offset = ", percent_offset, "clock = ", clock_rate #set the default input rate, we will check with the USRP if it is being used input_rate = clock_rate / options.decim gsm_symb_rate = 1625000.0 / 6.0 sps = input_rate/gsm_symb_rate # Attempt to enable realtime scheduling r = gr.enable_realtime_scheduling() if r == gr.RT_OK: realtime = True print "Realtime scheduling ENABLED" else: realtime = False print "Realtime scheduling FAILED" # if options.fusb_block_size == 0 and options.fusb_nblocks == 0: if realtime: # be more aggressive # options.fusb_block_size = gr.prefs().get_long('fusb', 'rt_block_size', 1024) options.fusb_block_size = gr.prefs().get_long('fusb', 'rt_block_size', 512) options.fusb_nblocks = gr.prefs().get_long('fusb', 'rt_nblocks', 16) else: # options.fusb_block_size = gr.prefs().get_long('fusb', 'block_size', 4096) options.fusb_block_size = gr.prefs().get_long('fusb', 'block_size', 1024) options.fusb_nblocks = gr.prefs().get_long('fusb', 'nblocks', 32) print "fusb_block_size =", options.fusb_block_size print "fusb_nblocks =", options.fusb_nblocks # Build the flowgraph # Setup our input source if options.inputfile: self.using_usrp = False print "Reading data from: " + options.inputfile self.source = gr.file_source(gr.sizeof_gr_complex, options.inputfile, options.fileloop) else: self.using_usrp = True self.u = usrp.source_c(decim_rate=options.decim,fusb_block_size=options.fusb_block_size,fusb_nblocks=options.fusb_nblocks) if options.rx_subdev_spec is None: options.rx_subdev_spec = pick_subdevice(self.u) self.u.set_mux(usrp.determine_rx_mux_value(self.u, options.rx_subdev_spec)) # determine the daughterboard subdevice self.subdev = usrp.selected_subdev(self.u, options.rx_subdev_spec) input_rate = self.u.adc_freq() / self.u.decim_rate() # set initial values if options.gain is None: # if no gain was specified, use the mid-point in dB g = self.subdev.gain_range() options.gain = float(g[0]+g[1])/2 self.set_gain(options.gain) # configure the processing blocks # configure channel filter filter_cutoff = 145e3 #135,417Hz is GSM bandwidth filter_t_width = 10e3 #Only DSP adjust for offset on datafile, adjust tuner for USRP #TODO: see if we can change this offset at runtime based on freq detection if options.inputfile: offset = self.offset else: offset = 0.0 filter_taps = gr.firdes.low_pass(1.0, input_rate, filter_cutoff, filter_t_width, gr.firdes.WIN_HAMMING) self.filter = gr.freq_xlating_fir_filter_ccf(1, filter_taps, offset, input_rate) # Connect the blocks if self.scopes.count("I"): self.input_fft_scope = fftsink.fft_sink_c (self, panel, fft_size=1024, sample_rate=input_rate) if options.inputfile: self.connect(self.source, self.filter) if self.scopes.count("I"): self.connect(self.source, self.input_fft_scope) else: self.connect(self.u, self.filter) if self.scopes.count("I"): self.connect(self.u, self.input_fft_scope) #create a tuner callback self.burst_cb = burst_callback(self) # Setup flow based on decoder selection if options.decoder.count("c"): #use the sink version if burst scope not selected if self.scopes.count("b"): self.burst = gsm.burst_cf(self.burst_cb,input_rate) else: self.burst = gsm.burst_sink_c(self.burst_cb,input_rate) self.connect(self.filter, self.burst) elif options.decoder.count("f"): # configure demodulator # adjust the phase gain for sampling rate self.demod = gr.quadrature_demod_cf(sps); #configure clock recovery gain_mu = 0.01 gain_omega = .25 * gain_mu * gain_mu # critically damped self.clocker = gr.clock_recovery_mm_ff( sps, gain_omega, 0.5, #mu gain_mu, 0.3) #omega_relative_limit, self.burst = gsm.burst_ff(self.burst_cb) self.connect(self.filter, self.demod, self.clocker, self.burst) if self.scopes.count("d"): self.demod_scope = scopesink.scope_sink_f(self, panel, sample_rate=input_rate) self.connect(self.demod, self.demod_scope) if self.scopes.count("c"): self.clocked_scope = scopesink.scope_sink_f(self, panel, sample_rate=gsm_symb_rate,v_scale=1) self.connect(self.clocker, self.clocked_scope) # setup decoder parameters # equalizer eq_types = {'none': gsm.EQ_NONE, 'fixed-dfe': gsm.EQ_FIXED_DFE} self.burst.d_equalizer_type = eq_types[options.equalizer] # timing topts = 0 if options.timing.count('c'): topts |= gsm.CLK_CORR_TRACK if options.timing.count('q'): topts |= gsm.QB_QUARTER elif options.timing.count('f'): topts |= gsm.QB_FULL04 self.burst.d_clock_options = topts #console print options popts = 0 if options.print_console.count('s'): popts |= gsm.PRINT_STATE if options.print_console.count('e'): popts |= gsm.PRINT_EVERYTHING if options.print_console.count('a'): popts |= gsm.PRINT_ALL_TYPES if options.print_console.count('k'): popts |= gsm.PRINT_KNOWN if options.print_console.count('u'): popts |= gsm.PRINT_UNKNOWN if options.print_console.count('0'): popts |= gsm.PRINT_TS0 if options.print_console.count('F'): popts |= gsm.PRINT_FCCH if options.print_console.count('S'): popts |= gsm.PRINT_SCH if options.print_console.count('N'): popts |= gsm.PRINT_NORMAL if options.print_console.count('D'): popts |= gsm.PRINT_DUMMY if options.print_console.count('d'): popts |= gsm.PRINT_GSM_DECODE if options.print_console.count('C'): popts |= gsm.PRINT_BITS | gsm.PRINT_CORR_BITS if options.print_console.count('x'): popts |= gsm.PRINT_BITS | gsm.PRINT_HEX if options.print_console.count('B'): popts |= gsm.PRINT_BITS | gsm.PRINT_ALL_BITS elif options.print_console.count('b'): popts |= gsm.PRINT_BITS if options.print_console.count('d'): popts |= gsm.PRINT_GSM_DECODE #TODO: should warn if PRINT_GSM_DECODE is combined with other flags (will corrupt output for gsmdecode) self.burst.d_print_options = popts # create and connect the scopes that apply to all decoders if self.scopes.count("F"): self.filter_fft_scope = fftsink.fft_sink_c (self, panel, fft_size=1024, sample_rate=input_rate) self.connect(self.filter, self.filter_fft_scope) # connect the burst output if self.scopes.count("b") or options.outputfile: self.v2s = gr.vector_to_stream(gr.sizeof_float,142) #burst output is 142 (USEFUL_BITS) self.connect(self.burst, self.v2s) # else: # self.burst_sink = gr.null_sink(gr.sizeof_float) # self.connect(self.v2s, self.burst_sink) #Connect output sinks if self.scopes.count("b"): self.burst_scope = scopesink.scope_sink_f(self, panel, sample_rate=gsm_symb_rate,v_scale=1) self.connect(self.v2s, self.burst_scope) # setup & connect output file if options.outputfile: self.filesink = gr.file_sink(gr.sizeof_float, options.outputfile) self.connect(self.v2s, self.filesink) self._build_gui(vbox) self.set_channel(self.channel) self.t1 = wx.Timer(self.frame) self.t1.Start(5000,0) self.frame.Bind(wx.EVT_TIMER, self.on_tick) #bind the idle routing for message_queue processing self.frame.Bind(wx.EVT_IDLE, self.on_idle) def _set_status_msg(self, msg): self.frame.GetStatusBar().SetStatusText(msg, 0) def _build_gui(self, vbox): if self.scopes.count("I"): vbox.Add(self.input_fft_scope.win, 5, wx.EXPAND) if self.scopes.count("F"): vbox.Add(self.filter_fft_scope.win, 5, wx.EXPAND) if self.scopes.count("d"): vbox.Add(self.demod_scope.win, 5, wx.EXPAND) if self.scopes.count("c"): vbox.Add(self.clocked_scope.win, 5, wx.EXPAND) if self.scopes.count("b"): vbox.Add(self.burst_scope.win, 5, wx.EXPAND) # add control area at the bottom if self.using_usrp: def _form_set_freq(kv): return self.set_freq(kv['freq']) self.usrpform = usrpform = form.form() hbox = wx.BoxSizer(wx.HORIZONTAL) hbox.Add((5,0), 0, 0) g = self.subdev.gain_range() usrpform['gain'] = form.slider_field(parent=self.panel, sizer=hbox, label="Gain", weight=3, min=int(g[0]), max=int(g[1]), callback=self.set_gain) hbox.Add((5,0), 0, 0) usrpform['chan'] = form.slider_field( parent=self.panel, sizer=hbox, label="Channel", weight=3, min=0, max=1023, value=self.channel, callback=self.set_channel) vbox.Add(hbox, 0, wx.EXPAND) def set_freq(self, freq): if not self.using_usrp: return False freq = freq - self.offset r = self.u.tune(0, self.subdev, freq) if r: self.status_msg = '%f' % (freq/1e6) return True else: self.status_msg = "Failed to set frequency (%f)" % (freq/1e6) return False def set_gain(self, gain): if not self.using_usrp: return False self.subdev.set_gain(gain) def set_channel(self, chan): self.arfcn = chan if not self.using_usrp: return False freq = get_freq_from_arfcn(chan,self.region) if freq: self.set_freq(freq) else: self.status_msg = "Invalid Channel" def print_stats(self): n_total = self.burst.d_total_count n_unknown = self.burst.d_unknown_count n_known = n_total - n_unknown print "======== STATS =========" print 'freq_offset: ',self.offset print 'mean_offset: ',self.burst.mean_freq_offset() print 'sync_loss_count:',self.burst.d_sync_loss_count print 'total_bursts: ',n_total print 'fcch_count: ',self.burst.d_fcch_count print 'part_sch_count: ',self.burst.d_part_sch_count print 'sch_count: ',self.burst.d_sch_count print 'normal_count: ',self.burst.d_normal_count print 'dummy_count: ',self.burst.d_dummy_count print 'unknown_count: ',self.burst.d_unknown_count print 'known_count: ',n_known if n_total: print '%known: ', 100.0 * n_known / n_total print "" def on_tick(self, evt): #if option.autotune #tune offset if self.print_status: self.print_stats() def on_idle(self, event): self._set_status_msg(self.status_msg) #print "Idle.\n"; def main (): app = stdgui.stdapp(app_flow_graph, "GSM Scanner", nstatus=1) app.MainLoop() if __name__ == '__main__': main ()