Commit 395099d0 authored by Thomas Steinreiter's avatar Thomas Steinreiter
Browse files

* refactored Communicator into P2PCommunicator and CollectiveCommunicator

parent 1917780b
...@@ -17,7 +17,7 @@ set(NAME ${DWARF_PREFIX}_wireworld) ...@@ -17,7 +17,7 @@ set(NAME ${DWARF_PREFIX}_wireworld)
if (MPI_FOUND AND Boost_FOUND) if (MPI_FOUND AND Boost_FOUND)
enable_language(CXX) enable_language(CXX)
include_directories(${MPI_INCLUDE_PATH} ${Boost_INCLUDE_DIRS}) include_directories(${MPI_INCLUDE_PATH} ${Boost_INCLUDE_DIRS})
add_executable(${NAME} main.cpp Configuration.cpp Communicator.cpp FileIO.cpp MpiEnvironment.cpp MpiSubarray.cpp MpiWireworld.cpp Tile.cpp Util.cpp) add_executable(${NAME} main.cpp Configuration.cpp Communicator.cpp P2PCommunicator.cpp CollectiveCommunicator.cpp FileIO.cpp MpiEnvironment.cpp MpiSubarray.cpp MpiWireworld.cpp Tile.cpp Util.cpp)
set(CMAKE_BUILD_TYPE RelWithDebInfo) set(CMAKE_BUILD_TYPE RelWithDebInfo)
if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native -Wall -Wextra") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=native -Wall -Wextra")
......
#include "CollectiveCommunicator.hpp"
void CollectiveCommunicator::Communicate(State* model) {
if (_commDistGraph == MPI_COMM_NULL)
MpiReportErrorAbort("Communicator not initialized");
MPI_Neighbor_alltoallw(model, // sendbuf
_sizes.data(), // sendcounts
_sendDisplacements.data(), // sdispl
_sendTypes.data(), // sendtypes
model, // recvbuf
_sizes.data(), // recvcounts
_recvDisplacements.data(), // rdispls
_recvTypes.data(), // recvtypes
_commDistGraph // comm
);
}
Communicator::MpiRequest CollectiveCommunicator::AsyncCommunicate(State* model) {
if (_commDistGraph == MPI_COMM_NULL)
MpiReportErrorAbort("Communicator not initialized");
MPI_Request req;
MPI_Ineighbor_alltoallw(model, // sendbuf
_sizes.data(), // sendcounts
_sendDisplacements.data(), // sdispl
_sendTypes.data(), // sendtypes
model, // recvbuf
_sizes.data(), // recvcounts
_recvDisplacements.data(), // rdispls
_recvTypes.data(), // recvtypes
_commDistGraph, // comm
&req); // request
return MpiRequest{{req}};
}
#pragma once
#include "Communicator.hpp"
struct CollectiveCommunicator : public Communicator {
using Communicator::Communicator;
void Communicate(State* model) override;
MpiRequest AsyncCommunicate(State* model) override;
};
...@@ -149,78 +149,3 @@ Communicator& Communicator::operator=(Communicator&& other) noexcept { ...@@ -149,78 +149,3 @@ Communicator& Communicator::operator=(Communicator&& other) noexcept {
swap(*this, other); swap(*this, other);
return *this; return *this;
} }
void Communicator::Communicate(State* model) {
if (_commDistGraph == MPI_COMM_NULL)
MpiReportErrorAbort("Communicator not initialized");
switch (_commMode) {
case CommunicationMode::Collective:
MPI_Neighbor_alltoallw(model, // sendbuf
_sizes.data(), // sendcounts
_sendDisplacements.data(), // sdispl
_sendTypes.data(), // sendtypes
model, // recvbuf
_sizes.data(), // recvcounts
_recvDisplacements.data(), // rdispls
_recvTypes.data(), // recvtypes
_commDistGraph // comm
);
break;
case CommunicationMode::P2P: {
AsyncCommunicate(model).Wait();
} break;
}
}
Communicator::MpiRequest Communicator::AsyncCommunicate(State* model) {
if (_commDistGraph == MPI_COMM_NULL)
MpiReportErrorAbort("Communicator not initialized");
switch (_commMode) {
case CommunicationMode::Collective: {
MPI_Request req;
MPI_Ineighbor_alltoallw(model, // sendbuf
_sizes.data(), // sendcounts
_sendDisplacements.data(), // sdispl
_sendTypes.data(), // sendtypes
model, // recvbuf
_sizes.data(), // recvcounts
_recvDisplacements.data(), // rdispls
_recvTypes.data(), // recvtypes
_commDistGraph, // comm
&req); // request
return MpiRequest{{req}};
};
case CommunicationMode::P2P: {
Communicator::MpiRequest::DoubleVector<MPI_Request> reqs;
for (std::size_t i{0}; i < _neighbors.size(); ++i) {
{
MPI_Request req;
MPI_Isend(model + _sendDisplacements[i], // buf
1, // count
_sendTypes[i], // datatype
_neighbors[i], // dest
0, // tag
MPI_COMM_WORLD, // comm
&req); // request
reqs.push_back(req);
}
{
MPI_Request req;
MPI_Irecv(model + _recvDisplacements[i], // buf
1, // count
_recvTypes[i], // datatype
_neighbors[i], // source
0, // tag
MPI_COMM_WORLD, // comm
&req); // request
reqs.push_back(req);
}
}
return MpiRequest{reqs};
};
}
}
...@@ -39,7 +39,7 @@ class Communicator { ...@@ -39,7 +39,7 @@ class Communicator {
~MpiRequest(); ~MpiRequest();
}; };
private: protected:
CommunicationMode _commMode; CommunicationMode _commMode;
// data members for graph topology // data members for graph topology
...@@ -60,13 +60,13 @@ class Communicator { ...@@ -60,13 +60,13 @@ class Communicator {
Communicator() = default; Communicator() = default;
Communicator(const MpiEnvironment& env, CommunicationMode commMode, Communicator(const MpiEnvironment& env, CommunicationMode commMode,
const Size& gridSize, const Size& tileSize); const Size& gridSize, const Size& tileSize);
~Communicator(); virtual ~Communicator();
void swap(Communicator& first, Communicator& second); void swap(Communicator& first, Communicator& second);
Communicator(Communicator&) = delete; Communicator(Communicator&) = delete;
Communicator& operator=(Communicator&) = delete; Communicator& operator=(Communicator&) = delete;
Communicator(Communicator&& other) noexcept; Communicator(Communicator&& other) noexcept;
Communicator& operator=(Communicator&& other) noexcept; Communicator& operator=(Communicator&& other) noexcept;
void Communicate(State* model); virtual void Communicate(State* model) = 0;
MpiRequest AsyncCommunicate(State* model); virtual MpiRequest AsyncCommunicate(State* model) = 0;
}; };
...@@ -3,12 +3,14 @@ ...@@ -3,12 +3,14 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <iostream> #include <iostream>
#include <limits>
#include <mpi.h> #include <mpi.h>
#include <string> #include <string>
#include <limits>
#include <vector> #include <vector>
#include "CollectiveCommunicator.hpp"
#include "FileIO.hpp" #include "FileIO.hpp"
#include "P2PCommunicator.hpp"
#include "State.hpp" #include "State.hpp"
#include "Tile.hpp" #include "Tile.hpp"
#include "Util.hpp" #include "Util.hpp"
...@@ -64,8 +66,17 @@ void MpiWireworld::processArea(Coord start, Size size) { ...@@ -64,8 +66,17 @@ void MpiWireworld::processArea(Coord start, Size size) {
MpiWireworld::MpiWireworld(const MpiEnvironment& env, const Configuration& cfg) MpiWireworld::MpiWireworld(const MpiEnvironment& env, const Configuration& cfg)
: _tile(Tile::Read(cfg, env)), : _tile(Tile::Read(cfg, env)),
_comm(env, cfg.CommMode, cfg.Procs, _tile.tileSize()) { _comm([&]() -> std::unique_ptr<Communicator> {
_comm.Communicate(_tile.model()); switch (cfg.CommMode) {
case CommunicationMode::Collective:
return std::make_unique<CollectiveCommunicator>(
env, cfg.CommMode, cfg.Procs, _tile.tileSize());
case CommunicationMode::P2P:
return std::make_unique<P2PCommunicator>(
env, cfg.CommMode, cfg.Procs, _tile.tileSize());
}
}()) {
_comm->Communicate(_tile.model());
} }
std::ostream& operator<<(std::ostream& out, const MpiWireworld& g) { std::ostream& operator<<(std::ostream& out, const MpiWireworld& g) {
...@@ -96,7 +107,7 @@ void MpiWireworld::simulateStep() { ...@@ -96,7 +107,7 @@ void MpiWireworld::simulateStep() {
processArea({1, _tileSize.Rows}, {_tileSize.Cols, 1}); processArea({1, _tileSize.Rows}, {_tileSize.Cols, 1});
// start communication of border while processing the core // start communication of border while processing the core
auto req = _comm.AsyncCommunicate(_nextModel); auto req = _comm->AsyncCommunicate(_nextModel);
/// core /// core
processArea({2, 2}, {_tileSize.Cols - 2, _tileSize.Rows - 2}); processArea({2, 2}, {_tileSize.Cols - 2, _tileSize.Rows - 2});
......
#pragma once #pragma once
#include <cstddef> #include <cstddef>
#include <memory>
#include <ostream> #include <ostream>
#include "Communicator.hpp" #include "Communicator.hpp"
...@@ -9,7 +10,7 @@ ...@@ -9,7 +10,7 @@
class MpiWireworld { class MpiWireworld {
Tile _tile; Tile _tile;
Communicator _comm; std::unique_ptr<Communicator> _comm;
void processArea(Coord start, Size size); void processArea(Coord start, Size size);
......
#include "P2PCommunicator.hpp"
void P2PCommunicator::Communicate(State* model) {
AsyncCommunicate(model).Wait();
}
Communicator::MpiRequest P2PCommunicator::AsyncCommunicate(State* model) {
Communicator::MpiRequest::DoubleVector<MPI_Request> reqs;
for (std::size_t i{0}; i < _neighbors.size(); ++i) {
{
MPI_Request req;
MPI_Isend(model + _sendDisplacements[i], // buf
1, // count
_sendTypes[i], // datatype
_neighbors[i], // dest
0, // tag
MPI_COMM_WORLD, // comm
&req); // request
reqs.push_back(req);
}
{
MPI_Request req;
MPI_Irecv(model + _recvDisplacements[i], // buf
1, // count
_recvTypes[i], // datatype
_neighbors[i], // source
0, // tag
MPI_COMM_WORLD, // comm
&req); // request
reqs.push_back(req);
}
}
return MpiRequest{reqs};
}
#pragma once
#include "Communicator.hpp"
struct P2PCommunicator : public Communicator {
using Communicator::Communicator;
void Communicate(State* model) override;
MpiRequest AsyncCommunicate(State* model) override;
};
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment