Newer
Older
#include "MpiBodyComm.hpp"
namespace nbody {
MpiBodyComm::MpiBodyComm() {
//setup initial data type and initialize request for tracking status of unblocking send
this->buffer.push_back(Body());
this->buffer.back().mass = 0.0;
this->datatypes[0] = MPI::DOUBLE;
this->datatypes[1] = MPI::DOUBLE;
this->datatypes[2] = MPI::DOUBLE;
this->blocklengths[0] = 3;
this->blocklengths[1] = 3;
this->blocklengths[2] = 1;
this->displacements[0] = MPI::Get_address(&(this->buffer[0].position[0]));
this->displacements[1] = MPI::Get_address(&(this->buffer[0].velocity[0]));
this->displacements[2] = MPI::Get_address(&(this->buffer[0].mass));
this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
this->commBodyType.Commit();
//wait for termination of possibly open request
this->request.Wait();
this->commBodyType.Free();
}
void MpiBodyComm::setupDatatype(vector<Body> bodies) {
//re-setup data type for current input data's memory locations
this->commBodyType.Free();
this->displacements[0] = MPI::Get_address(&(bodies[0].position[0]));
this->displacements[1] = MPI::Get_address(&(bodies[0].velocity[0]));
this->displacements[2] = MPI::Get_address(&(bodies[0].mass));
this->commBodyType = this->commBodyType.Create_struct(3, this->blocklengths, this->displacements, this->datatypes);
this->commBodyType.Commit();
bool MpiBodyComm::sendBlocking(int target, vector<Body> bodies) {
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
this->setupDatatype(bodies);
MPI::COMM_WORLD.Send(&bodies[0], bodies.size(), this->commBodyType, target, 0);
return true;
}
bool MpiBodyComm::sendUnblocking(int target, vector<Body> bodies) {
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
if (this->request != MPI::REQUEST_NULL && !this->request.Test()) {
//unblocking send still not finished
return false;
}
//need buffering; method will return before send is finished
this->buffer = bodies;
this->setupDatatype(this->buffer);
this->request = MPI::COMM_WORLD.Isend(&this->buffer[0], this->buffer.size(), this->commBodyType, target, 0);
return true;
}
void MpiBodyComm::recvBlocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
bodies.clear();
//wait for incoming message to determine number of bodies received
while (!MPI::COMM_WORLD.Iprobe(source, 0, status));
//get number of bodies to reserve memory
count = status.Get_count(this->commBodyType);
bodies.reserve(count);
MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblocking(int source, vector<Body>& bodies) {
MPI::Status status;
int count;
//if message is not here return
if (!MPI::COMM_WORLD.Iprobe(source, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory
count = status.Get_count(this->commBodyType);
bodies.reserve(count);
MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
void MpiBodyComm::recvBlockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
bodies.clear();
//wait for incoming message to determine number of bodies received
while (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status));
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
}
bool MpiBodyComm::recvUnblockingAnySource(int& source, vector<Body>& bodies) {
MPI::Status status;
int count;
int actualSource;
////if message is not here return
if (!MPI::COMM_WORLD.Iprobe(MPI::ANY_SOURCE, 0, status)) {
return false;
}
bodies.clear();
//get number of bodies to reserve memory and determine source
count = status.Get_count(this->commBodyType);
actualSource = status.Get_source();
bodies.reserve(count);
//set source
source = actualSource;
MPI::COMM_WORLD.Recv(&bodies[0], count, this->commBodyType, source, 0);
return true;
}