[Pkg-ocaml-maint-commits] [parmap] 01/08: New upstream version 1.0~rc8

Stéphane Glondu glondu at moszumanska.debian.org
Sat Jul 22 10:49:23 UTC 2017


This is an automated email from the git hooks/post-receive script.

glondu pushed a commit to branch master
in repository parmap.

commit 218dd85ad9cfba746b9a604a69d13e07b5780642
Author: Stephane Glondu <steph at glondu.net>
Date:   Sat Jul 22 12:25:31 2017 +0200

    New upstream version 1.0~rc8
---
 ChangeLog                 |   9 --
 Changelog                 |  20 ++-
 Makefile.in               |   2 +-
 README.maintainer         |  11 ++
 myocamlbuild.ml           |   4 +-
 opam                      |  28 ++++
 parmap.ml                 | 398 ++++++++++++++++++++++++----------------------
 parmap.mldylib            |   1 +
 parmap.mli                |   9 ++
 parmap.mllib              |   1 +
 setcore.mli => setcore.ml |   0
 11 files changed, 273 insertions(+), 210 deletions(-)

diff --git a/ChangeLog b/ChangeLog
deleted file mode 100644
index 654864e..0000000
--- a/ChangeLog
+++ /dev/null
@@ -1,9 +0,0 @@
-Version 0.9.8 contains the following major new features w.r.t. 0.9.4
- - a chunksize parameter can be used to control the granularity of the
-   parallelism: each worker will handle a series of chunks of this size
-   and ask for them when ready, thus allowing the system to achieve
-   automatic load balancing
- - very specialised versions of the map function are now available for
-   arrays and float arrays, allowing to obtain significant speed-up even
-   on relatively light computations
- - autoconf and ocamlbuild harness should simplify compilation and installation
diff --git a/Changelog b/Changelog
index 1d0aac9..c85ac01 100644
--- a/Changelog
+++ b/Changelog
@@ -1,4 +1,16 @@
-2011/08/30 (RDC): internally convert lists to array to avoid quadratic penalty in execution time on long lists,
-	          thanks to Paul Vernaza <pvernaza at andrew.cmu.edu> for pointing out this issu;
-	          added 'a sequence type to allow using efficiently the library both with lists and arrays.
-                  
+2011/11/30 (RDC)
+        Version 0.9.8 contains the following major new features w.r.t. 0.9.4
+           - a chunksize parameter can be used to control the granularity of the
+             parallelism: each worker will handle a series of chunks of this size
+             and ask for them when ready, thus allowing the system to achieve
+             automatic load balancing
+           - very specialised versions of the map function are now available for
+             arrays and float arrays, allowing to obtain significant speed-up even
+             on relatively light computations
+           - autoconf and ocamlbuild harness should simplify compilation and installation.
+
+
+2011/08/30 (RDC)
+        internally convert lists to array to avoid quadratic penalty in execution time on long lists,
+        thanks to Paul Vernaza <pvernaza at andrew.cmu.edu> for pointing out this issue;
+        added 'a sequence type to allow using efficiently the library both with lists and arrays.
diff --git a/Makefile.in b/Makefile.in
index 6c0ee0b..827a072 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -78,7 +78,7 @@ examples:
 
 
 INSTALL_STUFF = META 
-INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmxa _build/*.cmxs)
+INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmx _build/*.cmxa _build/*.cmxs)
 INSTALL_STUFF += $(filter-out $(wildcard _build/myocamlbuild.*),$(wildcard _build/*.mli _build/*.cmi))
 INSTALL_STUFF += $(wildcard _build/*.so _build/*.a)
 
diff --git a/README.maintainer b/README.maintainer
new file mode 100644
index 0000000..9cb1458
--- /dev/null
+++ b/README.maintainer
@@ -0,0 +1,11 @@
+Notice to maintainers and contributors.
+
+Dependencies
+------------
+
+The list of modules that are packed into parmap.cm{,x}a is declared in parmap.mllib
+
+The list of modules that are packed into parmap.cmxs is stated in parmap.mldylib
+
+These informations are used by ocamlbuild to create the libraries, please keep
+them up to date even in case an alternative build system is used.
diff --git a/myocamlbuild.ml b/myocamlbuild.ml
index 9654ea2..b847cf9 100644
--- a/myocamlbuild.ml
+++ b/myocamlbuild.ml
@@ -7,9 +7,9 @@ let _ = dispatch begin function
       flag ["compile"; "c"] & S[ A"-ccopt"; A"-D_GNU_SOURCE"; A"-ccopt"; A"-fPIC" ];
 
       flag ["link"; "library"; "ocaml"; "byte"; "use_libparmap"] &
-        S[A"-dllib"; A"-lparmap_stubs";];
+        S[A"-dllib"; A"-lparmap_stubs"; ];
       flag ["link"; "library"; "ocaml"; "native"; "use_libparmap"] &
-          S[A"-cclib"; A"-lparmap_stubs"];
+          S[A"-cclib"; A"-lparmap_stubs"; ];
       dep ["link"; "ocaml"; "use_libparmap"] ["libparmap_stubs.a"];
       flag ["link"; "ocaml"; "link_libparmap"] (A"libparmap_stubs.a");
 
diff --git a/opam b/opam
new file mode 100644
index 0000000..69ddbc9
--- /dev/null
+++ b/opam
@@ -0,0 +1,28 @@
+opam-version: "1.2"
+maintainer: "Roberto Di Cosmo <roberto at dicosmo.org>"
+authors: "Roberto Di Cosmo <roberto at dicosmo.org>"
+homepage: "https://github.com/rdicosmo/parmap"
+dev-repo: "https://github.com/rdicosmo/parmap.git"
+bug-reports: "https://github.com/rdicosmo/parmap/issues"
+build: [
+  ["aclocal" "-I" "m4"]
+  ["autoconf"]
+  ["autoheader"]
+  ["./configure"] 
+  [make "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib" ]
+]
+install: [
+  [make "install" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"]
+]
+remove: [
+  ["aclocal" "-I" "m4"]
+  ["autoconf"]
+  ["autoheader"]
+  ["./configure"] 
+  [make "uninstall" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"]
+]
+depends: [
+  "ocamlfind"
+  "ocamlbuild" {build}
+  "conf-autoconf"
+]
diff --git a/parmap.ml b/parmap.ml
index 74f3669..a8da654 100644
--- a/parmap.ml
+++ b/parmap.ml
@@ -30,6 +30,19 @@ let default_ncores=ref (max 2 (Setcore.numcores()-1));;
 let set_default_ncores n = default_ncores := n;;
 let get_default_ncores () = !default_ncores;;
 
+let ncores = ref 0;;
+
+let set_ncores n = ncores := n;;
+let get_ncores () = !ncores
+
+(* worker process rank *)
+
+let masters_rank = -1
+let rank = ref masters_rank
+
+let set_rank n = rank := n
+let get_rank () = !rank
+
 (* exception handling code *)
 
 let handle_exc core msg =
@@ -42,10 +55,14 @@ let can_redirect path =
     try
       Unix.mkdir path 0o777; true
     with Unix.Unix_error(e,_s,_s') ->
-      (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding without \
-                       stdout/stderr redirection\n%!"
-	 (Unix.getpid ()) path (Unix.error_message e));
-      false
+      (* another job may have created it between the check and the mkdir *)
+      if e == Unix.EEXIST then true
+      else begin
+	      (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding \
+			       without stdout/stderr redirection\n%!"
+		 (Unix.getpid ()) path (Unix.error_message e));
+	      false
+	end
   else true
 
 let log_debug fmt =
@@ -113,100 +130,113 @@ let marshal fd v =
   let s = Marshal.to_string v [Marshal.Closures] in
   ignore(Bytearray.mmap_of_string fd s)
 
+(* Exit the program with calling [at_exit] handlers *)
+external sys_exit : int -> 'a = "caml_sys_exit"
+
+let spawn_many n ~in_subprocess =
+  let rec loop i acc =
+    if i = n then
+      acc
+    else
+      match Unix.fork() with
+        0 ->
+        (* [at_exit] handlers are called in reverse order of registration.
+           By registering a handler that exits prematurely, we prevent the
+           execution of handlers registered before the fork.
+
+           This ignores the exit code provided by the user, but we ignore
+           it anyway in [wait_for_pids].
+        *)
+        at_exit (fun () -> sys_exit 0);
+        set_rank i;
+        in_subprocess i;
+        exit 0
+      | -1 ->
+        Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
+        loop (i + 1) acc
+      | pid ->
+        loop (i + 1) (pid :: acc)
+  in
+  (* call the GC before forking *)
+  Gc.compact ();
+  loop 0 []
+
+let wait_for_pids pids =
+  let rec wait_for_pid pid =
+    try ignore(Unix.waitpid [] pid : int * Unix.process_status)
+    with
+    | Unix.Unix_error (Unix.ECHILD, _, _) -> ()
+    | Unix.Unix_error (Unix.EINTR, _, _) -> wait_for_pid pid
+  in
+  List.iter wait_for_pid pids
+
+let run_many n ~in_subprocess =
+  wait_for_pids (spawn_many n ~in_subprocess)
+
 (* a simple mapper function that computes 1/nth of the data on each of the n
    cores in one iteration *)
-let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores compute opid al collect =
+let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores' compute opid al collect =
   (* flush everything *)
   flush_all();
   (* init task parameters *)
   let ln = Array.length al in
-  let ncores = min ln (max 1 ncores) in
-  let chunksize = max 1 (ln/ncores) in
+  set_ncores (min ln (max 1 ncores'));
+  let chunksize = max 1 (ln / !ncores) in
   log_debug
     "simplemapper on %d elements, on %d cores, chunksize = %d%!"
-    ln ncores chunksize;
+    ln !ncores chunksize;
   (* create descriptors to mmap *)
-  let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in
-  (* call the GC before forking *)
-  Gc.compact ();
-  (* spawn children *)
-  for i = 0 to ncores-1 do
-    match Unix.fork() with
-      0 ->
-	begin
-	  init i;  (* call initialization function *)
-	  Pervasives.at_exit finalize; (* register finalization function *)
-          let lo=i*chunksize in
-          let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
-          let exc_handler e j = (* handle an exception at index j *)
-	    Utils.log_error
-              "error at index j=%d in (%d,%d), chunksize=%d of a total of \
-               %d got exception %s on core %d \n%!"
-	      j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
-	    exit 1
-          in
-	  let v = compute al lo hi opid exc_handler in
-          marshal fdarr.(i) v;
-          exit 0
-	end
-    | -1  -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
-    | _pid -> ()
-  done;
-  (* wait for all children *)
-  for _i = 0 to ncores-1 do
-    try ignore(Unix.wait())
-    with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
-  done;
+  let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in
+  (* run children *)
+  run_many !ncores ~in_subprocess:(fun i ->
+    init i;  (* call initialization function *)
+    Pervasives.at_exit finalize; (* register finalization function *)
+    let lo=i*chunksize in
+    let hi=if i = !ncores - 1 then ln - 1 else (i + 1) * chunksize - 1 in
+    let exc_handler e j = (* handle an exception at index j *)
+      Utils.log_error
+        "error at index j=%d in (%d,%d), chunksize=%d of a total of \
+         %d got exception %s on core %d \n%!"
+        j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
+      exit 1
+    in
+    let v = compute al lo hi opid exc_handler in
+    marshal fdarr.(i) v);
   (* read in all data *)
   let res = ref [] in
   (* iterate in reverse order, to accumulate in the right order *)
-  for i = 0 to ncores-1 do
-      res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
+  for i = 0 to !ncores - 1 do
+      res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res;
   done;
   (* collect all results *)
   collect !res
 
 (* a simple iteration function that iterates on 1/nth of the data on each of
    the n cores *)
-let simpleiter init finalize ncores compute al =
+let simpleiter init finalize ncores' compute al =
   (* flush everything *)
   flush_all();
   (* init task parameters *)
   let ln = Array.length al in
-  let ncores = min ln (max 1 ncores) in
-  let chunksize = max 1 (ln/ncores) in
+  set_ncores (min ln (max 1 ncores'));
+  let chunksize = max 1 (ln / !ncores) in
   log_debug
     "simplemapper on %d elements, on %d cores, chunksize = %d%!"
-    ln ncores chunksize;
-  (* call the GC before forking *)
-  Gc.compact ();
-  (* spawn children *)
-  for i = 0 to ncores-1 do
-    match Unix.fork() with
-      0 ->
-	begin
-          init i;  (* call initialization function *)
-	  Pervasives.at_exit finalize; (* register finalization function *)
-          let lo=i*chunksize in
-          let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
-          let exc_handler e j = (* handle an exception at index j *)
-	    Utils.log_error
-              "error at index j=%d in (%d,%d), chunksize=%d of a total of \
-               %d got exception %s on core %d \n%!"
-	      j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
-	    exit 1
-          in
-	  compute al lo hi exc_handler;
-          exit 0
-	end
-    | -1  -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
-    | _pid -> ()
-  done;
-  (* wait for all children *)
-  for _i = 0 to ncores-1 do
-    try ignore(Unix.wait())
-    with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
-  done
+    ln !ncores chunksize;
+  (* run children *)
+  run_many !ncores ~in_subprocess:(fun i ->
+    init i;  (* call initialization function *)
+    Pervasives.at_exit finalize; (* register finalization function *)
+    let lo=i*chunksize in
+    let hi=if i= !ncores - 1 then ln-1 else (i+1)*chunksize-1 in
+    let exc_handler e j = (* handle an exception at index j *)
+      Utils.log_error
+        "error at index j=%d in (%d,%d), chunksize=%d of a total of \
+         %d got exception %s on core %d \n%!"
+	j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
+      exit 1
+    in
+    compute al lo hi exc_handler);
   (* return with no value *)
 
 (* a more sophisticated mapper function, with automatic load balancing *)
@@ -230,24 +260,24 @@ let setup_children_chans oc pipedown ?fdarr i =
   let finish () =
     (log_debug "shutting down (pid=%d)\n%!" pid;
      try close_in ic; close_out oc with _ -> ()
-    ); 
+    );
     exit 0 in
   receive, signal, return, finish, pid
 
 (* parametric mapper primitive that captures the parallel structure *)
-let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute opid al collect =
+let mapper (init:int -> unit) (finalize:unit -> unit) ncores' ~chunksize compute opid al collect =
   let ln = Array.length al in
   if ln=0 then (collect []) else
   begin
-   let ncores = min ln (max 1 ncores) in
-   log_debug "mapper on %d elements, on %d cores%!" ln ncores;
+   set_ncores (min ln (max 1 ncores'));
+   log_debug "mapper on %d elements, on %d cores%!" ln !ncores;
    match chunksize with
      None ->
        (* no need of load balancing *)
-       simplemapper init finalize ncores compute opid al collect
-   | Some v when ncores >= ln/v ->
+       simplemapper init finalize !ncores compute opid al collect
+   | Some v when !ncores >= ln/v ->
        (* no need of load balancing if more cores than tasks *)
-       simplemapper init finalize ncores compute opid al collect
+       simplemapper init finalize !ncores compute opid al collect
    | Some v ->
        (* init task parameters : ntasks > 0 here,
           as otherwise ncores >= 1 >= ln/v = ntasks and we would take
@@ -256,57 +286,50 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
        (* flush everything *)
        flush_all ();
        (* create descriptors to mmap *)
-       let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in
+       let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in
        (* setup communication channel with the workers *)
-       let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
+       let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in
        let pipeup_rd,pipeup_wr=Unix.pipe () in
        let oc_up = Unix.out_channel_of_descr pipeup_wr in
-       (* call the GC before forking *)
-       Gc.compact ();
-       (* spawn children *)
-       for i = 0 to ncores-1 do
-         match Unix.fork() with
-           0 ->
-             begin
-	       init i; (* call initialization function *)
-	       Pervasives.at_exit finalize; (* register finalization function *)
-               let d=Unix.gettimeofday()  in
-               (* primitives for communication *)
-               Unix.close pipeup_rd;
-               let receive,signal,return,finish,pid =
-                 setup_children_chans oc_up pipedown ~fdarr i in
-               let reschunk=ref opid in
-               let computetask n = (* compute chunk number n *)
-         	let lo=n*chunksize in
-         	let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
-         	let exc_handler e j = (* handle an exception at index j *)
-         	  begin
-         	    let errmsg = Printexc.to_string e in
-                    Utils.log_error
-                      "error at index j=%d in (%d,%d), chunksize=%d of a \
-                       total of %d got exception %s on core %d \n%!"
-         	      j lo hi chunksize (hi-lo+1) errmsg i;
-         	    signal (Error (i,errmsg));
-                    finish()
-         	  end
-         	in
-         	reschunk:= compute al lo hi !reschunk exc_handler;
-         	log_debug
-                  "worker on core %d (pid=%d), segment (%d,%d) of data of \
-                   length %d, chunksize=%d finished in %f seconds"
-         	  i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
-               in
-               while true do
-         	(* ask for work until we are finished *)
-         	signal (Ready i);
-         	match receive() with
-         	| Finished -> return (!reschunk:'d); finish ()
-         	| Task n -> computetask n
-               done;
-             end
-         | -1  -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
-         | _pid -> ()
-       done;
+       (* run children *)
+       let pids =
+         spawn_many !ncores ~in_subprocess:(fun i ->
+	   init i; (* call initialization function *)
+	   Pervasives.at_exit finalize; (* register finalization function *)
+           let d=Unix.gettimeofday()  in
+           (* primitives for communication *)
+           Unix.close pipeup_rd;
+           let receive,signal,return,finish,pid =
+             setup_children_chans oc_up pipedown ~fdarr i in
+           let reschunk=ref opid in
+           let computetask n = (* compute chunk number n *)
+             let lo=n*chunksize in
+             let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
+             let exc_handler e j = (* handle an exception at index j *)
+               begin
+                 let errmsg = Printexc.to_string e in
+                 Utils.log_error
+                   "error at index j=%d in (%d,%d), chunksize=%d of a \
+                    total of %d got exception %s on core %d \n%!"
+         	   j lo hi chunksize (hi-lo+1) errmsg i;
+                 signal (Error (i,errmsg): msg_to_master);
+                 finish()
+               end
+             in
+             reschunk:= compute al lo hi !reschunk exc_handler;
+             log_debug
+               "worker on core %d (pid=%d), segment (%d,%d) of data of \
+                length %d, chunksize=%d finished in %f seconds"
+               i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
+           in
+           while true do
+             (* ask for work until we are finished *)
+             signal (Ready i);
+             match receive() with
+             | Finished -> return (!reschunk:'d); finish ()
+             | Task n -> computetask n
+           done)
+       in
 
        (* close unused ends of the pipes *)
        Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown;
@@ -314,7 +337,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
 
        (* get ic/oc/wfdl *)
        let ocs=
-         Array.init ncores
+         Array.init !ncores
            (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
        let ic=Unix.in_channel_of_descr pipeup_rd in
 
@@ -325,7 +348,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
              (log_debug "sending task %d to worker %d" i w;
               let oc = ocs.(w) in
               (Marshal.to_channel oc (Task i) []); flush oc)
-         | Error (core,msg) -> handle_exc core msg
+         | (Error (core,msg): msg_to_master) -> handle_exc core msg
        done;
 
        (* send termination token to all children *)
@@ -336,94 +359,84 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
        ) ocs;
 
        (* wait for all children to terminate *)
-       for _i = 0 to ncores-1 do
-         try ignore(Unix.wait())
-         with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
-       done;
+       wait_for_pids pids;
 
        (* read in all data *)
        let res = ref [] in
        (* iterate in reverse order, to accumulate in the right order *)
-       for i = 0 to ncores-1 do
-         res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
+       for i = 0 to !ncores-1 do
+         res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res;
        done;
        (* collect all results *)
        collect !res
   end
 
 (* parametric iteration primitive that captures the parallel structure *)
-let geniter init finalize ncores ~chunksize compute al =
+let geniter init finalize ncores' ~chunksize compute al =
   let ln = Array.length al in
   if ln=0 then () else
   begin
-   let ncores = min ln (max 1 ncores) in
-   log_debug "geniter on %d elements, on %d cores%!" ln ncores;
+   set_ncores (min ln (max 1 ncores'));
+   log_debug "geniter on %d elements, on %d cores%!" ln !ncores;
    match chunksize with
      None ->
-       simpleiter init finalize ncores compute al (* no need of load balancing *)
-   | Some v when ncores >= ln/v ->
-       simpleiter init finalize ncores compute al (* no need of load balancing *)
+       simpleiter init finalize !ncores compute al (* no need of load balancing *)
+   | Some v when !ncores >= ln/v ->
+       simpleiter init finalize !ncores compute al (* no need of load balancing *)
    | Some v ->
        (* init task parameters *)
        let chunksize = v and ntasks = ln/v in
        (* flush everything *)
        flush_all ();
        (* setup communication channel with the workers *)
-       let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
+       let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in
        let pipeup_rd,pipeup_wr=Unix.pipe () in
        let oc_up = Unix.out_channel_of_descr pipeup_wr in
-       (* call the GC before forking *)
-       Gc.compact ();
        (* spawn children *)
-       for i = 0 to ncores-1 do
- 	match Unix.fork() with
- 	  0 ->
- 	    begin
-	       init i; (* call initialization function *)
-	       Pervasives.at_exit finalize; (* register finalization function *)
-               let d=Unix.gettimeofday()  in
-               (* primitives for communication *)
-               Unix.close pipeup_rd;
-               let receive,signal,return,finish,pid =
-                 setup_children_chans oc_up pipedown i in
-               let computetask n = (* compute chunk number n *)
- 		let lo=n*chunksize in
- 		let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
- 		let exc_handler e j = (* handle an exception at index j *)
- 		  begin
- 		    let errmsg = Printexc.to_string e in
-                    Utils.log_error
-                      "error at index j=%d in (%d,%d), chunksize=%d of \
-                       a total of %d got exception %s on core %d \n%!"
- 		      j lo hi chunksize (hi-lo+1) errmsg i;
- 		    signal (Error (i,errmsg));
-                    finish()
- 		  end
- 		in
- 		compute al lo hi exc_handler;
- 		log_debug
-                  "worker on core %d (pid=%d), segment (%d,%d) of data \
-                   of length %d, chunksize=%d finished in %f seconds"
- 		  i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
- 	      in
- 	      while true do
- 		(* ask for work until we are finished *)
- 		signal (Ready i);
- 		match receive() with
- 		| Finished -> return(); finish ()
- 		| Task n -> computetask n
- 	      done;
- 	    end
- 	| -1  -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
- 	| _pid -> ()
-       done;
+       let pids =
+         spawn_many !ncores ~in_subprocess:(fun i ->
+	   init i; (* call initialization function *)
+	   Pervasives.at_exit finalize; (* register finalization function *)
+           let d=Unix.gettimeofday()  in
+           (* primitives for communication *)
+           Unix.close pipeup_rd;
+           let receive,signal,return,finish,pid =
+             setup_children_chans oc_up pipedown i in
+           let computetask n = (* compute chunk number n *)
+ 	     let lo=n*chunksize in
+ 	     let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
+ 	     let exc_handler e j = (* handle an exception at index j *)
+ 	       begin
+ 		 let errmsg = Printexc.to_string e in
+                 Utils.log_error
+                   "error at index j=%d in (%d,%d), chunksize=%d of \
+                    a total of %d got exception %s on core %d \n%!"
+ 		   j lo hi chunksize (hi-lo+1) errmsg i;
+ 		 signal (Error (i,errmsg): msg_to_master);
+                 finish()
+ 	       end
+ 	     in
+ 	     compute al lo hi exc_handler;
+ 	     log_debug
+               "worker on core %d (pid=%d), segment (%d,%d) of data \
+                of length %d, chunksize=%d finished in %f seconds"
+ 	       i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
+ 	   in
+ 	   while true do
+ 	     (* ask for work until we are finished *)
+ 	     signal (Ready i);
+ 	     match receive() with
+ 	     | Finished -> return(); finish ()
+ 	     | Task n -> computetask n
+           done)
+       in
 
        (* close unused ends of the pipes *)
        Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown;
        Unix.close pipeup_wr;
 
        (* get ic/oc/wfdl *)
-       let ocs=Array.init ncores
+       let ocs=Array.init !ncores
          (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
        let ic=Unix.in_channel_of_descr pipeup_rd in
 
@@ -434,7 +447,7 @@ let geniter init finalize ncores ~chunksize compute al =
  	    (log_debug "sending task %d to worker %d" i w;
  	     let oc = ocs.(w) in
  	     (Marshal.to_channel oc (Task i) []); flush oc)
- 	| Error (core,msg) -> handle_exc core msg
+ 	| (Error (core,msg): msg_to_master) -> handle_exc core msg
        done;
 
        (* send termination token to all children *)
@@ -445,10 +458,7 @@ let geniter init finalize ncores ~chunksize compute al =
        ) ocs;
 
        (* wait for all children to terminate *)
-       for _i = 0 to ncores-1 do
- 	try ignore(Unix.wait())
- 	with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
-       done
+       wait_for_pids pids;
        (* no data to return *)
   end
 
diff --git a/parmap.mldylib b/parmap.mldylib
index df08796..eb65e2d 100644
--- a/parmap.mldylib
+++ b/parmap.mldylib
@@ -1,3 +1,4 @@
 Parmap
 Bytearray
 Parmap_utils
+Setcore
diff --git a/parmap.mli b/parmap.mli
index 2a08377..52dba04 100644
--- a/parmap.mli
+++ b/parmap.mli
@@ -30,6 +30,15 @@ val set_default_ncores : int -> unit
 
 val get_default_ncores : unit -> int
 
+(** {6 Getting ncores being used during parallel execution } *)
+
+val get_ncores : unit -> int
+
+(** {6 Getting the current worker rank. The master process has rank -1. Other processes
+    have the rank at which they were forked out (a worker's rank is in [0..ncores-1]) } *)
+
+val get_rank : unit -> int
+
 (** {6 Sequence type, subsuming lists and arrays} *)
 
 type 'a sequence = L of 'a list | A of 'a array;;
diff --git a/parmap.mllib b/parmap.mllib
index df08796..eb65e2d 100644
--- a/parmap.mllib
+++ b/parmap.mllib
@@ -1,3 +1,4 @@
 Parmap
 Bytearray
 Parmap_utils
+Setcore
diff --git a/setcore.mli b/setcore.ml
similarity index 100%
rename from setcore.mli
rename to setcore.ml

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ocaml-maint/packages/parmap.git



More information about the Pkg-ocaml-maint-commits mailing list