[hamradio-commits] [dump1090] 275/389: BUGFIX : Missed data causes timestamp slip

Matthew Ernisse mernisse-guest at moszumanska.debian.org
Wed Nov 5 00:20:04 UTC 2014


This is an automated email from the git hooks/post-receive script.

mernisse-guest pushed a commit to branch master
in repository dump1090.

commit 75a4c6ee216b19f5d0a138ad6997440c30452b57
Author: Malcolm Robb <Support at ATTAvionics.com>
Date:   Sat Feb 22 23:11:13 2014 +0000

    BUGFIX : Missed data causes timestamp slip
    
    The Mutex on the RTL data reader thread does not "force" the data
    processing thread to execute. Therefore, if the processor is busy, it is
    possible for a second RTL callback to occur before the data from the
    first has been processed. This will cause the loss of the first data,
    but worse, it will cause a slip in the timestamp. This upsets Beamfinder
    and MLAT operation in PlanePlotter.
    
    To solve this, keep a Fifo buffer which is filled by the callback
    thread, and emptied by the data processing thread. The fifo is the same
    size as the number of buffers requested in the call to
    rtlsdr_read_async().
    
    Note - we only put the value of the pointer supplied in the callback
    into the fifo. We do not attempt to cache the data in the buffer pointed
    to by the pointer.  This would require us to memcopy() 2Mbytes per
    second, which we don't want to do if we don't have to because it will
    only make the processor loading worse. Instead, we assume that the data
    in the buffer will remain valid after the callback returns, at least
    until it is overwritten by new data.
    
    It is still possible for us to lose data if we can't process it quickly
    enough. However, we can now detect this loss of data when the fifo is
    almost full, and correct the timestamp for the lost block/blocks.
---
 dump1090.c | 135 +++++++++++++++++++++++++++++++++++++++++++------------------
 dump1090.h |  16 +++++---
 mode_s.c   |   3 +-
 net_io.c   |   2 +-
 4 files changed, 108 insertions(+), 48 deletions(-)

diff --git a/dump1090.c b/dump1090.c
index 3cef7be..6a76fdf 100644
--- a/dump1090.c
+++ b/dump1090.c
@@ -92,7 +92,7 @@ void modesInit(void) {
 
     // Allocate the various buffers used by Modes
     if ( ((Modes.icao_cache = (uint32_t *) malloc(sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2)                  ) == NULL) ||
-         ((Modes.data       = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE)                                         ) == NULL) ||
+         ((Modes.pFileData  = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE)                                         ) == NULL) ||
          ((Modes.magnitude  = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE) ) == NULL) ||
          ((Modes.maglut     = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256)                                 ) == NULL) ||
          ((Modes.beastOut   = (char     *) malloc(MODES_RAWOUT_BUF_SIZE)                                        ) == NULL) ||
@@ -104,7 +104,7 @@ void modesInit(void) {
 
     // Clear the buffers that have just been allocated, just in-case
     memset(Modes.icao_cache, 0,   sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2);
-    memset(Modes.data,       127, MODES_ASYNC_BUF_SIZE);
+    memset(Modes.pFileData,127,   MODES_ASYNC_BUF_SIZE);
     memset(Modes.magnitude,  0,   MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE);
 
     // Validate the users Lat/Lon home location inputs
@@ -133,8 +133,9 @@ void modesInit(void) {
       {Modes.net_output_raw_rate = MODES_RAWOUT_BUF_RATE;}
 
     // Initialise the Block Timers to something half sensible
-    ftime(&Modes.stSystemTimeRTL);
-    Modes.stSystemTimeBlk         = Modes.stSystemTimeRTL;
+    ftime(&Modes.stSystemTimeBlk);
+    for (i = 0; i < MODES_ASYNC_BUF_NUMBER; i++)
+      {Modes.stSystemTimeRTL[i] = Modes.stSystemTimeBlk;}
 
     // Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the 
     // unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you 
@@ -250,13 +251,35 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
 
     MODES_NOTUSED(ctx);
 
+    // Lock the data buffer variables before accessing them
     pthread_mutex_lock(&Modes.data_mutex);
-    ftime(&Modes.stSystemTimeRTL);
-    if (len > MODES_ASYNC_BUF_SIZE) len = MODES_ASYNC_BUF_SIZE;
-    // Read the new data
-    memcpy(Modes.data, buf, len);
-    Modes.data_ready = 1;
-    // Signal to the other thread that new data is ready
+
+    rtlsdrStats(buf);
+
+    Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
+
+    // Get the system time for this block
+    ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]);
+
+    if (len > MODES_ASYNC_BUF_SIZE) {len = MODES_ASYNC_BUF_SIZE;}
+
+    // Queue the new data
+    Modes.pData[Modes.iDataIn] = (uint16_t *) buf;
+    Modes.iDataIn    = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
+    Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);   
+
+    if (Modes.iDataReady == 0) {
+      // Ooooops. We've just received the MODES_ASYNC_BUF_NUMBER'th outstanding buffer
+      // This means that RTLSDR is currently overwriting the MODES_ASYNC_BUF_NUMBER+1
+      // buffer, but we havent yet processed it, so we're going to lose it. There
+      // isn't much we can do to recover the lost data, but we can correct things to
+      // avoid any additional problems.
+      Modes.iDataOut   = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut+1);
+      Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1);   
+      Modes.iDataLost++;
+    }
+ 
+    // Signal to the other thread that new data is ready, and unlock
     pthread_cond_signal(&Modes.data_cond);
     pthread_mutex_unlock(&Modes.data_mutex);
 }
