#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>

#define READ_BUF_SIZE 4096
#define LINE_BUF_SIZE 4096

static void write_all(int fd, const char *buf, size_t len)
{
    while (len > 0) {
        ssize_t n = write(fd, buf, len);
        if (n < 0) {
            if (errno == EINTR) {
                continue;
            }
            _exit(1);
        }
        buf += n;
        len -= (size_t)n;
    }
}

static void write_str(int fd, const char *s)
{
    write_all(fd, s, strlen(s));
}

static void log_errno_msg(const char *prefix)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf), "%s: %s\n", prefix, strerror(errno));
    if (n > 0) {
        write_all(STDERR_FILENO, buf, (size_t)n);
    }
}

static void log_usage(const char *progname)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf),
                     "Uso: %s <tmp_dir> <n_map_files> <n_reducers>\n",
                     progname);
    if (n > 0) {
        write_all(STDERR_FILENO, buf, (size_t)n);
    }
}

static int wait_child(pid_t pid)
{
    int status;

    while (waitpid(pid, &status, 0) < 0) {
        if (errno == EINTR) {
            continue;
        }
        return -1;
    }

    if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
        return 0;
    }

    errno = ECHILD;
    return -1;
}

static void build_path(char *out, size_t out_size,
                       const char *dir, const char *prefix, int index)
{
    snprintf(out, out_size, "%s/%s%d.tmp", dir, prefix, index);
}

static unsigned long hash_word(const char *s)
{
    unsigned long h = 5381UL;

    while (*s != '\0') {
        h = ((h << 5) + h) + (unsigned char)(*s);
        ++s;
    }

    return h;
}

static int partition_map_files(const char *tmp_dir, int n_map_files, int n_reducers)
{
    int *batch_fds = NULL;
    int i;

    batch_fds = (int *)malloc(sizeof(int) * (size_t)n_reducers);
    if (batch_fds == NULL) {
        return -1;
    }

    for (i = 0; i < n_reducers; ++i) {
        char batch_path[512];
        build_path(batch_path, sizeof(batch_path), tmp_dir, "_batch_", i);

        batch_fds[i] = open(batch_path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (batch_fds[i] < 0) {
            free(batch_fds);
            return -1;
        }
    }

    for (i = 0; i < n_map_files; ++i) {
        char map_path[512];
        int infd;
        char rbuf[READ_BUF_SIZE];
        char line[LINE_BUF_SIZE];
        size_t line_len = 0;
        ssize_t nread;

        build_path(map_path, sizeof(map_path), tmp_dir, "_map_", i);

        infd = open(map_path, O_RDONLY);
        if (infd < 0) {
            log_errno_msg(map_path);
            goto error;
        }

        while ((nread = read(infd, rbuf, sizeof(rbuf))) > 0) {
            ssize_t j;

            for (j = 0; j < nread; ++j) {
                char c = rbuf[j];

                if (c == '\n') {
                    if (line_len > 0) {
                        char word[LINE_BUF_SIZE];
                        int count = 0;
                        int reducer_id;
                        unsigned long h;

                        line[line_len] = '\0';

                        if (sscanf(line, "%s %d", word, &count) == 2) {
                            h = hash_word(word);
                            reducer_id = (int)(h % (unsigned long)n_reducers);

                            write_all(batch_fds[reducer_id], line, line_len);
                            write_str(batch_fds[reducer_id], "\n");
                        }

                        line_len = 0;
                    }
                } else {
                    if (line_len + 1 >= sizeof(line)) {
                        close(infd);
                        errno = ENOMEM;
                        goto error;
                    }
                    line[line_len++] = c;
                }
            }
        }

        if (nread < 0) {
            close(infd);
            goto error;
        }

        if (line_len > 0) {
            char word[LINE_BUF_SIZE];
            int count = 0;
            int reducer_id;
            unsigned long h;

            line[line_len] = '\0';

            if (sscanf(line, "%s %d", word, &count) == 2) {
                h = hash_word(word);
                reducer_id = (int)(h % (unsigned long)n_reducers);

                write_all(batch_fds[reducer_id], line, line_len);
                write_str(batch_fds[reducer_id], "\n");
            }
            line_len = 0;
        }

        if (close(infd) < 0) {
            goto error;
        }
    }

    for (i = 0; i < n_reducers; ++i) {
        if (close(batch_fds[i]) < 0) {
            free(batch_fds);
            return -1;
        }
    }

    free(batch_fds);
    return 0;

error:
    if (batch_fds != NULL) {
        int k;
        for (k = 0; k < n_reducers; ++k) {
            if (batch_fds[k] >= 0) {
                close(batch_fds[k]);
            }
        }
        free(batch_fds);
    }
    return -1;
}

static int sort_batches(const char *tmp_dir, int n_reducers)
{
    int i;

    for (i = 0; i < n_reducers; ++i) {
        char batch_path[512];
        char suffle_path[512];
        pid_t pid;

        build_path(batch_path, sizeof(batch_path), tmp_dir, "_batch_", i);
        build_path(suffle_path, sizeof(suffle_path), tmp_dir, "_suffle_", i);

        pid = fork();
        if (pid < 0) {
            return -1;
        }

        if (pid == 0) {
            int outfd = open(suffle_path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
            if (outfd < 0) {
                _exit(127);
            }

            if (dup2(outfd, STDOUT_FILENO) < 0) {
                _exit(127);
            }

            close(outfd);

            execlp("sort", "sort", batch_path, (char *)NULL);
            _exit(127);
        }

        if (wait_child(pid) < 0) {
            return -1;
        }
    }

    return 0;
}

int main(int argc, char *argv[])
{
    const char *tmp_dir;
    int n_map_files;
    int n_reducers;

    if (argc != 4) {
        log_usage(argv[0]);
        return 1;
    }

    tmp_dir = argv[1];
    n_map_files = atoi(argv[2]);
    n_reducers = atoi(argv[3]);

    if (n_map_files <= 0 || n_reducers <= 0) {
        write_str(STDERR_FILENO, "Error: parámetros inválidos.\n");
        return 1;
    }

    if (partition_map_files(tmp_dir, n_map_files, n_reducers) < 0) {
        log_errno_msg("partition_map_files");
        return 1;
    }

    if (sort_batches(tmp_dir, n_reducers) < 0) {
        log_errno_msg("sort_batches");
        return 1;
    }

    {
        char buf[256];
        int n = snprintf(buf, sizeof(buf),
                         "[WordCount:suffle] Generated %d partition(s).\n",
                         n_reducers);
        if (n > 0) {
            write_all(STDOUT_FILENO, buf, (size_t)n);
        }
    }

    return 0;
}
