This is a read only copy of the old FEniCS QA forum. Please visit the new QA forum to ask questions

Spawning independent dolfin processes with mpi4py

+2 votes

Hello,

I'm trying to Spawn independent processes using mpi4py. Each of these processes solve the same pde using dolfin. The Spawning code is as follows:

def mpi_map_code(func_code, x, params, procs):
""" This function applies the function in ``func_code`` to the ``x`` inputs on ``procs`` processors.

:param str func_code: String containing the marshalled version of the function
:param list x: list of inputs to be passed (pickable)
:param params: parameters to be passed to the function (pickable)
:param int procs: number of processors to be used
"""

if not MPI_SUPPORT:
    raise ImportError("MPI is not supported by this system. Install mpi4py or run iteratively.")
import TensorToolbox        # Do I need this? Yes, to get the folder! See next line.
path = os.path.dirname(inspect.getfile(TensorToolbox)) + "/core/"

procs = min(procs,len(x))

comm = MPI.COMM_SELF.Spawn(sys.executable,
                           args=[path + 'mpi_eval.py'],
                           maxprocs=procs)

# Broadcast function and parameters
comm.bcast((func_code, params), root=MPI.ROOT)

# Split the input data
ns = [len(x) // procs]*procs
for i in xrange(len(x) % procs): ns[i] += 1
for i in xrange(1,procs): ns[i] += ns[i-1]
ns.insert(0,0)
split_x = [ x[ns[i]:ns[i+1]] for i in xrange(0, procs) ]

# Scatter the data
comm.scatter(split_x, root=MPI.ROOT)

# Gather the results
fval = comm.gather(None,root=MPI.ROOT)

comm.Disconnect()

fval = list(itertools.chain(*fval))

return fval

The mpi_eval.py file is just getting the input parameters scattered from the root process and returning outputs that are then gathered by the root process:

if __name__ == "__main__":
comm = MPI.Comm.Get_parent()

# Get the broadcasted function and parameters
(code_string,params) = comm.bcast(None, root=0)

# De-marshal function
code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "f")

# Get scattered data
part_x = comm.scatter(None, root=0)

# Evaluate
fval = [ func(x,params) for x in part_x ]

# Gather
comm.gather(fval, root=0)

comm.Disconnect()

The code works for any marshallable function and pickable parameters. In order to prevent dolfin from using MPI for splitting the mesh, the function f is coded up as follows:

def f(x,params):
   import dolfin as do

   ...

   mpi_comm = do.mpi_comm_self()
   mesh = do.UnitSquareMesh(mpi_comm,NE,NE)

   ...

The program runs for a while and completes all its computations of the scattered data, but then it returns this error from all the children processes:

*** The MPI_Comm_rank() function was called after MPI_FINALIZE was invoked.
*** This is disallowed by the MPI standard.
*** Your MPI job will now abort.
[nbdabi:19662] Abort after MPI_FINALIZE completed successfully; not able to guarantee that all other processes were killed!

Do you have any idea about why this happens? I guess mixing mpi4py.MPI and dolfin.MPI is the source of the problem, but how would you do it differently?

asked Jun 6, 2014 by daniele.bigoni FEniCS Novice (190 points)
edited Jun 6, 2014 by daniele.bigoni

1 Answer

+1 vote
 
Best answer

You need to compile dolfin with petsc4py. Then:

comm = petsc4py.MPI.Comm(mpi4py.MPI.COMM_WORLD)
mesh = UnitSquareMesh(comm, 4, 4)
answered Jun 7, 2014 by johanhake FEniCS Expert (22,480 points)
selected Jun 7, 2014 by daniele.bigoni

Hi Johan, thanks for the hint. It works like a charm.
I think the solution should be along the line of:

import dolfin as do
from petsc4py import PETSc
from mpi4py import MPI
comm = PETSc.Comm(MPI.COMM_SELF)
mesh = do.UnitSquareMesh(comm,NE,NE)

Was the COMM_WORLD a typo?

Yes, your solution looks correct. I just used COMM_WORLD as illustration.

...