@@ -268,13 +291,12 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
 //
 void readDataFromFile(void) {
     pthread_mutex_lock(&Modes.data_mutex);
-    while(1) {
+    while(Modes.exit == 0) {
         ssize_t nread, toread;
         unsigned char *p;
 
-        if (Modes.exit == 1) break;
-        if (Modes.data_ready) {
-            pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex);
+        if (Modes.iDataReady) {
+            pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
             continue;
         }
 
@@ -287,7 +309,7 @@ void readDataFromFile(void) {
         }
 
         toread = MODES_ASYNC_BUF_SIZE;
-        p = (unsigned char *) Modes.data;
+        p = (unsigned char *) Modes.pFileData;
         while(toread) {
             nread = read(Modes.fd, p, toread);
             if (nread <= 0) {
@@ -301,7 +323,17 @@ void readDataFromFile(void) {
             // Not enough data on file to fill the buffer? Pad with no signal.
             memset(p,127,toread);
         }
-        Modes.data_ready = 1;
+
+        Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
+
+        // Get the system time for this block
+        ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]);
+
+        // Queue the new data
+        Modes.pData[Modes.iDataIn] = Modes.pFileData;
+        Modes.iDataIn    = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
+        Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);   
+
         // Signal to the other thread that new data is ready
         pthread_cond_signal(&Modes.data_cond);
     }
@@ -323,7 +355,6 @@ void *readerThreadEntryPoint(void *arg) {
         readDataFromFile();
     }
     // Signal to the other thread that new data is ready - dummy really so threads don't mutually lock
-    Modes.data_ready = 1;
     pthread_cond_signal(&Modes.data_cond);
     pthread_mutex_unlock(&Modes.data_mutex);
 #ifndef _WIN32
@@ -561,10 +592,6 @@ int main(int argc, char **argv) {
     // Initialization
     modesInit();
 
-    //if (Modes.debug & MODES_DEBUG_BADCRC) {
-	//    testAndTimeBitCorrection();
-    //}
-
     if (Modes.net_only) {
         fprintf(stderr,"Net-only mode, no RTL device or file open.\n");
     } else if (Modes.filename == NULL) {
@@ -589,31 +616,55 @@ int main(int argc, char **argv) {
 
     // Create the thread that will read the data from the device.
     pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL);
-
     pthread_mutex_lock(&Modes.data_mutex);
-    while(1) {
-        if (!Modes.data_ready) {
-            pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex);
-            continue;
+
+    while (Modes.exit == 0) {
+
+        if (Modes.iDataReady == 0) {
+            pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex); // This unlocks Modes.data_mutex, and waits for Modes.data_cond 
+            continue;                                              // Once (Modes.data_cond) occurs, it locks Modes.data_mutex
         }
-        computeMagnitudeVector();
-        Modes.stSystemTimeBlk = Modes.stSystemTimeRTL;
 
-        // Signal to the other thread that we processed the available data
-        // and we want more (useful for --ifile)
-        Modes.data_ready = 0;
-        pthread_cond_signal(&Modes.data_cond);
+        // Modes.data_mutex is Locked, and (Modes.iDataReady != 0)
+        if (Modes.iDataReady) { // Check we have new data, just in case!!
+ 
+            Modes.iDataOut &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase
+
+            // Translate the next lot of I/Q samples into Modes.magnitude
+            computeMagnitudeVector(Modes.pData[Modes.iDataOut]);
+
+            Modes.stSystemTimeBlk = Modes.stSystemTimeRTL[Modes.iDataOut];
+
+            // Update the input buffer pointer queue
+            Modes.iDataOut   = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut + 1); 
+            Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);   
+
+            // If we lost some blocks, correct the timestamp
+            if (Modes.iDataLost) {
+                Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 6 * Modes.iDataLost);
+                uRtlLost+= Modes.iDataLost;
+                Modes.iDataLost = 0;
+            }
+
+            // It's safe to release the lock now
+            pthread_cond_signal (&Modes.data_cond);
+            pthread_mutex_unlock(&Modes.data_mutex);
+
+            // Process data after releasing the lock, so that the capturing
+            // thread can read data while we perform computationally expensive
+            // stuff at the same time.
+            detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES);
+
+            // Update the timestamp ready for the next block
+            Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6);
+
+        } else {
+            pthread_cond_signal (&Modes.data_cond);
+            pthread_mutex_unlock(&Modes.data_mutex);
+        }
 
