#include "MpiBodyComm.hpp" namespace nbody { MpiBodyComm::MpiBodyComm() { //setup initial data type and initialize request for tracking status of unblocking send this->buffer.push_back(Body()); this->buffer.back().mass = 0.0; this->datatypes[0] = MPI::DOUBLE; this->datatypes[1] = MPI::DOUBLE; this->datatypes[2] = MPI::DOUBLE; this->blocklengths[0] = 3; this->blocklengths[1] = 3; this->blocklengths[2] = 1; this->displacements[0] = MPI::Get_address(&(this->buffer[0].position[0])); this->displacements[1] = MPI::Get_address(&(this->buffer[0].velocity[0])); this->displacements[2] = MPI::Get_address(&(this->buffer[0].mass)); this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); this->commBodyType.Commit(); this->request = MPI::REQUEST_NULL; } MpiBodyComm::~MpiBodyComm() { if (this->request != MPI::REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); this->request.Free(); } this->commBodyType.Free(); } void MpiBodyComm::setupDatatype(vector<Body> bodies) { //re-setup data type for current input data's memory locations this->commBodyType.Free(); this->displacements[0] = MPI::Get_address(&(bodies[0].position[0])); this->displacements[1] = MPI::Get_address(&(bodies[0].velocity[0])); this->displacements[2] = MPI::Get_address(&(bodies[0].mass)); this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); this->commBodyType.Commit(); } bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } this->setupDatatype(bodies); MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), this->commBodyType, target, 0); return true; } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } //need buffering; method will return before send is finished this->buffer = bodies; this->setupDatatype(this->buffer); this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), this->commBodyType, target, 0); return true; } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int count; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); bodies.reserve(count); MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) { MPI::Status status; int count; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); bodies.reserve(count); MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0); return true; } void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; ////if message is not here return if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0); return true; } }