[hamradio-commits] [gnss-sdr] 101/126: adding a TPC client and server

Carles Fernandez carles_fernandez-guest at moszumanska.debian.org
Sat Dec 26 18:38:06 UTC 2015


This is an automated email from the git hooks/post-receive script.

carles_fernandez-guest pushed a commit to branch next
in repository gnss-sdr.

commit cf75c669c84acd540a12adc02fa4fdec28460fea
Author: Carles Fernandez <carles.fernandez at gmail.com>
Date:   Tue Dec 15 15:04:58 2015 +0100

    adding a TPC client and server
---
 src/core/system_parameters/rtcm.cc |  82 +++++++++++++
 src/core/system_parameters/rtcm.h  | 231 +++++++++++++++++++++++++++++++++++++
 src/tests/formats/rtcm_test.cc     |  37 ++++++
 3 files changed, 350 insertions(+)

diff --git a/src/core/system_parameters/rtcm.cc b/src/core/system_parameters/rtcm.cc
index f9adf58..fd10254 100644
--- a/src/core/system_parameters/rtcm.cc
+++ b/src/core/system_parameters/rtcm.cc
@@ -33,6 +33,7 @@
 #include <cmath>      // for std::fmod
 #include <cstdlib>    // for strtol
 #include <sstream>    // for std::stringstream
+#include <thread>
 #include <boost/algorithm/string.hpp>  // for to_upper_copy
 #include <boost/dynamic_bitset.hpp>
 #include <gflags/gflags.h>
@@ -41,7 +42,11 @@
 using google::LogMessage;
 
 DEFINE_int32(RTCM_Ref_Station_ID, 1234, "Reference Station ID in RTCM messages");
+DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server");
+// 2101 is the standard RTCM port according to the Internet Assigned Numbers Authority (IANA)
+// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml
 
+DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address");
 
 
 Rtcm::Rtcm()
@@ -49,9 +54,86 @@ Rtcm::Rtcm()
     Rtcm::reset_data_fields();
     preamble = std::bitset<8>("11010011");
     reserved_field = std::bitset<6>("000000");
+    rtcm_message_queue = std::make_shared< concurrent_queue<std::string> >();
+    // for each server, do:
+    boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port);
+    servers.emplace_back(io_service, endpoint, rtcm_message_queue);
+    server_is_running = false;
 }
 
 
+Rtcm::~Rtcm()
+{}
+
+
+
+// *****************************************************************************************************
+//
+//   TCP Server / Client functions
+//
+// *****************************************************************************************************
+void Rtcm::run_server()
+{
+    std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl;
+    try
+    {
+            std::thread t([&]{ io_service.run(); });
+            server_is_running = true;
+            t.detach();
+    }
+    catch (std::exception& e)
+    {
+            std::cerr << "Exception: " << e.what() << "\n";
+    }
+}
+
+
+void Rtcm::stop_service()
+{
+    io_service.stop();
+}
+
+
+void Rtcm::stop_server()
+{
+    std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl;
+    Rtcm::stop_service();
+    server_is_running = false;
+}
+
+
+void Rtcm::run_client()
+{
+    std::cout << "Starting a TCP Client on port " << FLAGS_RTCM_Port << std::endl;
+    std::string remote_host = FLAGS_Remote_RTCM_Server;
+    std::string remote_port = std::to_string(FLAGS_RTCM_Port);
+    boost::asio::ip::tcp::resolver resolver(io_service);
+    auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() });
+
+    clients.emplace_back(io_service, endpoint_iterator);
+    try
+    {
+            std::thread t([&](){ io_service.run(); });
+            t.detach();
+    }
+    catch (std::exception& e)
+    {
+            std::cerr << "Exception: " << e.what() << "\n";
+    }
+}
+
+
+void Rtcm::stop_client()
+{
+    std::cout << "Stopping TCP Client on port " << FLAGS_RTCM_Port << std::endl;
+    Rtcm::stop_service();
+}
+
+
+void Rtcm::send_message(const std::string & msg)
+{
+    rtcm_message_queue->push(msg);
+}
 
 
 // *****************************************************************************************************
diff --git a/src/core/system_parameters/rtcm.h b/src/core/system_parameters/rtcm.h
index 98eeb01..b87049a 100644
--- a/src/core/system_parameters/rtcm.h
+++ b/src/core/system_parameters/rtcm.h
@@ -34,12 +34,18 @@
 
 
 #include <bitset>
+#include <deque>
 #include <map>
+#include <memory>
+#include <set>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
+#include <boost/asio.hpp>
 #include <boost/crc.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include "concurrent_queue.h"
 #include "gnss_synchro.h"
 #include "galileo_fnav_message.h"
 #include "gps_navigation_message.h"
