[Debian-astro-commits] [gyoto] 24/221: MPI: move all the raytrace logic into Scenery::rayTrace

Thibaut Jean-Claude Paumard thibaut at moszumanska.debian.org
Fri May 22 20:52:30 UTC 2015


This is an automated email from the git hooks/post-receive script.

thibaut pushed a commit to branch master
in repository gyoto.

commit 2232e31bac6a76f729ff0ebe8c1207c5fd9157dc
Author: Thibaut Paumard <paumard at users.sourceforge.net>
Date:   Fri Oct 10 15:45:16 2014 +0200

    MPI: move all the raytrace logic into Scenery::rayTrace
---
 bin/gyoto-mpi-worker.C | 136 +++--------------------------
 include/GyotoScenery.h |   4 +-
 lib/Scenery.C          | 228 ++++++++++++++++++++++++++++++++-----------------
 3 files changed, 166 insertions(+), 202 deletions(-)

diff --git a/bin/gyoto-mpi-worker.C b/bin/gyoto-mpi-worker.C
index 24fa563..a61d3de 100644
--- a/bin/gyoto-mpi-worker.C
+++ b/bin/gyoto-mpi-worker.C
@@ -37,43 +37,16 @@
 using namespace std;
 using namespace Gyoto;
 
-static char*     pixfile   = NULL;
-static fitsfile* fptr      = NULL;
-static int       status    = 0;
-static long      fpixel[]  = {1,1,1};
-static long      nelements = 0;
-static double*   vect      = NULL;
-static double*   impactcoords=NULL;
-static SmartPointer<Astrobj::Properties> data = NULL;
-
 static SmartPointer<Scenery> sc = NULL;
 
 #ifdef HAVE_MPI
 #include <boost/mpi/environment.hpp>
-#include <boost/mpi/communicator.hpp>
+#include <boost/mpi/intercommunicator.hpp>
 #include <string>
 #include <boost/serialization/string.hpp>
 namespace mpi = boost::mpi;
 #endif
 
-void usage() {
-  cout << "Usage:" << endl <<
-    "    rayXML [--imin=i0 --imax=i1 --jmin=j0 --jmax=j1] input.xml output.dat" << endl;
-}
-
-void sigint_handler(int sig)
-{
-  if (sig!=SIGINT) cerr << "\n********GYOTO: sigint_handler trapping signal " << sig << ", this should not happen !" << endl;
-  cerr << "GYOTO: SIGINT received: saving data to " << pixfile << "... ";
-  signal(SIGINT, SIG_DFL);
-
-  fits_write_pix(fptr, TDOUBLE, fpixel, nelements, vect, &status);
-  fits_close_file(fptr, &status);
-  fits_report_error(stderr, status);
-
-  cerr << "Killing self." << endl;
-  kill(getpid(), SIGINT);
-}
 
 static std::string curmsg = "";
 static int curretval = 1;
