#include <iostream> #include "MpiBodyComm.hpp" namespace nbody { using namespace std; MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) { this->datatype = bodyDatatype; this->request = MPI::REQUEST_NULL; } MpiBodyComm::~MpiBodyComm() { if (this->request != MPI::REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); this->request.Free(); } } void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *(this->datatype), target, 0); } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int currentSource; int currentSize; bodies.clear(); while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); currentSource = status.Get_source(); currentSize = status.Get_count(*(this->datatype)); bodies.reserve(currentSize); MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *(this->datatype), currentSource, 0); /* MPI::Status status; Body templateBody; MPI::Datatype btype = MPI::DATATYPE_NULL; MPI::Aint displ[3]; int currentSource; displ[0] = MPI::Get_address(templateBody.position); displ[1] = MPI::Get_address(templateBody.velocity); displ[2] = MPI::Get_address(&(templateBody.mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); currentSource = status.Get_source(); bodies.clear(); cout << "BRG " << status.Get_count(btype) << endl; bodies.reserve(status.Get_count(btype)); btype.Free(); displ[0] = MPI::Get_address(bodies[0].position); displ[1] = MPI::Get_address(bodies[0].velocity); displ[2] = MPI::Get_address(&(bodies[0].mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, currentSource, 0); btype.Free(); */ } /* bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { MPI::Aint displ[3]; if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } if (this->request != MPI::REQUEST_NULL) { this->request.Free(); } this->buffer.clear(); this->buffer = bodies; if (this->sendBodyType != MPI::DATATYPE_NULL) { this->sendBodyType.Free(); } displ[0] = MPI::Get_address(bodies[0].position); displ[1] = MPI::Get_address(bodies[0].velocity); displ[2] = MPI::Get_address(&(bodies[0].mass)); this->sendBodyType = this->sendBodyType.Create_struct(3, this->blocklengths, displ, this->datatypes); this->sendBodyType.Commit(); this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->sendBodyType, target, 0); return true; } */ /* void MpiBodyComm::setupDatatype(vector<Body> bodies) { //re-setup data type for current input data's memory locations if (this->commBodyType != MPI::DATATYPE_NULL) { this->commBodyType.Free(); } this->displacements[0] = MPI::Get_address(bodies[0].position); this->displacements[1] = MPI::Get_address(bodies[0].velocity); this->displacements[2] = MPI::Get_address(&(bodies[0].mass)); this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); this->commBodyType.Commit(); } int MpiBodyComm::getReceivedNumberBodies(int source) { MPI::Datatype btype = MPI::DATATYPE_NULL; MPI::Aint displ[3]; Body templateBody; MPI::Status status; if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return 0; } displ[0] = MPI::Get_address(templateBody.position); displ[1] = MPI::Get_address(templateBody.velocity); displ[2] = MPI::Get_address(&(templateBody.mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); int result = status.Get_count(btype); btype.Free(); return result; } bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } this->setupDatatype(bodies); MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0); return true; } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } //need buffering; method will return before send is finished this->buffer = bodies; this->setupDatatype(this->buffer); this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0); return true; } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int count; bodies.clear(); while ((count = this->getReceivedNumberBodies(source)) == 0); //wait for incoming message to determine number of bodies received //while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); //get number of bodies to reserve memory //count = status.Get_count(this->commBodyType); cout << "RECVED " << count << endl; bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) { MPI::Status status; int count; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } bool MpiBodyComm::sendReady() { //check if the last non-blocking send on this object is out of the door return this->request == MPI::REQUEST_NULL || this->request.Test(); } bool MpiBodyComm::recvReady(int source) { //check if there has been data received from a specific source or MPI::ANY_SOURCE return MPI::COMM_WORLD.Iprobe(source, 0); } */ void MpiBodyComm::testing(vector<Body>& data, int myRank) { Body b[4]; vector<Body> vb; for (int i = 0; i < 4; i++) { vb.push_back(b[i]); } if (myRank == 0) { //MPI::COMM_WORLD.Send(b, 4, *this->datatype, 1, 0); MPI::COMM_WORLD.Send(&vb[0], 4, *this->datatype, 1, 0); } else if (myRank == 1) { MPI::COMM_WORLD.Recv(b, 4, *this->datatype, 0, 0); } //MPI::Datatype btype = MPI::DATATYPE_NULL; //int bodySize = data.size(); /* MPI::COMM_WORLD.Bcast(&bodySize, 1, MPI::INT, 0); if (myRank != 0) { data.clear(); data.reserve(bodySize); } */ //MPI::COMM_WORLD.Bcast(MPI::BOTTOM, bodySize, btype, 0); /* MPI::Aint displ[3]; if (myRank == 0) { displ[0] = MPI::Get_address(data[0].position); displ[1] = MPI::Get_address(data[0].velocity); displ[2] = MPI::Get_address(&(data[0].mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); //MPI::COMM_WORLD.Send(MPI::BOTTOM, bodySize, btype, 1, 0); MPI::COMM_WORLD.Send(MPI::BOTTOM, data.size(), btype, 1, 0); btype.Free(); } else if (myRank == 1) { MPI::Status status; Body templateBody; displ[0] = MPI::Get_address(templateBody.position); displ[1] = MPI::Get_address(templateBody.velocity); displ[2] = MPI::Get_address(&(templateBody.mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); while (!MPI::COMM_WORLD.Iprobe(0, 0, status)); cout << "RECVED " << status.Get_count(btype) << endl; data.clear(); data.reserve(status.Get_count(btype)); btype.Free(); displ[0] = MPI::Get_address(data[0].position); displ[1] = MPI::Get_address(data[0].velocity); displ[2] = MPI::Get_address(&(data[0].mass)); btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes); btype.Commit(); MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, 0, 0); btype.Free(); //MPI::COMM_WORLD.Recv(MPI::BOTTOM, bodySize, btype, 0, 0); //cout << "RECVED " << bodySize << " " << status.Get_count(btype) << endl; } */ } }