Skip to content
Snippets Groups Projects
MpiBodyComm.cpp 9.27 KiB
Newer Older
#include <iostream>
#include "MpiBodyComm.hpp"

namespace nbody {
	using namespace std;

	MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) {
		this->datatype = bodyDatatype;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->request = MPI::REQUEST_NULL;
	}

	MpiBodyComm::~MpiBodyComm() {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			//wait for termination of possibly open request
			this->request.Wait();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			this->request.Free();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
		MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *(this->datatype), target, 0);
	}

	void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
		MPI::Status status;
		int currentSource;
		int currentSize;

		bodies.clear();
		while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
		currentSource = status.Get_source();
		currentSize = status.Get_count(*(this->datatype));
		bodies.reserve(currentSize);
		MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *(this->datatype), currentSource, 0);
		MPI::Status status;
		Body templateBody;
		MPI::Datatype btype = MPI::DATATYPE_NULL;
		MPI::Aint displ[3];
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		int currentSource;

		displ[0] = MPI::Get_address(templateBody.position);
		displ[1] = MPI::Get_address(templateBody.velocity);
		displ[2] = MPI::Get_address(&(templateBody.mass));
		btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
		btype.Commit();
		while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		currentSource = status.Get_source();
		bodies.clear();

Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		cout << "BRG " << status.Get_count(btype) << endl;

		bodies.reserve(status.Get_count(btype));
		btype.Free();
		displ[0] = MPI::Get_address(bodies[0].position);
		displ[1] = MPI::Get_address(bodies[0].velocity);
		displ[2] = MPI::Get_address(&(bodies[0].mass));
		btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
		btype.Commit();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, currentSource, 0);
		btype.Free();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
		MPI::Aint displ[3];

		if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
			//unblocking send still not finished
			return false;
		}
		if (this->request != MPI::REQUEST_NULL) {
			this->request.Free();
		}
		this->buffer.clear();
		this->buffer = bodies;
		if (this->sendBodyType != MPI::DATATYPE_NULL) {
			this->sendBodyType.Free();
		}
		displ[0] = MPI::Get_address(bodies[0].position);
		displ[1] = MPI::Get_address(bodies[0].velocity);
		displ[2] = MPI::Get_address(&(bodies[0].mass));
		this->sendBodyType = this->sendBodyType.Create_struct(3, this->blocklengths, displ, this->datatypes);
		this->sendBodyType.Commit();
		this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->sendBodyType, target, 0);
		return true;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	void MpiBodyComm::setupDatatype(vector<Body> bodies) {
		//re-setup data type for current input data's memory locations
		if (this->commBodyType != MPI::DATATYPE_NULL) {
			this->commBodyType.Free();
		}
		this->displacements[0] = MPI::Get_address(bodies[0].position);
		this->displacements[1] = MPI::Get_address(bodies[0].velocity);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
		this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
		this->commBodyType.Commit();
	int MpiBodyComm::getReceivedNumberBodies(int source) {
		MPI::Datatype btype = MPI::DATATYPE_NULL;
		MPI::Aint displ[3];
		Body templateBody;
		MPI::Status status;

		if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
			return 0;
		}
		displ[0] = MPI::Get_address(templateBody.position);
		displ[1] = MPI::Get_address(templateBody.velocity);
		displ[2] = MPI::Get_address(&(templateBody.mass));
		btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
		btype.Commit();
		int result = status.Get_count(btype);
		btype.Free();
		return result;
	}

Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
			//unblocking send still not finished
			return false;
		}
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Send(MPI::BOTTOM, bodies.size(), this->commBodyType, target, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}

	bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
			//unblocking send still not finished
			return false;
		}
		//need buffering; method will return before send is finished
		this->buffer = bodies;
		this->setupDatatype(this->buffer);
		this->request = MPI::COMM_WORLD.Isend(MPI::BOTTOM, this->buffer.size(), this->commBodyType, target, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}

	void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
		MPI::Status status;
		int count;

		bodies.clear();
		while ((count = this->getReceivedNumberBodies(source)) == 0);

Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//wait for incoming message to determine number of bodies received
		//while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//get number of bodies to reserve memory
		//count = status.Get_count(this->commBodyType);
		cout << "RECVED " << count << endl;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		bodies.reserve(count);
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}

	bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
		MPI::Status status;
		int count;

		//if message is not here return
		if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
			return false;
		}
		bodies.clear();
		//get number of bodies to reserve memory
		count = status.Get_count(this->commBodyType);
		bodies.reserve(count);
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed

	void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
		MPI::Status status;
		int count;
		int actualSource;

		bodies.clear();
		//wait for incoming message to determine number of bodies received
		while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
		//get number of bodies to reserve memory and determine source
		count = status.Get_count(this->commBodyType);
		actualSource = status.Get_source();
		bodies.reserve(count);
		//set source
		source = actualSource;
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}

	bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
		MPI::Status status;
		int count;
		int actualSource;

		//if message is not here return
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
			return false;
		}
		bodies.clear();
		//get number of bodies to reserve memory and determine source
		count = status.Get_count(this->commBodyType);
		actualSource = status.Get_source();
		bodies.reserve(count);
		//set source
		source = actualSource;
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Recv(MPI::BOTTOM, count, this->commBodyType, source, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}

	bool MpiBodyComm::sendReady() {
		//check if the last non-blocking send on this object is out of the door
		return this->request == MPI::REQUEST_NULL || this->request.Test();
	}

	bool MpiBodyComm::recvReady(int source) {
		//check if there has been data received from a specific source or MPI::ANY_SOURCE
		return MPI::COMM_WORLD.Iprobe(source, 0);
	}
	void MpiBodyComm::testing(vector<Body>& data, int myRank) {
		Body b[4];
		vector<Body> vb;

		for (int i = 0; i < 4; i++) {
			vb.push_back(b[i]);
		}
		if (myRank == 0) {
			//MPI::COMM_WORLD.Send(b, 4, *this->datatype, 1, 0);
			MPI::COMM_WORLD.Send(&vb[0], 4, *this->datatype, 1, 0);
		} else if (myRank == 1) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			//MPI::COMM_WORLD.Recv(b, 4, *this->datatype, 0, 0);
			MPI::COMM_WORLD.Recv(&vb[0], 4, *this->datatype, 0, 0);
		//MPI::Datatype btype = MPI::DATATYPE_NULL;
		//int bodySize = data.size();
		MPI::COMM_WORLD.Bcast(&bodySize, 1, MPI::INT, 0);
		if (myRank != 0) {
			data.clear();
			data.reserve(bodySize);
		}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//MPI::COMM_WORLD.Bcast(MPI::BOTTOM, bodySize, btype, 0);
		MPI::Aint displ[3];

Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (myRank == 0) {
			displ[0] = MPI::Get_address(data[0].position);
			displ[1] = MPI::Get_address(data[0].velocity);
			displ[2] = MPI::Get_address(&(data[0].mass));
			btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
			btype.Commit();

			//MPI::COMM_WORLD.Send(MPI::BOTTOM, bodySize, btype, 1, 0);
			MPI::COMM_WORLD.Send(MPI::BOTTOM, data.size(), btype, 1, 0);
			btype.Free();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		} else if (myRank == 1) {
			MPI::Status status;
			Body templateBody;

			displ[0] = MPI::Get_address(templateBody.position);
			displ[1] = MPI::Get_address(templateBody.velocity);
			displ[2] = MPI::Get_address(&(templateBody.mass));
			btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
			btype.Commit();

			while (!MPI::COMM_WORLD.Iprobe(0, 0, status));
			cout << "RECVED " << status.Get_count(btype) << endl;
			data.clear();
			data.reserve(status.Get_count(btype));

			btype.Free();
			displ[0] = MPI::Get_address(data[0].position);
			displ[1] = MPI::Get_address(data[0].velocity);
			displ[2] = MPI::Get_address(&(data[0].mass));
			btype = btype.Create_struct(3, this->blocklengths, displ, this->datatypes);
			btype.Commit();
			MPI::COMM_WORLD.Recv(MPI::BOTTOM, status.Get_count(btype), btype, 0, 0);

			btype.Free();
			//MPI::COMM_WORLD.Recv(MPI::BOTTOM, bodySize, btype, 0, 0);
			//cout << "RECVED " << bodySize << " " << status.Get_count(btype) << endl;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		}