@@ -79,6 +85,7 @@ class Rtcm
 {
 public:
     Rtcm(); //<! Default constructor
+    ~Rtcm();
 
     /*!
      * \brief Prints message type 1001 (L1-Only GPS RTK Observables)
@@ -261,6 +268,14 @@ public:
 
     bool check_CRC(const std::string & message);         //<! Checks that the CRC of a RTCM package is correct
 
+    void run_server();
+    void stop_server();
+
+    void run_client();
+    void stop_client();
+
+    void send_message(const std::string & message);
+
 private:
     //
     // Generation of messages content
@@ -325,6 +340,222 @@ private:
     unsigned int msm_extended_lock_time_indicator(unsigned int lock_time_period_s);
 
     //
+    // Classes for TCP communication
+    //
+    class Message_buffer
+    {
+    public:
+        // Construct from a std::string.
+        explicit Message_buffer(const std::string& data)
+        : data_(new std::vector<char>(data.begin(), data.end())),
+          buffer_(boost::asio::buffer(*data_))
+        {
+        }
+
+        // Implement the ConstBufferSequence requirements.
+        typedef boost::asio::const_buffer value_type;
+        typedef const boost::asio::const_buffer* const_iterator;
+        const boost::asio::const_buffer* begin() const { return &buffer_; }
+        const boost::asio::const_buffer* end() const { return &buffer_ + 1; }
+
+    private:
+        std::shared_ptr<std::vector<char> > data_;
+        boost::asio::const_buffer buffer_;
+    };
+
+
+    class Rtcm_listener
+    {
+    public:
+        virtual ~Rtcm_listener() {}
+        virtual void deliver(const Message_buffer & msg) = 0;
+    };
+
+
+    class Rtcm_session
+            : public Rtcm_listener,
+              public std::enable_shared_from_this<Rtcm_session>
+    {
+    public:
+        Rtcm_session(boost::asio::ip::tcp::socket socket, std::shared_ptr< concurrent_queue<std::string> > & queue) : socket_(std::move(socket)) , queue_(queue)  { }
+
+        void start()
+        {
+            std::cout << "Starting a session: " <<  std::endl;
+            listeners_.insert(shared_from_this());
+            do_send_messages();
+        }
+
+        void deliver(const Message_buffer & msg)
+        {
+            bool write_in_progress = !write_msgs_.empty();
+            std::cout << "Pushing a message " <<  std::endl;
+            write_msgs_.push_back(msg);
+            if (!write_in_progress)
+                {
+                    do_write();
+                }
+        }
+
+    private:
+        void do_send_messages()
+        {
+            std::string message;
+            queue_->wait_and_pop(message);
+            Message_buffer buffer_out(message);
+            for (auto listener: listeners_)
+                {
+                    listener->deliver(buffer_out);
+                }
+            do_send_messages();
+        }
+
+        void do_write()
+        {
+            auto self(shared_from_this());
+            boost::asio::async_write(socket_, write_msgs_.front(), [this, self](boost::system::error_code ec, std::size_t /*length*/)
+                    {
+                if(!ec)
+                    {
+                        std::cout << "RTCM message sent." << std::endl;
+                        write_msgs_.pop_front();
+                        if(!write_msgs_.empty())
+                            {
+                                do_write();
+                            }
+                    }
+                else
+                    {
+                        std::cout << "Erasing listener" << std::endl;
+                        listeners_.erase(shared_from_this());
+                    }
+                    });
+        }
+
+        boost::asio::ip::tcp::socket socket_;
+        std::shared_ptr< concurrent_queue<std::string> > & queue_;
+        std::string read_msg_;
+        std::deque<Message_buffer> write_msgs_;
+        std::set< std::shared_ptr<Rtcm_listener> > listeners_;
+    };
+
+
+    class Tcp_server
+    {
+    public:
+        Tcp_server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint, std::shared_ptr< concurrent_queue<std::string> >  & queue)
+    : acceptor_(io_service), queue_(queue),
+      socket_(io_service)
+    {
+            acceptor_.open(endpoint.protocol());
+            acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+            acceptor_.bind(endpoint);
+            acceptor_.listen();
+            do_accept();
+    }
+
+    private:
+        void do_accept()
+        {
+            acceptor_.async_accept(socket_, [this](boost::system::error_code ec)
+                    {
+                if (!ec)
+                    {
+                        std::cout << "Starting RTCM TCP server session..." << std::endl;
+                        std::thread session([&]{ std::make_shared<Rtcm_session>(std::move(socket_), queue_)->start(); });
+                        session.detach();
+                        std::cout << "Accepting new connections " << std::endl;
+                    }
+                else
+                    {
+                        std::cout << "Error starting a new session: " << ec << std::endl;
+                    }
+                do_accept();
+                    });
+        }
+
+        boost::asio::ip::tcp::acceptor acceptor_;
+        boost::asio::ip::tcp::socket socket_;
+        std::shared_ptr< concurrent_queue<std::string> >  & queue_;
+    };
+
+
+    class Tcp_client
+    {
+    public:
+        Tcp_client(boost::asio::io_service& io_service,
+                boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
+    : io_service_(io_service),
+      socket_(io_service)
+    {
+            do_connect(endpoint_iterator);
+    }
+
+        void close()
+        {
+            io_service_.post([this]() { socket_.close(); });
+        }
+
+    private:
+        void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
+        {
+            std::cout << "Connecting to server..." << std::endl;
+            boost::asio::async_connect(socket_, endpoint_iterator,
+                    [this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator)
+                    {
+                if (!ec)
+                    {
+                        std::cout << "Connected." << std::endl;
+                        do_read_message();
+                    }
+                else
+                    {
+                        std::cout << "Server is down." << std::endl;
+                    }
+                    });
+        }
+
+        void do_read_message()
+        {
+            // Waiting for data and reading forever, until connection is closed.
+            for(;;)
+                {
+                    std::array<char, 10> buf;
+                    boost::system::error_code error;
+
+                    size_t len = socket_.read_some(boost::asio::buffer(buf), error);
+
+                    if(error == boost::asio::error::eof)
+                        break; // // Connection closed cleanly by peer.
+                    else if(error)
+                        {
+                            std::cout << "Error: " << error << std::endl;
+                            socket_.close();
+                            break;
+                        }
+                    std::cout << "Received message: ";
+                    std::cout.write(buf.data(), len);
+                    std::cout << std::endl;
+                }
+
+            std::cout << std::endl;
+            socket_.close();
+            std::cout << "Connection closed by the server. Good bye." << std::endl;
+        }
+
+        boost::asio::io_service& io_service_;
+        boost::asio::ip::tcp::socket socket_;
+    };
+
+    boost::asio::io_service io_service;
+    std::shared_ptr< concurrent_queue<std::string> > rtcm_message_queue;
+    std::thread t;
+    std::list<Rtcm::Tcp_server> servers;
+    std::list<Rtcm::Tcp_client> clients;
+    bool server_is_running;
+    void stop_service();
+
+    //
     // Transport Layer
     //
     std::bitset<8> preamble;
diff --git a/src/tests/formats/rtcm_test.cc b/src/tests/formats/rtcm_test.cc
index 5dee39d..9790590 100644
--- a/src/tests/formats/rtcm_test.cc
+++ b/src/tests/formats/rtcm_test.cc
@@ -30,6 +30,7 @@
  */
 
 #include <memory>
+#include <thread>
 #include "rtcm.h"
 
 TEST(Rtcm_Test, Hex_to_bin)
@@ -519,3 +520,39 @@ TEST(Rtcm_Test, MSM1)
     EXPECT_EQ(psrng4_s, read_psrng4_s_2);
 }
 
