Newer
Older
#include "MpiBodyComm.hpp"
namespace nbody {
this->datatypes[0] = MPI::DOUBLE;
this->datatypes[1] = MPI::DOUBLE;
this->datatypes[2] = MPI::DOUBLE;
this->blocklengths[0] = 3;
this->blocklengths[1] = 3;
this->blocklengths[2] = 1;
this->sendBodyType = MPI::DATATYPE_NULL;
this->recvBodyType = MPI::DATATYPE_NULL;
//wait for termination of possibly open request
this->request.Wait();
if (this->sendBodyType != MPI::DATATYPE_NULL) {
this->sendBodyType.Free();
}
if (this->recvBodyType != MPI::DATATYPE_NULL) {
this->recvBodyType.Free();
void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), btype, target, 0);
btype.Free();
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
Body templateBody;
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
cout << "BRG " << status.Get_count(btype) << endl;
bodies.reserve(status.Get_count(btype));
btype.Free();
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, currentSource, 0);
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
MPI::Aint displ[3];
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
if (this->request != MPI::REQUEST_NULL) {
this->request.Free();
}
this->buffer.clear();
this->buffer = bodies;
if (this->sendBodyType != MPI::DATATYPE_NULL) {
this->sendBodyType.Free();
}
displ[0] = MPI::Get_address(bodies[0].position);
displ[1] = MPI::Get_address(bodies[0].velocity);
displ[2] = MPI::Get_address(&(bodies[0].mass));
this->sendBodyType = this->sendBodyType.Create_struct(3, this->blocklengths, displ, this->datatypes);
this->sendBodyType.Commit();
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->sendBodyType, target, 0);
return true;
void MpiBodyComm::setupDatatype(vector<Body> bodies) {
//re-setup data type for current input data's memory locations
if (this->commBodyType != MPI::DATATYPE_NULL) {
this->commBodyType.Free();
}
this->displacements[0] = MPI::Get_address(bodies[0].position);
this->displacements[1] = MPI::Get_address(bodies[0].velocity);
this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
this->commBodyType.Commit();
int MpiBodyComm::getReceivedNumberBodies(int source) {
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::Aint displ[3];
Body templateBody;
MPI::Status status;
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return 0;
}
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
int result = status.Get_count(btype);
btype.Free();
return result;
}
bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
this->setupDatatype(bodies);
MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0);
return true;
}
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
//need buffering; method will return before send is finished
this->buffer = bodies;
this->setupDatatype(this->buffer);
this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0);
return true;
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
bodies.clear();
while ((count = this->getReceivedNumberBodies(source)) == 0);
//wait for incoming message to determine number of bodies received
//while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
//count = status.Get_count(this->commBodyType);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
//if message is not here return
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory
count = status.Get_count(this->commBodyType);
bodies.reserve(count);
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
bodies.clear();
//wait for incoming message to determine number of bodies received
while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
this->setupDatatype(bodies);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
bool MpiBodyComm::sendReady() {
//check if the last non-blocking send on this object is out of the door
return this->request == MPI::REQUEST_NULL || this->request.Test();
}
bool MpiBodyComm::recvReady(int source) {
//check if there has been data received from a specific source or MPI::ANY_SOURCE
return MPI::COMM_WORLD.Iprobe(source, 0);
}
void MpiBodyComm::testing(vector<Body>& data, int myRank) {
MPI::Datatype btype = MPI::DATATYPE_NULL;
MPI::COMM_WORLD.Bcast(&bodySize, 1, MPI::INT, 0);
if (myRank != 0) {
data.clear();
data.reserve(bodySize);
}
displ[0] = MPI::Get_address(data[0].position);
displ[1] = MPI::Get_address(data[0].velocity);
displ[2] = MPI::Get_address(&(data[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
btype.Commit();
//MPI::COMM_WORLD.Send(MPI::BOTTOM, bodySize, btype, 1, 0);
MPI::COMM_WORLD.Send(MPI::BOTTOM, data.size(), btype, 1, 0);
btype.Free();
displ[0] = MPI::Get_address(templateBody.position);
displ[1] = MPI::Get_address(templateBody.velocity);
displ[2] = MPI::Get_address(&(templateBody.mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
while (!MPI::COMM_WORLD.Iprobe(0, 0, status));
cout << "RECVED " << status.Get_count(btype) << endl;
data.clear();
data.reserve(status.Get_count(btype));
btype.Free();
displ[0] = MPI::Get_address(data[0].position);
displ[1] = MPI::Get_address(data[0].velocity);
displ[2] = MPI::Get_address(&(data[0].mass));
btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, 0, 0);
btype.Free();
//MPI::COMM_WORLD.Recv(MPI::BOTTOM, bodySize, btype, 0, 0);
//cout << "RECVED " << bodySize << " " << status.Get_count(btype) << endl;