-        // Process data after releasing the lock, so that the capturing
-        // thread can read data while we perform computationally expensive
-        // stuff * at the same time. (This should only be useful with very
-        // slow processors).
-        pthread_mutex_unlock(&Modes.data_mutex);
-        detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES);
-        Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6);
         backgroundTasks();
         pthread_mutex_lock(&Modes.data_mutex);
-        if (Modes.exit) break;
     }
 
     // If --stats were given, print statistics
@@ -658,7 +709,11 @@ int main(int argc, char **argv) {
     pthread_cond_destroy(&Modes.data_cond);     // Thread cleanup
     pthread_mutex_destroy(&Modes.data_mutex);
     pthread_join(Modes.reader_thread,NULL);     // Wait on reader thread exit
+#ifndef _WIN32
     pthread_exit(0);
+#else
+    return (0);
+#endif
 }
 //
 //=========================================================================
diff --git a/dump1090.h b/dump1090.h
index 24d971b..3c6d41c 100644
--- a/dump1090.h
+++ b/dump1090.h
@@ -81,7 +81,7 @@
 #define MODES_DEFAULT_FREQ         1090000000
 #define MODES_DEFAULT_WIDTH        1000
 #define MODES_DEFAULT_HEIGHT       700
-#define MODES_ASYNC_BUF_NUMBER     12
+#define MODES_ASYNC_BUF_NUMBER     16
 #define MODES_ASYNC_BUF_SIZE       (16*16384)                 // 256k
 #define MODES_ASYNC_BUF_SAMPLES    (MODES_ASYNC_BUF_SIZE / 2) // Each sample is 2 bytes
 #define MODES_AUTO_GAIN            -100                       // Use automatic gain
@@ -225,15 +225,21 @@ struct aircraft {
 // Program global state
 struct {                             // Internal state
     pthread_t       reader_thread;
+
     pthread_mutex_t data_mutex;      // Mutex to synchronize buffer access
     pthread_cond_t  data_cond;       // Conditional variable associated
-    uint16_t       *data;            // Raw IQ samples buffer
+    uint16_t       *pData          [MODES_ASYNC_BUF_NUMBER]; // Raw IQ sample buffers from RTL
+    struct timeb    stSystemTimeRTL[MODES_ASYNC_BUF_NUMBER]; // System time when RTL passed us this block
+    int             iDataIn;    // Fifo input pointer
+    int             iDataOut;   // Fifo output pointer
+    int             iDataReady; // Fifo content count 
+    int             iDataLost;  // Count of missed buffers
+
+    uint16_t       *pFileData;       // Raw IQ samples buffer (from a File)
     uint16_t       *magnitude;       // Magnitude vector
-    struct timeb    stSystemTimeRTL; // System time when RTL passed us the Latest block
     uint64_t        timestampBlk;    // Timestamp of the start of the current block
     struct timeb    stSystemTimeBlk; // System time when RTL passed us currently processing this block
     int             fd;              // --ifile option file descriptor
-    int             data_ready;      // Data ready to be processed
     uint32_t       *icao_cache;      // Recently seen ICAO addresses cache
     uint16_t       *maglut;          // I/Q -> Magnitude lookup table
     int             exit;            // Exit from the main loop when true
@@ -403,7 +409,7 @@ void detectModeS        (uint16_t *m, uint32_t mlen);
 void decodeModesMessage (struct modesMessage *mm, unsigned char *msg);
 void displayModesMessage(struct modesMessage *mm);
 void useModesMessage    (struct modesMessage *mm);
-void computeMagnitudeVector();
+void computeMagnitudeVector(uint16_t *pData);
 void decodeCPR          (struct aircraft *a, int fflag, int surface);
 int  decodeCPRrelative  (struct aircraft *a, int fflag, int surface);
 void modesInitErrorInfo ();
diff --git a/mode_s.c b/mode_s.c
index 8cd9987..84bef2d 100644
--- a/mode_s.c
+++ b/mode_s.c
@@ -1372,9 +1372,8 @@ void displayModesMessage(struct modesMessage *mm) {
 // Turn I/Q samples pointed by Modes.data into the magnitude vector
 // pointed by Modes.magnitude.
 //
-void computeMagnitudeVector(void) {
+void computeMagnitudeVector(uint16_t *p) {
     uint16_t *m = &Modes.magnitude[MODES_PREAMBLE_SAMPLES+MODES_LONG_MSG_SAMPLES];
-    uint16_t *p = Modes.data;
     uint32_t j;
 
     memcpy(Modes.magnitude,&Modes.magnitude[MODES_ASYNC_BUF_SAMPLES], MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE);
diff --git a/net_io.c b/net_io.c
index 2ff6c94..5d6b834 100644
--- a/net_io.c
+++ b/net_io.c
@@ -96,7 +96,7 @@ void modesAcceptClients(void) {
     services[4] = Modes.https;
     services[5] = Modes.sbsos;
 
-    for (j = 0; j < sizeof(services)/sizeof(int); j++) {
+    for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
         fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port);
         if (fd == -1) continue;
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-hamradio/dump1090.git



More information about the pkg-hamradio-commits mailing list