+
+TEST(Rtcm_Test, InstantiateServer)
+{
+    auto rtcm = std::make_shared<Rtcm>();
+    rtcm->run_server();
+    std::string msg("Hello");
+    rtcm->send_message(msg);
+    std::string test3 = "ff";
+    std::string test3_bin = rtcm->hex_to_bin(test3);
+    EXPECT_EQ(0, test3_bin.compare("11111111"));
+    rtcm->stop_server();
+    std::string test6 = "0011";
+    std::string test6_hex = rtcm->bin_to_hex(test6);
+    EXPECT_EQ(0, test6_hex.compare("3"));
+    long unsigned int expected1 = 42;
+    EXPECT_EQ(expected1, rtcm->bin_to_uint("00101010"));
+    rtcm->run_server();
+    std::string test4_bin = rtcm->hex_to_bin(test3);
+    std::string s("Testing");
+    rtcm->send_message(s);
+    rtcm->stop_server();
+    EXPECT_EQ(0, test4_bin.compare("11111111"));
+}
+
+
+TEST(Rtcm_Test, InstantiateClient)
+{
+    auto rtcm = std::make_shared<Rtcm>();
+    rtcm->run_client();
+    std::string test3 = "ff";
+    std::string test3_bin = rtcm->hex_to_bin(test3);
+    EXPECT_EQ(0, test3_bin.compare("11111111"));
+    rtcm->stop_client();
+}
+
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-hamradio/gnss-sdr.git



More information about the pkg-hamradio-commits mailing list