Hello,
I'm trying to Spawn independent processes using mpi4py. Each of these processes solve the same pde using dolfin. The Spawning code is as follows:
def mpi_map_code(func_code, x, params, procs):
""" This function applies the function in ``func_code`` to the ``x`` inputs on ``procs`` processors.
:param str func_code: String containing the marshalled version of the function
:param list x: list of inputs to be passed (pickable)
:param params: parameters to be passed to the function (pickable)
:param int procs: number of processors to be used
"""
if not MPI_SUPPORT:
raise ImportError("MPI is not supported by this system. Install mpi4py or run iteratively.")
import TensorToolbox # Do I need this? Yes, to get the folder! See next line.
path = os.path.dirname(inspect.getfile(TensorToolbox)) + "/core/"
procs = min(procs,len(x))
comm = MPI.COMM_SELF.Spawn(sys.executable,
args=[path + 'mpi_eval.py'],
maxprocs=procs)
# Broadcast function and parameters
comm.bcast((func_code, params), root=MPI.ROOT)
# Split the input data
ns = [len(x) // procs]*procs
for i in xrange(len(x) % procs): ns[i] += 1
for i in xrange(1,procs): ns[i] += ns[i-1]
ns.insert(0,0)
split_x = [ x[ns[i]:ns[i+1]] for i in xrange(0, procs) ]
# Scatter the data
comm.scatter(split_x, root=MPI.ROOT)
# Gather the results
fval = comm.gather(None,root=MPI.ROOT)
comm.Disconnect()
fval = list(itertools.chain(*fval))
return fval
The mpi_eval.py file is just getting the input parameters scattered from the root process and returning outputs that are then gathered by the root process:
if __name__ == "__main__":
comm = MPI.Comm.Get_parent()
# Get the broadcasted function and parameters
(code_string,params) = comm.bcast(None, root=0)
# De-marshal function
code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "f")
# Get scattered data
part_x = comm.scatter(None, root=0)
# Evaluate
fval = [ func(x,params) for x in part_x ]
# Gather
comm.gather(fval, root=0)
comm.Disconnect()
The code works for any marshallable function and pickable parameters. In order to prevent dolfin from using MPI for splitting the mesh, the function f is coded up as follows:
def f(x,params):
import dolfin as do
...
mpi_comm = do.mpi_comm_self()
mesh = do.UnitSquareMesh(mpi_comm,NE,NE)
...
The program runs for a while and completes all its computations of the scattered data, but then it returns this error from all the children processes:
*** The MPI_Comm_rank() function was called after MPI_FINALIZE was invoked.
*** This is disallowed by the MPI standard.
*** Your MPI job will now abort.
[nbdabi:19662] Abort after MPI_FINALIZE completed successfully; not able to guarantee that all other processes were killed!
Do you have any idea about why this happens? I guess mixing mpi4py.MPI and dolfin.MPI is the source of the problem, but how would you do it differently?