Skip to content
markovchain.c 10.5 KiB
Newer Older
#include <stdio.h> 
#include <stdlib.h>
#include <stdint.h>

#include "mpi.h" 


// adjust BITS_PER_STATE and STATES_PER_PATTERN according to the problem
// NOTE: (BITS_PER_STATE * STATES_PER_PATTERN) should not exceed 30 bits

// number of bits to represent a state
#define BITS_PER_STATE 3
// number of states to represent a pattern
#define STATES_PER_PATTERN 4


// number of unique states
#define NUM_STATES (1 << BITS_PER_STATE)
// number of unique patterns
#define NUM_PATTERNS (1 << (BITS_PER_STATE * STATES_PER_PATTERN))
// number of preceding states that result in the last state o the pattern
// *history: extra states to read before establishing first pattern
#define LEN_HISTORY (STATES_PER_PATTERN - 1)
// default state for unknown state symbols
#define UNKNOWN_STATE 0
// bitmask where only bits for a single state is set
#define MASK_STATE (NUM_STATES - 1)
// bitmask where only bits for a complete pattern is set
#define MASK_PATTERN (NUM_PATTERNS - 1)


uint32_t stateID(char c) {
    // get the state id for this state symbol

    // if this is an unknown symbol, return UNKNOWN_STATE (i. e. 0)
    if (c < 'a') return UNKNOWN_STATE;
    if (c >= ('a' + NUM_STATES)) return UNKNOWN_STATE;

    // return 1 for a, 2 for b, ... 
    return (c - 'a' + 1);
}	



char stateSYM(uint32_t s) {
    // get the state symbol for this state id

    // truncate overflowing bits (if any) from state
    s = s & MASK_STATE;

    // if this is an unknown symbol, return '_'
    if (s == UNKNOWN_STATE) return '_';

    // return a for 1, b for 2, ... 
    return ('a' + s - 1);
}	



uint32_t add_state(uint32_t pattern, uint32_t state) {
    // drop oldest state in pattern
    // shift pattern by one state
    // place state as the newest state in pattern

    // truncate overflowing bits (if any) from state
    state = state & MASK_STATE;
    // shift the pattern to clear space for the newest state
    pattern = pattern << BITS_PER_STATE;
    // truncate the pattern to drop the oldest state
    pattern = pattern & MASK_PATTERN;
    // add the given state as the newest state in the pattern
    pattern = pattern | state;
    // return the altered pattern
    return pattern;
}


char * print_pattern(char *buf, uint32_t pattern) {
    // print the given pattern onto the given buffer

    // for each state in pattern
    for (int i=0; i<STATES_PER_PATTERN; i++) {
        // print state for the current symbol (moving right to left)
        buf[STATES_PER_PATTERN-i-1] = stateSYM(pattern & MASK_STATE);
        // drop the newest symbol (moving new to old)
        pattern = pattern >> BITS_PER_STATE;
    }
    // shift final state
    buf[STATES_PER_PATTERN] = buf[STATES_PER_PATTERN-1];
    // put a seperator before final state
    buf[STATES_PER_PATTERN-1] = ':';
    // terminate the string
    buf[STATES_PER_PATTERN+1] = '\0';

    // return buffer
    return buf;
}



char *buffer_from_file(char *filename, int *bytes_read) {
    // creates a buffer on each process
    // reads states (single char symbols) from file (named filename) into buffer
    // sets bytes_read to the number of states read into buffer

    MPI_File filehandle;    // handle for referring to opened file
    char *buffer;           // buffer to hold states read from file
    int id_me;              // id of this process
    int id_max;             // number of processes
    MPI_Offset size_file;   // number of states in the file
    MPI_Offset size_chunk;  // number of states per process (excluding history*)
    MPI_Offset size_buffer; // number of states the buffer can hold
    MPI_Offset size_skip;   // number of states to skip before reading into buffer
    MPI_Status status;      // status to hold the result of the reading attempt

    // get the id of this process
    MPI_Comm_rank(MPI_COMM_WORLD, &id_me);
    // get the number of processes
    MPI_Comm_size(MPI_COMM_WORLD, &id_max);


    if (MPI_File_open(           // open the file ...
                MPI_COMM_WORLD,  // ... for all processes in MPI_COMM_WORLD
                filename,        // ... named filename
                MPI_MODE_RDONLY, // ... as read only
                MPI_INFO_NULL,   // ... using default hints (see man page)
                &filehandle      // ... and refer to it as filehandle
                ) != MPI_SUCCESS) {
        // if the operation is not successfull
        printf("ERROR: could not read from input file\n");
        return NULL;
    }

    // get the number of states in the file
    MPI_File_get_size(filehandle, &size_file);

    // split the number of states (excluding states) to processes
    size_chunk = (size_file - LEN_HISTORY) / id_max;
    // each buffer should be able to hold a chunk AND history*
    size_buffer = size_chunk + LEN_HISTORY;
    // the last process should be able to hold up to id_max excess states
    // excess states come from the remainder of "file / process" division,
    // ... thus, may nox exceed id_max
    if (id_me == (id_max - 1)) size_buffer += id_max;

    // get the number of states to skip before starting reading
    size_skip = id_me * size_chunk;

    // allocate memory for buffer
    buffer = (char *) malloc(sizeof(char) * ((size_t) size_buffer));
    if (buffer == NULL) {
        // if the allocation fails
        printf("ERROR: could not allocate memory for buffer\n");
        return NULL;
    }

    MPI_File_seek(         // reposition the cursor in file ...
            filehandle,    // ... using filehandle
            size_skip,     // ... skipping size_skip states
            MPI_SEEK_SET); // ... using absolute position (in contrast to relative position) in file

    MPI_File_read(       // read states from file ...
            filehandle,  // ... using filehandle
            buffer,      // ... into buffer
            size_buffer, // ... up to size_buffer states
            MPI_BYTE,    // ... where each state is a MPI_BYTE
            &status);    // ... recording the result in status variable

    // get the number of states actually read from file
    MPI_Get_count(&status, MPI_BYTE, bytes_read);

    // place a sentinel value at the end of the buffer
    buffer[*bytes_read] = '\0';

    // close the file
    MPI_File_close(&filehandle); 

    // return the address of the buffer
    return buffer;
}



