#include <iostream> #include "MpiBodyComm.hpp" namespace nbody { using namespace std; MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) { //keep pointer to datatype for send/recv this->datatype = bodyDatatype; this->request = MPI::REQUEST_NULL; } MpiBodyComm::~MpiBodyComm() { /* if (this->request != MPI::REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); //TODO: check why request cannot be freed //this->request.Free(); } */ } void MpiBodyComm::cleanup() { if (this->request != MPI::REQUEST_NULL) { this->request.Wait(); //this->request.Free(); } } void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *this->datatype, target, 0); } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int currentSource; int currentSize; //check for message beforehand to get number of bodies transmitted //check for actual source because input parameter could be MPI::ANY_SOURCE MPI::COMM_WORLD.Probe(source, 0, status); currentSource = status.Get_source(); currentSize = status.Get_count(*this->datatype); //allocate correct vector size for incoming data bodies.resize(currentSize); MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0); } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } /* if (this->request != MPI::REQUEST_NULL) { //TODO: check if works for reuse of a MPIBodyComm this->request.Free(); } */ this->buffer.resize(bodies.size()); std::copy(bodies.begin(), bodies.end(), this->buffer.begin()); this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), *this->datatype, target, 0); return true; } bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) { MPI::Status status; int currentSource; int currentSize; //check for message beforehand to get number of bodies transmitted //check for actual source because input parameter could be MPI::ANY_SOURCE if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return false; } currentSource = status.Get_source(); currentSize = status.Get_count(*this->datatype); //allocate correct vector size for incoming data bodies.resize(currentSize); MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0); return true; } }