#include <iostream> #include "MpiBodyComm.hpp" namespace nbody { using namespace std; MpiBodyComm::MpiBodyComm() { //setup initial data type and initialize request for tracking status of unblocking send this->datatypes[0] = MPI::DOUBLE; this->datatypes[1] = MPI::DOUBLE; this->datatypes[2] = MPI::DOUBLE; this->blocklengths[0] = 3; this->blocklengths[1] = 3; this->blocklengths[2] = 1; this->request = MPI::REQUEST_NULL; this->commBodyType = MPI::DATATYPE_NULL; } MpiBodyComm::~MpiBodyComm() { if (this->request != MPI::REQUEST_NULL) { //wait for termination of possibly open request this->request.Wait(); this->request.Free(); } if (this->commBodyType != MPI::DATATYPE_NULL) { this->commBodyType.Free(); } } void MpiBodyComm::setupDatatype(vector<Body> bodies) { //re-setup data type for current input data's memory locations if (this->commBodyType != MPI::DATATYPE_NULL) { this->commBodyType.Free(); } this->displacements[0] = MPI::Get_address(bodies[0].position); this->displacements[1] = MPI::Get_address(bodies[0].velocity); this->displacements[2] = MPI::Get_address(&(bodies[0].mass)); this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); this->commBodyType.Commit(); } bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } this->setupDatatype(bodies); MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0); return true; } bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) { if (this->request != MPI::REQUEST_NULL && !this->request.Test()) { //unblocking send still not finished return false; } //need buffering; method will return before send is finished this->buffer = bodies; this->setupDatatype(this->buffer); this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0); return true; } void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) { MPI::Status status; int count; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(source, 0, status)); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); cout << "RECVED " << count << endl; bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) { MPI::Status status; int count; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory count = status.Get_count(this->commBodyType); bodies.reserve(count); this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; bodies.clear(); //wait for incoming message to determine number of bodies received while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); } bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) { MPI::Status status; int count; int actualSource; //if message is not here return if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) { return false; } bodies.clear(); //get number of bodies to reserve memory and determine source count = status.Get_count(this->commBodyType); actualSource = status.Get_source(); bodies.reserve(count); //set source source = actualSource; this->setupDatatype(bodies); MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0); return true; } bool MpiBodyComm::sendReady() { //check if the last non-blocking send on this object is out of the door return this->request == MPI::REQUEST_NULL || this->request.Test(); } bool MpiBodyComm::recvReady(int source) { //check if there has been data received from a specific source or MPI::ANY_SOURCE return MPI::COMM_WORLD.Iprobe(source, 0); } void MpiBodyComm::testing(vector<Body>& data, int myRank) { MPI::Datatype btype = MPI::DATATYPE_NULL; int bodySize = data.size(); MPI::COMM_WORLD.Bcast(&bodySize, 1, MPI::INT, 0); if (myRank != 0) { data.clear(); data.reserve(bodySize); } this->displacements[0] = MPI::Get_address(data[0].position); this->displacements[1] = MPI::Get_address(data[0].velocity); this->displacements[2] = MPI::Get_address(&(data[0].mass)); btype = btype.Create_struct(3, this->blocklengths, this->displacements, this->datatypes); btype.Commit(); MPI::COMM_WORLD.Bcast(MPI::BOTTOM, bodySize, btype, 0); btype.Free(); } }