[libmath-prime-util-perl] 18/20: In-place phi(x,a) merge

Partha P. Mukherjee ppm-guest at moszumanska.debian.org
Thu May 21 18:47:32 UTC 2015


This is an automated email from the git hooks/post-receive script.

ppm-guest pushed a commit to annotated tag v0.23
in repository libmath-prime-util-perl.

commit 0901cc7d0f2c9aca31ebd7f6c34c30ef017b524d
Author: Dana Jacobsen <dana at acm.org>
Date:   Mon Mar 4 18:18:37 2013 -0800

    In-place phi(x,a) merge
---
 Changes  |   7 +++-
 TODO     |   4 +-
 lehmer.c | 125 ++++++++++++++++++++++++++++++++++++++-------------------------
 sieve.c  |  53 ++++++++++++++-------------
 4 files changed, 112 insertions(+), 77 deletions(-)

diff --git a/Changes b/Changes
index a0b69db..c067d11 100644
--- a/Changes
+++ b/Changes
@@ -23,13 +23,18 @@ Revision history for Perl extension Math::Prime::Util.
 
     - Start on Lagarias-Miller-Odlyzko prime count.
 
+    - A new data structure for the phi(x,a) function used by all the fast
+      prime count routines.  Quite a bit faster and most importantly, uses
+      half the memory of the old structure.
+
     - Performance:
        - Divisor sum with no sub is ~10x faster.
        - Speed up PP version of exp_mangoldt, create XS version.
        - Zeta much faster as mentioned above.
        - faster nth_prime as mentioned above.
        - AKS about 10% faster.
-       - New data structure for phi(x,a) means even faster prime_count.
+       - Unroll a little more in sieve inner loop.  A couple percent faster.
+       - Faster prime_count and nth_prime due to new phi(x,a) (about 1.25x).
 
 0.22 26 February 2013
 
diff --git a/TODO b/TODO
index 65c9f1a..9a6844d 100644
--- a/TODO
+++ b/TODO
@@ -48,6 +48,6 @@
   that (1) handles any low / high, and (2) segments.  Take segment_primes
   from XS.xs to do this.
 
-- Do in-place merge in new phi(x,a).
-
 - Implement S2 calculation for LMO prime count.
+
+- Balance memory used in phi with memory for binary search pc in lehmer.
diff --git a/lehmer.c b/lehmer.c
index 65a7bd2..cad15ea 100644
--- a/lehmer.c
+++ b/lehmer.c
@@ -31,14 +31,14 @@
  * with large values of x.
  *
  * Using my sieve code with everything running in serial, calculating pi(10^12)
- * is done undef 1 second on my computer.  pi(10^14) takes under 30 seconds,
+ * is done under 1 second on my computer.  pi(10^14) takes under 30 seconds,
  * pi(10^16) in under 20 minutes.  Compared with Thomas R. Nicely's pix4
- * program, his one is 4-6x faster and uses 2-3x less memory.  When compiled
+ * program, his one is 4-6x faster and uses 2-4x less memory.  When compiled
  * with parallel primesieve it is another 2x or more faster:
  *   pix4(10^16) takes 124 minutes, this code + primesieve takes 4 minutes.
  *
- * Timings with Perl + MPU.  Using the standalone program with primesieve
- * speeds up stage 4 a lot for large values.
+ * Timings with Perl + MPU with all-serial computation.  Using the standalone
+ * program with parallel primesieve speeds up stage 4 a lot for large values.
  *
  *    n     phi(x,a) mem/time  |  stage 4 mem/time  | total time
  *   10^17   3000MB    275.29  | 2300MB   9911.9    | 179m 37.5s
@@ -299,7 +299,14 @@ static void vcarray_destroy(vcarray_t* l)
   l->size = 0;
   l->n = 0;
 }
