Newer
Older
#include "MpiBodyComm.hpp"
namespace nbody {
MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) {
this->datatype = bodyDatatype;
//wait for termination of possibly open request
this->request.Wait();
void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *this->datatype, target, 0);
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int currentSource;
int currentSize;
currentSize = status.Get_count(*this->datatype);
bodies.resize(currentSize);
MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0);
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
MPI::Aint displ[3];
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
if (this->request != MPI::REQUEST_NULL) {
this->request.Free();
}
this->buffer.clear();
this->buffer = bodies;
if (this->sendBodyType != MPI::DATATYPE_NULL) {
this->sendBodyType.Free();
}
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
this->sendBodyType = this->sendBodyType.Create_struct(3, this->blocklengths, displ, this->datatypes);
this->sendBodyType.Commit();
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->sendBodyType, target, 0);
return true;
void MpiBodyComm::setupDatatype(vector<Body> bodies) {
//re-setup data type for current input data's memory locations
if (this->commBodyType != MPI::DATATYPE_NULL) {
this->commBodyType.Free();
}
this->displacements[0] = MPI::Get_address(bodies[0].position);
this->displacements[1] = MPI::Get_address(bodies[0].velocity);
this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
this->commBodyType.Commit();
int MpiBodyComm::getReceivedNumberBodies(int source) {
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
Body templateBody;
MPI::Status status;
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return 0;
}
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
int result = status.Get_count(btype);
btype.Free();
return result;
}
bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
this->setupDatatype(bodies);
MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0);
return true;
}
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
//need buffering; method will return before send is finished
this->buffer = bodies;
this->setupDatatype(this->buffer);
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0);
return true;
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
bodies.clear();
while ((count = this->getReceivedNumberBodies(source)) == 0);
//wait for incoming message to determine number of bodies received
//while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
//count = status.Get_count(this->commBodyType);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
//if message is not here return
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory
count = status.Get_count(this->commBodyType);
bodies.reserve(count);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
bodies.clear();
//wait for incoming message to determine number of bodies received
while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
bool MpiBodyComm::sendReady() {
//check if the last non-blocking send on this object is out of the door
return this->request == MPI::REQUEST_NULL || this->request.Test();
}
bool MpiBodyComm::recvReady(int source) {
//check if there has been data received from a specific source or MPI::ANY_SOURCE
return MPI::COMM_WORLD.Iprobe(source, 0);
}