Skip to content
Snippets Groups Projects
MpiBodyComm.cpp 2.7 KiB
Newer Older
#include <iostream>
#include "MpiBodyComm.hpp"

namespace nbody {
	using namespace std;

	MpiBodyComm::MpiBodyComm(MPI_Datatype* bodyDatatype) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//keep pointer to datatype for send/recv
		this->datatype = bodyDatatype;
		this->request = MPI_REQUEST_NULL;
	}

	MpiBodyComm::~MpiBodyComm() {
		if (this->request != MPI_REQUEST_NULL) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			//wait for termination of possibly open request
			this->request.Wait();
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			//TODO: check why request cannot be freed
			//this->request.Free();
	void MpiBodyComm::cleanup() {
		if (this->request != MPI_REQUEST_NULL) {
			MPI_Status status;

			MPI_Wait(&this->request, &status);
			//this->request.Free();
		}
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
		MPI_Send(&bodies[0], bodies.size(), *this->datatype, target, 0, MPI_COMM_WORLD);
	}

	void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
		MPI_Status status;
		int currentSource;
		int currentSize;

Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//check for message beforehand to get number of bodies transmitted
		//check for actual source because input parameter could be MPI_ANY_SOURCE
		MPI_Probe(source, 0, MPI_COMM_WORLD, &status);
		currentSource = status.MPI_SOURCE;
		MPI_Get_count(&status, *this->datatype, &currentSize);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		//allocate correct vector size for incoming data
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		bodies.resize(currentSize);
		MPI_Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0, MPI_COMM_WORLD, &status);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
		if (this->request != MPI_REQUEST_NULL) {
			MPI_Status status;
			int completed;

			MPI_Test(&this->request, &completed, &status);
			if (!completed) {
				//unblocking send still not finished
				return false;
			}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		}
		if (this->request != MPI_REQUEST_NULL) {
			//TODO: check if works for reuse of a MPIBodyComm
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			this->request.Free();
		}
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		this->buffer.resize(bodies.size());
		std::copy(bodies.begin(), bodies.end(), this->buffer.begin());
		MPI_Isend(&this->buffer[0], this->buffer.size(), *this->datatype, target, 0, MPI_COMM_WORLD, &this->request);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
	}

	bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
		MPI_Status status;
		int currentSource;
		int currentSize;
		int ready;
		//check for message beforehand to get number of bodies transmitted
		//check for actual source because input parameter could be MPI_ANY_SOURCE
		MPI_Iprobe(source, 0, MPI_COMM_WORLD, &ready, &status);
		if (!ready) {
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
			return false;
		}
		currentSource = status.MPI_SOURCE;
		MPI_Get_count(&status, *this->datatype, &currentSize);
		//allocate correct vector size for incoming data
		bodies.resize(currentSize);
		MPI_Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0, MPI_COMM_WORLD, &status);
Paul Heinzlreiter's avatar
Paul Heinzlreiter committed
		return true;
	}