[libmath-prime-util-perl] 09/16: improve threading
Partha P. Mukherjee
ppm-guest at moszumanska.debian.org
Thu May 21 18:44:35 UTC 2015
This is an automated email from the git hooks/post-receive script.
ppm-guest pushed a commit to annotated tag v0.08
in repository libmath-prime-util-perl.
commit 10fba385da314503526d9d361a63bb72276d489a
Author: Dana Jacobsen <dana at acm.org>
Date: Wed Jun 20 13:22:53 2012 -0600
improve threading
---
Changes | 2 +-
MANIFEST | 1 +
cache.c | 146 ++++++++++++++++++++++++++++++++++++-------------
lib/Math/Prime/Util.pm | 7 ++-
sieve.c | 2 +-
t/31-threading.t | 38 +++++++++++++
util.c | 7 ++-
7 files changed, 157 insertions(+), 46 deletions(-)
diff --git a/Changes b/Changes
index 4687a08..051b029 100644
--- a/Changes
+++ b/Changes
@@ -1,7 +1,7 @@
Revision history for Perl extension Math::Prime::Util.
0.08 20 June 2012
- - Added thread safety, though with poor concurrency.
+ - Added thread safety and tested good concurrency.
- Accuracy improvement and measurements for math functions.
- Remove simple sieve -- it wasn't being used, and was just around for
performance comparisons.
diff --git a/MANIFEST b/MANIFEST
index f0cc009..89667bb 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -45,6 +45,7 @@ t/16-randomprime.t
t/17-pseudoprime.t
t/18-functions.t
t/30-relations.t
+t/31-threading.t
t/50-factoring.t
t/90-release-perlcritic.t
t/91-release-pod-syntax.t
diff --git a/cache.c b/cache.c
index cebfde4..bacc5c8 100644
--- a/cache.c
+++ b/cache.c
@@ -15,34 +15,33 @@
* They handle a cached primary set of primes, as well as a segment
* area for use by all the functions that want to do segmented operation.
*
- * Since we're trying to be thread-safe (and ideally allow a good deal
- * of concurrency), it is imperative these be used correctly. You need
- * to call the get method, do your stuff, then call release. Do *not* return
- * out of your function or croak without calling release.
+ * We must be thread-safe, and we want to allow a good deal of concurrency.
+ * It is imperative these be used correctly. After calling the get method,
+ * use the sieve or segment, then release. You MUST call release before you
+ * return or croak. You ought to release as soon as you're done using the
+ * sieve or segment.
*/
static int mutex_init = 0;
#ifdef USE_ITHREADS
static perl_mutex segment_mutex;
-static perl_mutex primary_mutex;
+
+/* See: http://en.wikipedia.org/wiki/Readers-writers_problem */
+static perl_mutex primary_mutex_no_waiting;
+static perl_mutex primary_mutex_no_accessing;
+static perl_mutex primary_mutex_counter;
+static int primary_number_of_readers = 0;
#endif
static unsigned char* prime_cache_sieve = 0;
static UV prime_cache_size = 0;
-/*
- * Get the size and a pointer to the cached prime sieve.
- * Returns the maximum sieved value in the sieve.
- * Allocates and sieves if needed.
- *
- * The sieve holds 30 numbers per byte, using a mod-30 wheel.
- */
-UV get_prime_cache(UV n, const unsigned char** sieve)
-{
- MUTEX_LOCK(&primary_mutex);
-
+/* Fill the primary cache up to n */
+static void _fill_prime_cache(UV n, int nowait) {
+ if (!nowait) { MUTEX_LOCK(&primary_mutex_no_waiting); }
+ MUTEX_LOCK(&primary_mutex_no_accessing);
+ if (!nowait) { MUTEX_UNLOCK(&primary_mutex_no_waiting); }
if (prime_cache_size < n) {
-
if (prime_cache_sieve != 0)
Safefree(prime_cache_sieve);
prime_cache_sieve = 0;
@@ -59,41 +58,102 @@ UV get_prime_cache(UV n, const unsigned char** sieve)
if (prime_cache_sieve != 0)
prime_cache_size = n;
+ /* printf("size to %llu\n", prime_cache_size); fflush(stdout); */
}
+ MUTEX_UNLOCK(&primary_mutex_no_accessing);
+}
+
+/*
+ * Get the size and a pointer to the cached prime sieve.
+ * Returns the maximum sieved value in the sieve.
+ * Allocates and sieves if needed.
+ *
+ * The sieve holds 30 numbers per byte, using a mod-30 wheel.
+ */
+UV get_prime_cache(UV n, const unsigned char** sieve)
+{
+ int prev_readers;
if (sieve == 0) {
- MUTEX_UNLOCK(&primary_mutex);
- } else {
- *sieve = prime_cache_sieve;
+ if (prime_cache_size < n) {
+ _fill_prime_cache(n, 0);
+ }
+ return prime_cache_size;
}
+ if (prime_cache_size < n)
+ _fill_prime_cache(n, 0);
+
+ /* TODO: We've got a problem. If another thread does a memfree right here,
+ * then we'll return a size less than n. Everything technically works, but
+ * there will be sieve croaks because they can't get enough primes.
+ */
+
+ MUTEX_LOCK(&primary_mutex_no_waiting);
+ MUTEX_LOCK(&primary_mutex_counter);
+ prev_readers = primary_number_of_readers;
+ primary_number_of_readers++;
+ MUTEX_UNLOCK(&primary_mutex_counter);
+ if (prev_readers == 0) { MUTEX_LOCK(&primary_mutex_no_accessing); }
+ MUTEX_UNLOCK(&primary_mutex_no_waiting);
+
+ *sieve = prime_cache_sieve;
return prime_cache_size;
}
void release_prime_cache(const unsigned char* mem) {
- /* Thanks for letting us know you're done. */
- MUTEX_UNLOCK(&primary_mutex);
+ int current_readers;
+ MUTEX_LOCK(&primary_mutex_counter);
+ primary_number_of_readers--;
+ current_readers = primary_number_of_readers;
+ MUTEX_UNLOCK(&primary_mutex_counter);
+ if (current_readers == 0) { MUTEX_UNLOCK(&primary_mutex_no_accessing); }
}
-#define SEGMENT_CHUNK_SIZE UVCONST(262144)
+#define SEGMENT_CHUNK_SIZE UVCONST(256*1024*1024-8)
static unsigned char* prime_segment = 0;
+static int prime_segment_is_available = 1;
unsigned char* get_prime_segment(UV *size) {
+ unsigned char* mem;
+ int use_prime_segment;
+
MPUassert(size != 0, "get_prime_segment given null size pointer");
MPUassert(mutex_init == 1, "segment mutex has not been initialized");
+
MUTEX_LOCK(&segment_mutex);
- if (prime_segment == 0)
- New(0, prime_segment, SEGMENT_CHUNK_SIZE, unsigned char);
- if (prime_segment == 0) {
- MUTEX_UNLOCK(&segment_mutex);
- croak("Could not allocate %"UVuf" bytes for segment sieve", SEGMENT_CHUNK_SIZE);
+ if (prime_segment_is_available) {
+ prime_segment_is_available = 0;
+ use_prime_segment = 1;
+ } else {
+ use_prime_segment = 0;
+ }
+ MUTEX_UNLOCK(&segment_mutex);
+
+ if (use_prime_segment) {
+ if (prime_segment == 0)
+ New(0, prime_segment, SEGMENT_CHUNK_SIZE, unsigned char);
+ *size = SEGMENT_CHUNK_SIZE;
+ mem = prime_segment;
+ } else {
+ UV chunk_size = 64*1024*1024-8;
+ New(0, mem, chunk_size, unsigned char);
+ *size = chunk_size;
}
- *size = SEGMENT_CHUNK_SIZE;
- return prime_segment;
+
+ if (mem == 0)
+ croak("Could not allocate %"UVuf" bytes for segment sieve", *size);
+
+ return mem;
}
void release_prime_segment(unsigned char* mem) {
- /* Thanks for letting us know you're done. */
+ MUTEX_LOCK(&segment_mutex);
+ if (mem == prime_segment) {
+ prime_segment_is_available = 1;
+ } else {
+ Safefree(mem);
+ }
MUTEX_UNLOCK(&segment_mutex);
}
@@ -103,7 +163,9 @@ void prime_precalc(UV n)
{
if (!mutex_init) {
MUTEX_INIT(&segment_mutex);
- MUTEX_INIT(&primary_mutex);
+ MUTEX_INIT(&primary_mutex_no_waiting);
+ MUTEX_INIT(&primary_mutex_no_accessing);
+ MUTEX_INIT(&primary_mutex_counter);
mutex_init = 1;
}
@@ -120,20 +182,24 @@ void prime_memfree(void)
{
MPUassert(mutex_init == 1, "segment mutex has not been initialized");
- if (prime_cache_sieve != 0) {
- MUTEX_LOCK(&primary_mutex);
+ MUTEX_LOCK(&primary_mutex_no_waiting);
+ MUTEX_LOCK(&primary_mutex_no_accessing);
+ MUTEX_UNLOCK(&primary_mutex_no_waiting);
+ if ( (prime_cache_sieve != 0) ) {
+ /* printf("size to 0 nreaders: %d\n", primary_number_of_readers); fflush(stdout); */
Safefree(prime_cache_sieve);
prime_cache_sieve = 0;
prime_cache_size = 0;
- MUTEX_UNLOCK(&primary_mutex);
}
+ MUTEX_UNLOCK(&primary_mutex_no_accessing);
- if (prime_segment != 0) {
- MUTEX_LOCK(&segment_mutex);
+ MUTEX_LOCK(&segment_mutex);
+ /* Don't free if another thread is using it */
+ if ( (prime_segment != 0) && (prime_segment_is_available) ) {
Safefree(prime_segment);
prime_segment = 0;
- MUTEX_UNLOCK(&segment_mutex);
}
+ MUTEX_UNLOCK(&segment_mutex);
prime_precalc(0);
}
@@ -144,7 +210,9 @@ void _prime_memfreeall(void)
/* No locks. We're shutting everything down. */
if (mutex_init) {
MUTEX_DESTROY(&segment_mutex);
- MUTEX_DESTROY(&primary_mutex);
+ MUTEX_DESTROY(&primary_mutex_no_waiting);
+ MUTEX_DESTROY(&primary_mutex_no_accessing);
+ MUTEX_DESTROY(&primary_mutex_counter);
mutex_init = 0;
}
if (prime_cache_sieve != 0)
diff --git a/lib/Math/Prime/Util.pm b/lib/Math/Prime/Util.pm
index 378fdf7..3d0bab0 100644
--- a/lib/Math/Prime/Util.pm
+++ b/lib/Math/Prime/Util.pm
@@ -326,6 +326,8 @@ the fastest on CPAN, including L<Math::Prime::XS>, L<Math::Prime::FastSieve>,
and L<Math::Factor::XS>. L<Math::Pari> is slower in some things, faster in
others.
+The module is thread-safe, though threads should not call the memory free
+routines.
=head1 FUNCTIONS
@@ -749,8 +751,9 @@ Perl versions earlier than 5.8.0 have issues with 64-bit. The test suite will
try to determine if your Perl is broken. This will show up in factoring tests.
Perl 5.6.2 32-bit works fine, as do later versions with 32-bit and 64-bit.
-The module is thread-safe, but will not currently allow much concurrency. This
-is being worked on.
+The module is thread-safe and should allow good concurrency. There are still
+some issues if threads call prime_memfree while other threads are sieving
+that are being worked on.
=head1 PERFORMANCE
diff --git a/sieve.c b/sieve.c
index 4372b9d..47a8652 100644
--- a/sieve.c
+++ b/sieve.c
@@ -193,7 +193,7 @@ int sieve_segment(unsigned char* mem, UV startd, UV endd)
/* printf("segment sieve from %"UVuf" to %"UVuf" (aux sieve to %"UVuf")\n", startp, endp, limit); */
pcsize = get_prime_cache(limit, &sieve);
if (pcsize < limit) {
- croak("Couldn't generate small sieve for segment sieve");
+ release_prime_cache(sieve);
return 0;
}
diff --git a/t/31-threading.t b/t/31-threading.t
new file mode 100644
index 0000000..e73dcd2
--- /dev/null
+++ b/t/31-threading.t
@@ -0,0 +1,38 @@
+#!/usr/bin/env perl
+use strict;
+use warnings;
+
+use Config;
+BEGIN {
+ if (! $Config{useithreads} || $] < 5.008) {
+ print("1..0 # Skip Threads not supported\n");
+ exit(0);
+ }
+ # Should be be looking for newer than 5.008?
+ if (! eval { require threads }) {
+ print "1..0 # Skip threads.pm not installed\n";
+ exit 0;
+ }
+}
+
+use Test::More 'tests' => 1;
+use Math::Prime::Util ":all";
+my $nthreads = 64;
+
+srand(50);
+my @digits;
+push @digits, random_ndigit_prime(6) for (0..9);
+
+my $tsub = sub { my $sum = 0; $sum += prime_count($_) for (@digits); return $sum;};
+my @threads;
+# Fire off all our threads
+push @threads, threads->create($tsub) for (1..$nthreads);
+# Retrieve results
+my $par_sum = 0;
+$par_sum += $_->join() for (@threads);
+
+# Now try it on main
+my $seq_sum = 0;
+$seq_sum += $tsub->() for (1..$nthreads);
+
+is($par_sum, $seq_sum, "$nthreads threads summed prime count");
diff --git a/util.c b/util.c
index 9556180..a318f53 100644
--- a/util.c
+++ b/util.c
@@ -295,7 +295,7 @@ static UV count_segment_ranged(const unsigned char* sieve, UV nbytes, UV lowp, U
UV hi_d = highp/30;
UV hi_m = highp - hi_d*30;
- MPUassert( sieve != 0, "count_segment_maxcount incorrect args");
+ MPUassert( sieve != 0, "count_segment_ranged incorrect args");
if (hi_d >= nbytes) {
hi_d = nbytes-1;
@@ -551,7 +551,7 @@ UV prime_count(UV low, UV high)
segment_size = get_prime_cache( sqrt(endp) + 1 , &cache_sieve) / 30;
}
- if (low_d <= segment_size) {
+ if ( (segment_size > 0) && (low_d <= segment_size) ) {
/* Count all the primes in the primary cache in our range */
count += count_segment_ranged(cache_sieve, segment_size, low, high);
@@ -763,7 +763,8 @@ UV nth_prime(UV n)
segment_size = get_prime_cache(sqrt(upper_limit), &cache_sieve) / 30;
/* Count up everything in the cached sieve. */
- count += count_segment_maxcount(cache_sieve, segment_size, target, &p);
+ if (segment_size > 0)
+ count += count_segment_maxcount(cache_sieve, segment_size, target, &p);
release_prime_cache(cache_sieve);
if (count == target)
return p;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmath-prime-util-perl.git
More information about the Pkg-perl-cvs-commits
mailing list