[Debian-astro-commits] [gyoto] 24/221: MPI: move all the raytrace logic into Scenery::rayTrace
Thibaut Jean-Claude Paumard
thibaut at moszumanska.debian.org
Fri May 22 20:52:30 UTC 2015
This is an automated email from the git hooks/post-receive script.
thibaut pushed a commit to branch master
in repository gyoto.
commit 2232e31bac6a76f729ff0ebe8c1207c5fd9157dc
Author: Thibaut Paumard <paumard at users.sourceforge.net>
Date: Fri Oct 10 15:45:16 2014 +0200
MPI: move all the raytrace logic into Scenery::rayTrace
---
bin/gyoto-mpi-worker.C | 136 +++--------------------------
include/GyotoScenery.h | 4 +-
lib/Scenery.C | 228 ++++++++++++++++++++++++++++++++-----------------
3 files changed, 166 insertions(+), 202 deletions(-)
diff --git a/bin/gyoto-mpi-worker.C b/bin/gyoto-mpi-worker.C
index 24fa563..a61d3de 100644
--- a/bin/gyoto-mpi-worker.C
+++ b/bin/gyoto-mpi-worker.C
@@ -37,43 +37,16 @@
using namespace std;
using namespace Gyoto;
-static char* pixfile = NULL;
-static fitsfile* fptr = NULL;
-static int status = 0;
-static long fpixel[] = {1,1,1};
-static long nelements = 0;
-static double* vect = NULL;
-static double* impactcoords=NULL;
-static SmartPointer<Astrobj::Properties> data = NULL;
-
static SmartPointer<Scenery> sc = NULL;
#ifdef HAVE_MPI
#include <boost/mpi/environment.hpp>
-#include <boost/mpi/communicator.hpp>
+#include <boost/mpi/intercommunicator.hpp>
#include <string>
#include <boost/serialization/string.hpp>
namespace mpi = boost::mpi;
#endif
-void usage() {
- cout << "Usage:" << endl <<
- " rayXML [--imin=i0 --imax=i1 --jmin=j0 --jmax=j1] input.xml output.dat" << endl;
-}
-
-void sigint_handler(int sig)
-{
- if (sig!=SIGINT) cerr << "\n********GYOTO: sigint_handler trapping signal " << sig << ", this should not happen !" << endl;
- cerr << "GYOTO: SIGINT received: saving data to " << pixfile << "... ";
- signal(SIGINT, SIG_DFL);
-
- fits_write_pix(fptr, TDOUBLE, fpixel, nelements, vect, &status);
- fits_close_file(fptr, &status);
- fits_report_error(stderr, status);
-
- cerr << "Killing self." << endl;
- kill(getpid(), SIGINT);
-}
static std::string curmsg = "";
static int curretval = 1;
@@ -101,14 +74,14 @@ int main(int argc, char** argv) {
MPI_Comm parent_c;
MPI_Comm_get_parent(&parent_c);
- mpi::communicator manager(parent_c,mpi::comm_take_ownership);
+ mpi::intercommunicator manager(parent_c,mpi::comm_take_ownership);
mpi::communicator world;
string pluglist= getenv("GYOTO_PLUGINS")?
getenv("GYOTO_PLUGINS"):
GYOTO_DEFAULT_PLUGINS;
Gyoto::Error::setHandler ( &gyotoErrorHandler );
- curmsg = "In gyoto.C: Error initializing libgyoto: ";
+ curmsg = "In gyoto-mpi-worker.C: Error initializing libgyoto: ";
curretval = 1;
Gyoto::Register::init(pluglist.c_str());
@@ -119,115 +92,30 @@ int main(int argc, char** argv) {
Scenery::mpi_tag task=Scenery::give_task;
Scenery::is_worker=true;
while (task != Scenery::terminate) {
- // std::cerr << "Worker #" << rk << " waiting for task " << endl;
manager.recv(0, Scenery::give_task, task);
- // std::cerr << "Worker #" << rk << " received task " << task << endl;
switch (task) {
case Scenery::read_scenery: {
std::string parfile;
manager.recv(0, task, parfile);
- std::cerr << "Worker #" << rk << " reading \""<<parfile<<"\""<<std::endl;
- curmsg = "In gyoto.C: Error in Factory creation: ";
+ curmsg = "In gyoto-mpi-worker.C: Error in Factory creation: ";
curretval = 1;
sc = Factory(const_cast<char*>(parfile.c_str())).getScenery();
- cerr <<"Maxiter=="<<sc->maxiter()<<endl;
- break;
- }
- case Scenery::raytrace: {
- size_t ij[2]={0, 0};
- // std::cerr << "Worker #" << rk << " receiving ij for raytracing" <<std::endl;
- manager.recv(0, task, ij, 2);
- std::cerr << "Worker #" << rk << " raytracing i="<<ij[0]<<", j="<<ij[1]<<std::endl;
-
-
- // initialize AstrobjProperties
- size_t nbnuobs=0;
- Quantity_t quantities = sc -> getRequestedQuantities();
- if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
- SmartPointer<Spectrometer::Generic> spr = sc -> screen() -> spectrometer();
- if (!spr) throwError("Spectral quantity requested but "
- "no spectrometer specified!");
- nbnuobs = spr -> nSamples();
- }
- size_t nbdata= sc->getScalarQuantitiesCount();
- size_t nelt=(nbdata+nbnuobs);
- vect = new double[nelt];
- data = new Astrobj::Properties();
- size_t offset=1;
- size_t curquant=0;
-
- if (quantities & GYOTO_QUANTITY_INTENSITY) {
- data->intensity=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_EMISSIONTIME) {
- data->time=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_MIN_DISTANCE) {
- data->distance=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_FIRST_DMIN) {
- data->first_dmin=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_REDSHIFT) {
- data->redshift=vect+offset*(curquant++);
- }
- // if ((quantities & GYOTO_QUANTITY_IMPACTCOORDS || ipct) && !ipctdims[0] ) {
- // data->impactcoords = impactcoords = new double [res*res*16];
- // ipcttime = tobs * GYOTO_C / scenery -> metric() -> unitLength();
- // }
- if (quantities & GYOTO_QUANTITY_USER1) {
- data->user1=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_USER2) {
- data->user2=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_USER3) {
- data->user3=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_USER4) {
- data->user4=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_USER5) {
- data->user5=vect+offset*(curquant++);
- }
- if (quantities & GYOTO_QUANTITY_SPECTRUM) {
- data->spectrum=vect+offset*(curquant++);
- data->offset=int(offset);
- }
- if (quantities & GYOTO_QUANTITY_BINSPECTRUM) {
- data->binspectrum=vect+offset*(curquant++);
- data->offset=int(offset);
- }
-
- data->init(nbnuobs);
- sc->screen()->computeBaseVectors();
- sc->setPropertyConverters(data);
-
- // need to do something about impactcoords
-
- (*sc)(ij[0], ij[1], data, NULL, NULL);
-
- manager.send(0, Scenery::raytrace_done, rk);
- manager.send(0, Scenery::raytrace_done, ij, 2);
- manager.send(0, Scenery::raytrace_done, nelt);
- manager.send(0, Scenery::raytrace_done, vect, nelt);
-
- delete[] vect;
- delete data;
- // std::cerr << "Worker #" << rk << " done raytracing i="<<ij[0]<<", j="<<ij[1]<<std::endl;
-
+ sc -> mpi_manager_ = &manager;
+ sc -> mpi_world_ = &world;
+ sc -> mpi_env_ = &env;
+ break;
}
+ case Scenery::raytrace:
+ curmsg = "In gyoto-mpi-worker.C: Error in ray-tracing: ";
+ curretval = 1;
+ sc -> rayTrace(0, 0, 0, 0, NULL, NULL);
break;
case Scenery::terminate:
- std::cerr << "Worker #" << rk << " terminating"<<std::endl;
-
break;
default:
std::cerr << "unknown task" << endl;
}
}
- std::cerr << "Worker #" << rk << " done, exiting" << endl;
-
return 0;
}
diff --git a/include/GyotoScenery.h b/include/GyotoScenery.h
index d003b3f..ea2017c 100644
--- a/include/GyotoScenery.h
+++ b/include/GyotoScenery.h
@@ -212,11 +212,11 @@ class Gyoto::Scenery : protected Gyoto::SmartPointee {
# endif
# ifdef HAVE_MPI
+ public:
boost::mpi::environment * mpi_env_;
boost::mpi::communicator * mpi_world_;
boost::mpi::intercommunicator * mpi_workers_;
- int mpi_nbworkers_;
- public:
+ boost::mpi::intercommunicator * mpi_manager_;
static bool is_worker;
void mpiSpawn(int nbchildren);
void mpiTerminate (bool keep_env=false);
diff --git a/lib/Scenery.C b/lib/Scenery.C
index 9ff7630..db67fe4 100644
--- a/lib/Scenery.C
+++ b/lib/Scenery.C
@@ -50,7 +50,7 @@ using namespace std;
Scenery::Scenery() :
screen_(NULL), delta_(GYOTO_DEFAULT_DELTA),
quantities_(0), ph_(), nthreads_(0),
- mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+ mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
{}
Scenery::Scenery(SmartPointer<Metric::Generic> met,
@@ -58,7 +58,7 @@ Scenery::Scenery(SmartPointer<Metric::Generic> met,
SmartPointer<Astrobj::Generic> obj) :
screen_(scr), delta_(GYOTO_DEFAULT_DELTA),
quantities_(0), ph_(), nthreads_(0),
- mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+ mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
{
metric(met);
if (screen_) screen_->metric(met);
@@ -70,7 +70,7 @@ Scenery::Scenery(const Scenery& o) :
screen_(NULL), delta_(o.delta_),
quantities_(o.quantities_), ph_(o.ph_),
nthreads_(o.nthreads_),
- mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+ mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
{
if (o.screen_()) {
screen_=o.screen_->clone();
@@ -243,73 +243,6 @@ void Scenery::rayTrace(size_t imin, size_t imax,
imax=(imax<=(npix)?imax:(npix));
jmax=(jmax<=(npix)?jmax:(npix));
- //#if 0
- if (mpi_workers_) {
- // dispatch over workers and monitor
-
- size_t ij[2]={imin, jmin};
- int working = 0;
- if (data) data->init(0);
-
- // initiate raytracing
- for (int w=0; w<mpi_workers_->remote_size(); ++w) {
- if (impactcoords) {
- mpi_workers_->send(w, give_task, Scenery::impactcoords);
- mpi_workers_->send(w, Scenery::impactcoords, impactcoords, npix*npix*8);
- } else mpi_workers_->send(w, give_task, Scenery::noimpactcoords);
- mpi_workers_ -> send(w, give_task, raytrace);
- mpi_workers_ -> send(w, raytrace, ij, 2);
- ++working;
- if (++ij[0]>imax) {
- cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
- if (++ij[1]>jmax) break;
- else ij[0]=imin;
- }
- cerr << "Manager sent data to all workers" << endl;
- }
-
- // continue
- while (working) {
- // receive one result, need to track back where it belongs and
- // store it there
- int w;
- size_t nelt;
- double * vect;
- size_t ijr[2];
- cerr << "Manager waiting for worker to send result" << endl;
- mpi_workers_ -> recv(mpi::any_source, raytrace_done, w);
- cerr << "Manager received result from worker #"<<w << endl;
-
- mpi_workers_ -> recv(w, raytrace_done, ijr, 2);
- mpi_workers_ -> recv(w, raytrace_done, nelt);
- vect=new double[nelt];
- mpi_workers_ -> recv(w, raytrace_done, vect, nelt);
-
- Astrobj::Properties *databis=data;
- databis += (ijr[1]-1)*npix+ijr[0];
- size_t offset=1;
- size_t curquant=0;
-
- if (data->intensity) data->intensity[(ijr[1]-1)*npix+ijr[0]]=*(vect+offset*(curquant++));
- //populate other quantities
-
- delete[] vect;
-
- // give new task or decrease working counter
- if (ij[0]<=imax) {
- mpi_workers_ -> send(w, give_task, raytrace);
- mpi_workers_ -> send(w, raytrace, ij, 2);
- if (++ij[0]>imax) {
- cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
- if (++ij[1]<=jmax) ij[0]=imin;
- }
- } else --working;
- }
-
- return;
- }
- //#endif
-
screen_->computeBaseVectors();
// Necessary for KS integration, computes relation between
// observer's x,y,z coord and KS X,Y,Z coord. Will be used to
@@ -325,8 +258,156 @@ void Scenery::rayTrace(size_t imin, size_t imax,
ph_ . setInitCoord(coord, -1);
// delta is reset in operator()
+ if (data) setPropertyConverters(data);
+
+ //#if 0
+ if (mpi_workers_ || Scenery::is_worker) {
+ // We are in an MPI content, either the manager or a worker.
+ // dispatch over workers and monitor
+
+ size_t ij[2]={imin, jmin};
+
+ size_t nbnuobs=0;
+ Quantity_t quantities = getRequestedQuantities();
+ if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
+ if (!spr) throwError("Spectral quantity requested but "
+ "no spectrometer specified!");
+ nbnuobs = spr -> nSamples();
+ }
+ size_t nelt= getScalarQuantitiesCount();
+ if (quantities & GYOTO_QUANTITY_SPECTRUM) nelt += nbnuobs;
+ if (quantities & GYOTO_QUANTITY_BINSPECTRUM) nelt += nbnuobs;
+ double * vect = new double[nelt];
+ Astrobj::Properties *locdata = new Astrobj::Properties();
+ size_t offset=1;
+ size_t curquant=0;
+
+ if (Scenery::is_worker) {
+ // set all converters to the trivial one, conversion is
+ // performed in the manager.
+ intensity_converter_ = NULL;
+ spectrum_converter_ = NULL;
+ binspectrum_converter_ = NULL;
+ setPropertyConverters(locdata);
+ }
+
+ if (quantities & GYOTO_QUANTITY_INTENSITY) {
+ locdata->intensity=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_EMISSIONTIME) {
+ locdata->time=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_MIN_DISTANCE) {
+ locdata->distance=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_FIRST_DMIN) {
+ locdata->first_dmin=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_REDSHIFT) {
+ locdata->redshift=vect+offset*(curquant++);
+ }
+ // if ((quantities & GYOTO_QUANTITY_IMPACTCOORDS || ipct) && !ipctdims[0] ) {
+ // locdata->impactcoords = impactcoords = new double [res*res*16];
+ // ipcttime = tobs * GYOTO_C / scenery -> metric() -> unitLength();
+ // }
+ if (quantities & GYOTO_QUANTITY_USER1) {
+ locdata->user1=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_USER2) {
+ locdata->user2=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_USER3) {
+ locdata->user3=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_USER4) {
+ locdata->user4=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_USER5) {
+ locdata->user5=vect+offset*(curquant++);
+ }
+ if (quantities & GYOTO_QUANTITY_SPECTRUM) {
+ locdata->spectrum=vect+offset*(curquant++);
+ locdata->offset=int(offset);
+ }
+ if (quantities & GYOTO_QUANTITY_BINSPECTRUM) {
+ locdata->binspectrum=vect+offset*(curquant++);
+ locdata->offset=int(offset);
+ }
+
+ mpi::status s;
+ if (mpi_workers_) { // We are the manager
+ int working = mpi_workers_->remote_size();
+ // First tell the workers to join our task force
+ // The corresponding recv is in gyoto-scenery-worker.c
+ for (int w=0; w<working; ++w)
+ mpi_workers_->send(w, give_task, raytrace);
+
+ size_t *ijr = new size_t[working*2];
+
+ while (working) {
+ // receive one result, need to track back where it belongs and
+ // store it there
+ int w;
+
+ // Wait for worker to ask for task.
+ // tag may be raytrace_done if worker has result to report,
+ // give_task if worker has no data yet.
+ s = mpi_workers_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
+
+ w = s.source();
+
+ if (s.tag()==Scenery::raytrace_done) {
+ size_t cell=(ijr[2*w+1]-1)*npix+ijr[2*w]-1;
+ if (data->intensity) {
+ // Perform conversion here
+ data->intensity[cell]=data->intensity_converter_?
+ (*data->intensity_converter_)(*locdata->intensity):
+ *locdata->intensity;
+ }
+ //populate other quantities
+ }
+
+ // give new task or decrease working counter
+ if (ij[0]<=imax) {
+ //should send also impactcoords
+ ijr[2*w]=ij[0];
+ ijr[2*w+1]=ij[1];
+ mpi_workers_ -> send(w, raytrace, ij, 2);
+ if (++ij[0]>imax) {
+ cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
+ if (++ij[1]<=jmax) ij[0]=imin;
+ }
+ } else {
+ mpi_workers_ -> send(w, raytrace_done, ij, 2);
+ --working;
+ }
+ }
+ delete [] ijr;
+ } else {
+ // we are a worker
+ // First send dummy result, using tag "give_tag".
+ // Manager will ignore the results and send first coordinates.
+ mpi_manager_->send(0, give_task, vect, nelt);
+ while (true) {
+ // Receive new coordinates to work on.
+ // Should also receive impactcoords.
+ s = mpi_manager_->recv(0, mpi::any_tag, ij, 2);
+ if (s.tag()==raytrace_done) {
+ break;
+ }
+ locdata->init(nbnuobs);
+ (*this)(ij[0], ij[1], locdata, impactcoords, &ph_);
+ // send result
+ mpi_manager_->send(0, raytrace_done, vect, nelt);
+ }
+ }
+ delete locdata;
+ delete [] vect;
+ return;
+ }
+ //#endif
+
if (data) {
- setPropertyConverters(data);
size_t first_index=(jmin-1)*npix + imin -1;
(*data) += first_index;
if (impactcoords) impactcoords += first_index * 16;
@@ -725,17 +806,13 @@ void Gyoto::Scenery::mpiSpawn(int nbchildren) {
MPI_ERRCODES_IGNORE);
mpi_workers_ = new mpi::intercommunicator (children_c, mpi::comm_take_ownership);
- cerr<<"mpi_workers_->remote_size()=="<<mpi_workers_->remote_size()<< endl;
int size;
MPI_Comm_remote_size(children_c, &size);
- cerr<<"MPI_Comm_size(children_c)=="<<size<<endl;
}
void Gyoto::Scenery::mpiTerminate(bool keep_env) {
- cerr << "Manager terminating workers"<< endl;
if (mpi_workers_) {
for (int i=0; i < mpi_workers_->remote_size(); ++i) {
- cerr << "Manager killing worker #"<< i <<endl;
mpi_workers_->send(i, give_task, terminate);
}
delete mpi_workers_;
@@ -750,7 +827,6 @@ void Gyoto::Scenery::mpiClone()
std::string tmpfile(tmpfile_c);
Gyoto::Factory(this).write(tmpfile_c);
int errcode;
- cerr<<"mpi_workers_->remote_size()=="<<mpi_workers_->remote_size()<< endl;
for (int i=0; i < mpi_workers_->remote_size(); ++i) {
//mpi_workers_->recv(i, ready, errcode);
mpi_workers_->send(i, give_task, read_scenery);
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-astro/packages/gyoto.git
More information about the Debian-astro-commits
mailing list