#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>

#define READ_BUF_SIZE 4096
#define LINE_BUF_SIZE 4096

static void write_all(int fd, const char *buf, size_t len)
{
    while (len > 0) {
        ssize_t n = write(fd, buf, len);
        if (n < 0) {
            if (errno == EINTR) {
                continue;
            }
            _exit(1);
        }
        buf += n;
        len -= (size_t)n;
    }
}

static void write_str(int fd, const char *s)
{
    write_all(fd, s, strlen(s));
}

static void log_errno_msg(const char *prefix)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf), "%s: %s\n", prefix, strerror(errno));
    if (n > 0) {
        write_all(STDERR_FILENO, buf, (size_t)n);
    }
}

static void log_usage(const char *progname)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf),
                     "Uso: %s <tmp_dir> <partition_index> <output_file>\n",
                     progname);
    if (n > 0) {
        write_all(STDERR_FILENO, buf, (size_t)n);
    }
}

static void build_path(char *out, size_t out_size,
                       const char *dir, const char *prefix, int index)
{
    snprintf(out, out_size, "%s/%s%d.tmp", dir, prefix, index);
}

static int ensure_parent_dir(const char *path)
{
    char dirbuf[512];
    char *slash;
    struct stat st;

    if (strlen(path) >= sizeof(dirbuf)) {
        errno = ENAMETOOLONG;
        return -1;
    }

    strcpy(dirbuf, path);
    slash = strrchr(dirbuf, '/');

    if (slash == NULL) {
        return 0;
    }

    *slash = '\0';

    if (dirbuf[0] == '\0') {
        return 0;
    }

    if (stat(dirbuf, &st) == 0) {
        if (S_ISDIR(st.st_mode)) {
            return 0;
        }
        errno = ENOTDIR;
        return -1;
    }

    if (mkdir(dirbuf, 0755) < 0) {
        return -1;
    }

    return 0;
}

static int flush_result(int outfd, const char *word, long total)
{
    char buf[LINE_BUF_SIZE + 64];
    int n = snprintf(buf, sizeof(buf), "%s %ld\n", word, total);
    if (n < 0) {
        return -1;
    }
    write_all(outfd, buf, (size_t)n);
    return 0;
}

static int reduce_partition(const char *suffle_path, int outfd)
{
    int infd;
    char rbuf[READ_BUF_SIZE];
    char line[LINE_BUF_SIZE];
    char current_word[LINE_BUF_SIZE];
    size_t line_len = 0;
    long current_total = 0;
    int has_current = 0;
    ssize_t nread;

    infd = open(suffle_path, O_RDONLY);
    if (infd < 0) {
        return -1;
    }

    current_word[0] = '\0';

    while ((nread = read(infd, rbuf, sizeof(rbuf))) > 0) {
        ssize_t i;

        for (i = 0; i < nread; ++i) {
            char c = rbuf[i];

            if (c == '\n') {
                if (line_len > 0) {
                    char word[LINE_BUF_SIZE];
                    int count = 0;

                    line[line_len] = '\0';

                    if (sscanf(line, "%s %d", word, &count) == 2) {
                        if (!has_current) {
                            strcpy(current_word, word);
                            current_total = count;
                            has_current = 1;
                        } else if (strcmp(current_word, word) == 0) {
                            current_total += count;
                        } else {
                            if (flush_result(outfd, current_word, current_total) < 0) {
                                close(infd);
                                return -1;
                            }
                            strcpy(current_word, word);
                            current_total = count;
                        }
                    }

                    line_len = 0;
                }
            } else {
                if (line_len + 1 >= sizeof(line)) {
                    close(infd);
                    errno = ENOMEM;
                    return -1;
                }
                line[line_len++] = c;
            }
        }
    }

    if (nread < 0) {
        close(infd);
        return -1;
    }

    if (line_len > 0) {
        char word[LINE_BUF_SIZE];
        int count = 0;

        line[line_len] = '\0';

        if (sscanf(line, "%s %d", word, &count) == 2) {
            if (!has_current) {
                strcpy(current_word, word);
                current_total = count;
                has_current = 1;
            } else if (strcmp(current_word, word) == 0) {
                current_total += count;
            } else {
                if (flush_result(outfd, current_word, current_total) < 0) {
                    close(infd);
                    return -1;
                }
                strcpy(current_word, word);
                current_total = count;
            }
        }
    }

    if (has_current) {
        if (flush_result(outfd, current_word, current_total) < 0) {
            close(infd);
            return -1;
        }
    }

    if (close(infd) < 0) {
        return -1;
    }

    return 0;
}

int main(int argc, char *argv[])
{
    const char *tmp_dir;
    const char *output_file;
    int partition_index;
    int outfd;
    char suffle_path[512];

    if (argc != 4) {
        log_usage(argv[0]);
        return 1;
    }

    tmp_dir = argv[1];
    partition_index = atoi(argv[2]);
    output_file = argv[3];

    if (partition_index < 0) {
        write_str(STDERR_FILENO, "Error: partition_index inválido.\n");
        return 1;
    }

    if (ensure_parent_dir(output_file) < 0) {
        log_errno_msg("ensure_parent_dir");
        return 1;
    }

    outfd = open(output_file, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (outfd < 0) {
        log_errno_msg("open output_file");
        return 1;
    }

    build_path(suffle_path, sizeof(suffle_path), tmp_dir, "_suffle_", partition_index);

    if (reduce_partition(suffle_path, outfd) < 0) {
        log_errno_msg(suffle_path);
        close(outfd);
        return 1;
    }

    if (close(outfd) < 0) {
        log_errno_msg("close output_file");
        return 1;
    }

    {
        char buf[256];
        int n = snprintf(buf, sizeof(buf),
                         "[WordCount:reduce] Reduced partition %d.\n",
                         partition_index);
        if (n > 0) {
            write_all(STDOUT_FILENO, buf, (size_t)n);
        }
    }

    return 0;
}
