#include <mpi.h> #include <stdlib.h> #include <stdio.h> #include "mpicomm.h" #include "mpitypes.h" // --------------------------------------------- Helper function declarations int prepare_receive(part_t *particles, int n_particles_to_receive); int receive_particles(sim_t *s, const MPI_Status *status); void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type); // ========================================================================== void mpi_broadcast_configuration(conf_t *configuration) { MPI_Datatype conf_mpitype; mpitype_conf_init(&conf_mpitype); MPI_Bcast(configuration, 1, conf_mpitype, 0, MPI_COMM_WORLD); mpitype_conf_free(&conf_mpitype); } void mpi_reduce_sim_info(const sim_info_t *local_info, sim_info_t *info) { MPI_Datatype info_mpitype; MPI_Op operation; int commutative = 1; mpitype_sim_info_init(&info_mpitype); MPI_Op_create(sim_info_reduce_op, commutative, &operation); MPI_Reduce(local_info, info, 1, info_mpitype, operation, 0, MPI_COMM_WORLD); mpitype_sim_info_free(&info_mpitype); MPI_Op_free(&operation); } void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type) { int i; const sim_info_t *other = invec; sim_info_t *result = inoutvec; for(i = 0; i < *len; ++i) { result->n_particles_total += other->n_particles_total; if(result->min_particles_per_cell > other->min_particles_per_cell) { result->min_particles_per_cell = other->min_particles_per_cell; } if(result->max_particles_per_cell < other->max_particles_per_cell) { result->max_particles_per_cell = other->max_particles_per_cell; } if(result->max_velocity < other->max_velocity) { result->max_velocity = other->max_velocity; } ++other; ++result; } } void mpi_communicate_particles_dynamic(sim_t *s) { #if MPI_VERSION >= 3 // ----------------------------------------------------------------------- // PHASE 1: Post nonblocking synchronous sends for all outgoing particles // ----------------------------------------------------------------------- MPI_Datatype *send_types; int *send_counts, *receiver_ranks; int n_receivers, i; int tag = 0; MPI_Request *send_requests; // Create MPI Send Types mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers); send_requests = malloc(n_receivers * sizeof(*send_requests)); for(i = 0; i < n_receivers; i++) { MPI_Issend( MPI_BOTTOM, 1, send_types[i], receiver_ranks[i], tag, s->communicator, &send_requests[i] ); } // Free MPI Send Types mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers); // ----------------------------------------------------------------------- // PHASE 2: Receive messages until all outgoing particles are transmitted // ----------------------------------------------------------------------- int local_sends_finished = 0; int message_pending = 0; MPI_Status status; while(!local_sends_finished) { // Check for next pending message and receive MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status); if(message_pending) { receive_particles(s, &status); } // Check for completion of local sends MPI_Testall(n_receivers, send_requests, &local_sends_finished, MPI_STATUSES_IGNORE); } free(send_requests); // ----------------------------------------------------------------------- // PHASE 3: Signal completion of local sends with non-blocking barrier // ----------------------------------------------------------------------- MPI_Request barrier_request; MPI_Ibarrier(s->communicator, &barrier_request); // ----------------------------------------------------------------------- // PHASE 4: Receive messages until other processes have finished sending too // ----------------------------------------------------------------------- int all_sends_finished = 0; while(!all_sends_finished) { // Check for next pending message and receive MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status); if(message_pending) { receive_particles(s, &status); } // Check for completion of other processes' sends MPI_Test(&barrier_request, &all_sends_finished, MPI_STATUS_IGNORE); } #else #pragma message("MPI_VERSION < 3 => Dynamic transmission mode will not be supported.") fprintf(stderr, "MPI_VERSION < 3 => Cannot use dynamic transmission mode.\n"); MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); #endif } void mpi_communicate_particles_rma(sim_t *s) { // Create MPI datatypes for sending particle data MPI_Datatype *send_types; MPI_Request *send_requests; int *send_counts, *receiver_ranks; int n_receivers; mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers); // Let other processes increase local recv_count (using remote memory access) static MPI_Win recv_count_window = MPI_WIN_NULL; static int recv_count; int i, tag = 0; if(recv_count_window == MPI_WIN_NULL) { MPI_Aint window_size = sizeof(recv_count); int displacement_unit = sizeof(int); MPI_Win_create( &recv_count, window_size, displacement_unit, MPI_INFO_NULL, s->communicator, &recv_count_window ); } recv_count = 0; MPI_Win_fence(0, recv_count_window); for(i = 0; i < n_receivers; i++) { MPI_Aint target_displacement = 0; MPI_Accumulate( &send_counts[i], 1, MPI_INT, // send buffer receiver_ranks[i], // target rank target_displacement, 1, MPI_INT, // target buffer MPI_SUM, recv_count_window // operation and window object ); } MPI_Win_fence(0, recv_count_window); // Post non-blocking sends send_requests = malloc(n_receivers * sizeof(*send_requests)); for(i = 0; i < n_receivers; i++) { MPI_Isend( MPI_BOTTOM, 1, send_types[i], receiver_ranks[i], tag, s->communicator, &send_requests[i] ); } // Receive particles MPI_Status status; while(recv_count > 0) { MPI_Probe(MPI_ANY_SOURCE, tag, s->communicator, &status); recv_count -= receive_particles(s, &status); } // Finish sends MPI_Waitall(n_receivers, send_requests, MPI_STATUSES_IGNORE); // Cleanup mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers); free(send_requests); } int receive_particles(sim_t *s, const MPI_Status *status) { part_t *particles = &s->local_particles; // Reserve space for new particles int recv_count = mpitype_part_get_count(status); int recv_offset = prepare_receive(particles, recv_count); // Create MPI Datatype and receive MPI_Datatype recv_type; mpitype_part_init_recv(particles, recv_count, recv_offset, &recv_type); MPI_Recv( MPI_BOTTOM, 1, recv_type, status->MPI_SOURCE, status->MPI_TAG, s->communicator, MPI_STATUS_IGNORE ); // Free MPI Datatype mpitype_part_free_recv(&recv_type); return recv_count; } void mpi_communicate_particles_collective(sim_t *s) { // Create MPI datatypes for sending particle data MPI_Datatype *send_types_compact; int *send_counts_compact, *receiver_ranks; int n_receivers; mpitype_part_init_send(&s->leaving_particles, &send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers); // Prepare argument arrays for MPI_Alltoall{w} int nprocs, rank, i, j, recv_offset, recv_count; MPI_Datatype *send_types, *recv_types; int *send_counts, *recv_counts; int *displacements; MPI_Comm_size(s->communicator, &nprocs); MPI_Comm_rank(s->communicator, &rank); send_types = malloc(nprocs * sizeof(*send_types)); recv_types = malloc(nprocs * sizeof(*recv_types)); send_counts = malloc(nprocs * sizeof(*send_counts)); recv_counts = malloc(nprocs * sizeof(*recv_counts)); displacements = malloc(nprocs * sizeof(*displacements)); for(i = 0; i < nprocs; i++) { send_types[i] = MPI_BYTE; recv_types[i] = MPI_BYTE; send_counts[i] = 0; recv_counts[i] = 0; displacements[i] = 0; } for(i = 0; i < n_receivers; i++) { j = receiver_ranks[i]; send_types[j] = send_types_compact[i]; send_counts[j] = send_counts_compact[i]; } // Transfer particle counts (determine number of particles to receive) MPI_Alltoall(send_counts, 1, MPI_INT, recv_counts, 1, MPI_INT, s->communicator); // Count particles to receive and reserve space if needed part_t* particles = &s->local_particles; recv_count = 0; for(i = 0; i < nprocs; i++) recv_count += recv_counts[i]; recv_offset = prepare_receive(particles, recv_count); // Create MPI Datatypes for receiving particle data; // adjust send/receive counts (transfer only one element of compound datatype) for(i = 0; i < nprocs; i++) { if(send_counts[i] > 0) { send_counts[i] = 1; } if(recv_counts[i] > 0) { mpitype_part_init_recv(particles, recv_counts[i], recv_offset, &recv_types[i]); recv_offset += recv_counts[i]; recv_counts[i] = 1; } } // Transfer particle data MPI_Alltoallw( MPI_BOTTOM, send_counts, displacements, send_types, MPI_BOTTOM, recv_counts, displacements, recv_types, s->communicator ); // Free MPI Datatypes mpitype_part_free_send(&send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers); for(i = 0; i < nprocs; i++) { if(recv_counts[i] > 0) { mpitype_part_free_recv(&recv_types[i]); } } // Cleanup free(send_types); free(recv_types); free(send_counts); free(recv_counts); free(displacements); } // --------------------------------------------------------- Helper functions int prepare_receive(part_t *particles, int n_particles_to_receive) { int recv_offset = particles->count; int new_count = recv_offset + n_particles_to_receive; if(particles->capacity < new_count) { int new_capacity = new_count + new_count / 8; part_reserve(particles, new_capacity); } part_resize(particles, new_count); return recv_offset; }