GNU Radio circular buffer manipulation
GNU Radio circular buffer manipulation
I encountered the following error
gr::log :WARN: tpb_thread_body - asynchronous message buffer overflowing, dropping message
Out of serendipity, I ran into this GNU Radio presentation on
Youtube.
The presenter mentioned an OOT block he called "buffer" that is capable of eliminating the "buffer overflowing" error. Apparently, this block plays with different sample rates and the so-called "circular buffers". I haven't worked with circular buffers myself. Any ideas on circular buffers or any hints on how to build this buffer block are welcome.
EDIT
Below is the flowgraph that generates the error. As it was suggested in the comments, the culprits could be the message processing blocks (red-circled) namely generateCADU (for generating standard CCSDS frames) and processCADU (for extracting CADUs from a data stream).
The implementation file of the generateCADU block is given below
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gnuradio/io_signature.h>
#include "generateCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"
namespace gr
namespace ccsds
generateCADU::sptr
generateCADU::make(int frameLength,std::string sync, int scramble, int rs, int intDepth)
return gnuradio::get_initial_sptr
(new generateCADU_impl(frameLength, sync, scramble, rs, intDepth));
/*
* The private constructor
*/
generateCADU_impl::generateCADU_impl(int frameLength,std::string sync, int scramble, int rs, int intDepth)
: gr::sync_block("generateCADU",
gr::io_signature::make(1, 1, sizeof(unsigned char)),
gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
set_output_multiple(d_frameLength);
//Registering output port
message_port_register_out(pmt::mp("out"));
d_sync = parse_string(sync);
/*
* Our virtual destructor.
*/
generateCADU_impl::~generateCADU_impl()
unsigned char
generateCADU_impl::parse_hex(char c)
if ('0' <= c && c <= '9') return c - '0';
if ('A' <= c && c <= 'F') return c - 'A' + 10;
if ('a' <= c && c <= 'f') return c - 'a' + 10;
std::abort();
std::vector<unsigned char>
generateCADU_impl::parse_string(const std::string & s)
if (s.size() % 2 != 0) std::abort();
std::vector<unsigned char> result(s.size() / 2);
for (std::size_t i = 0; i != s.size() / 2; ++i)
result[i] = 16 * parse_hex(s[2 * i]) + parse_hex(s[2 * i + 1]);
return result;
int
generateCADU_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
const unsigned char *in = (const unsigned char *) input_items[0];
//Reed-Solomon and Scrambler objects
ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
Scrambler S;
//Buffers
unsigned char *frameBuffer1 = (unsigned char*)malloc(d_frameLength*sizeof(unsigned char));
std::vector<unsigned char> frameBuffer2;
//The work function engine
for(int i = 0; (i + d_frameLength) < noutput_items; i += d_frameLength)
//Copying data from input stream
memcpy(frameBuffer1,in + i + d_frameLength,d_frameLength);
//Copying frame into std::vector buffer
frameBuffer2.insert(frameBuffer2.begin(),frameBuffer1, frameBuffer1 + d_frameLength);
//Optional scrambling and Reed-Solomon
if (d_rs) RS.Encode_RS(frameBuffer2);
if (d_scramble) S.Scramble(frameBuffer2);
//Insert sync word
frameBuffer2.insert(frameBuffer2.begin(), d_sync.begin(), d_sync.end());
//Transmitting PDU
pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer2.data(),frameBuffer2.size())));
message_port_pub(pmt::mp("out"), pdu);
//Clear buffer
frameBuffer2.clear();
// Tell runtime system how many output items we produced.
return noutput_items;
/* namespace ccsds */
/* namespace gr */
And here is the processCADU block. This block uses tags generated by the synchronizeCADU (which is simply a wrapper for the correlate_access_tag block) to extract CADUs
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gnuradio/io_signature.h>
#include "processCADU_impl.h"
#include "fec/ReedSolomon/ReedSolomon.h"
#include "fec/Scrambler/Scrambler.h"
namespace gr
namespace ccsds
processCADU::sptr
processCADU::make(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
return gnuradio::get_initial_sptr
(new processCADU_impl(frameLength, scramble, rs, intDepth, tagName));
/*
* The private constructor
*/
processCADU_impl::processCADU_impl(int frameLength, int scramble, int rs, int intDepth, std::string tagName)
: gr::sync_block("processCADU",
gr::io_signature::make(1, 1, sizeof(unsigned char)),
gr::io_signature::make(0, 0, 0)),
d_frameLength(frameLength),d_scramble(scramble == 1),d_rs(rs >= 1), d_basis(rs >= 2), d_intDepth(intDepth)
//Multiple input
set_output_multiple(d_frameLength * 8);
//Registering output port
message_port_register_out(pmt::mp("out"));
if (d_rs) d_frameLength += 32 * d_intDepth;
//SEtting tag name
key = pmt::mp(tagName);
/*
* Our virtual destructor.
*/
processCADU_impl::~processCADU_impl()
delete d_pack;
int
processCADU_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
const unsigned char *in = (const unsigned char *) input_items[0];
unsigned char *out = (unsigned char *) output_items[0];
void *msg_data = NULL;
unsigned char frame_data[d_frameLength];
unsigned char frame_len = 0;
std::vector<unsigned char> frameBuffer;
//Reed-Solomon and Scrambler objects
ReedSolomon RS(16,d_intDepth,d_basis);// False = conventional, True = dual-basis
std::vector<int> errors;//errors.push_back(0);
Scrambler S;
d_tags.clear();
d_pack = new blocks::kernel::pack_k_bits(8);
this->get_tags_in_window(d_tags, 0, 0, noutput_items,key);
for(d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++)
// Check that we have enough data for a full frame
if ((d_tags_itr->offset - this->nitems_read(0)) > (noutput_items - (d_frameLength) * 8))
return (d_tags_itr->offset - this->nitems_read(0) - 1);
//Pack bits into bytes
d_pack->pack(frame_data, &in[d_tags_itr->offset - this->nitems_read(0)], d_frameLength);
//Copying frame into std::vector buffer
frameBuffer.insert(frameBuffer.begin(),frame_data, frame_data + d_frameLength);
//Optional scrambling and Reed-Solomon
if (d_scramble) S.Scramble(frameBuffer);
//if (d_rs) RS.Decode_RS(frameBuffer,errors);
//If there is Reed-Solomon decoding
if(d_rs)
RS.Decode_RS(frameBuffer,errors);
if (RS.Success(errors)) // Success
//std::cout << "Success" << std::endl;
pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
message_port_pub(pmt::mp("out"), pdu);
/*for(int i=0; i < errors.size(); i++)
//std::cout << "Number of Errors : " << errors.at(i) << std::endl << std::endl;
*/
else // Failure
std::cout << "RS failure" << std::endl;
else
pmt::pmt_t pdu(pmt::cons(pmt::PMT_NIL,pmt::make_blob(frameBuffer.data(),frameBuffer.size())));
message_port_pub(pmt::mp("out"), pdu);
//Clear buffers
frameBuffer.clear();
errors.clear();
// Tell runtime system how many output items we produced.
return noutput_items;
/* namespace ccsds */
/* namespace gr */
Regards,
M
@MarcusMüller You are absolutely right. But, my flowgraph has a number of blocks before the "message processing" block. The flowgraph also hangs after I have transferred around 0.5 gigabytes of data. So in addition to the errors caused by the message passing block, I will also like suggestions on how to implement the buffer block.
– Moses Browne Mwakyanjala
13 hours ago
you don't need that. I promise. You can just increase the minimum output buffer size of the upstream block, and let GNU Radio handle the buffer. It would do the same, only smarter.
– Marcus Müller
13 hours ago
and if your flowgraph hangs after 0.5 GB, you have a bug, not something that needs buffering! Or, you have something like a storage bottleneck, in which you need buffering, but in the operating system's file system handling. Anyway, not knowing your application, I'd argue that you're asking about something you don't show here – I'm afraid that qualifies as "too unclear" to be answered :(
– Marcus Müller
12 hours ago
@MarcusMüller Fair enough. I have added the full flowgraph and the implementation files of the blocks that process messages.
– Moses Browne Mwakyanjala
12 hours ago
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
You're misinterpreting something. The output buffer of every block is a circular buffer. And: that block can't have anything to do with your problem, as it doesn't deal with messages, and your error has to do with message passing. So, wrong track.
– Marcus Müller
17 hours ago