[hamradio-commits] [gnss-sdr] 105/126: Redesign of the TCP server

Carles Fernandez carles_fernandez-guest at moszumanska.debian.org
Sat Dec 26 18:38:06 UTC 2015


This is an automated email from the git hooks/post-receive script.

carles_fernandez-guest pushed a commit to branch next
in repository gnss-sdr.

commit e1c6137597cb7613b3697400c29e09632567d489
Author: Carles Fernandez <carles.fernandez at gmail.com>
Date:   Tue Dec 22 18:45:07 2015 +0100

    Redesign of the TCP server
    
    Now the TCP server serves RTCM messages to multiple clients
    concurrently, and without loosing messages.
---
 src/core/system_parameters/rtcm.cc |  25 ++-
 src/core/system_parameters/rtcm.h  | 403 ++++++++++++++++++++++++++++++-------
 src/tests/formats/rtcm_test.cc     |   2 +
 3 files changed, 356 insertions(+), 74 deletions(-)

diff --git a/src/core/system_parameters/rtcm.cc b/src/core/system_parameters/rtcm.cc
index fd10254..534b72b 100644
--- a/src/core/system_parameters/rtcm.cc
+++ b/src/core/system_parameters/rtcm.cc
@@ -30,6 +30,7 @@
 
 #include "rtcm.h"
 #include <algorithm>  // for std::reverse
+#include <chrono>     // std::chrono::seconds
 #include <cmath>      // for std::fmod
 #include <cstdlib>    // for strtol
 #include <sstream>    // for std::stringstream
@@ -47,6 +48,7 @@ DEFINE_int32(RTCM_Port, 2101 , "TCP port of the RTCM message server");
 // https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml
 
 DEFINE_string(Remote_RTCM_Server, "localhost", "Remote RTCM server address");
+DEFINE_int32(Remote_RTCM_Port, 2101 , "Remote TCP port of the RTCM message server");
 
 
 Rtcm::Rtcm()
@@ -57,7 +59,7 @@ Rtcm::Rtcm()
     rtcm_message_queue = std::make_shared< concurrent_queue<std::string> >();
     // for each server, do:
     boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), FLAGS_RTCM_Port);
-    servers.emplace_back(io_service, endpoint, rtcm_message_queue);
+    servers.emplace_back(io_service, endpoint);
     server_is_running = false;
 }
 
@@ -69,7 +71,7 @@ Rtcm::~Rtcm()
 
 // *****************************************************************************************************
 //
-//   TCP Server / Client functions
+//   TCP Server / Client helper classes
 //
 // *****************************************************************************************************
 void Rtcm::run_server()
@@ -77,6 +79,9 @@ void Rtcm::run_server()
     std::cout << "Starting a TCP Server on port " << FLAGS_RTCM_Port << std::endl;
     try
     {
+            std::thread tq([&]{ std::make_shared<Queue_Reader>(io_service, rtcm_message_queue, FLAGS_RTCM_Port)->do_read_queue(); });
+            tq.detach();
+
             std::thread t([&]{ io_service.run(); });
             server_is_running = true;
             t.detach();
@@ -97,6 +102,8 @@ void Rtcm::stop_service()
 void Rtcm::stop_server()
 {
     std::cout << "Stopping TCP Server on port " << FLAGS_RTCM_Port << std::endl;
+    rtcm_message_queue->push("Goodbye"); // this kills tq
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     Rtcm::stop_service();
     server_is_running = false;
 }
@@ -104,9 +111,9 @@ void Rtcm::stop_server()
 
 void Rtcm::run_client()
 {
-    std::cout << "Starting a TCP Client on port " << FLAGS_RTCM_Port << std::endl;
+    std::cout << "Starting a TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl;
     std::string remote_host = FLAGS_Remote_RTCM_Server;
-    std::string remote_port = std::to_string(FLAGS_RTCM_Port);
+    std::string remote_port = std::to_string(FLAGS_Remote_RTCM_Port);
     boost::asio::ip::tcp::resolver resolver(io_service);
     auto endpoint_iterator = resolver.resolve({ remote_host.c_str(), remote_port.c_str() });
 
@@ -125,7 +132,9 @@ void Rtcm::run_client()
 
 void Rtcm::stop_client()
 {
-    std::cout << "Stopping TCP Client on port " << FLAGS_RTCM_Port << std::endl;
+    std::cout << "Stopping TCP Client on port " << FLAGS_Remote_RTCM_Port << std::endl;
+    clients.front().close();
+    std::this_thread::sleep_for(std::chrono::seconds(1));
     Rtcm::stop_service();
 }
 
@@ -136,6 +145,12 @@ void Rtcm::send_message(const std::string & msg)
 }
 
 
+bool Rtcm::is_server_running()
+{
+    return server_is_running;
+}
+
+
 // *****************************************************************************************************
 //
 //   TRANSPORT LAYER AS DEFINED AT RTCM STANDARD 10403.2
diff --git a/src/core/system_parameters/rtcm.h b/src/core/system_parameters/rtcm.h
index e361d21..f6decd9 100644
--- a/src/core/system_parameters/rtcm.h
+++ b/src/core/system_parameters/rtcm.h
@@ -250,12 +250,12 @@ public:
             bool divergence_free,
             bool more_messages);
 