-static void vcarray_insert(vcarray_t* l, UV val, IV count)
+/* Insert a value/count pair.  We do this indirection because about 80% of
+ * the calls result in a merge with the previous entry. */
+#define vcarray_insert(arr, val, count) \
+  if (arr.n > 0 && arr.a[arr.n-1].v == val) \
+    arr.a[arr.n-1].c += count; \
+  else \
+    vcarray_insert_func(&arr, val, count);
+static void vcarray_insert_func(vcarray_t* l, UV val, IV count)
 {
   UV n = l->n;
   if (n > 0 && l->a[n-1].v <= val) {
@@ -328,51 +335,80 @@ static void vcarray_insert(vcarray_t* l, UV val, IV count)
   l->n++;
 }
 
-static vcarray_t vcarray_merge(vcarray_t* a, vcarray_t* b)
+/* Merge the two sorted lists A and B into A.  Each list has no duplicates,
+ * but they may have duplications between the two.  We're quite interested
+ * in saving memory, so first remove all the duplicates, then do an in-place
+ * merge. */
+static void vcarray_merge(vcarray_t* a, vcarray_t* b)
 {
-  UV ai, bi, an, bn, k, mn;
-  vcarray_t m = vcarray_create();
-
-  /* printf("going to merge %lu and %lu elements\n", a->n, b->n); */
-  an = a->n;  bn = b->n;  mn = an+bn;
-  New(0, m.a, mn, vc_t);
-  m.size = mn;
-  /* merge */
-  ai = 0;  bi = 0;
+  long ai, bi, bj, k, kn;
+  UV an = a->n;
+  UV bn = b->n;
   vc_t* aa = a->a;
   vc_t* ba = b->a;
-  for (k = 0; k < mn; k++) {
+
+  /* Merge anything in B that appears in A. */
+  for (ai = 0, bi = 0, bj = 0; bi < bn; bi++) {
+    /* Skip forward in A until empty or aa[ai].v <= ba[bi].v */
+    UV bval = ba[bi].v;
+    while (ai < an && aa[ai].v > bval)
+      ai++;
+    /* if A empty then copy the remaining elements */
     if (ai >= an) {
-      if (bi >= bn) croak("ran out of data during merge");
-      while (k < mn)
-        m.a[k++] = ba[bi++];
+      if (bi == bj)
+        bj = bn;
+      else
+        while (bi < bn)
+          ba[bj++] = ba[bi++];
       break;
-    } else if (bi >= bn) {
-      while (k < mn)
-        m.a[k++] = aa[ai++];
+    }
+    if (aa[ai].v == bval)
+      aa[ai].c += ba[bi].c;
+    else
+      ba[bj++] = ba[bi];
+  }
+  if (verbose>2) printf("  removed %lu duplicates from b\n", bn - bj);
+  bn = bj;
+
+  if (bn == 0) {  /* In case they were all duplicates */
+    b->n = 0;
+    return;
+  }
+
+  /* kn = the final merged size.  All duplicates are gone, so this is exact. */
+  kn = an+bn;
+  if (a->size < kn) {  /* Make A big enough to hold kn elements */
+    UV new_size = (UV) (1.2 * kn);
+    if (verbose>2) printf("REALLOCing list %p, new size %lu\n", a->a, new_size);
+    Renew( a->a, new_size, vc_t );
+    aa = a->a;  /* this could have been changed by the realloc */
+    a->size = new_size;
+  }
+
+  /* merge A and B.  Very simple using reverse merge. */
+  ai = an-1;
+  bi = bn-1;
+  for (k = kn-1; k >= 0; k--) {
+    if (ai < 0) { /* A is exhausted, just filling in B */
+      if (bi < 0) croak("ran out of data during merge");
+      aa[k] = ba[bi--];
+    } else if (bi < 0) { /* We've caught up with A */
       break;
-    } else if (aa[ai].v > ba[bi].v) {
-      m.a[k] = aa[ai++];
-    } else if (ba[bi].v > aa[ai].v) {
-      m.a[k] = ba[bi++];
+    } else if (aa[ai].v < ba[bi].v) {
+      aa[k] = aa[ai--];
     } else {
-      m.a[k] = aa[ai];
-      m.a[k].c += ba[bi].c;
-      ai++;  bi++;  mn--;
+      if (aa[ai].v == ba[bi].v) croak("deduplication error");
+      aa[k] = ba[bi--];
     }
   }
-  m.n = k;
-  /* printf("done with merge, have %lu elements\n", m.n); */
-  return m;
+  a->n = kn;    /* A now has this many items */
+  b->n = 0;     /* B is marked empty */
 }
 
 
 /*
  * The main phi(x,a) algorithm.  In this implementation, it takes under 10%
- * of the total time for the Lehmer algorithm, but it is the memory
- * constraining part.
- *
- * TODO: Expand a1, then merge a2 directly into it.  Should save memory.
+ * of the total time for the Lehmer algorithm, but is a big memory consumer.
  */
 
 static UV phi(UV x, UV a)
@@ -392,7 +428,7 @@ static UV phi(UV x, UV a)
 
   vcarray_t a1 = vcarray_create();
   vcarray_t a2 = vcarray_create();
-  vcarray_insert(&a1, x, 1);
+  vcarray_insert(a1, x, 1);
 
   while (a > 7) {
     UV primea = primes[a];
@@ -403,22 +439,13 @@ static UV phi(UV x, UV a)
         continue;
       sval = val / primea;
       if (sval >= primea) {
-        vcarray_insert(&a2, sval, -count);
+        vcarray_insert(a2, sval, -count);
       } else {
         sum -= count;
       }
     }
-    { /* Merge a1 and a2 into a3 */
-      vcarray_t m = vcarray_merge(&a1, &a2);
-      /* printf("a1 size %lu  a2 size %lu  m size %lu\n",a1.n,a2.n,m.n); */
-      /* Empty a2 */
-      a2.n = 0;
-      /* erase a1 and replace with m */
-      vcarray_destroy(&a1);
-      a1.a = m.a;
-      a1.n = m.n;
-      a1.size = m.size;
-    }
+    /* Merge a1 and a2 into a1.  a2 will be emptied. */
+    vcarray_merge(&a1, &a2);
     a--;
   }
   vcarray_destroy(&a2);
diff --git a/sieve.c b/sieve.c
index 16d22fc..618fd21 100644
--- a/sieve.c
+++ b/sieve.c
@@ -94,6 +94,30 @@ static const unsigned char presieve13[PRESIEVE_SIZE] =
   0x18,0x89,0x08,0x25,0x44,0x22,0x30,0x14,0xc3,0x88,0x86,0x40,0x1a,
   0x28,0x30,0x85,0x09,0x54,0x60,0x43,0x24,0x92,0x81,0x08,0x04,0x70};
 
