[Pkg-ocaml-maint-commits] [nproc] 01/03: Import commits from upstream master branch
Stéphane Glondu
glondu at moszumanska.debian.org
Thu Feb 19 08:11:59 UTC 2015
This is an automated email from the git hooks/post-receive script.
glondu pushed a commit to branch master
in repository nproc.
commit a0c274c275e009a58f23b79367e66c2fdd377b48
Author: Stephane Glondu <steph at glondu.net>
Date: Wed Feb 18 14:07:01 2015 +0100
Import commits from upstream master branch
---
...0002-Retry-waitpid-if-it-fails-with-EINTR.patch | 33 ++++++++
...x-for-concurrent-access-to-the-input-stre.patch | 57 ++++++++++++++
...re-efficient-fix-for-concurrent-access-to.patch | 89 ++++++++++++++++++++++
debian/patches/series | 3 +
4 files changed, 182 insertions(+)
diff --git a/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch b/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch
new file mode 100644
index 0000000..1596537
--- /dev/null
+++ b/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch
@@ -0,0 +1,33 @@
+From: Martin Jambon <martin at mjambon.com>
+Date: Fri, 30 Dec 2011 23:10:27 -0800
+Subject: Retry waitpid if it fails with EINTR
+
+---
+ nproc.ml | 6 +++++-
+ 1 file changed, 5 insertions(+), 1 deletion(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index b68ca1e..1b4a48f 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -149,6 +149,10 @@ struct
+ closed : bool ref;
+ }
+
++ let rec waitpid pid =
++ try Unix.waitpid [] pid
++ with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
++
+ (* --master-- *)
+ let pull_task kill_workers in_stream central_service worker =
+ (* Note: input and output file descriptors are automatically closed
+@@ -266,7 +270,7 @@ struct
+ (try close_worker x with _ -> ());
+ (try
+ Unix.kill x.worker_pid Sys.sigkill;
+- ignore (Unix.waitpid [] x.worker_pid)
++ ignore (waitpid x.worker_pid)
+ with e ->
+ !log_error
+ (sprintf "kill worker %i: %s"
+--
diff --git a/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch b/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
new file mode 100644
index 0000000..bd9d302
--- /dev/null
+++ b/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
@@ -0,0 +1,57 @@
+From: pveber <philippe.veber at gmail.com>
+Date: Mon, 1 Apr 2013 11:43:07 +0200
+Subject: possible fix for concurrent access to the input stream
+
+in the master process, there are threads 1-to-1 associated with
+workers. They concurrently try to pull a task from a stream
+[in_stream] and send it to a worker, providing a waiter thread for
+delivering the output. However it seems that several worker-associated
+threads can read the same incoming value in the stream, perform the
+computation concurrently and try to send it back to the waiter. Since
+the waiter is woken up several times, this generates the exceptions
+[Invalid_argument("Lwt.wakeup_result")].
+
+The final result is correct, but ressources are wasted, since some
+computation may be several times by several workers (and that really
+happens, since the exceptions are raised quite a few times).
+
+The proposed fix is to add a mutex for the access to [in_stream].
+---
+ nproc.ml | 11 ++++++++---
+ 1 file changed, 8 insertions(+), 3 deletions(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index 1b4a48f..d42ccef 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -153,6 +153,8 @@ struct
+ try Unix.waitpid [] pid
+ with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
+
++ let mutex = Lwt_mutex.create ()
++
+ (* --master-- *)
+ let pull_task kill_workers in_stream central_service worker =
+ (* Note: input and output file descriptors are automatically closed
+@@ -160,14 +162,17 @@ struct
+ let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
+ let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
+ let rec pull () =
+- Lwt.bind (Lwt_stream.get in_stream) (
+- function
+- None -> Lwt.return ()
++ Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
++ Lwt.bind (Lwt_stream.get in_stream) (
++ function
++ | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
+ | Some (f, x, g) ->
++ Lwt_mutex.unlock mutex ;
+ let req = Worker_req (f, x) in
+ Lwt.bind
+ (write_value oc req)
+ (read_from_worker g)
++ )
+ )
+ and read_from_worker g () =
+ Lwt.try_bind
+--
diff --git a/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch b/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
new file mode 100644
index 0000000..a17a976
--- /dev/null
+++ b/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
@@ -0,0 +1,89 @@
+From: pveber <philippe.veber at gmail.com>
+Date: Mon, 1 Apr 2013 12:06:03 +0200
+Subject: slightly more efficient fix for concurrent access to [in_stream]
+
+now each pool has its own mutex.
+---
+ nproc.ml | 34 +++++++++++++++-------------------
+ 1 file changed, 15 insertions(+), 19 deletions(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index d42ccef..62252ce 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -153,26 +153,21 @@ struct
+ try Unix.waitpid [] pid
+ with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
+
+- let mutex = Lwt_mutex.create ()
+-
+ (* --master-- *)
+- let pull_task kill_workers in_stream central_service worker =
++ let pull_task kill_workers in_stream in_stream_mutex central_service worker =
+ (* Note: input and output file descriptors are automatically closed
+ when the end of the lwt channel is reached. *)
+ let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
+ let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
+ let rec pull () =
+- Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
+- Lwt.bind (Lwt_stream.get in_stream) (
+- function
+- | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
+- | Some (f, x, g) ->
+- Lwt_mutex.unlock mutex ;
+- let req = Worker_req (f, x) in
+- Lwt.bind
+- (write_value oc req)
+- (read_from_worker g)
+- )
++ Lwt.bind (Lwt_mutex.with_lock in_stream_mutex (fun () -> Lwt_stream.get in_stream)) (
++ function
++ | None -> Lwt.return ()
++ | Some (f, x, g) ->
++ let req = Worker_req (f, x) in
++ Lwt.bind
++ (write_value oc req)
++ (read_from_worker g)
+ )
+ and read_from_worker g () =
+ Lwt.try_bind
+@@ -219,7 +214,7 @@ struct
+ pull ()
+
+ (* --master-- *)
+- let create_gen init (in_stream, push) nproc central_service worker_data =
++ let create_gen init ((in_stream, push), in_stream_mutex) nproc central_service worker_data =
+ let proc_pool = Array.make nproc None in
+ Array.iteri (
+ fun i _ ->
+@@ -286,7 +281,7 @@ struct
+ let jobs =
+ Lwt.join
+ (List.map
+- (pull_task kill_workers in_stream central_service)
++ (pull_task kill_workers in_stream in_stream_mutex central_service)
+ worker_info)
+ in
+
+@@ -315,7 +310,7 @@ struct
+ let default_init worker_info = ()
+
+ let create ?(init = default_init) nproc central_service worker_data =
+- create_gen init (Lwt_stream.create ()) nproc central_service worker_data
++ create_gen init (Lwt_stream.create (), Lwt_mutex.create ()) nproc central_service worker_data
+
+ let close p =
+ p.close ()
+@@ -402,8 +397,9 @@ struct
+ in
+ let p, t =
+ create_gen init
+- (task_stream,
+- (fun _ -> assert false) (* push *))
++ ((task_stream,
++ (fun _ -> assert false) (* push *)),
++ Lwt_mutex.create ())
+ nproc serv env
+ in
+ try
+--
diff --git a/debian/patches/series b/debian/patches/series
index 4c90dad..64629b0 100644
--- a/debian/patches/series
+++ b/debian/patches/series
@@ -1 +1,4 @@
0001-Do-not-install-redundant-files.patch
+0002-Retry-waitpid-if-it-fails-with-EINTR.patch
+0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
+0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ocaml-maint/packages/nproc.git
More information about the Pkg-ocaml-maint-commits
mailing list