-    unsigned int lock_time(const Gps_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which GPS L1 signals have been continually tracked.
+    unsigned int lock_time(const Gps_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro);      //<! Returns the time period in which GPS L1 signals have been continually tracked.
     unsigned int lock_time(const Gps_CNAV_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which GPS L2 signals have been continually tracked.
-    unsigned int lock_time(const Galileo_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro); //<! Returns the time period in which Galileo signals have been continually tracked.
+    unsigned int lock_time(const Galileo_Ephemeris & eph, double obs_time, const Gnss_Synchro & gnss_synchro);  //<! Returns the time period in which Galileo signals have been continually tracked.
 
-    std::string bin_to_hex(const std::string& s); //<! Returns a string of hexadecimal symbols from a string of binary symbols
-    std::string hex_to_bin(const std::string& s); //<! Returns a string of binary symbols from a string of hexadecimal symbols
+    std::string bin_to_hex(const std::string& s);        //<! Returns a string of hexadecimal symbols from a string of binary symbols
+    std::string hex_to_bin(const std::string& s);        //<! Returns a string of binary symbols from a string of hexadecimal symbols
 
     unsigned long int bin_to_uint(const std::string& s); //<! Returns an unsigned long int from a string of binary symbols
     long int bin_to_int(const std::string& s);           //<! Returns a long int from a string of binary symbols
@@ -268,13 +268,14 @@ public:
 
     bool check_CRC(const std::string & message);         //<! Checks that the CRC of a RTCM package is correct
 
-    void run_server();
-    void stop_server();
+    void run_server();                                   //<! Starts running the server
+    void stop_server();                                  //<! Stops the server
 
-    void run_client();
-    void stop_client();
+    void run_client();                                   //<! Starts running the client
+    void stop_client();                                  //<! Stops the client
 
-    void send_message(const std::string & message);
+    void send_message(const std::string & message);      //<! Sends a message through the server
+    bool is_server_running();                            //<! Returns true if the server is running, false otherwise
 
 private:
     //
@@ -342,54 +343,147 @@ private:
     //
     // Classes for TCP communication
     //
-    class Message_buffer
+    class Rtcm_Message
     {
     public:
-        // Construct from a std::string.
-        explicit Message_buffer(const std::string& data)
-        : data_(new std::vector<char>(data.begin(), data.end())),
-          buffer_(boost::asio::buffer(*data_))
+        enum { header_length = 6 };
+        enum { max_body_length = 1029 };
+
+        Rtcm_Message()
+        : body_length_(0)
+        { }
+
+        const char* data() const
+        {
+            return data_;
+        }
+
+        char* data()
+        {
+            return data_;
+        }
+
+        std::size_t length() const
+        {
+            return header_length + body_length_;
+        }
+
+        const char* body() const
+        {
+            return data_ + header_length;
+        }
+
+        char* body()
+        {
+            return data_ + header_length;
+        }
+
+        std::size_t body_length() const
+        {
+            return body_length_;
+        }
+
+        void body_length(std::size_t new_length)
+        {
+            body_length_ = new_length;
+            if (body_length_ > max_body_length)
+                body_length_ = max_body_length;
+        }
+
+        bool decode_header()
         {
+            char header[header_length + 1] = "";
+            std::strncat(header, data_, header_length);
+            if(header[0] != 'G' || header[1] != 'S')
+                {
+                    return false;
+                }
+
+            char header2_[header_length - 1] = "";
+            std::strncat(header2_, data_ + 2 , header_length - 2);
+            body_length_ = std::atoi(header2_);
+            if(body_length_ == 0)
+                {
+                    return false;
+                }
+
+            if (body_length_ > max_body_length)
+                {
+                    body_length_ = 0;
+                    return false;
+                }
+            return true;
         }
 
-        // Implement the ConstBufferSequence requirements.
-        typedef boost::asio::const_buffer value_type;
-        typedef const boost::asio::const_buffer* const_iterator;
-        const boost::asio::const_buffer* begin() const { return &buffer_; }
-        const boost::asio::const_buffer* end() const { return &buffer_ + 1; }
+        void encode_header()
+        {
+            char header[header_length + 1] = "";
+            std::sprintf(header, "GS%4d", static_cast<int>(body_length_));
+            std::memcpy(data_, header, header_length);
+        }
 
     private:
-        std::shared_ptr<std::vector<char> > data_;
-        boost::asio::const_buffer buffer_;
+        char data_[header_length + max_body_length];
+        std::size_t body_length_;
+    };
+
+
+    class Rtcm_Listener
+    {
+    public:
+        virtual ~Rtcm_Listener() {}
+        virtual void deliver(const Rtcm_Message & msg) = 0;
     };
 
 
-    class Rtcm_listener
+    class Rtcm_Listener_Room
     {
     public:
-        virtual ~Rtcm_listener() {}
-        virtual void deliver(const Message_buffer & msg) = 0;
+        void join(std::shared_ptr<Rtcm_Listener> participant)
+        {
+            participants_.insert(participant);
+            for (auto msg: recent_msgs_)
+                participant->deliver(msg);
+        }
+
+        void leave(std::shared_ptr<Rtcm_Listener> participant)
+        {
+            participants_.erase(participant);
+        }
+
+        void deliver(const Rtcm_Message & msg)
+        {
+            recent_msgs_.push_back(msg);
+            while (recent_msgs_.size() > max_recent_msgs)
+                recent_msgs_.pop_front();
+
+            for (auto participant: participants_)
+                participant->deliver(msg);
+        }
+
+    private:
+        std::set<std::shared_ptr<Rtcm_Listener> > participants_;
+        enum { max_recent_msgs = 1 };
+        std::deque<Rtcm_Message> recent_msgs_;
     };
 
 
-    class Rtcm_session
-            : public Rtcm_listener,
-              public std::enable_shared_from_this<Rtcm_session>
+    class Rtcm_Session
+            : public Rtcm_Listener,
+              public std::enable_shared_from_this<Rtcm_Session>
     {
     public:
-        Rtcm_session(boost::asio::ip::tcp::socket socket, std::shared_ptr< concurrent_queue<std::string> > & queue) : socket_(std::move(socket)) , queue_(queue)  { }
+        Rtcm_Session(boost::asio::ip::tcp::socket socket, Rtcm_Listener_Room & room) : socket_(std::move(socket)), room_(room)   { }
 
         void start()
         {
-            std::cout << "Starting a session: " <<  std::endl;
-            listeners_.insert(shared_from_this());
-            do_send_messages();
+            room_.join(shared_from_this());
+            do_read_message_header();
         }
 
-        void deliver(const Message_buffer & msg)
+        void deliver(const Rtcm_Message & msg)
         {
             bool write_in_progress = !write_msgs_.empty();
-            std::cout << "Pushing a message " <<  std::endl;
             write_msgs_.push_back(msg);
             if (!write_in_progress)
                 {
@@ -398,26 +492,57 @@ private:
         }
 
     private:
-        void do_send_messages()
+        void do_read_message_header()
         {
-            std::string message;
-            queue_->wait_and_pop(message);
-            Message_buffer buffer_out(message);
-            for (auto listener: listeners_)
-                {
-                    listener->deliver(buffer_out);
-                }
-            do_send_messages();
+            auto self(shared_from_this());
+            boost::asio::async_read(socket_,
+                    boost::asio::buffer(read_msg_.data(), Rtcm_Message::header_length),
+                    [this, self](boost::system::error_code ec, std::size_t /*length*/)
+                    {
+                if (!ec && read_msg_.decode_header())
+                    {
+                        do_read_message_body();
+                    }
+                else
+                    {
+                        std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
+                        room_.leave(shared_from_this());
+                    }
+                    });
         }
 
-        void do_write()
+        void do_read_message_body()
         {
             auto self(shared_from_this());
-            boost::asio::async_write(socket_, write_msgs_.front(), [this, self](boost::system::error_code ec, std::size_t /*length*/)
+            boost::asio::async_read(socket_,
+                    boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
+                    [this, self](boost::system::error_code ec, std::size_t /*length*/)
+                    {
+                if (!ec)
+                    {
+                        room_.deliver(read_msg_);
+                        //std::cout << "Delivered message (session): ";
+                        //std::cout.write(read_msg_.body(), read_msg_.body_length());
+                        //std::cout << std::endl;
+                        do_read_message_header();
+                    }
+                else
                     {
+                        std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
+                        room_.leave(shared_from_this());
+                    }
+                    });
+        }
+
+        void do_write()
+        {
+            auto self(shared_from_this());
+            boost::asio::async_write(socket_,
+                    boost::asio::buffer(write_msgs_.front().body(),
+                            write_msgs_.front().body_length()), [this, self](boost::system::error_code ec, std::size_t /*length*/)
+                            {
                 if(!ec)
                     {
-                        std::cout << "RTCM message sent." << std::endl;
                         write_msgs_.pop_front();
                         if(!write_msgs_.empty())
                             {
@@ -426,26 +551,151 @@ private:
                     }
                 else
                     {
-                        std::cout << "Erasing listener" << std::endl;
-                        listeners_.erase(shared_from_this());
+                        std::cout << "Closing connection with client from " << socket_.remote_endpoint().address() << std::endl;
+                        room_.leave(shared_from_this());
+                    }
+                            });
+        }
+
+        boost::asio::ip::tcp::socket socket_;
+        Rtcm_Listener_Room & room_;
+        Rtcm_Message read_msg_;
+        std::deque<Rtcm_Message> write_msgs_;
+    };
+
+
+    class Tcp_Internal_Client
+            : public std::enable_shared_from_this<Tcp_Internal_Client>
+    {
+    public:
+        Tcp_Internal_Client(boost::asio::io_service& io_service,
+                boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
+    : io_service_(io_service), socket_(io_service)
+    {
+            do_connect(endpoint_iterator);
+    }
+
+        void close()
+        {
+            io_service_.post([this]() { socket_.close(); });
+        }
+
+        void write(const Rtcm_Message & msg)
+        {
+            io_service_.post(
+                    [this, msg]()
+                    {
+                bool write_in_progress = !write_msgs_.empty();
+                write_msgs_.push_back(msg);
+                if (!write_in_progress)
+                    {
+                        do_write();
+                    }
+                    });
+        }
+
+    private:
+        void do_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
+        {
+            boost::asio::async_connect(socket_, endpoint_iterator,
+                    [this](boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator)
+                    {
+                if (!ec)
+                    {
+                        do_read_message();
+                    }
+                else
+                    {
+                        std::cout << "Server is down." << std::endl;
                     }
                     });
         }
 
+        void do_read_message()
+        {
+            boost::asio::async_read(socket_,
+                    boost::asio::buffer(read_msg_.data(), 1029),
+                    [this](boost::system::error_code ec, std::size_t length)
+                    {
+                if (!ec )
+                    {
+                        do_read_message();
+                    }
+                else
+                    {
+                        std::cout << "Error in client" << std::endl;
+                        socket_.close();
+                    }
+                    });
+        }
+
+        void do_write()
+        {
+
+            boost::asio::async_write(socket_,
+                    boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()),
+                    [this](boost::system::error_code ec, std::size_t /*length*/)
+                    {
+                if (!ec)
+                    {
+                        write_msgs_.pop_front();
+                        if (!write_msgs_.empty())
+                            {
+                                do_write();
+                            }
+                    }
+                else
+                    {
+                        socket_.close();
+                    }
+                    });
+        }
+
+        boost::asio::io_service& io_service_;
         boost::asio::ip::tcp::socket socket_;
+        Rtcm_Message read_msg_;
+        std::deque<Rtcm_Message> write_msgs_;
+    };
+
+
+    class Queue_Reader
+    {
+    public:
+        Queue_Reader(boost::asio::io_service& io_service, std::shared_ptr< concurrent_queue<std::string> > & queue, int port) : queue_(queue)
+    {
+            boost::asio::ip::tcp::resolver resolver(io_service);
+            std::string host("localhost");
+            std::string port_str = std::to_string(port);
+            auto queue_endpoint_iterator = resolver.resolve({ host.c_str(), port_str.c_str() });
+            c = std::make_shared<Tcp_Internal_Client>(io_service, queue_endpoint_iterator);
+    }
+
+        void do_read_queue()
+        {
+            for(;;)
+                {
+                    std::string message;
+                    Rtcm_Message msg;
+                    queue_->wait_and_pop(message);
+                    if(message.compare("Goodbye") == 0) break;
+                    const char *char_msg = message.c_str();
+                    msg.body_length(message.length());
+                    std::memcpy(msg.body(), char_msg, msg.body_length());
+                    msg.encode_header();
+                    c->write(msg);
+                }
+        }
+    private:
+        std::shared_ptr<Tcp_Internal_Client> c;
         std::shared_ptr< concurrent_queue<std::string> > & queue_;
-        std::string read_msg_;
-        std::deque<Message_buffer> write_msgs_;
-        std::set< std::shared_ptr<Rtcm_listener> > listeners_;
     };
 
 
-    class Tcp_server
+    class Tcp_Server
     {
     public:
-        Tcp_server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint, std::shared_ptr< concurrent_queue<std::string> >  & queue)
-    : acceptor_(io_service), queue_(queue),
-      socket_(io_service)
+        Tcp_Server(boost::asio::io_service& io_service, const boost::asio::ip::tcp::endpoint& endpoint)
+    : io_service_(io_service), acceptor_(io_service), socket_(io_service)
     {
             acceptor_.open(endpoint.protocol());
             acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
@@ -454,40 +704,53 @@ private:
             do_accept();
     }
 
-    private:
+        void close_server()
+        {
+            socket_.close();
+            acceptor_.close();
+        }
 
+    private:
         void do_accept()
         {
             acceptor_.async_accept(socket_, [this](boost::system::error_code ec)
                     {
                 if (!ec)
                     {
-                        std::cout << "Starting RTCM TCP server session..." << std::endl;
-                        std::thread session([&]{ std::make_shared<Rtcm_session>(std::move(socket_), queue_)->start(); });
-                        session.detach();
-                        std::cout << "Accepting new connections " << std::endl;
+                        if(first_client)
+                            {
+                                std::cout << "The TCP Server is up and running. Accepting connections ..." << std::endl;
+                                first_client = false;
+                            }
+                        else
+                            {
+                                std::cout << "Starting RTCM TCP server session..." << std::endl;
+                                std::cout << "Serving client from " << socket_.remote_endpoint().address() << std::endl;
+                            }
+                        std::make_shared<Rtcm_Session>(std::move(socket_), room_)->start();
                     }
                 else
                     {
-                        std::cout << "Error starting a new session: " << ec << std::endl;
+                        std::cout << "Error when invoking a RTCM session. " << ec << std::endl;
                     }
                 do_accept();
                     });
         }
 
+        boost::asio::io_service& io_service_;
         boost::asio::ip::tcp::acceptor acceptor_;
-        std::shared_ptr< concurrent_queue<std::string> >  & queue_;
         boost::asio::ip::tcp::socket socket_;
+        Rtcm_Listener_Room room_;
+        bool first_client = true;
     };
 
 
-    class Tcp_client
+    class Tcp_Client
     {
     public:
-        Tcp_client(boost::asio::io_service& io_service,
+        Tcp_Client(boost::asio::io_service& io_service,
                 boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
-    : io_service_(io_service),
-      socket_(io_service)
+    : io_service_(io_service), socket_(io_service)
     {
             do_connect(endpoint_iterator);
     }
@@ -521,7 +784,7 @@ private:
             // Waiting for data and reading forever, until connection is closed.
             for(;;)
                 {
-                    std::array<char, 10> buf;
+                    std::array<char, 1029> buf;
                     boost::system::error_code error;
 
                     size_t len = socket_.read_some(boost::asio::buffer(buf), error);
@@ -531,7 +794,7 @@ private:
                     else if(error)
                         {
                             std::cout << "Error: " << error << std::endl;
-                            socket_.close();
+                            //socket_.close();
                             break;
                         }
                     std::cout << "Received message: ";
@@ -548,11 +811,13 @@ private:
         boost::asio::ip::tcp::socket socket_;
     };
 
+
     boost::asio::io_service io_service;
     std::shared_ptr< concurrent_queue<std::string> > rtcm_message_queue;
     std::thread t;
-    std::list<Rtcm::Tcp_server> servers;
-    std::list<Rtcm::Tcp_client> clients;
+    std::thread tq;
+    std::list<Rtcm::Tcp_Server> servers;
+    std::list<Rtcm::Tcp_Client> clients;
     bool server_is_running;
     void stop_service();
 
diff --git a/src/tests/formats/rtcm_test.cc b/src/tests/formats/rtcm_test.cc
index 9790590..249e09c 100644
--- a/src/tests/formats/rtcm_test.cc
+++ b/src/tests/formats/rtcm_test.cc
@@ -553,6 +553,8 @@ TEST(Rtcm_Test, InstantiateClient)
     std::string test3_bin = rtcm->hex_to_bin(test3);
     EXPECT_EQ(0, test3_bin.compare("11111111"));
     rtcm->stop_client();
+    std::string test3_bin2 = rtcm->hex_to_bin(test3);
+    EXPECT_EQ(0, test3_bin2.compare("11111111"));
 }
 
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-hamradio/gnss-sdr.git



More information about the pkg-hamradio-commits mailing list