+#define FIND_COMPOSITE_POS(i,j) \
+  { \
+    UV dlast = d; \
+    do { \
+      d += dinc; \
+      m += minc; \
+      if (m >= 30) { d++; m -= 30; } \
+    } while ( masktab30[m] == 0 ); \
+    wdinc[i] = d - dlast; \
+    wmask[j] = masktab30[m]; \
+  }
+#define FIND_COMPOSITE_POSITIONS(p) \
+  do { \
+    FIND_COMPOSITE_POS(0,1) \
+    FIND_COMPOSITE_POS(1,2) \
+    FIND_COMPOSITE_POS(2,3) \
+    FIND_COMPOSITE_POS(3,4) \
+    FIND_COMPOSITE_POS(4,5) \
+    FIND_COMPOSITE_POS(5,6) \
+    FIND_COMPOSITE_POS(6,7) \
+    FIND_COMPOSITE_POS(7,0) \
+    d -= p; \
+  } while (0)
+
 static void sieve_prefill(unsigned char* mem, UV startd, UV endd)
 {
   UV nbytes = endd - startd + 1;
@@ -145,20 +169,9 @@ unsigned char* sieve_erat30(UV end)
     UV minc = (2*prime) - dinc*30;
     UV wdinc[8];
     unsigned char wmask[8];
-    int i;
 
     /* Find the positions of the next composites we will mark */
-    for (i = 1; i <= 8; i++) {
-      UV dlast = d;
-      do {
-        d += dinc;
-        m += minc;
-        if (m >= 30) { d++; m -= 30; }
-      } while ( masktab30[m] == 0 );
-      wdinc[i-1] = d - dlast;
-      wmask[i%8] = masktab30[m];
-    }
-    d -= prime;
+    FIND_COMPOSITE_POSITIONS(prime);
 #if 0
     assert(d == ((prime*prime)/30));
     assert(d < max_buf);
@@ -199,7 +212,8 @@ int sieve_segment(unsigned char* mem, UV startd, UV endd)
   /* Fill buffer with marked 7, 11, and 13 */
   sieve_prefill(mem, startd, endd);
 
-  limit = isqrt(endp) + 1;
+  limit = isqrt(endp);
+  if (limit*limit < endp) limit++;  /* ceil(sqrt(endp)) */
   /* printf("segment sieve from %"UVuf" to %"UVuf" (aux sieve to %"UVuf")\n", startp, endp, limit); */
   pcsize = get_prime_cache(limit, &sieve);
   if (pcsize < limit) {
@@ -236,20 +250,9 @@ int sieve_segment(unsigned char* mem, UV startd, UV endd)
       UV wdinc[8];
       unsigned char wmask[8];
       UV offset_endd = endd - startd;
-      int i;
 
       /* Find the positions of the next composites we will mark */
-      for (i = 1; i <= 8; i++) {
-        UV dlast = d;
-        do {
-          d += dinc;
-          m += minc;
-          if (m >= 30) { d++; m -= 30; }
-        } while ( masktab30[m] == 0 );
-        wdinc[i-1] = d - dlast;
-        wmask[i%8] = masktab30[m];
-      }
-      d -= p;
+      FIND_COMPOSITE_POSITIONS(p);
       d -= startd;
 #if 0
       i = 0;        /* Mark the composites */

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmath-prime-util-perl.git



More information about the Pkg-perl-cvs-commits mailing list