Newer
Older
#include "MpiBodyComm.hpp"
namespace nbody {
MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) {
this->datatype = bodyDatatype;
//wait for termination of possibly open request
this->request.Wait();
void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *(this->datatype), target, 0);
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int currentSource;
int currentSize;
bodies.clear();
while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
currentSource = status.Get_source();
currentSize = status.Get_count(*(this->datatype));
bodies.reserve(currentSize);
MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *(this->datatype), currentSource, 0);
MPI::Status status;
Body templateBody;
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
cout << "BRG " << status.Get_count(btype) << endl;
bodies.reserve(status.Get_count(btype));
btype.Free();
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, currentSource, 0);
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
MPI::Aint displ[3];
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
if (this->request != MPI::REQUEST_NULL) {
this->request.Free();
}
this->buffer.clear();
this->buffer = bodies;
if (this->sendBodyType != MPI::DATATYPE_NULL) {
this->sendBodyType.Free();
}
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
this->sendBodyType = this->sendBodyType.Create_struct(3, this->blocklengths, displ, this->datatypes);
this->sendBodyType.Commit();
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->sendBodyType, target, 0);
return true;
void MpiBodyComm::setupDatatype(vector<Body> bodies) {
//re-setup data type for current input data's memory locations
if (this->commBodyType != MPI::DATATYPE_NULL) {
this->commBodyType.Free();
}
this->displacements[0] = MPI::Get_address(bodies[0].position);
this->displacements[1] = MPI::Get_address(bodies[0].velocity);
this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
this->commBodyType.Commit();
int MpiBodyComm::getReceivedNumberBodies(int source) {
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
Body templateBody;
MPI::Status status;
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return 0;
}
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
int result = status.Get_count(btype);
btype.Free();
return result;
}
bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
this->setupDatatype(bodies);
MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0);
return true;
}
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
//need buffering; method will return before send is finished
this->buffer = bodies;
this->setupDatatype(this->buffer);
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0);
return true;
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
bodies.clear();
while ((count = this->getReceivedNumberBodies(source)) == 0);
//wait for incoming message to determine number of bodies received
//while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
//count = status.Get_count(this->commBodyType);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
//if message is not here return
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory
count = status.Get_count(this->commBodyType);
bodies.reserve(count);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
bodies.clear();
//wait for incoming message to determine number of bodies received
while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
bool MpiBodyComm::sendReady() {
//check if the last non-blocking send on this object is out of the door
return this->request == MPI::REQUEST_NULL || this->request.Test();
}
bool MpiBodyComm::recvReady(int source) {
//check if there has been data received from a specific source or MPI::ANY_SOURCE
return MPI::COMM_WORLD.Iprobe(source, 0);
}
void MpiBodyComm::testing(vector<Body>& data, int myRank) {
for (int i = 0; i < 4; i++) {
vb.push_back(b[i]);
}
if (myRank == 0) {
//MPI::COMM_WORLD.Send(b, 4, *this->datatype, 1, 0);
MPI::COMM_WORLD.Send(&data[0], data.size(), *this->datatype, 1, 0);
//MPI::COMM_WORLD.Recv(b, 4, *this->datatype, 0, 0);
MPI::Status status;
vector<Body> vb;
MPI::COMM_WORLD.Probe(0, 0, status);
vb.resize(status.Get_count(*this->datatype));
MPI::COMM_WORLD.Recv(&vb[0], status.Get_count(*this->datatype), *this->datatype, 0, 0);
for (int i = 0; i < vb.size(); i++) {
vb[i].print();
}
MPI::COMM_WORLD.Bcast(&bodySize, 1, MPI::INT, 0);
if (myRank != 0) {
data.clear();
data.reserve(bodySize);
}
displ[0] = MPI::Get_address(data[0].position);
displ[1] = MPI::Get_address(data[0].velocity);
displ[2] = MPI::Get_address(&(data[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
//MPI::COMM_WORLD.Send(MPI::BOTTOM, bodySize, btype, 1, 0);
MPI::COMM_WORLD.Send(MPI::BOTTOM, data.size(), btype, 1, 0);
btype.Free();
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
while (!MPI::COMM_WORLD.Iprobe(0, 0, status));
cout << "RECVED " << status.Get_count(btype) << endl;
data.clear();
data.reserve(status.Get_count(btype));
btype.Free();
displ[0] = MPI::Get_address(data[0].position);
displ[1] = MPI::Get_address(data[0].velocity);
displ[2] = MPI::Get_address(&(data[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, 0, 0);
btype.Free();
//MPI::COMM_WORLD.Recv(MPI::BOTTOM, bodySize, btype, 0, 0);
//cout << "RECVED " << bodySize << " " << status.Get_count(btype) << endl;