Skip to content
Snippets Groups Projects
MpiBodyComm.cpp 4.63 KiB
Newer Older
#include "MpiBodyComm.hpp"

namespace nbody {
	MpiBodyComm::MpiBodyComm() {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//setup initial data type and initialize request for tracking status of unblocking send
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->buffer.push_back(Body());
		this->buffer.back().mass = 0.0;
		this->datatypes[0] = MPI::DOUBLE;
		this->datatypes[1] = MPI::DOUBLE;
		this->datatypes[2] = MPI::DOUBLE;
		this->blocklengths[0] = 3;
		this->blocklengths[1] = 3;
		this->blocklengths[2] = 1;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->displacements[0] = MPI::Get_address(&(this->buffer[0].position[0]));
		this->displacements[1] = MPI::Get_address(&(this->buffer[0].velocity[0]));
		this->displacements[2] = MPI::Get_address(&(this->buffer[0].mass));
		this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
		this->commBodyType.Commit();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->request = MPI::REQUEST_NULL;
	}

	MpiBodyComm::~MpiBodyComm() {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			//wait for termination of possibly open request
			this->request.Wait();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			this->request.Free();
		}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->commBodyType.Free();
	}

	void MpiBodyComm::setupDatatype(vector<Body> bodies) {
		//re-setup data type for current input data's memory locations
		this->commBodyType.Free();
		this->displacements[0] = MPI::Get_address(&(bodies[0].position[0]));
		this->displacements[1] = MPI::Get_address(&(bodies[0].velocity[0]));
		this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
		this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
		this->commBodyType.Commit();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
			//unblocking send still not finished
			return false;
		}
		this->setupDatatype(bodies);
		MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), this->commBodyType, target, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}

	bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
			//unblocking send still not finished
			return false;
		}
		//need buffering; method will return before send is finished
		this->buffer = bodies;
		this->setupDatatype(this->buffer);
		this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), this->commBodyType, target, 0);
		return true;
	}

	void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
		MPI::Status status;
		int count;

		bodies.clear();
		//wait for incoming message to determine number of bodies received
		while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
		//get number of bodies to reserve memory
		count = status.Get_count(this->commBodyType);
		bodies.reserve(count);
		MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
	}

	bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
		MPI::Status status;
		int count;

		//if message is not here return
		if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
			return false;
		}
		bodies.clear();
		//get number of bodies to reserve memory
		count = status.Get_count(this->commBodyType);
		bodies.reserve(count);
		MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed

	void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
		MPI::Status status;
		int count;
		int actualSource;

		bodies.clear();
		//wait for incoming message to determine number of bodies received
		while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
		//get number of bodies to reserve memory and determine source
		count = status.Get_count(this->commBodyType);
		actualSource = status.Get_source();
		bodies.reserve(count);
		//set source
		source = actualSource;
		MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
	}

	bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
		MPI::Status status;
		int count;
		int actualSource;

		//if message is not here return
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
			return false;
		}
		bodies.clear();
		//get number of bodies to reserve memory and determine source
		count = status.Get_count(this->commBodyType);
		actualSource = status.Get_source();
		bodies.reserve(count);
		//set source
		source = actualSource;
		MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
		return true;
	}

	bool MpiBodyComm::sendReady() {
		//check if the last non-blocking send on this object is out of the door
		return this->request == MPI::REQUEST_NULL || this->request.Test();
	}

	bool MpiBodyComm::recvReady(int source) {
		//check if there has been data received from a specific source or MPI::ANY_SOURCE
		return MPI::COMM_WORLD.Iprobe(source, 0);
	}