#include <unistd.h>
#include <sys/types.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>

static char *xstrdup(const char *s)
{
    size_t len = strlen(s) + 1;
    char *copy = (char *)malloc(len);
    if (copy == NULL) {
        return NULL;
    }
    memcpy(copy, s, len);
    return copy;
}

#define READ_BUF_SIZE 4096
#define LINE_BUF_SIZE 4096

typedef struct {
    char **words;
    size_t count;
    size_t cap;
} word_vec_t;

static void write_all(int fd, const char *buf, size_t len)
{
    while (len > 0) {
        ssize_t n = write(fd, buf, len);
        if (n < 0) {
            if (errno == EINTR) continue;
            _exit(1);
        }
        buf += n;
        len -= (size_t)n;
    }
}

static void write_str(int fd, const char *s)
{
    write_all(fd, s, strlen(s));
}

static void log_usage(const char *progname)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf),
                     "Uso: %s <nreducers> <write_fd_0> [write_fd_1 ...]\n",
                     progname);
    if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
}

static void log_errno_msg(const char *prefix)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf), "%s: %s\n", prefix, strerror(errno));
    if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
}

static unsigned long hash_word(const char *s)
{
    unsigned long h = 5381UL;
    while (*s != '\0') {
        h = ((h << 5) + h) + (unsigned char)(*s);
        ++s;
    }
    return h;
}

static int cmp_words(const void *a, const void *b)
{
    const char *wa = *(const char * const *)a;
    const char *wb = *(const char * const *)b;
    return strcmp(wa, wb);
}

static int push_word(word_vec_t *vec, const char *word)
{
    char *copy;
    if (vec->count == vec->cap) {
        size_t newcap = (vec->cap == 0) ? 128 : vec->cap * 2;
        char **tmp = (char **)realloc(vec->words, newcap * sizeof(char *));
        if (!tmp) return -1;
        vec->words = tmp;
        vec->cap = newcap;
    }

    copy = xstrdup(word);
    if (!copy) return -1;

    vec->words[vec->count++] = copy;
    return 0;
}

int main(int argc, char *argv[])
{
    int nreducers, i;
    int *outfds = NULL;
    word_vec_t *parts = NULL;
    char rbuf[READ_BUF_SIZE];
    char line[LINE_BUF_SIZE];
    size_t line_len = 0;
    ssize_t nread;

    if (argc < 3) {
        log_usage(argv[0]);
        return 1;
    }

    nreducers = atoi(argv[1]);
    if (nreducers <= 0 || argc != nreducers + 2) {
        write_str(STDERR_FILENO, "Error: parámetros inválidos.\n");
        return 1;
    }

    outfds = (int *)malloc((size_t)nreducers * sizeof(int));
    parts = (word_vec_t *)calloc((size_t)nreducers, sizeof(word_vec_t));
    if (!outfds || !parts) {
        log_errno_msg("malloc/calloc");
        return 1;
    }

    for (i = 0; i < nreducers; ++i) {
        outfds[i] = atoi(argv[i + 2]);
    }

    while ((nread = read(STDIN_FILENO, rbuf, sizeof(rbuf))) > 0) {
        ssize_t j;
        for (j = 0; j < nread; ++j) {
            char c = rbuf[j];
            if (c == '\n') {
                if (line_len > 0) {
                    char word[LINE_BUF_SIZE];
                    int count = 0;
                    int rid;

                    line[line_len] = '\0';
                    if (sscanf(line, "%s %d", word, &count) == 2) {
                        rid = (int)(hash_word(word) % (unsigned long)nreducers);
                        if (push_word(&parts[rid], word) < 0) {
                            log_errno_msg("push_word");
                            return 1;
                        }
                    }
                    line_len = 0;
                }
            } else {
                if (line_len + 1 < sizeof(line)) line[line_len++] = c;
            }
        }
    }

    if (nread < 0) {
        log_errno_msg("read");
        return 1;
    }

    if (line_len > 0) {
        char word[LINE_BUF_SIZE];
        int count = 0;
        int rid;

        line[line_len] = '\0';
        if (sscanf(line, "%s %d", word, &count) == 2) {
            rid = (int)(hash_word(word) % (unsigned long)nreducers);
            if (push_word(&parts[rid], word) < 0) {
                log_errno_msg("push_word");
                return 1;
            }
        }
    }

    for (i = 0; i < nreducers; ++i) {
        size_t k;
        qsort(parts[i].words, parts[i].count, sizeof(char *), cmp_words);

        for (k = 0; k < parts[i].count; ++k) {
            char out[LINE_BUF_SIZE + 8];
            int n = snprintf(out, sizeof(out), "%s 1\n", parts[i].words[k]);
            if (n > 0) write_all(outfds[i], out, (size_t)n);
            free(parts[i].words[k]);
        }

        close(outfds[i]);
        free(parts[i].words);
    }

    free(outfds);
    free(parts);

    {
        char buf[256];
        int n = snprintf(buf, sizeof(buf),
                         "[WordCount:suffle] Generated %d partition(s).\n",
                         nreducers);
        if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
    }

    return 0;
}
