[Pkg-ocaml-maint-commits] [parmap] 01/08: New upstream version 1.0~rc8
Stéphane Glondu
glondu at moszumanska.debian.org
Sat Jul 22 10:49:23 UTC 2017
This is an automated email from the git hooks/post-receive script.
glondu pushed a commit to branch master
in repository parmap.
commit 218dd85ad9cfba746b9a604a69d13e07b5780642
Author: Stephane Glondu <steph at glondu.net>
Date: Sat Jul 22 12:25:31 2017 +0200
New upstream version 1.0~rc8
---
ChangeLog | 9 --
Changelog | 20 ++-
Makefile.in | 2 +-
README.maintainer | 11 ++
myocamlbuild.ml | 4 +-
opam | 28 ++++
parmap.ml | 398 ++++++++++++++++++++++++----------------------
parmap.mldylib | 1 +
parmap.mli | 9 ++
parmap.mllib | 1 +
setcore.mli => setcore.ml | 0
11 files changed, 273 insertions(+), 210 deletions(-)
diff --git a/ChangeLog b/ChangeLog
deleted file mode 100644
index 654864e..0000000
--- a/ChangeLog
+++ /dev/null
@@ -1,9 +0,0 @@
-Version 0.9.8 contains the following major new features w.r.t. 0.9.4
- - a chunksize parameter can be used to control the granularity of the
- parallelism: each worker will handle a series of chunks of this size
- and ask for them when ready, thus allowing the system to achieve
- automatic load balancing
- - very specialised versions of the map function are now available for
- arrays and float arrays, allowing to obtain significant speed-up even
- on relatively light computations
- - autoconf and ocamlbuild harness should simplify compilation and installation
diff --git a/Changelog b/Changelog
index 1d0aac9..c85ac01 100644
--- a/Changelog
+++ b/Changelog
@@ -1,4 +1,16 @@
-2011/08/30 (RDC): internally convert lists to array to avoid quadratic penalty in execution time on long lists,
- thanks to Paul Vernaza <pvernaza at andrew.cmu.edu> for pointing out this issu;
- added 'a sequence type to allow using efficiently the library both with lists and arrays.
-
+2011/11/30 (RDC)
+ Version 0.9.8 contains the following major new features w.r.t. 0.9.4
+ - a chunksize parameter can be used to control the granularity of the
+ parallelism: each worker will handle a series of chunks of this size
+ and ask for them when ready, thus allowing the system to achieve
+ automatic load balancing
+ - very specialised versions of the map function are now available for
+ arrays and float arrays, allowing to obtain significant speed-up even
+ on relatively light computations
+ - autoconf and ocamlbuild harness should simplify compilation and installation.
+
+
+2011/08/30 (RDC)
+ internally convert lists to array to avoid quadratic penalty in execution time on long lists,
+ thanks to Paul Vernaza <pvernaza at andrew.cmu.edu> for pointing out this issue;
+ added 'a sequence type to allow using efficiently the library both with lists and arrays.
diff --git a/Makefile.in b/Makefile.in
index 6c0ee0b..827a072 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -78,7 +78,7 @@ examples:
INSTALL_STUFF = META
-INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmxa _build/*.cmxs)
+INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmx _build/*.cmxa _build/*.cmxs)
INSTALL_STUFF += $(filter-out $(wildcard _build/myocamlbuild.*),$(wildcard _build/*.mli _build/*.cmi))
INSTALL_STUFF += $(wildcard _build/*.so _build/*.a)
diff --git a/README.maintainer b/README.maintainer
new file mode 100644
index 0000000..9cb1458
--- /dev/null
+++ b/README.maintainer
@@ -0,0 +1,11 @@
+Notice to maintainers and contributors.
+
+Dependencies
+------------
+
+The list of modules that are packed into parmap.cm{,x}a is declared in parmap.mllib
+
+The list of modules that are packed into parmap.cmxs is stated in parmap.mldylib
+
+These informations are used by ocamlbuild to create the libraries, please keep
+them up to date even in case an alternative build system is used.
diff --git a/myocamlbuild.ml b/myocamlbuild.ml
index 9654ea2..b847cf9 100644
--- a/myocamlbuild.ml
+++ b/myocamlbuild.ml
@@ -7,9 +7,9 @@ let _ = dispatch begin function
flag ["compile"; "c"] & S[ A"-ccopt"; A"-D_GNU_SOURCE"; A"-ccopt"; A"-fPIC" ];
flag ["link"; "library"; "ocaml"; "byte"; "use_libparmap"] &
- S[A"-dllib"; A"-lparmap_stubs";];
+ S[A"-dllib"; A"-lparmap_stubs"; ];
flag ["link"; "library"; "ocaml"; "native"; "use_libparmap"] &
- S[A"-cclib"; A"-lparmap_stubs"];
+ S[A"-cclib"; A"-lparmap_stubs"; ];
dep ["link"; "ocaml"; "use_libparmap"] ["libparmap_stubs.a"];
flag ["link"; "ocaml"; "link_libparmap"] (A"libparmap_stubs.a");
diff --git a/opam b/opam
new file mode 100644
index 0000000..69ddbc9
--- /dev/null
+++ b/opam
@@ -0,0 +1,28 @@
+opam-version: "1.2"
+maintainer: "Roberto Di Cosmo <roberto at dicosmo.org>"
+authors: "Roberto Di Cosmo <roberto at dicosmo.org>"
+homepage: "https://github.com/rdicosmo/parmap"
+dev-repo: "https://github.com/rdicosmo/parmap.git"
+bug-reports: "https://github.com/rdicosmo/parmap/issues"
+build: [
+ ["aclocal" "-I" "m4"]
+ ["autoconf"]
+ ["autoheader"]
+ ["./configure"]
+ [make "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib" ]
+]
+install: [
+ [make "install" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"]
+]
+remove: [
+ ["aclocal" "-I" "m4"]
+ ["autoconf"]
+ ["autoheader"]
+ ["./configure"]
+ [make "uninstall" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"]
+]
+depends: [
+ "ocamlfind"
+ "ocamlbuild" {build}
+ "conf-autoconf"
+]
diff --git a/parmap.ml b/parmap.ml
index 74f3669..a8da654 100644
--- a/parmap.ml
+++ b/parmap.ml
@@ -30,6 +30,19 @@ let default_ncores=ref (max 2 (Setcore.numcores()-1));;
let set_default_ncores n = default_ncores := n;;
let get_default_ncores () = !default_ncores;;
+let ncores = ref 0;;
+
+let set_ncores n = ncores := n;;
+let get_ncores () = !ncores
+
+(* worker process rank *)
+
+let masters_rank = -1
+let rank = ref masters_rank
+
+let set_rank n = rank := n
+let get_rank () = !rank
+
(* exception handling code *)
let handle_exc core msg =
@@ -42,10 +55,14 @@ let can_redirect path =
try
Unix.mkdir path 0o777; true
with Unix.Unix_error(e,_s,_s') ->
- (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding without \
- stdout/stderr redirection\n%!"
- (Unix.getpid ()) path (Unix.error_message e));
- false
+ (* another job may have created it between the check and the mkdir *)
+ if e == Unix.EEXIST then true
+ else begin
+ (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding \
+ without stdout/stderr redirection\n%!"
+ (Unix.getpid ()) path (Unix.error_message e));
+ false
+ end
else true
let log_debug fmt =
@@ -113,100 +130,113 @@ let marshal fd v =
let s = Marshal.to_string v [Marshal.Closures] in
ignore(Bytearray.mmap_of_string fd s)
+(* Exit the program with calling [at_exit] handlers *)
+external sys_exit : int -> 'a = "caml_sys_exit"
+
+let spawn_many n ~in_subprocess =
+ let rec loop i acc =
+ if i = n then
+ acc
+ else
+ match Unix.fork() with
+ 0 ->
+ (* [at_exit] handlers are called in reverse order of registration.
+ By registering a handler that exits prematurely, we prevent the
+ execution of handlers registered before the fork.
+
+ This ignores the exit code provided by the user, but we ignore
+ it anyway in [wait_for_pids].
+ *)
+ at_exit (fun () -> sys_exit 0);
+ set_rank i;
+ in_subprocess i;
+ exit 0
+ | -1 ->
+ Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
+ loop (i + 1) acc
+ | pid ->
+ loop (i + 1) (pid :: acc)
+ in
+ (* call the GC before forking *)
+ Gc.compact ();
+ loop 0 []
+
+let wait_for_pids pids =
+ let rec wait_for_pid pid =
+ try ignore(Unix.waitpid [] pid : int * Unix.process_status)
+ with
+ | Unix.Unix_error (Unix.ECHILD, _, _) -> ()
+ | Unix.Unix_error (Unix.EINTR, _, _) -> wait_for_pid pid
+ in
+ List.iter wait_for_pid pids
+
+let run_many n ~in_subprocess =
+ wait_for_pids (spawn_many n ~in_subprocess)
+
(* a simple mapper function that computes 1/nth of the data on each of the n
cores in one iteration *)
-let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores compute opid al collect =
+let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores' compute opid al collect =
(* flush everything *)
flush_all();
(* init task parameters *)
let ln = Array.length al in
- let ncores = min ln (max 1 ncores) in
- let chunksize = max 1 (ln/ncores) in
+ set_ncores (min ln (max 1 ncores'));
+ let chunksize = max 1 (ln / !ncores) in
log_debug
"simplemapper on %d elements, on %d cores, chunksize = %d%!"
- ln ncores chunksize;
+ ln !ncores chunksize;
(* create descriptors to mmap *)
- let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in
- (* call the GC before forking *)
- Gc.compact ();
- (* spawn children *)
- for i = 0 to ncores-1 do
- match Unix.fork() with
- 0 ->
- begin
- init i; (* call initialization function *)
- Pervasives.at_exit finalize; (* register finalization function *)
- let lo=i*chunksize in
- let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
- let exc_handler e j = (* handle an exception at index j *)
- Utils.log_error
- "error at index j=%d in (%d,%d), chunksize=%d of a total of \
- %d got exception %s on core %d \n%!"
- j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
- exit 1
- in
- let v = compute al lo hi opid exc_handler in
- marshal fdarr.(i) v;
- exit 0
- end
- | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
- | _pid -> ()
- done;
- (* wait for all children *)
- for _i = 0 to ncores-1 do
- try ignore(Unix.wait())
- with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
- done;
+ let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in
+ (* run children *)
+ run_many !ncores ~in_subprocess:(fun i ->
+ init i; (* call initialization function *)
+ Pervasives.at_exit finalize; (* register finalization function *)
+ let lo=i*chunksize in
+ let hi=if i = !ncores - 1 then ln - 1 else (i + 1) * chunksize - 1 in
+ let exc_handler e j = (* handle an exception at index j *)
+ Utils.log_error
+ "error at index j=%d in (%d,%d), chunksize=%d of a total of \
+ %d got exception %s on core %d \n%!"
+ j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
+ exit 1
+ in
+ let v = compute al lo hi opid exc_handler in
+ marshal fdarr.(i) v);
(* read in all data *)
let res = ref [] in
(* iterate in reverse order, to accumulate in the right order *)
- for i = 0 to ncores-1 do
- res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
+ for i = 0 to !ncores - 1 do
+ res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res;
done;
(* collect all results *)
collect !res
(* a simple iteration function that iterates on 1/nth of the data on each of
the n cores *)
-let simpleiter init finalize ncores compute al =
+let simpleiter init finalize ncores' compute al =
(* flush everything *)
flush_all();
(* init task parameters *)
let ln = Array.length al in
- let ncores = min ln (max 1 ncores) in
- let chunksize = max 1 (ln/ncores) in
+ set_ncores (min ln (max 1 ncores'));
+ let chunksize = max 1 (ln / !ncores) in
log_debug
"simplemapper on %d elements, on %d cores, chunksize = %d%!"
- ln ncores chunksize;
- (* call the GC before forking *)
- Gc.compact ();
- (* spawn children *)
- for i = 0 to ncores-1 do
- match Unix.fork() with
- 0 ->
- begin
- init i; (* call initialization function *)
- Pervasives.at_exit finalize; (* register finalization function *)
- let lo=i*chunksize in
- let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
- let exc_handler e j = (* handle an exception at index j *)
- Utils.log_error
- "error at index j=%d in (%d,%d), chunksize=%d of a total of \
- %d got exception %s on core %d \n%!"
- j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
- exit 1
- in
- compute al lo hi exc_handler;
- exit 0
- end
- | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
- | _pid -> ()
- done;
- (* wait for all children *)
- for _i = 0 to ncores-1 do
- try ignore(Unix.wait())
- with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
- done
+ ln !ncores chunksize;
+ (* run children *)
+ run_many !ncores ~in_subprocess:(fun i ->
+ init i; (* call initialization function *)
+ Pervasives.at_exit finalize; (* register finalization function *)
+ let lo=i*chunksize in
+ let hi=if i= !ncores - 1 then ln-1 else (i+1)*chunksize-1 in
+ let exc_handler e j = (* handle an exception at index j *)
+ Utils.log_error
+ "error at index j=%d in (%d,%d), chunksize=%d of a total of \
+ %d got exception %s on core %d \n%!"
+ j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
+ exit 1
+ in
+ compute al lo hi exc_handler);
(* return with no value *)
(* a more sophisticated mapper function, with automatic load balancing *)
@@ -230,24 +260,24 @@ let setup_children_chans oc pipedown ?fdarr i =
let finish () =
(log_debug "shutting down (pid=%d)\n%!" pid;
try close_in ic; close_out oc with _ -> ()
- );
+ );
exit 0 in
receive, signal, return, finish, pid
(* parametric mapper primitive that captures the parallel structure *)
-let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute opid al collect =
+let mapper (init:int -> unit) (finalize:unit -> unit) ncores' ~chunksize compute opid al collect =
let ln = Array.length al in
if ln=0 then (collect []) else
begin
- let ncores = min ln (max 1 ncores) in
- log_debug "mapper on %d elements, on %d cores%!" ln ncores;
+ set_ncores (min ln (max 1 ncores'));
+ log_debug "mapper on %d elements, on %d cores%!" ln !ncores;
match chunksize with
None ->
(* no need of load balancing *)
- simplemapper init finalize ncores compute opid al collect
- | Some v when ncores >= ln/v ->
+ simplemapper init finalize !ncores compute opid al collect
+ | Some v when !ncores >= ln/v ->
(* no need of load balancing if more cores than tasks *)
- simplemapper init finalize ncores compute opid al collect
+ simplemapper init finalize !ncores compute opid al collect
| Some v ->
(* init task parameters : ntasks > 0 here,
as otherwise ncores >= 1 >= ln/v = ntasks and we would take
@@ -256,57 +286,50 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
(* flush everything *)
flush_all ();
(* create descriptors to mmap *)
- let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in
+ let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in
(* setup communication channel with the workers *)
- let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
+ let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in
let pipeup_rd,pipeup_wr=Unix.pipe () in
let oc_up = Unix.out_channel_of_descr pipeup_wr in
- (* call the GC before forking *)
- Gc.compact ();
- (* spawn children *)
- for i = 0 to ncores-1 do
- match Unix.fork() with
- 0 ->
- begin
- init i; (* call initialization function *)
- Pervasives.at_exit finalize; (* register finalization function *)
- let d=Unix.gettimeofday() in
- (* primitives for communication *)
- Unix.close pipeup_rd;
- let receive,signal,return,finish,pid =
- setup_children_chans oc_up pipedown ~fdarr i in
- let reschunk=ref opid in
- let computetask n = (* compute chunk number n *)
- let lo=n*chunksize in
- let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
- let exc_handler e j = (* handle an exception at index j *)
- begin
- let errmsg = Printexc.to_string e in
- Utils.log_error
- "error at index j=%d in (%d,%d), chunksize=%d of a \
- total of %d got exception %s on core %d \n%!"
- j lo hi chunksize (hi-lo+1) errmsg i;
- signal (Error (i,errmsg));
- finish()
- end
- in
- reschunk:= compute al lo hi !reschunk exc_handler;
- log_debug
- "worker on core %d (pid=%d), segment (%d,%d) of data of \
- length %d, chunksize=%d finished in %f seconds"
- i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
- in
- while true do
- (* ask for work until we are finished *)
- signal (Ready i);
- match receive() with
- | Finished -> return (!reschunk:'d); finish ()
- | Task n -> computetask n
- done;
- end
- | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
- | _pid -> ()
- done;
+ (* run children *)
+ let pids =
+ spawn_many !ncores ~in_subprocess:(fun i ->
+ init i; (* call initialization function *)
+ Pervasives.at_exit finalize; (* register finalization function *)
+ let d=Unix.gettimeofday() in
+ (* primitives for communication *)
+ Unix.close pipeup_rd;
+ let receive,signal,return,finish,pid =
+ setup_children_chans oc_up pipedown ~fdarr i in
+ let reschunk=ref opid in
+ let computetask n = (* compute chunk number n *)
+ let lo=n*chunksize in
+ let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
+ let exc_handler e j = (* handle an exception at index j *)
+ begin
+ let errmsg = Printexc.to_string e in
+ Utils.log_error
+ "error at index j=%d in (%d,%d), chunksize=%d of a \
+ total of %d got exception %s on core %d \n%!"
+ j lo hi chunksize (hi-lo+1) errmsg i;
+ signal (Error (i,errmsg): msg_to_master);
+ finish()
+ end
+ in
+ reschunk:= compute al lo hi !reschunk exc_handler;
+ log_debug
+ "worker on core %d (pid=%d), segment (%d,%d) of data of \
+ length %d, chunksize=%d finished in %f seconds"
+ i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
+ in
+ while true do
+ (* ask for work until we are finished *)
+ signal (Ready i);
+ match receive() with
+ | Finished -> return (!reschunk:'d); finish ()
+ | Task n -> computetask n
+ done)
+ in
(* close unused ends of the pipes *)
Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown;
@@ -314,7 +337,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
(* get ic/oc/wfdl *)
let ocs=
- Array.init ncores
+ Array.init !ncores
(fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
let ic=Unix.in_channel_of_descr pipeup_rd in
@@ -325,7 +348,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
(log_debug "sending task %d to worker %d" i w;
let oc = ocs.(w) in
(Marshal.to_channel oc (Task i) []); flush oc)
- | Error (core,msg) -> handle_exc core msg
+ | (Error (core,msg): msg_to_master) -> handle_exc core msg
done;
(* send termination token to all children *)
@@ -336,94 +359,84 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute
) ocs;
(* wait for all children to terminate *)
- for _i = 0 to ncores-1 do
- try ignore(Unix.wait())
- with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
- done;
+ wait_for_pids pids;
(* read in all data *)
let res = ref [] in
(* iterate in reverse order, to accumulate in the right order *)
- for i = 0 to ncores-1 do
- res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
+ for i = 0 to !ncores-1 do
+ res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res;
done;
(* collect all results *)
collect !res
end
(* parametric iteration primitive that captures the parallel structure *)
-let geniter init finalize ncores ~chunksize compute al =
+let geniter init finalize ncores' ~chunksize compute al =
let ln = Array.length al in
if ln=0 then () else
begin
- let ncores = min ln (max 1 ncores) in
- log_debug "geniter on %d elements, on %d cores%!" ln ncores;
+ set_ncores (min ln (max 1 ncores'));
+ log_debug "geniter on %d elements, on %d cores%!" ln !ncores;
match chunksize with
None ->
- simpleiter init finalize ncores compute al (* no need of load balancing *)
- | Some v when ncores >= ln/v ->
- simpleiter init finalize ncores compute al (* no need of load balancing *)
+ simpleiter init finalize !ncores compute al (* no need of load balancing *)
+ | Some v when !ncores >= ln/v ->
+ simpleiter init finalize !ncores compute al (* no need of load balancing *)
| Some v ->
(* init task parameters *)
let chunksize = v and ntasks = ln/v in
(* flush everything *)
flush_all ();
(* setup communication channel with the workers *)
- let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
+ let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in
let pipeup_rd,pipeup_wr=Unix.pipe () in
let oc_up = Unix.out_channel_of_descr pipeup_wr in
- (* call the GC before forking *)
- Gc.compact ();
(* spawn children *)
- for i = 0 to ncores-1 do
- match Unix.fork() with
- 0 ->
- begin
- init i; (* call initialization function *)
- Pervasives.at_exit finalize; (* register finalization function *)
- let d=Unix.gettimeofday() in
- (* primitives for communication *)
- Unix.close pipeup_rd;
- let receive,signal,return,finish,pid =
- setup_children_chans oc_up pipedown i in
- let computetask n = (* compute chunk number n *)
- let lo=n*chunksize in
- let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
- let exc_handler e j = (* handle an exception at index j *)
- begin
- let errmsg = Printexc.to_string e in
- Utils.log_error
- "error at index j=%d in (%d,%d), chunksize=%d of \
- a total of %d got exception %s on core %d \n%!"
- j lo hi chunksize (hi-lo+1) errmsg i;
- signal (Error (i,errmsg));
- finish()
- end
- in
- compute al lo hi exc_handler;
- log_debug
- "worker on core %d (pid=%d), segment (%d,%d) of data \
- of length %d, chunksize=%d finished in %f seconds"
- i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
- in
- while true do
- (* ask for work until we are finished *)
- signal (Ready i);
- match receive() with
- | Finished -> return(); finish ()
- | Task n -> computetask n
- done;
- end
- | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i;
- | _pid -> ()
- done;
+ let pids =
+ spawn_many !ncores ~in_subprocess:(fun i ->
+ init i; (* call initialization function *)
+ Pervasives.at_exit finalize; (* register finalization function *)
+ let d=Unix.gettimeofday() in
+ (* primitives for communication *)
+ Unix.close pipeup_rd;
+ let receive,signal,return,finish,pid =
+ setup_children_chans oc_up pipedown i in
+ let computetask n = (* compute chunk number n *)
+ let lo=n*chunksize in
+ let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
+ let exc_handler e j = (* handle an exception at index j *)
+ begin
+ let errmsg = Printexc.to_string e in
+ Utils.log_error
+ "error at index j=%d in (%d,%d), chunksize=%d of \
+ a total of %d got exception %s on core %d \n%!"
+ j lo hi chunksize (hi-lo+1) errmsg i;
+ signal (Error (i,errmsg): msg_to_master);
+ finish()
+ end
+ in
+ compute al lo hi exc_handler;
+ log_debug
+ "worker on core %d (pid=%d), segment (%d,%d) of data \
+ of length %d, chunksize=%d finished in %f seconds"
+ i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
+ in
+ while true do
+ (* ask for work until we are finished *)
+ signal (Ready i);
+ match receive() with
+ | Finished -> return(); finish ()
+ | Task n -> computetask n
+ done)
+ in
(* close unused ends of the pipes *)
Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown;
Unix.close pipeup_wr;
(* get ic/oc/wfdl *)
- let ocs=Array.init ncores
+ let ocs=Array.init !ncores
(fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
let ic=Unix.in_channel_of_descr pipeup_rd in
@@ -434,7 +447,7 @@ let geniter init finalize ncores ~chunksize compute al =
(log_debug "sending task %d to worker %d" i w;
let oc = ocs.(w) in
(Marshal.to_channel oc (Task i) []); flush oc)
- | Error (core,msg) -> handle_exc core msg
+ | (Error (core,msg): msg_to_master) -> handle_exc core msg
done;
(* send termination token to all children *)
@@ -445,10 +458,7 @@ let geniter init finalize ncores ~chunksize compute al =
) ocs;
(* wait for all children to terminate *)
- for _i = 0 to ncores-1 do
- try ignore(Unix.wait())
- with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
- done
+ wait_for_pids pids;
(* no data to return *)
end
diff --git a/parmap.mldylib b/parmap.mldylib
index df08796..eb65e2d 100644
--- a/parmap.mldylib
+++ b/parmap.mldylib
@@ -1,3 +1,4 @@
Parmap
Bytearray
Parmap_utils
+Setcore
diff --git a/parmap.mli b/parmap.mli
index 2a08377..52dba04 100644
--- a/parmap.mli
+++ b/parmap.mli
@@ -30,6 +30,15 @@ val set_default_ncores : int -> unit
val get_default_ncores : unit -> int
+(** {6 Getting ncores being used during parallel execution } *)
+
+val get_ncores : unit -> int
+
+(** {6 Getting the current worker rank. The master process has rank -1. Other processes
+ have the rank at which they were forked out (a worker's rank is in [0..ncores-1]) } *)
+
+val get_rank : unit -> int
+
(** {6 Sequence type, subsuming lists and arrays} *)
type 'a sequence = L of 'a list | A of 'a array;;
diff --git a/parmap.mllib b/parmap.mllib
index df08796..eb65e2d 100644
--- a/parmap.mllib
+++ b/parmap.mllib
@@ -1,3 +1,4 @@
Parmap
Bytearray
Parmap_utils
+Setcore
diff --git a/setcore.mli b/setcore.ml
similarity index 100%
rename from setcore.mli
rename to setcore.ml
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ocaml-maint/packages/parmap.git
More information about the Pkg-ocaml-maint-commits
mailing list