unsigned int *hitcounts_from_buffer(char *buffer, int bytes_read) {
    // allocate a hit table for patterns
    // scan buffer and the count occurence of each pattern
    unsigned int *hits; // number of hits (indexed by pattern)
    uint32_t pattern;   // current pattern

    // allocate memory for the hit table
    hits = (unsigned int *) malloc(sizeof(unsigned int) * NUM_PATTERNS);
    if (hits == NULL) {
        // if the allocation fails
        printf("ERROR: could not allocate memory for hit table\n");
        return NULL;
    }
    // initialize the hit table
    for (int i=0; i<NUM_PATTERNS; i++) hits[i] = 0;

    // start with an empty pattern
    pattern = 0;

    // fill the pattern to obtain history*
    for (int i=0; i<LEN_HISTORY; i++) {
        // drop oldest state, add next state from buffer
        pattern = add_state(pattern, stateID(buffer[i]));
    }

    // read states from buffer and update hit table
    for (int i=LEN_HISTORY; i<bytes_read; i++) {
        // drop oldest state, add next state from buffer
        pattern = add_state(pattern, stateID(buffer[i]));
        // update the number of times this pattern is encountered
        hits[pattern] += 1;
    }

    // return hi table
    return hits;
}



void analyse_and_print_results(unsigned int * hits) {
    // gather hit counts from all processes
    // calculate frequencies
    // print results

    int id_me;               // id of this process
    unsigned int hit_count;  // hit count per history
    uint32_t pattern;        // current pattern
    char pbuf[32];           // buffer for printing pattern

    // get the id of this process
    MPI_Comm_rank(MPI_COMM_WORLD, &id_me);

    MPI_Reduce(                           // reduce local results ...
            id_me ? hits : MPI_IN_PLACE,  // ... from hits (special value MPI_IN_PLACE for root process)
            // ...   as this process overwrites its send buffer
            hits,                         // ... into hits
            NUM_PATTERNS,                 // ... for NUM_PATTERNS items
            MPI_UNSIGNED,                 // ... of unsigned int data type
            MPI_SUM,                      // ... appling sum operation
            0,                            // ... gathering in the root process
            MPI_COMM_WORLD);              // ... for all processes in MPI_COMM_WORLD

    if (id_me == 0) { // only on the main process

        for (int i=0; i < (1 << (BITS_PER_STATE * LEN_HISTORY)); i++) { // for each history pattern
            // reset hit count for this history
            hit_count = 0;

            for (int j=0; j < NUM_STATES; j++) { // for each resulting state
                // count the occurrent of this history pattern
                hit_count += hits[(i << BITS_PER_STATE) + j];
            }

            if (hit_count) { // if this history pattern occured at least once

                for (int j=0; j < NUM_STATES; j++) { // for each resulting state
                    pattern = (i << BITS_PER_STATE) + j; // calculate pattern to avoid repetition
                    if (hits[pattern]) { // only if hits for this pattern is non-zero
                        printf("%s : %d / %d\n", print_pattern(pbuf, pattern), hits[pattern], hit_count);
                    }
                }

                // insert an empty line to separate history patterns
                printf("\n");

            }
        }
    }
}


int main(int argc, char *argv[]) { 

    unsigned int *hits; // hit counts per pattern
    char *buffer;       // buffer to read from file
    int bytes_read;     // number of states read

    // initiaize MPI subsystem
    MPI_Init(&argc, &argv); 

    // print an error if the input file is not specified
    if (argc != 2) {
        printf("USAGE: %s <input_file>\n", argv[0]);
        MPI_Finalize(); 
        exit(EXIT_FAILURE);
    }

    // read a part of the input file into local buffer
    buffer = buffer_from_file(argv[1],  &bytes_read);

    // count occurences of patterns i local buffer
    hits = hitcounts_from_buffer(buffer, bytes_read);

    // free buffer as it is no longer needed
    free(buffer);

    // combine and process the data, then print the results
    analyse_and_print_results(hits);

    // free hits as it is no longer needed
    free(hits);

    // finalize the MPI subsystem
    MPI_Finalize();

    // finalize the program
    return EXIT_SUCCESS;
}