@@ -101,14 +74,14 @@ int main(int argc, char** argv) {
   MPI_Comm parent_c;
   MPI_Comm_get_parent(&parent_c);
 
-  mpi::communicator manager(parent_c,mpi::comm_take_ownership);
+  mpi::intercommunicator manager(parent_c,mpi::comm_take_ownership);
   mpi::communicator world;
 
   string pluglist= getenv("GYOTO_PLUGINS")?
     getenv("GYOTO_PLUGINS"):
     GYOTO_DEFAULT_PLUGINS;
   Gyoto::Error::setHandler ( &gyotoErrorHandler );
-  curmsg = "In gyoto.C: Error initializing libgyoto: ";
+  curmsg = "In gyoto-mpi-worker.C: Error initializing libgyoto: ";
   curretval = 1;
   Gyoto::Register::init(pluglist.c_str());
 
@@ -119,115 +92,30 @@ int main(int argc, char** argv) {
   Scenery::mpi_tag task=Scenery::give_task;
   Scenery::is_worker=true;
   while (task != Scenery::terminate) {
-    //    std::cerr << "Worker #" << rk << " waiting for task " << endl;
     manager.recv(0, Scenery::give_task, task);
-    //    std::cerr << "Worker #" << rk << " received task " << task << endl;
     switch (task) {
     case Scenery::read_scenery: {
       std::string parfile;
       manager.recv(0, task, parfile);
-      std::cerr << "Worker #" << rk << " reading \""<<parfile<<"\""<<std::endl;
-      curmsg = "In gyoto.C: Error in Factory creation: ";
+      curmsg = "In gyoto-mpi-worker.C: Error in Factory creation: ";
       curretval = 1;
       sc = Factory(const_cast<char*>(parfile.c_str())).getScenery();
-      cerr <<"Maxiter=="<<sc->maxiter()<<endl;
-      break;
-    }
-    case Scenery::raytrace: {
-      size_t ij[2]={0, 0};
-      //      std::cerr << "Worker #" << rk << " receiving ij for raytracing" <<std::endl;
-      manager.recv(0, task, ij, 2);
-      std::cerr << "Worker #" << rk << " raytracing i="<<ij[0]<<", j="<<ij[1]<<std::endl;
-
-
-      // initialize AstrobjProperties
-      size_t nbnuobs=0;
-      Quantity_t quantities = sc -> getRequestedQuantities();
-      if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
-	SmartPointer<Spectrometer::Generic> spr = sc -> screen() -> spectrometer();
-	if (!spr) throwError("Spectral quantity requested but "
-			     "no spectrometer specified!");
-	nbnuobs = spr -> nSamples();
-      }
-      size_t nbdata= sc->getScalarQuantitiesCount();
-      size_t nelt=(nbdata+nbnuobs);
-      vect = new double[nelt];
-      data = new Astrobj::Properties();
-      size_t offset=1;
-      size_t curquant=0;
-
-      if (quantities & GYOTO_QUANTITY_INTENSITY) {
-	data->intensity=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_EMISSIONTIME) {
-	data->time=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_MIN_DISTANCE) {
-	data->distance=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_FIRST_DMIN) {
-	data->first_dmin=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_REDSHIFT) {
-	data->redshift=vect+offset*(curquant++);
-      }
-      // if ((quantities & GYOTO_QUANTITY_IMPACTCOORDS || ipct) && !ipctdims[0] ) {
-      // 	data->impactcoords = impactcoords = new double [res*res*16];
-      // 	ipcttime = tobs * GYOTO_C / scenery -> metric() -> unitLength();
-      // }
-      if (quantities & GYOTO_QUANTITY_USER1) {
-	data->user1=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_USER2) {
-	data->user2=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_USER3) {
-	data->user3=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_USER4) {
-	data->user4=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_USER5) {
-	data->user5=vect+offset*(curquant++);
-      }
-      if (quantities & GYOTO_QUANTITY_SPECTRUM) {
-	data->spectrum=vect+offset*(curquant++);
-	data->offset=int(offset);
-      }
-      if (quantities & GYOTO_QUANTITY_BINSPECTRUM) {
-	data->binspectrum=vect+offset*(curquant++);
-	data->offset=int(offset);
-      }
-
-      data->init(nbnuobs);
-      sc->screen()->computeBaseVectors();
-      sc->setPropertyConverters(data);
-
-      // need to do something about impactcoords
-
-      (*sc)(ij[0], ij[1], data, NULL, NULL);
-
-      manager.send(0, Scenery::raytrace_done, rk);
-      manager.send(0, Scenery::raytrace_done, ij, 2);
-      manager.send(0, Scenery::raytrace_done, nelt);
-      manager.send(0, Scenery::raytrace_done, vect, nelt);
-
-      delete[] vect;
-      delete data;
-      //      std::cerr << "Worker #" << rk << " done raytracing i="<<ij[0]<<", j="<<ij[1]<<std::endl;
-
+      sc -> mpi_manager_ = &manager;
+      sc -> mpi_world_ = &world;
+      sc -> mpi_env_ = &env;
+     break;
     }
+    case Scenery::raytrace:
+      curmsg = "In gyoto-mpi-worker.C: Error in ray-tracing: ";
+      curretval = 1;
+      sc -> rayTrace(0, 0, 0, 0, NULL, NULL);
       break;
     case Scenery::terminate:
-      std::cerr << "Worker #" << rk << " terminating"<<std::endl;
-
       break;
     default:
       std::cerr << "unknown task" << endl;
     }
   }
 
-  std::cerr << "Worker #" << rk << " done, exiting" << endl;
-
   return 0;
 }
