#include <iostream> #include "MpiBodyComm.hpp" namespace nbody { using namespace std; MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) { //keep pointer to datatype for send/recv this->datatype = bodyDatatype; this->request = MPI::REQUEST_NULL; } MpiBodyComm::~MpiBodyComm() { if (this->request != MPI::REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); //TODO: check why request cannot be freed //this->request.Free(); } } void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *this->datatype, target, 0); } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int currentSource; int currentSize; //check for message beforehand to get number of bodies transmitted //check for actual source because input parameter could be MPI::ANY_SOURCE MPI::COMM_WORLD.Probe(source, 0, status); currentSource = status.Get_source(); currentSize = status.Get_count(*this->datatype); //allocate correct vector size for incoming data bodies.resize(currentSize); MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0); } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } if (this->request != MPI::REQUEST_NULL) { this->request.Free(); } this->buffer.resize(bodies.size()); std::copy(bodies.begin(), bodies.end(), this->buffer.begin()); this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), *this->datatype, target, 0); return true; } /* void MpiBodyComm::setupDatatype(vector<Body> bodies) { //re-setup data type for current input data's memory locations if (this->commBodyType != MPI::DATATYPE_NULL) { this->commBodyType.Free(); } this->displacements[0] = MPI::Get_address(bodies[0].position); this->displacements[1] = MPI::Get_address(bodies[0].velocity); this->displacements[2] = MPI::Get_address(&(bodies[0].mass)); this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); this->commBodyType.Commit(); } int MpiBodyComm::getReceivedNumberBodies(int source) { MPI::Datatype btype = MPI::DATATYPE_NULL; MPI::Aint displ[3]; Body templateBody; MPI::Status status; if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return 0; } displ[0] = MPI::Get_address(templateBody.position); displ[1] = MPI::Get_address(templateBody.velocity); displ[2] = MPI::Get_address(&(templateBody.mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); int result = status.Get_count(btype); btype.Free(); return result; } bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } this->setupDatatype(bodies); MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0); return true; } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } //need buffering; method will return before send is finished this->buffer = bodies; this->setupDatatype(this->buffer); this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0); return true; } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int count; bodies.clear(); while ((count = this->getReceivedNumberBodies(source)) == 0); //wait for incoming message to determine number of bodies received //while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); //get number of bodies to reserve memory //count = status.Get_count(this->commBodyType); cout << "RECVED " << count << endl; bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) { MPI::Status status; int count; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } bool MpiBodyComm::sendReady() { //check if the last non-blocking send on this object is out of the door return this->request == MPI::REQUEST_NULL || this->request.Test(); } bool MpiBodyComm::recvReady(int source) { //check if there has been data received from a specific source or MPI::ANY_SOURCE return MPI::COMM_WORLD.Iprobe(source, 0); } */ }