Newer
Older
#include "MpiBodyComm.hpp"
namespace nbody {
MpiBodyComm::MpiBodyComm(MPI::Datatype* bodyDatatype) {
//wait for termination of possibly open request
this->request.Wait();
//TODO: check why request cannot be freed
//this->request.Free();
void MpiBodyComm::cleanup() {
if (this->request != MPI::REQUEST_NULL) {
this->request.Wait();
//this->request.Free();
}
}
void MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), *this->datatype, target, 0);
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int currentSource;
int currentSize;
//check for message beforehand to get number of bodies transmitted
//check for actual source because input parameter could be MPI::ANY_SOURCE
bodies.resize(currentSize);
MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0);
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
//TODO: check if works for reuse of a MPIBodyComm
this->buffer.resize(bodies.size());
std::copy(bodies.begin(), bodies.end(), this->buffer.begin());
this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), *this->datatype, target, 0);
}
bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
MPI::Status status;
//check for message beforehand to get number of bodies transmitted
//check for actual source because input parameter could be MPI::ANY_SOURCE
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return false;
}
currentSource = status.Get_source();
currentSize = status.Get_count(*this->datatype);
//allocate correct vector size for incoming data
bodies.resize(currentSize);
MPI::COMM_WORLD.Recv(&bodies[0], currentSize, *this->datatype, currentSource, 0);