#include #include "MpiBodyComm.hpp" namespace nbody { using namespace std; MpiBodyComm::MpiBodyComm(MPI_Datatype* bodyDatatype) { //keep pointer to datatype for send/recv this->datatype = bodyDatatype; this->request = MPI_REQUEST_NULL; } MpiBodyComm::~MpiBodyComm() { /* if (this->request != MPI_REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); //TODO: check why request cannot be freed //this->request.Free(); } */ } void MpiBodyComm::cleanup() { if (this->request != MPI_REQUEST_NULL) { MPI_Status status; MPI_Wait(&this->request, &status); //this->request.Free(); } } void MpiBodyComm::sendBlocking(int target, vector bodies) { MPI_Send(&bodies[0], bodies.size(), *this->datatype, target, 0, MPI_COMM_WORLD); } void MpiBodyComm::recvBlocking(int source, vector& bodies) { MPI_Status status; int currentSource; int currentSize; //check for message beforehand to get number of bodies transmitted //check for actual source because input parameter could be MPI_ANY_SOURCE MPI_Probe(source, 0, MPI_COMM_WORLD, &status); currentSource = status.MPI_SOURCE; MPI_Get_count(&status, *this->datatype, ¤tSize); //allocate correct vector size for incoming data bodies.resize(currentSize); MPI_Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0, MPI_COMM_WORLD, &status); } bool MpiBodyComm::sendUnblocking(int target, vector bodies) { if (this->request != MPI_REQUEST_NULL) { MPI_Status status; int completed; MPI_Test(&this->request, &completed, &status); if (!completed) { //unblocking send still not finished return false; } } /* if (this->request != MPI_REQUEST_NULL) { //TODO: check if works for reuse of a MPIBodyComm this->request.Free(); } */ this->buffer.resize(bodies.size()); std::copy(bodies.begin(), bodies.end(), this->buffer.begin()); MPI_Isend(&this->buffer[0], this->buffer.size(), *this->datatype, target, 0, MPI_COMM_WORLD, &this->request); return true; } bool MpiBodyComm::recvUnblocking(int source, vector& bodies) { MPI_Status status; int currentSource; int currentSize; int ready; //check for message beforehand to get number of bodies transmitted //check for actual source because input parameter could be MPI_ANY_SOURCE MPI_Iprobe(source, 0, MPI_COMM_WORLD, &ready, &status); if (!ready) { return false; } currentSource = status.MPI_SOURCE; MPI_Get_count(&status, *this->datatype, ¤tSize); //allocate correct vector size for incoming data bodies.resize(currentSize); MPI_Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0, MPI_COMM_WORLD, &status); return true; } }