Skip to content
Snippets Groups Projects
mpicomm.c 10.2 KiB
Newer Older
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>

#include "mpicomm.h"
#include "mpitypes.h"

// --------------------------------------------- Helper function declarations

int prepare_receive(part_t *particles, int n_particles_to_receive);
int receive_particles(sim_t *s, const MPI_Status *status);

void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type);

// ==========================================================================

void mpi_broadcast_configuration(conf_t *configuration)
{
   MPI_Datatype conf_mpitype;
   mpitype_conf_init(&conf_mpitype);
   MPI_Bcast(configuration, 1, conf_mpitype, 0, MPI_COMM_WORLD);
   mpitype_conf_free(&conf_mpitype);
}

void mpi_reduce_sim_info(const sim_info_t *local_info, sim_info_t *info)
{
   MPI_Datatype info_mpitype;
   MPI_Op operation;
   int commutative = 1;

   mpitype_sim_info_init(&info_mpitype);
   MPI_Op_create(sim_info_reduce_op, commutative, &operation);

   MPI_Reduce(local_info, info, 1, info_mpitype, operation, 0, MPI_COMM_WORLD);

   mpitype_sim_info_free(&info_mpitype);
   MPI_Op_free(&operation);
}

void sim_info_reduce_op(void *invec, void *inoutvec, int *len, MPI_Datatype *type)
{
   int i;
   const sim_info_t *other = invec;
   sim_info_t *result = inoutvec;

   for(i = 0; i < *len; ++i) {
      result->n_particles_total += other->n_particles_total;

      if(result->min_particles_per_cell > other->min_particles_per_cell) {
         result->min_particles_per_cell = other->min_particles_per_cell;
      }

      if(result->max_particles_per_cell < other->max_particles_per_cell) {
         result->max_particles_per_cell = other->max_particles_per_cell;
      }

      if(result->max_velocity < other->max_velocity) {
         result->max_velocity = other->max_velocity;
      }
      ++other;
      ++result;
   }
}

void mpi_communicate_particles_dynamic(sim_t *s)
{
#if MPI_VERSION >= 3
   // -----------------------------------------------------------------------
   // PHASE 1: Post nonblocking synchronous sends for all outgoing particles
   // -----------------------------------------------------------------------
   MPI_Datatype *send_types;
   int *send_counts, *receiver_ranks;
   int n_receivers, i;
   int tag = 0;
   MPI_Request *send_requests;

   // Create MPI Send Types
   mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers);
   send_requests = malloc(n_receivers * sizeof(*send_requests));
   for(i = 0; i < n_receivers; i++) {
      MPI_Issend(
            MPI_BOTTOM, 1, send_types[i], receiver_ranks[i],
            tag, s->communicator, &send_requests[i]
      );
   }

   // Free MPI Send Types
   mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers);

   // -----------------------------------------------------------------------
   // PHASE 2: Receive messages until all outgoing particles are transmitted
   // -----------------------------------------------------------------------
   int local_sends_finished = 0;
   int message_pending = 0;
   MPI_Status status;

   while(!local_sends_finished) {
      // Check for next pending message and receive
      MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status);
      if(message_pending) {
         receive_particles(s, &status);
      }

      // Check for completion of local sends
      MPI_Testall(n_receivers, send_requests, &local_sends_finished, MPI_STATUSES_IGNORE);
   }
   free(send_requests);

   // -----------------------------------------------------------------------
   // PHASE 3: Signal completion of local sends with non-blocking barrier
   // -----------------------------------------------------------------------
   MPI_Request barrier_request;
   MPI_Ibarrier(s->communicator, &barrier_request);

   // -----------------------------------------------------------------------
   // PHASE 4: Receive messages until other processes have finished sending too
   // -----------------------------------------------------------------------
   int all_sends_finished = 0;

   while(!all_sends_finished) {
      // Check for next pending message and receive
      MPI_Iprobe(MPI_ANY_SOURCE, tag, s->communicator, &message_pending, &status);
      if(message_pending) {
         receive_particles(s, &status);
      }
      // Check for completion of other processes' sends
      MPI_Test(&barrier_request, &all_sends_finished, MPI_STATUS_IGNORE);
   }
#else
   #pragma message("MPI_VERSION < 3 => Dynamic transmission mode will not be supported.")
   fprintf(stderr, "MPI_VERSION < 3 => Cannot use dynamic transmission mode.\n");
   MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
#endif
}

void mpi_communicate_particles_rma(sim_t *s)
{
   // Create MPI datatypes for sending particle data
   MPI_Datatype *send_types;
   MPI_Request *send_requests;
   int *send_counts, *receiver_ranks;
   int n_receivers;

   mpitype_part_init_send(&s->leaving_particles, &send_types, &send_counts, &receiver_ranks, &n_receivers);

   // Let other processes increase local recv_count (using remote memory access)
   static MPI_Win recv_count_window = MPI_WIN_NULL;
   static int recv_count;
   int i, tag = 0;

   if(recv_count_window == MPI_WIN_NULL) {
      MPI_Aint window_size = sizeof(recv_count);
      int displacement_unit = sizeof(int);

      MPI_Win_create(
            &recv_count, window_size, displacement_unit,
            MPI_INFO_NULL, s->communicator,
            &recv_count_window
      );
   }
   recv_count = 0;
   MPI_Win_fence(0, recv_count_window);
   for(i = 0; i < n_receivers; i++) {
      MPI_Aint target_displacement = 0;
      MPI_Accumulate(
            &send_counts[i], 1, MPI_INT,     // send buffer
            receiver_ranks[i],               // target rank
            target_displacement, 1, MPI_INT, // target buffer
            MPI_SUM, recv_count_window       // operation and window object
      );
   }
   MPI_Win_fence(0, recv_count_window);

   // Post non-blocking sends
   send_requests = malloc(n_receivers * sizeof(*send_requests));
   for(i = 0; i < n_receivers; i++) {
      MPI_Isend(
            MPI_BOTTOM, 1, send_types[i], receiver_ranks[i],
            tag, s->communicator, &send_requests[i]
      );
   }

   // Receive particles
   MPI_Status status;
   while(recv_count > 0) {
      MPI_Probe(MPI_ANY_SOURCE, tag, s->communicator, &status);
      recv_count -= receive_particles(s, &status);
   }

   // Finish sends
   MPI_Waitall(n_receivers, send_requests, MPI_STATUSES_IGNORE);

   // Cleanup
   mpitype_part_free_send(&send_types, &send_counts, &receiver_ranks, &n_receivers);
   free(send_requests);
}

int receive_particles(sim_t *s, const MPI_Status *status)
{
   part_t *particles = &s->local_particles;

   // Reserve space for new particles
   int recv_count = mpitype_part_get_count(status);
   int recv_offset = prepare_receive(particles, recv_count);

   // Create MPI Datatype and receive
   MPI_Datatype recv_type;
   mpitype_part_init_recv(particles, recv_count, recv_offset, &recv_type);
         MPI_BOTTOM, 1, recv_type,
         status->MPI_SOURCE, status->MPI_TAG,
         s->communicator, MPI_STATUS_IGNORE
   );

   // Free MPI Datatype
   mpitype_part_free_recv(&recv_type);
}

void mpi_communicate_particles_collective(sim_t *s)
{
   // Create MPI datatypes for sending particle data
   MPI_Datatype *send_types_compact;
   int *send_counts_compact, *receiver_ranks;
   int n_receivers;

   mpitype_part_init_send(&s->leaving_particles, &send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers);

   // Prepare argument arrays for MPI_Alltoall{w}
   int nprocs, rank, i, j, recv_offset, recv_count;
   MPI_Datatype *send_types, *recv_types;
   int *send_counts, *recv_counts;
   int *displacements;

   MPI_Comm_size(s->communicator, &nprocs);
   MPI_Comm_rank(s->communicator, &rank);

   send_types = malloc(nprocs * sizeof(*send_types));
   recv_types = malloc(nprocs * sizeof(*recv_types));
   send_counts = malloc(nprocs * sizeof(*send_counts));
   recv_counts = malloc(nprocs * sizeof(*recv_counts));
   displacements = malloc(nprocs * sizeof(*displacements));

   for(i = 0; i < nprocs; i++) {
      send_types[i] = MPI_BYTE;
      recv_types[i] = MPI_BYTE;
      send_counts[i] = 0;
      recv_counts[i] = 0;
      displacements[i] = 0;
   }

   for(i = 0; i < n_receivers; i++) {
      j = receiver_ranks[i];
      send_types[j] = send_types_compact[i];
      send_counts[j] = send_counts_compact[i];
   }

   // Transfer particle counts (determine number of particles to receive)
   MPI_Alltoall(send_counts, 1, MPI_INT, recv_counts, 1, MPI_INT, s->communicator);

   // Count particles to receive and reserve space if needed
   part_t* particles = &s->local_particles;
   recv_count = 0;
   for(i = 0; i < nprocs; i++) recv_count += recv_counts[i];
   recv_offset = prepare_receive(particles, recv_count);
   // Create MPI Datatypes for receiving particle data;
   // adjust send/receive counts (transfer only one element of compound datatype)
   for(i = 0; i < nprocs; i++)
   {
      if(send_counts[i] > 0) {
         send_counts[i] = 1;
      }
      if(recv_counts[i] > 0) {
         mpitype_part_init_recv(particles, recv_counts[i], recv_offset, &recv_types[i]);
         recv_offset += recv_counts[i];
         recv_counts[i] = 1;
      }
   }

   // Transfer particle data
   MPI_Alltoallw(
         MPI_BOTTOM, send_counts, displacements, send_types,
         MPI_BOTTOM, recv_counts, displacements, recv_types,
         s->communicator
   );

   // Free MPI Datatypes
   mpitype_part_free_send(&send_types_compact, &send_counts_compact, &receiver_ranks, &n_receivers);
   for(i = 0; i < nprocs; i++) {
      if(recv_counts[i] > 0) {
         mpitype_part_free_recv(&recv_types[i]);
      }
   }

   // Cleanup
   free(send_types);
   free(recv_types);
   free(send_counts);
   free(recv_counts);
   free(displacements);
}

// --------------------------------------------------------- Helper functions

int prepare_receive(part_t *particles, int n_particles_to_receive)
   int recv_offset = particles->count;
   int new_count = recv_offset + n_particles_to_receive;

   if(particles->capacity < new_count) {
      int new_capacity = new_count + new_count / 8;
      part_reserve(particles, new_capacity);
   }
   part_resize(particles, new_count);