#!/usr/bin/env python2 # -*- coding: utf-8 -*- ################################################## # GNU Radio Python Flow Graph # Title: Dec Proto Am Dc Ber Top # GNU Radio version: 3.7.13.5 ################################################## from gnuradio import blocks from gnuradio import digital from gnuradio import eng_notation from gnuradio import fec from gnuradio import filter from gnuradio import gr from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from optparse import OptionParser import pmt class dec_proto_am_dc_ber_top(gr.top_block): def __init__(self, ber_file='0', signal_strength=50): gr.top_block.__init__(self, "Dec Proto Am Dc Ber Top") ################################################## # Parameters ################################################## self.ber_file = ber_file self.signal_strength = signal_strength ################################################## # Variables ################################################## self.sim_mul = sim_mul = 1e4 self.actual_sampling_rate = actual_sampling_rate = 10 self.sync_tag = sync_tag = gr.tag_utils.python_to_tag((0, pmt.intern("sync"), pmt.from_double(0.0), pmt.intern("correlate_access_code"))) self.samp_rate = samp_rate = actual_sampling_rate*sim_mul self.pi = pi = 3.141592653589793 self.packet_time_est_tag = packet_time_est_tag = gr.tag_utils.python_to_tag((0, pmt.intern("start"), pmt.from_double(0.0), pmt.intern("packet_vector_source"))) self.ber_delay = ber_delay = 198 ################################################## # Blocks ################################################## self.low_pass_filter_0 = filter.fir_filter_ccf(1, firdes.low_pass( 1, samp_rate, 0.1 * sim_mul, 0.05 * sim_mul, firdes.WIN_HAMMING, 6.76)) self.high_pass_filter_0 = filter.fir_filter_ccf(1, firdes.high_pass( 1, samp_rate, sim_mul/200, sim_mul/800, firdes.WIN_HAMMING, 6.76)) self.fec_ber_bf_0 = fec.ber_bf(False, 0, -7.0) self.digital_clock_recovery_mm_xx_0 = digital.clock_recovery_mm_ff(50, 0.001, 0, 0.01, 0.01) self.digital_binary_slicer_fb_0 = digital.binary_slicer_fb() self.blocks_vector_source_x_0_0_1_0 = blocks.vector_source_f([1,0]*(4*12)+[1,1,0,1,0,1,0,1]*12+[1,0,1,1,1,1,1,0,0,1]+[1,1,1,1,0,1,1,0,0,1]+[1,0,1,1,1,1,1,0,0,1]+[0,1,1,1,0,1,1,0,1,0]+[0,0,0,0,0,1,0,1,0,1,1,0,0,1,1,1,0,0,0,0]+[0]*8, True, 1, [packet_time_est_tag]) self.blocks_throttle_0 = blocks.throttle(gr.sizeof_float*1, samp_rate,True) self.blocks_repeat_0 = blocks.repeat(gr.sizeof_float*1, 10*5) self.blocks_null_source_1 = blocks.null_source(gr.sizeof_float*1) self.blocks_multiply_const_vxx_1 = blocks.multiply_const_vff((signal_strength * 0.001, )) self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vff((1000.0/signal_strength, )) self.blocks_float_to_complex_0 = blocks.float_to_complex(1) self.blocks_float_to_char_0 = blocks.float_to_char(1, 1) self.blocks_file_source_0 = blocks.file_source(gr.sizeof_float*1, '/home/user/research/smart_meter_reset/gm_platform/fw/raw_freq.bin', True) self.blocks_file_source_0.set_begin_tag(pmt.PMT_NIL) self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*1, ber_file, False) self.blocks_file_sink_0.set_unbuffered(False) self.blocks_delay_0 = blocks.delay(gr.sizeof_float*1, ber_delay) self.blocks_complex_to_real_0 = blocks.complex_to_real(1) self.blocks_add_xx_0 = blocks.add_vff(1) self.blocks_add_const_vxx_1 = blocks.add_const_vff((-signal_strength*0.001/2.0, )) self.blocks_add_const_vxx_0 = blocks.add_const_vff((-50, )) ################################################## # Connections ################################################## self.connect((self.blocks_add_const_vxx_0, 0), (self.blocks_add_xx_0, 0)) self.connect((self.blocks_add_const_vxx_1, 0), (self.blocks_add_xx_0, 1)) self.connect((self.blocks_add_xx_0, 0), (self.blocks_float_to_complex_0, 0)) self.connect((self.blocks_complex_to_real_0, 0), (self.blocks_multiply_const_vxx_0, 0)) self.connect((self.blocks_delay_0, 0), (self.blocks_float_to_char_0, 0)) self.connect((self.blocks_file_source_0, 0), (self.blocks_throttle_0, 0)) self.connect((self.blocks_float_to_char_0, 0), (self.fec_ber_bf_0, 1)) self.connect((self.blocks_float_to_complex_0, 0), (self.high_pass_filter_0, 0)) self.connect((self.blocks_multiply_const_vxx_0, 0), (self.digital_clock_recovery_mm_xx_0, 0)) self.connect((self.blocks_multiply_const_vxx_1, 0), (self.blocks_add_const_vxx_1, 0)) self.connect((self.blocks_null_source_1, 0), (self.blocks_float_to_complex_0, 1)) self.connect((self.blocks_repeat_0, 0), (self.blocks_multiply_const_vxx_1, 0)) self.connect((self.blocks_throttle_0, 0), (self.blocks_add_const_vxx_0, 0)) self.connect((self.blocks_vector_source_x_0_0_1_0, 0), (self.blocks_delay_0, 0)) self.connect((self.blocks_vector_source_x_0_0_1_0, 0), (self.blocks_repeat_0, 0)) self.connect((self.digital_binary_slicer_fb_0, 0), (self.fec_ber_bf_0, 0)) self.connect((self.digital_clock_recovery_mm_xx_0, 0), (self.digital_binary_slicer_fb_0, 0)) self.connect((self.fec_ber_bf_0, 0), (self.blocks_file_sink_0, 0)) self.connect((self.high_pass_filter_0, 0), (self.low_pass_filter_0, 0)) self.connect((self.low_pass_filter_0, 0), (self.blocks_complex_to_real_0, 0)) def get_ber_file(self): return self.ber_file def set_ber_file(self, ber_file): self.ber_file = ber_file self.blocks_file_sink_0.open(self.ber_file) def get_signal_strength(self): return self.signal_strength def set_signal_strength(self, signal_strength): self.signal_strength = signal_strength self.blocks_multiply_const_vxx_1.set_k((self.signal_strength * 0.001, )) self.blocks_multiply_const_vxx_0.set_k((1000.0/self.signal_strength, )) self.blocks_add_const_vxx_1.set_k((-self.signal_strength*0.001/2.0, )) def get_sim_mul(self): return self.sim_mul def set_sim_mul(self, sim_mul): self.sim_mul = sim_mul self.set_samp_rate(self.actual_sampling_rate*self.sim_mul) self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 0.1 * self.sim_mul, 0.05 * self.sim_mul, firdes.WIN_HAMMING, 6.76)) self.high_pass_filter_0.set_taps(firdes.high_pass(1, self.samp_rate, self.sim_mul/200, self.sim_mul/800, firdes.WIN_HAMMING, 6.76)) def get_actual_sampling_rate(self): return self.actual_sampling_rate def set_actual_sampling_rate(self, actual_sampling_rate): self.actual_sampling_rate = actual_sampling_rate self.set_samp_rate(self.actual_sampling_rate*self.sim_mul) def get_sync_tag(self): return self.sync_tag def set_sync_tag(self, sync_tag): self.sync_tag = sync_tag def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 0.1 * self.sim_mul, 0.05 * self.sim_mul, firdes.WIN_HAMMING, 6.76)) self.high_pass_filter_0.set_taps(firdes.high_pass(1, self.samp_rate, self.sim_mul/200, self.sim_mul/800, firdes.WIN_HAMMING, 6.76)) self.blocks_throttle_0.set_sample_rate(self.samp_rate) def get_pi(self): return self.pi def set_pi(self, pi): self.pi = pi def get_packet_time_est_tag(self): return self.packet_time_est_tag def set_packet_time_est_tag(self, packet_time_est_tag): self.packet_time_est_tag = packet_time_est_tag self.blocks_vector_source_x_0_0_1_0.set_data([1,0]*(4*12)+[1,1,0,1,0,1,0,1]*12+[1,0,1,1,1,1,1,0,0,1]+[1,1,1,1,0,1,1,0,0,1]+[1,0,1,1,1,1,1,0,0,1]+[0,1,1,1,0,1,1,0,1,0]+[0,0,0,0,0,1,0,1,0,1,1,0,0,1,1,1,0,0,0,0]+[0]*8, [self.packet_time_est_tag]) def get_ber_delay(self): return self.ber_delay def set_ber_delay(self, ber_delay): self.ber_delay = ber_delay self.blocks_delay_0.set_dly(self.ber_delay) def argument_parser(): parser = OptionParser(usage="%prog: [options]", option_class=eng_option) parser.add_option( "", "--ber-file", dest="ber_file", type="string", default='0', help="Set BER data output file [default=%default]") parser.add_option( "", "--signal-strength", dest="signal_strength", type="eng_float", default=eng_notation.num_to_str(50), help="Set signal strength in mHz [default=%default]") return parser def main(top_block_cls=dec_proto_am_dc_ber_top, options=None): if options is None: options, _ = argument_parser().parse_args() tb = top_block_cls(ber_file=options.ber_file, signal_strength=options.signal_strength) tb.start() try: raw_input('Press Enter to quit: ') except EOFError: pass tb.stop() tb.wait() if __name__ == '__main__': main()