Newer
Older
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include "mpicomm.h"
#include "mpitypes.h"
// --------------------------------------------- Helper function declarations
int prepare_receive(part_t *particles, int n_particles_to_receive);
Thomas Ponweiser
committed
int receive_particles(sim_t *s, const MPI_Status *status);
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type);
// ==========================================================================
void mpi_broadcast_configuration(conf_t *configuration)
{
MPI_Datatype conf_mpitype;
mpitype_conf_init(&conf_mpitype);
MPI_Bcast(configuration, 1, conf_mpitype, 0, MPI_COMM_WORLD);
mpitype_conf_free(&conf_mpitype);
}
void mpi_reduce_sim_info(const sim_info_t *local_info, sim_info_t *info)
{
MPI_Datatype info_mpitype;
MPI_Op operation;
int commutative = 1;
mpitype_sim_info_init(&info_mpitype);
MPI_Op_create(sim_info_reduce_op, commutative, &operation);
MPI_Reduce(local_info, info, 1, info_mpitype, operation, 0, MPI_COMM_WORLD);
mpitype_sim_info_free(&info_mpitype);
MPI_Op_free(&operation);
}
void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type)
{
int i;
const sim_info_t *other = invec;
sim_info_t *result = inoutvec;
for(i = 0; i < *len; ++i) {
result->n_particles_total += other->n_particles_total;
if(result->min_particles_per_cell > other->min_particles_per_cell) {
result->min_particles_per_cell = other->min_particles_per_cell;
}
if(result->max_particles_per_cell < other->max_particles_per_cell) {
result->max_particles_per_cell = other->max_particles_per_cell;
}
if(result->max_velocity < other->max_velocity) {
result->max_velocity = other->max_velocity;
}
++other;
++result;
}
}
void mpi_communicate_particles_dynamic(sim_t *s)
{
#if MPI_VERSION >= 3
// -----------------------------------------------------------------------
// PHASE 1: Post nonblocking synchronous sends for all outgoing particles
// -----------------------------------------------------------------------
MPI_Datatype *send_types;
int *send_counts, *receiver_ranks;
int n_receivers, i;
int tag = 0;
MPI_Request *send_requests;
// Create MPI Send Types
mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers);
send_requests = malloc(n_receivers * sizeof(*send_requests));
for(i = 0; i < n_receivers; i++) {
MPI_Issend(
MPI_BOTTOM, 1, send_types[i], receiver_ranks[i],
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
tag, s->communicator, &send_requests[i]
);
}
// Free MPI Send Types
mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers);
// -----------------------------------------------------------------------
// PHASE 2: Receive messages until all outgoing particles are transmitted
// -----------------------------------------------------------------------
int local_sends_finished = 0;
int message_pending = 0;
MPI_Status status;
while(!local_sends_finished) {
// Check for next pending message and receive
MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status);
if(message_pending) {
receive_particles(s, &status);
}
// Check for completion of local sends
MPI_Testall(n_receivers, send_requests, &local_sends_finished, MPI_STATUSES_IGNORE);
}
free(send_requests);
// -----------------------------------------------------------------------
// PHASE 3: Signal completion of local sends with non-blocking barrier
// -----------------------------------------------------------------------
MPI_Request barrier_request;
MPI_Ibarrier(s->communicator, &barrier_request);
// -----------------------------------------------------------------------
// PHASE 4: Receive messages until other processes have finished sending too
// -----------------------------------------------------------------------
int all_sends_finished = 0;
while(!all_sends_finished) {
// Check for next pending message and receive
MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status);
if(message_pending) {
receive_particles(s, &status);
}
// Check for completion of other processes' sends
MPI_Test(&barrier_request, &all_sends_finished, MPI_STATUS_IGNORE);
}
#else
#pragma message("MPI_VERSION < 3 => Dynamic transmission mode will not be supported.")
fprintf(stderr, "MPI_VERSION < 3 => Cannot use dynamic transmission mode.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
#endif
}
Thomas Ponweiser
committed
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
void mpi_communicate_particles_rma(sim_t *s)
{
// Create MPI datatypes for sending particle data
MPI_Datatype *send_types;
MPI_Request *send_requests;
int *send_counts, *receiver_ranks;
int n_receivers;
mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers);
// Let other processes increase local recv_count (using remote memory access)
static MPI_Win recv_count_window = MPI_WIN_NULL;
static int recv_count;
int i, tag = 0;
if(recv_count_window == MPI_WIN_NULL) {
MPI_Aint window_size = sizeof(recv_count);
int displacement_unit = sizeof(int);
MPI_Win_create(
&recv_count, window_size, displacement_unit,
MPI_INFO_NULL, s->communicator,
&recv_count_window
);
}
recv_count = 0;
MPI_Win_fence(0, recv_count_window);
for(i = 0; i < n_receivers; i++) {
MPI_Aint target_displacement = 0;
MPI_Accumulate(
&send_counts[i], 1, MPI_INT, // send buffer
receiver_ranks[i], // target rank
target_displacement, 1, MPI_INT, // target buffer
MPI_SUM, recv_count_window // operation and window object
);
}
MPI_Win_fence(0, recv_count_window);
// Post non-blocking sends
send_requests = malloc(n_receivers * sizeof(*send_requests));
for(i = 0; i < n_receivers; i++) {
MPI_Isend(
MPI_BOTTOM, 1, send_types[i], receiver_ranks[i],
tag, s->communicator, &send_requests[i]
);
}
// Receive particles
MPI_Status status;
while(recv_count > 0) {
MPI_Probe(MPI_ANY_SOURCE, tag, s->communicator, &status);
recv_count -= receive_particles(s, &status);
}
// Finish sends
MPI_Waitall(n_receivers, send_requests, MPI_STATUSES_IGNORE);
// Cleanup
mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers);
free(send_requests);
}
int receive_particles(sim_t *s, const MPI_Status *status)
{
part_t *particles = &s->local_particles;
// Reserve space for new particles
int recv_count = mpitype_part_get_count(status);
int recv_offset = prepare_receive(particles, recv_count);
// Create MPI Datatype and receive
MPI_Datatype recv_type;
mpitype_part_init_recv(particles, recv_count, recv_offset, &recv_type);
MPI_Recv(
MPI_BOTTOM, 1, recv_type,
status->MPI_SOURCE, status->MPI_TAG,
s->communicator, MPI_STATUS_IGNORE
);
// Free MPI Datatype
mpitype_part_free_recv(&recv_type);
Thomas Ponweiser
committed
return recv_count;
}
void mpi_communicate_particles_collective(sim_t *s)
{
// Create MPI datatypes for sending particle data
MPI_Datatype *send_types_compact;
int *send_counts_compact, *receiver_ranks;
int n_receivers;
mpitype_part_init_send(&s->leaving_particles, &send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers);
// Prepare argument arrays for MPI_Alltoall{w}
int nprocs, rank, i, j, recv_offset, recv_count;
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
MPI_Datatype *send_types, *recv_types;
int *send_counts, *recv_counts;
int *displacements;
MPI_Comm_size(s->communicator, &nprocs);
MPI_Comm_rank(s->communicator, &rank);
send_types = malloc(nprocs * sizeof(*send_types));
recv_types = malloc(nprocs * sizeof(*recv_types));
send_counts = malloc(nprocs * sizeof(*send_counts));
recv_counts = malloc(nprocs * sizeof(*recv_counts));
displacements = malloc(nprocs * sizeof(*displacements));
for(i = 0; i < nprocs; i++) {
send_types[i] = MPI_BYTE;
recv_types[i] = MPI_BYTE;
send_counts[i] = 0;
recv_counts[i] = 0;
displacements[i] = 0;
}
for(i = 0; i < n_receivers; i++) {
j = receiver_ranks[i];
send_types[j] = send_types_compact[i];
send_counts[j] = send_counts_compact[i];
}
// Transfer particle counts (determine number of particles to receive)
MPI_Alltoall(send_counts, 1, MPI_INT, recv_counts, 1, MPI_INT, s->communicator);
// Count particles to receive and reserve space if needed
part_t* particles = &s->local_particles;
recv_count = 0;
for(i = 0; i < nprocs; i++) recv_count += recv_counts[i];
recv_offset = prepare_receive(particles, recv_count);
// Create MPI Datatypes for receiving particle data;
// adjust send/receive counts (transfer only one element of compound datatype)
for(i = 0; i < nprocs; i++)
{
if(send_counts[i] > 0) {
send_counts[i] = 1;
}
if(recv_counts[i] > 0) {
mpitype_part_init_recv(particles, recv_counts[i], recv_offset, &recv_types[i]);
recv_offset += recv_counts[i];
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
recv_counts[i] = 1;
}
}
// Transfer particle data
MPI_Alltoallw(
MPI_BOTTOM, send_counts, displacements, send_types,
MPI_BOTTOM, recv_counts, displacements, recv_types,
s->communicator
);
// Free MPI Datatypes
mpitype_part_free_send(&send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers);
for(i = 0; i < nprocs; i++) {
if(recv_counts[i] > 0) {
mpitype_part_free_recv(&recv_types[i]);
}
}
// Cleanup
free(send_types);
free(recv_types);
free(send_counts);
free(recv_counts);
free(displacements);
}
// --------------------------------------------------------- Helper functions
int prepare_receive(part_t *particles, int n_particles_to_receive)
int recv_offset = particles->count;
int new_count = recv_offset + n_particles_to_receive;
if(particles->capacity < new_count) {
int new_capacity = new_count + new_count / 8;
part_reserve(particles, new_capacity);
}
part_resize(particles, new_count);