Commit 95cd288e authored by Jussi Enkovaara's avatar Jussi Enkovaara
Browse files

Exercise on simple parallel I/O

parent d890f883
Copyright (C) 2018 CSC - IT Center for Science Ltd.
Licensed under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
Code is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
Copy of the GNU General Public License can be obtained from
<http://www.gnu.org/licenses/>.
## Parallel I/O with Posix
a) Write data from all MPI tasks to a single file using the spokesman
strategy. Gather data to a single MPI task and write it to a file. The
data should be kept in the order of the MPI ranks.
b) Verify the above write by reading the file using the spokesman
strategy. Use different number of MPI tasks than in writing.
c) Implement the above write so that all the MPI tasks write in to
separate files. Skeleton codes are found in
[spokesman.c](c/spokesman.c) and
[spokesman_reader.c](c/spokesman_reader.c), or in
[spokesman.F90](fortran/spokesman.F90) and
[spokesman_reader.F90](fortran/spokesman_reader.F90)
d) Rewrite exercise a) so that all MPI tasks participate in the
writing/reading in to a single file using MPI-I/O.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void mpiio_writer(int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, i, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
for (i = 0; i < localsize; i++) {
localvector[i] = i + 1 + localsize * my_id;
}
mpiio_writer(my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void mpiio_writer(int my_id, int *localvector, int localsize)
{
MPI_File fh;
MPI_Offset offset;
MPI_File_open(MPI_COMM_WORLD, "mpiio.dat",
MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
offset = my_id * localsize * sizeof(int);
MPI_File_write_at_all(fh, offset, localvector,
localsize, MPI_INT, MPI_STATUS_IGNORE);
MPI_File_close(&fh);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void single_writer(int, int *, int);
void many_writers(int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, i, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
for (i = 0; i < localsize; i++) {
localvector[i] = i + 1 + localsize * my_id;
}
many_writers(my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void single_writer(int my_id, int *localvector, int localsize)
{
FILE *fp;
int *fullvector;
fullvector = (int *) malloc(DATASIZE * sizeof(int));
MPI_Gather(localvector, localsize, MPI_INT, fullvector, localsize,
MPI_INT, WRITER_ID, MPI_COMM_WORLD);
if (my_id == WRITER_ID) {
if ((fp = fopen("singlewriter.dat", "wb")) == NULL) {
fprintf(stderr, "Error: %d (%s)\n", errno, strerror(errno));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
} else {
fwrite(fullvector, sizeof(int), DATASIZE, fp);
fclose(fp);
printf("Wrote %d elements to file singlewriter.dat\n", DATASIZE);
}
}
free(fullvector);
}
void many_writers(int my_id, int *localvector, int localsize)
{
FILE *fp;
char filename[64];
sprintf(filename, "manywriters-%d.dat", my_id);
if ((fp = fopen(filename, "wb")) == NULL) {
fprintf(stderr, "Error: %d (%s)\n", errno, strerror(errno));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
} else {
fwrite(localvector, sizeof(int), localsize, fp);
fclose(fp);
printf("Wrote %d elements to file manywriters-%d.dat\n", localsize,
my_id);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void single_writer(int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, i, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
for (i = 0; i < localsize; i++) {
localvector[i] = i + 1 + localsize * my_id;
}
single_writer(my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void single_writer(int my_id, int *localvector, int localsize)
{
FILE *fp;
int *fullvector;
fullvector = (int *) malloc(DATASIZE * sizeof(int));
MPI_Gather(localvector, localsize, MPI_INT, fullvector, localsize,
MPI_INT, WRITER_ID, MPI_COMM_WORLD);
if (my_id == WRITER_ID) {
if ((fp = fopen("singlewriter.dat", "wb")) == NULL) {
fprintf(stderr, "Error: %d (%s)\n", errno, strerror(errno));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
} else {
fwrite(fullvector, sizeof(int), DATASIZE, fp);
fclose(fp);
printf("Wrote %d elements to file singlewriter.dat\n", DATASIZE);
}
}
free(fullvector);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void single_reader(int, int *, int);
void ordered_print(int, int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
single_reader(my_id, localvector, localsize);
ordered_print(ntasks, my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void single_reader(int my_id, int *localvector, int localsize)
{
FILE *fp;
int *fullvector, nread;
char *fname = "singlewriter.dat";
fullvector = (int *) malloc(DATASIZE * sizeof(int));
if (my_id == WRITER_ID) {
if ((fp = fopen(fname, "rb")) == NULL) {
fprintf(stderr, "Error: %d (%s)\n", errno, strerror(errno));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
} else {
nread = fread(fullvector, sizeof(int), DATASIZE, fp);
fclose(fp);
if (nread != DATASIZE) {
fprintf(stderr, "Warning! The number of read elements is "
" incorrect.\n");
} else {
printf("Read %i numbers from file %s\n", nread, fname);
}
}
}
MPI_Scatter(fullvector, localsize, MPI_INT, localvector, localsize,
MPI_INT, WRITER_ID, MPI_COMM_WORLD);
free(fullvector);
}
/* Try to avoid this type of pattern when ever possible.
Here we are using this serialized output just to make the
debugging easier. */
void ordered_print(int ntasks, int rank, int *buffer, int n)
{
int task, i;
for (task = 0; task < ntasks; task++) {
if (rank == task) {
printf("Task %i received:", rank);
for (i = 0; i < n; i++) {
printf(" %2i", buffer[i]);
}
printf("\n");
}
MPI_Barrier(MPI_COMM_WORLD);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void single_writer(int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, i, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
for (i = 0; i < localsize; i++) {
localvector[i] = i + 1 + localsize * my_id;
}
single_writer(my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void single_writer(int my_id, int *localvector, int localsize)
{
FILE *fp;
int *fullvector;
/* TODO: Implement a function that will write the data to file so that
a single process does the file io. Use rank WRITER_ID as the io rank */
free(fullvector);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <mpi.h>
#define DATASIZE 64
#define WRITER_ID 0
void single_reader(int, int *, int);
void ordered_print(int, int, int *, int);
int main(int argc, char *argv[])
{
int my_id, ntasks, localsize;
int *localvector;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
if (ntasks > 64) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (DATASIZE % ntasks != 0) {
fprintf(stderr, "Datasize (64) should be divisible by number "
"of tasks.\n");
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
localsize = DATASIZE / ntasks;
localvector = (int *) malloc(localsize * sizeof(int));
single_reader(my_id, localvector, localsize);
ordered_print(ntasks, my_id, localvector, localsize);
free(localvector);
MPI_Finalize();
return 0;
}
void single_reader(int my_id, int *localvector, int localsize)
{
FILE *fp;
int *fullvector, nread;
char *fname = "singlewriter.dat";
/* TODO: Implement a function that will read the data from a file so that
a single process does the file io. Use rank WRITER_ID as the io rank */
free(fullvector);
}
/* Try to avoid this type of pattern when ever possible.
Here we are using this serialized output just to make the
debugging easier. */
void ordered_print(int ntasks, int rank, int *buffer, int n)
{
int task, i;
for (task = 0; task < ntasks; task++) {
if (rank == task) {
printf("Task %i received:", rank);
for (i = 0; i < n; i++) {
printf(" %2i", buffer[i]);
}
printf("\n");
}
MPI_Barrier(MPI_COMM_WORLD);
}
}
program pario
use mpi
use, intrinsic :: iso_fortran_env, only : error_unit, output_unit
implicit none
integer, parameter :: datasize = 64, writer_id = 0
integer :: rc, my_id, ntasks, localsize, i
integer, dimension(:), allocatable :: localvector
call mpi_init(rc)
call mpi_comm_size(mpi_comm_world, ntasks, rc)
call mpi_comm_rank(mpi_comm_world, my_id, rc)
if (ntasks > 64) then
write(error_unit, *) 'Maximum number of tasks is 64!'
call mpi_abort(MPI_COMM_WORLD, -1, rc)
end if
if (mod(datasize, ntasks) /= 0) then
write(error_unit,*) 'Datasize (64) should be divisible by number of tasks'
call mpi_abort(MPI_COMM_WORLD, -1, rc)
end if
localsize = datasize / ntasks
allocate(localvector(localsize))
localvector = [(i + my_id * localsize, i=1,localsize)]
call mpiio_writer()
deallocate(localvector)
call mpi_finalize(rc)
contains
subroutine mpiio_writer()
implicit none
integer :: fh, rc, dsize
integer(kind=MPI_OFFSET_KIND) :: offset;
call mpi_type_size(MPI_INTEGER, dsize, rc)
offset = my_id * localsize * dsize
call mpi_file_open(MPI_COMM_WORLD, 'mpiio.dat', &
& MPI_MODE_CREATE+MPI_MODE_WRONLY, MPI_INFO_NULL, fh, rc)
call mpi_file_write_at_all(fh, offset, localvector, localsize, &
& MPI_INTEGER, MPI_STATUS_IGNORE, rc)
call mpi_file_close(fh, rc)
end subroutine mpiio_writer
end program pario
program pario
use mpi
use, intrinsic :: iso_fortran_env, only : error_unit, output_unit
implicit none
integer, parameter :: datasize = 64, writer_id = 0
integer :: rc, my_id, ntasks, localsize, i
integer, dimension(:), allocatable :: localvector
integer, dimension(datasize) :: fullvector
call mpi_init(rc)
call mpi_comm_size(mpi_comm_world, ntasks, rc)
call mpi_comm_rank(mpi_comm_world, my_id, rc)
if (ntasks > 64) then
write(error_unit, *) 'Maximum number of tasks is 64!'
call mpi_abort(MPI_COMM_WORLD, -1, rc)
end if
if (mod(datasize, ntasks) /= 0) then
write(error_unit,*) 'Datasize (64) should be divisible by number of tasks'
call mpi_abort(MPI_COMM_WORLD, -1, rc)
end if
localsize = datasize / ntasks
allocate(localvector(localsize))
localvector = [(i + my_id * localsize, i=1,localsize)]
call many_writers()
deallocate(localvector)
call mpi_finalize(rc)
contains
subroutine single_writer()
implicit none
call mpi_gather(localvector, localsize, mpi_integer, fullvector, &
& localsize, mpi_integer, writer_id, mpi_comm_world, rc)
if (my_id == writer_id) then
open(10, file='singlewriter.dat', status='replace', form='unformatted', &
& access='stream')
write(10, pos=1) fullvector
close (10)
write(output_unit,'(A,I0,A)') 'Wrote ', size(fullvector), &
& ' elements to file singlewriter.dat'
end if
end subroutine single_writer
subroutine many_writers()
implicit none
character(len=85) :: filename
write(filename, '(A,I0,A)') 'manywriters-', my_id, '.dat'
open(my_id+10, file=filename, status='replace', form='unformatted', &