diff --git a/include/GyotoScenery.h b/include/GyotoScenery.h
index d003b3f..ea2017c 100644
--- a/include/GyotoScenery.h
+++ b/include/GyotoScenery.h
@@ -212,11 +212,11 @@ class Gyoto::Scenery : protected Gyoto::SmartPointee {
 # endif
 
 # ifdef HAVE_MPI
+ public:
   boost::mpi::environment * mpi_env_;
   boost::mpi::communicator * mpi_world_;
   boost::mpi::intercommunicator * mpi_workers_;
-  int mpi_nbworkers_;
- public:
+  boost::mpi::intercommunicator * mpi_manager_;
   static bool is_worker;
   void mpiSpawn(int nbchildren);
   void mpiTerminate (bool keep_env=false);
diff --git a/lib/Scenery.C b/lib/Scenery.C
index 9ff7630..db67fe4 100644
--- a/lib/Scenery.C
+++ b/lib/Scenery.C
@@ -50,7 +50,7 @@ using namespace std;
 Scenery::Scenery() :
   screen_(NULL), delta_(GYOTO_DEFAULT_DELTA),
   quantities_(0), ph_(), nthreads_(0),
-  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
 {}
 
 Scenery::Scenery(SmartPointer<Metric::Generic> met,
@@ -58,7 +58,7 @@ Scenery::Scenery(SmartPointer<Metric::Generic> met,
 		 SmartPointer<Astrobj::Generic> obj) :
   screen_(scr), delta_(GYOTO_DEFAULT_DELTA),
   quantities_(0), ph_(), nthreads_(0),
-  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
 {
   metric(met);
   if (screen_) screen_->metric(met);
@@ -70,7 +70,7 @@ Scenery::Scenery(const Scenery& o) :
   screen_(NULL), delta_(o.delta_), 
   quantities_(o.quantities_), ph_(o.ph_), 
   nthreads_(o.nthreads_),
-  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL)
+  mpi_env_(NULL), mpi_world_(NULL), mpi_workers_(NULL), mpi_manager_(NULL)
 {
   if (o.screen_()) {
     screen_=o.screen_->clone();
@@ -243,73 +243,6 @@ void Scenery::rayTrace(size_t imin, size_t imax,
   imax=(imax<=(npix)?imax:(npix));
   jmax=(jmax<=(npix)?jmax:(npix));
 
-  //#if 0
-  if (mpi_workers_) {
-    // dispatch over workers and monitor
-
-    size_t ij[2]={imin, jmin};
-    int working = 0;
-    if (data) data->init(0);
-
-    // initiate raytracing
-    for (int w=0; w<mpi_workers_->remote_size(); ++w) {
-      if (impactcoords) {
-	mpi_workers_->send(w, give_task, Scenery::impactcoords);
-	mpi_workers_->send(w, Scenery::impactcoords, impactcoords, npix*npix*8);
-      } else mpi_workers_->send(w, give_task, Scenery::noimpactcoords);
-      mpi_workers_ -> send(w, give_task, raytrace);
-      mpi_workers_ -> send(w, raytrace, ij, 2);
-      ++working;
-      if (++ij[0]>imax) {
-	cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
-	if (++ij[1]>jmax) break;
-	else ij[0]=imin;
-      }
-      cerr << "Manager sent data to all workers" << endl;
-    }
-
-    // continue
-    while (working) {
-      // receive one result, need to track back where it belongs and
-      // store it there
-      int w;
-      size_t nelt;
-      double * vect;
-      size_t ijr[2];
-      cerr << "Manager waiting for worker to send result" << endl;
-      mpi_workers_ -> recv(mpi::any_source, raytrace_done, w);
-      cerr << "Manager received result from worker #"<<w << endl;
-
-      mpi_workers_ -> recv(w, raytrace_done, ijr, 2);
-      mpi_workers_ -> recv(w, raytrace_done, nelt);
-      vect=new double[nelt];
-      mpi_workers_ -> recv(w, raytrace_done, vect, nelt);
-
-      Astrobj::Properties *databis=data;
-      databis += (ijr[1]-1)*npix+ijr[0];
-      size_t offset=1;
-      size_t curquant=0;
-   
-      if (data->intensity) data->intensity[(ijr[1]-1)*npix+ijr[0]]=*(vect+offset*(curquant++));
-      //populate other quantities
-
-      delete[] vect;
-
-      // give new task or decrease working counter
-      if (ij[0]<=imax) {
-	mpi_workers_ -> send(w, give_task, raytrace);
-	mpi_workers_ -> send(w, raytrace, ij, 2);
-	if (++ij[0]>imax) {
-	  cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
-	  if (++ij[1]<=jmax) ij[0]=imin;
-	}
-      } else --working;
-    }
-
-    return;
-  }
-  //#endif
-
   screen_->computeBaseVectors();
          // Necessary for KS integration, computes relation between
          // observer's x,y,z coord and KS X,Y,Z coord. Will be used to
@@ -325,8 +258,156 @@ void Scenery::rayTrace(size_t imin, size_t imax,
   ph_ . setInitCoord(coord, -1);
   // delta is reset in operator()
 
+  if (data) setPropertyConverters(data);
+
+  //#if 0
+  if (mpi_workers_ || Scenery::is_worker) {
+    // We are in an MPI content, either the manager or a worker.
+    // dispatch over workers and monitor
+
+    size_t ij[2]={imin, jmin};
+
+    size_t nbnuobs=0;
+    Quantity_t quantities = getRequestedQuantities();
+    if (quantities & (GYOTO_QUANTITY_SPECTRUM | GYOTO_QUANTITY_BINSPECTRUM)) {
+      if (!spr) throwError("Spectral quantity requested but "
+			     "no spectrometer specified!");
+      nbnuobs = spr -> nSamples();
+    }
+    size_t nelt= getScalarQuantitiesCount();
+    if (quantities & GYOTO_QUANTITY_SPECTRUM)    nelt += nbnuobs;
+    if (quantities & GYOTO_QUANTITY_BINSPECTRUM) nelt += nbnuobs;
+    double * vect = new double[nelt];
+    Astrobj::Properties *locdata = new Astrobj::Properties();
+    size_t offset=1;
+    size_t curquant=0;
+
+    if (Scenery::is_worker) {
+      // set all converters to the trivial one, conversion is
+      // performed in the manager.
+      intensity_converter_ = NULL;
+      spectrum_converter_ = NULL;
+      binspectrum_converter_ = NULL;
+      setPropertyConverters(locdata);
+    }
+
+    if (quantities & GYOTO_QUANTITY_INTENSITY) {
+      locdata->intensity=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_EMISSIONTIME) {
+      locdata->time=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_MIN_DISTANCE) {
+      locdata->distance=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_FIRST_DMIN) {
+      locdata->first_dmin=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_REDSHIFT) {
+      locdata->redshift=vect+offset*(curquant++);
+    }
+    // if ((quantities & GYOTO_QUANTITY_IMPACTCOORDS || ipct) && !ipctdims[0] ) {
+    // 	locdata->impactcoords = impactcoords = new double [res*res*16];
+    // 	ipcttime = tobs * GYOTO_C / scenery -> metric() -> unitLength();
+    // }
+    if (quantities & GYOTO_QUANTITY_USER1) {
+      locdata->user1=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_USER2) {
+      locdata->user2=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_USER3) {
+      locdata->user3=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_USER4) {
+      locdata->user4=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_USER5) {
+      locdata->user5=vect+offset*(curquant++);
+    }
+    if (quantities & GYOTO_QUANTITY_SPECTRUM) {
+      locdata->spectrum=vect+offset*(curquant++);
+      locdata->offset=int(offset);
+    }
+    if (quantities & GYOTO_QUANTITY_BINSPECTRUM) {
+      locdata->binspectrum=vect+offset*(curquant++);
+      locdata->offset=int(offset);
+    }
+
+    mpi::status s;
+    if (mpi_workers_) { // We are the manager
+      int working = mpi_workers_->remote_size();
+      // First tell the workers to join our task force
+      // The corresponding recv is in gyoto-scenery-worker.c
+      for (int w=0; w<working; ++w)
+	mpi_workers_->send(w, give_task, raytrace);
+
+      size_t *ijr = new size_t[working*2];
+
+      while (working) {
+	// receive one result, need to track back where it belongs and
+	// store it there
+	int w;
+
+	// Wait for worker to ask for task.
+	// tag may be raytrace_done if worker has result to report, 
+	// give_task if worker has no data yet.
+	s = mpi_workers_ -> recv(mpi::any_source, mpi::any_tag, vect, nelt);
+
+	w = s.source();
+	
+	if (s.tag()==Scenery::raytrace_done) {
+	  size_t cell=(ijr[2*w+1]-1)*npix+ijr[2*w]-1;
+	  if (data->intensity) {
+	    // Perform conversion here
+	    data->intensity[cell]=data->intensity_converter_?
+	      (*data->intensity_converter_)(*locdata->intensity):
+	      *locdata->intensity;
+	  }
+	  //populate other quantities
+	}
+
+	// give new task or decrease working counter
+	if (ij[0]<=imax) {
+	  //should send also impactcoords
+	  ijr[2*w]=ij[0];
+	  ijr[2*w+1]=ij[1];
+	  mpi_workers_ -> send(w, raytrace, ij, 2);
+	  if (++ij[0]>imax) {
+	    cout << "\rj = " << ij[1] << " / " << jmax << " " << flush;
+	    if (++ij[1]<=jmax) ij[0]=imin;
+	  }
+	} else {
+	  mpi_workers_ -> send(w, raytrace_done, ij, 2);
+	  --working;
+	}
+      }
+      delete [] ijr;
+    } else {
+      // we are a worker
+      // First send dummy result, using tag "give_tag".
+      // Manager will ignore the results and send first coordinates.
+      mpi_manager_->send(0, give_task, vect, nelt);
+      while (true) {
+	// Receive new coordinates to work on.
+	// Should also receive impactcoords.
+	s = mpi_manager_->recv(0, mpi::any_tag, ij, 2);
+	if (s.tag()==raytrace_done) {
+	  break;
+	}
+	locdata->init(nbnuobs);
+	(*this)(ij[0], ij[1], locdata, impactcoords, &ph_);
+	// send result
+	mpi_manager_->send(0, raytrace_done, vect, nelt);
+      }
+    }
+    delete locdata;
+    delete [] vect;
+    return;
+  }
+  //#endif
+
   if (data) {
-    setPropertyConverters(data);
     size_t first_index=(jmin-1)*npix + imin -1;
     (*data) += first_index;
     if (impactcoords) impactcoords += first_index * 16;
@@ -725,17 +806,13 @@ void Gyoto::Scenery::mpiSpawn(int nbchildren) {
                  MPI_ERRCODES_IGNORE);
 
   mpi_workers_ = new mpi::intercommunicator (children_c, mpi::comm_take_ownership); 
-  cerr<<"mpi_workers_->remote_size()=="<<mpi_workers_->remote_size()<< endl;
   int size;
   MPI_Comm_remote_size(children_c, &size);
-  cerr<<"MPI_Comm_size(children_c)=="<<size<<endl;
 }
 
 void Gyoto::Scenery::mpiTerminate(bool keep_env) {
-  cerr << "Manager terminating workers"<< endl; 
   if (mpi_workers_) {
     for (int i=0; i < mpi_workers_->remote_size(); ++i) {
-      cerr << "Manager killing worker #"<< i <<endl; 
       mpi_workers_->send(i, give_task, terminate);
     }
     delete mpi_workers_;
@@ -750,7 +827,6 @@ void Gyoto::Scenery::mpiClone()
   std::string tmpfile(tmpfile_c);
   Gyoto::Factory(this).write(tmpfile_c);
   int errcode;
-  cerr<<"mpi_workers_->remote_size()=="<<mpi_workers_->remote_size()<< endl;
   for (int i=0; i < mpi_workers_->remote_size(); ++i) {
     //mpi_workers_->recv(i, ready, errcode);
     mpi_workers_->send(i, give_task, read_scenery);

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-astro/packages/gyoto.git



More information about the Debian-astro-commits mailing list