[SCM] Gerris Flow Solver branch, upstream, updated. e8f73a07832050124d2b8bf6c6f35b33180e65a8

Stephane Popinet popinet at users.sf.net
Tue Nov 24 12:24:14 UTC 2009


The following commit has been merged in the upstream branch:
commit 2d4f9371fd4fd5d59091138d56ef9bbfba52591f
Author: Stephane Popinet <popinet at users.sf.net>
Date:   Sun Jun 21 08:30:38 2009 +1000

    GfsOutputSimulation can write "joined" parallel simulation files
    
    Only for GFS and text formats.
    
    darcs-hash:20090620223038-d4795-e40dfc849c26649ea181e92700505fde44d24989.gz

diff --git a/src/output.c b/src/output.c
index 4f32599..b80fe6a 100644
--- a/src/output.c
+++ b/src/output.c
@@ -1212,10 +1212,11 @@ static void output_simulation_destroy (GtsObject * object)
   (* GTS_OBJECT_CLASS (gfs_output_simulation_class ())->parent_class->destroy) (object);
 }
 
-static void write_text (FttCell * cell, GfsOutputSimulation * output)
+static void write_text (FttCell * cell, gpointer * data)
 {
+  GfsOutputSimulation * output = data[0];  
+  FILE * fp = data[1];
   GSList * i = GFS_DOMAIN (gfs_object_simulation (output))->variables_io;
-  FILE * fp = GFS_OUTPUT (output)->file->fp;
   FttVector p;
 
   gfs_cell_cm (cell, &p);
@@ -1257,37 +1258,61 @@ static gboolean output_simulation_event (GfsEvent * event, GfsSimulation * sim)
     domain->binary =       output->binary;
     sim->output_solid   =  output->solid;
     switch (output->format) {
+
     case GFS:
-      gfs_simulation_write (sim,
-			    output->max_depth,
-			    GFS_OUTPUT (event)->file->fp);
+      if (GFS_OUTPUT (output)->parallel)
+	gfs_simulation_write (sim,
+			      output->max_depth,
+			      GFS_OUTPUT (event)->file->fp);
+      else
+	gfs_simulation_union_write (sim,
+				    output->max_depth,
+				    GFS_OUTPUT (event)->file->fp);
       break;
+
     case GFS_TEXT: {
-      FILE * fp = GFS_OUTPUT (event)->file->fp;
-      GSList * i = domain->variables_io;
-      guint nv = 4;
+      if (GFS_OUTPUT (output)->parallel || domain->pid == 0) {
+	FILE * fp = GFS_OUTPUT (event)->file->fp;
+	GSList * i = domain->variables_io;
+	guint nv = 4;
 
-      fputs ("# 1:X 2:Y 3:Z", fp);
-      while (i) {
-	g_assert (GFS_VARIABLE1 (i->data)->name);
-	fprintf (fp, " %d:%s", nv++, GFS_VARIABLE1 (i->data)->name);
-	i = i->next;
+	fputs ("# 1:X 2:Y 3:Z", fp);
+	while (i) {
+	  g_assert (GFS_VARIABLE1 (i->data)->name);
+	  fprintf (fp, " %d:%s", nv++, GFS_VARIABLE1 (i->data)->name);
+	  i = i->next;
+	}
+	fputc ('\n', fp);
+      }
+      gpointer data[2];
+      data[0] = output;
+      if (GFS_OUTPUT (output)->parallel) {
+	data[1] = GFS_OUTPUT (event)->file->fp;
+	gfs_domain_cell_traverse (domain, FTT_PRE_ORDER, FTT_TRAVERSE_LEAFS, -1,
+				  (FttCellTraverseFunc) write_text, data);
+      }
+      else {
+	FILE * fpp = gfs_union_open (GFS_OUTPUT (event)->file->fp, domain->pid);
+	data[1] = fpp;
+	gfs_domain_cell_traverse (domain, FTT_PRE_ORDER, FTT_TRAVERSE_LEAFS, -1,
+				  (FttCellTraverseFunc) write_text, data);
+	gfs_union_close (GFS_OUTPUT (event)->file->fp, domain->pid, fpp);
       }
-      fputc ('\n', fp);
-      gfs_domain_cell_traverse (domain, FTT_PRE_ORDER, FTT_TRAVERSE_LEAFS, -1,
-				(FttCellTraverseFunc) write_text, event);
       break;
     }
+
     case GFS_VTK: {
       gfs_domain_write_vtk (domain, output->max_depth, domain->variables_io, output->precision,
 			    GFS_OUTPUT (event)->file->fp);
       break;
     }
+
     case GFS_TECPLOT: {
       gfs_domain_write_tecplot (domain, output->max_depth, domain->variables_io, output->precision,
 				GFS_OUTPUT (event)->file->fp);
       break;
     }
+
     default:
       g_assert_not_reached ();
     }
diff --git a/src/simulation.c b/src/simulation.c
index 22e4e5f..3e940cf 100644
--- a/src/simulation.c
+++ b/src/simulation.c
@@ -1149,6 +1149,110 @@ void gfs_simulation_write (GfsSimulation * sim,
   domain->max_depth_write = depth;
 }
 
+#ifdef HAVE_MPI
+static void count_edges (GtsGEdge * e, guint * nedge)
+{
+  (*nedge)++;
+}
+
+static void write_node (GtsObject * node, gpointer * data)
+{
+  FILE * fp = data[0];
+  guint * nnode = data[1];
+
+  node->reserved = GUINT_TO_POINTER ((*nnode)++);
+  if (node->klass->write)
+    (* node->klass->write) (node, fp);
+  fputc ('\n', fp);
+}
+
+static void write_edge (GtsGEdge * edge, FILE * fp)
+{
+  fprintf (fp, "%u %u", 
+	   GPOINTER_TO_UINT (GTS_OBJECT (edge->n1)->reserved),
+	   GPOINTER_TO_UINT (GTS_OBJECT (edge->n2)->reserved));
+  if (GTS_OBJECT (edge)->klass->write)
+    (* GTS_OBJECT (edge)->klass->write) (GTS_OBJECT (edge), fp);
+  fputc ('\n', fp);
+}
+#endif /* HAVE_MPI */
+
+/**
+ * gfs_simulation_union_write:
+ * @sim: a #GfsSimulation.
+ * @max_depth: the maximum depth at which to stop writing cell tree
+ * data (-1 means no limit).
+ * @fp: a file pointer.
+ *
+ * Identical to gfs_simulation_write() for serial simulations. For
+ * parallel simulations writes the union of the simulations on all
+ * processes to @fp.
+ */
+void gfs_simulation_union_write (GfsSimulation * sim,
+				 gint max_depth,		  
+				 FILE * fp)
+{
+  GfsDomain * domain = GFS_DOMAIN (sim);
+
+  g_return_if_fail (sim != NULL);
+  g_return_if_fail (fp != NULL);
+
+  if (domain->pid < 0)
+    gfs_simulation_write (sim, max_depth, fp);
+  else {
+#ifdef HAVE_MPI
+    int gsize;
+    guint * nbox;
+
+    MPI_Comm_size (MPI_COMM_WORLD, &gsize);
+    nbox = g_malloc (sizeof (guint)*gsize);
+    nbox[domain->pid] = gts_container_size (GTS_CONTAINER (sim));
+    MPI_Allgather (&nbox[domain->pid], 1, MPI_UNSIGNED, nbox, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
+    /* nbox[] now contains the number of boxes on each PE */
+
+    /* see gts/src/graph.c:gts_graph_write() for the original (serial) implementation */
+    GtsGraph * g = GTS_GRAPH (sim);
+    guint nedge = 0;
+    gts_graph_foreach_edge (g, (GtsFunc) count_edges, &nedge);
+    gfs_all_reduce (domain, nedge, MPI_UNSIGNED, MPI_SUM);
+
+    if (domain->pid == 0) {
+      fprintf (fp, "# Gerris Flow Solver %dD version %s (%s)\n",
+	       FTT_DIMENSION, GFS_VERSION, GFS_BUILD_VERSION);
+      guint i, nboxes = 0;
+      for (i = 0; i < gsize; i++)
+	nboxes += nbox[i];
+      fprintf (fp, "%u %u", nboxes, nedge);
+      if (GTS_OBJECT (g)->klass->write)
+	(* GTS_OBJECT (g)->klass->write) (GTS_OBJECT (g), fp);
+      fputc ('\n', fp);
+    }
+
+    gint depth = domain->max_depth_write;
+    guint i, nnode = 1;
+    gpointer data[2];
+
+    for (i = 0; i < domain->pid; i++)
+      nnode += nbox[i];
+    g_free (nbox);
+
+    FILE * fpp = gfs_union_open (fp, domain->pid);
+    data[0] = fpp;
+    data[1] = &nnode;
+    domain->max_depth_write = max_depth;
+    gts_container_foreach (GTS_CONTAINER (g), (GtsFunc) write_node, data);
+    domain->max_depth_write = depth;
+    gfs_union_close (fp, domain->pid, fpp);
+
+    fpp = gfs_union_open (fp, domain->pid);
+    gts_graph_foreach_edge (g, (GtsFunc) write_edge, fpp);
+    gfs_union_close (fp, domain->pid, fpp);
+
+    gts_container_foreach (GTS_CONTAINER (g), (GtsFunc) gts_object_reset_reserved, NULL);
+#endif /* HAVE_MPI */
+  }
+}
+
 static gdouble min_cfl (GfsSimulation * sim)
 {
   gdouble cfl = (sim->advection_params.scheme == GFS_NONE ?
diff --git a/src/simulation.h b/src/simulation.h
index 8a5de07..a009d57 100644
--- a/src/simulation.h
+++ b/src/simulation.h
@@ -104,6 +104,9 @@ void                 gfs_simulation_init         (GfsSimulation * sim);
 void                 gfs_simulation_write        (GfsSimulation * sim,
 						  gint max_depth,  
 						  FILE * fp);
+void                 gfs_simulation_union_write  (GfsSimulation * sim,
+						  gint max_depth,  
+						  FILE * fp);
 GfsSimulation *      gfs_simulation_read         (GtsFile * fp);
 GSList *             gfs_simulation_get_solids   (GfsSimulation * sim);
 void                 gfs_simulation_refine       (GfsSimulation * sim);
diff --git a/src/utils.c b/src/utils.c
index dbc347f..abb18ee 100644
--- a/src/utils.c
+++ b/src/utils.c
@@ -19,7 +19,9 @@
 
 #include <stdlib.h>
 #include <ctype.h>
+#include <sys/stat.h>
 #include <sys/wait.h>
+#include <sys/mman.h>
 #include <unistd.h>
 #include <signal.h>
 #include <math.h>
@@ -1554,3 +1556,85 @@ void gfs_clock_destroy (GfsClock * t)
 
   g_free (t);
 }
+
+/**
+ * gfs_union_open:
+ * @fp: a file pointer.
+ * @rank: the rank of the current parallel process.
+ *
+ * Opens a "parallel" file which serialises multiple parallel (write)
+ * accesses to the file pointed to by @fp.
+ *
+ * This file must be closed with gfs_union_close().
+ *
+ * Returns: a "parallel" file pointer associated with @fp.
+ */
+FILE * gfs_union_open (FILE * fp, int rank)
+{
+  g_return_val_if_fail (fp != NULL, NULL);
+
+  if (rank <= 0) /* master */
+    return fp;
+  else { /* slaves */
+#ifdef HAVE_MPI
+    MPI_Status status;
+    int pe;
+    MPI_Recv (&pe, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, &status);
+    g_assert (rank == pe);
+#endif /* HAVE_MPI */
+    return tmpfile ();
+  }
+}
+
+/**
+ * gfs_union_close:
+ * @fp: a file pointer.
+ * @rank: the rank of the current parallel process.
+ * @fpp: a "parallel" file pointer returned by a call to gfs_union_open().
+ *
+ * Closes a "parallel" file previously opened using gfs_union_open().
+ */
+void gfs_union_close (FILE * fp, int rank, FILE * fpp)
+{
+  g_return_if_fail (fp != NULL);
+  g_return_if_fail (fpp != NULL);
+
+  if (rank == 0) { /* master */
+#ifdef HAVE_MPI
+    int pe, npe;
+    MPI_Comm_size (MPI_COMM_WORLD, &npe);
+    for (pe = 1; pe < npe; pe++) {
+      MPI_Send (&pe, 1, MPI_INT, pe, pe, MPI_COMM_WORLD);
+      MPI_Status status;
+      long length;
+      MPI_Recv (&length, 1, MPI_LONG, pe, pe, MPI_COMM_WORLD, &status);
+      /*      fprintf (stderr, "receiving %ld bytes from PE %d\n", length, pe); */
+      if (length > 0) {
+	char * buf = g_malloc (length);
+	MPI_Recv (buf, length, MPI_BYTE, pe, pe + 1, MPI_COMM_WORLD, &status);
+	int rcvcount;
+	MPI_Get_count (&status, MPI_BYTE, &rcvcount);
+	fwrite (buf, 1, rcvcount, fp);
+	g_free (buf);
+      }
+    }
+#endif /* HAVE_MPI */
+  }
+  else if (rank > 0) { /* slaves */
+#ifdef HAVE_MPI
+    int fd = fileno (fpp);
+    struct stat sb;
+    fflush (fpp);
+    g_assert (fstat (fd, &sb) != -1);
+    long length = sb.st_size;
+    MPI_Send (&length, 1, MPI_LONG, 0, rank, MPI_COMM_WORLD);
+    if (length > 0) {
+      char * buf = mmap (NULL, length, PROT_READ, MAP_PRIVATE, fd, 0);
+      g_assert (buf != MAP_FAILED);
+      MPI_Send (buf, length, MPI_BYTE, 0, rank + 1, MPI_COMM_WORLD);
+      munmap (buf, length);
+    }
+#endif /* HAVE_MPI */
+    fclose (fpp);
+  }
+}
diff --git a/src/utils.h b/src/utils.h
index 8e46189..97a2fd4 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -150,6 +150,12 @@ void               gfs_clock_stop           (GfsClock * t);
 gdouble            gfs_clock_elapsed        (GfsClock * t);
 void               gfs_clock_destroy        (GfsClock * t);
 
+FILE *             gfs_union_open           (FILE * fp, 
+					     int rank);
+void               gfs_union_close          (FILE * fp, 
+					     int rank, 
+					     FILE * fpp);
+
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */

-- 
Gerris Flow Solver



More information about the debian-science-commits mailing list