#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>

#define MAX_INPUTS 128
#define MAX_REDUCERS 16

static void write_all(int fd, const char *buf, size_t len)
{
    while (len > 0) {
        ssize_t n = write(fd, buf, len);
        if (n < 0) {
            if (errno == EINTR) continue;
            _exit(1);
        }
        buf += n;
        len -= (size_t)n;
    }
}

static void log_errno_msg(const char *prefix)
{
    char buf[512];
    int n = snprintf(buf, sizeof(buf), "%s: %s\n", prefix, strerror(errno));
    if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
}

static void log_usage(const char *progname)
{
    char buf[1024];
    int n = snprintf(
        buf, sizeof(buf),
        "Uso: %s [-m] [-r nreducers] [-o output_file] <input_files...>\n",
        progname
    );
    if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
}

static int ensure_dir_exists(const char *path)
{
    struct stat st;

    if (stat(path, &st) == 0) {
        if (S_ISDIR(st.st_mode)) return 0;
        errno = ENOTDIR;
        return -1;
    }

    if (mkdir(path, 0755) < 0) return -1;
    return 0;
}

static int ensure_parent_dir(const char *path)
{
    char dirbuf[512];
    char *slash;
    struct stat st;

    if (strlen(path) >= sizeof(dirbuf)) {
        errno = ENAMETOOLONG;
        return -1;
    }

    strcpy(dirbuf, path);
    slash = strrchr(dirbuf, '/');
    if (!slash) return 0;
    *slash = '\0';
    if (dirbuf[0] == '\0') return 0;

    if (stat(dirbuf, &st) == 0) {
        if (S_ISDIR(st.st_mode)) return 0;
        errno = ENOTDIR;
        return -1;
    }

    if (mkdir(dirbuf, 0755) < 0) return -1;
    return 0;
}

static void log_created(const char *stage, const char *cmd, pid_t pid)
{
    char buf[256];
    int n = snprintf(buf, sizeof(buf),
                     "[WordCount:%s] Created %s process %d.\n",
                     stage, cmd, (int)pid);
    if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
}

static int wait_for_pid(pid_t pid, int *exit_status)
{
    int status;

    while (waitpid(pid, &status, 0) < 0) {
        if (errno == EINTR) continue;
        return -1;
    }

    if (WIFEXITED(status)) {
        *exit_status = WEXITSTATUS(status);
        return 0;
    }

    *exit_status = -1;
    return -1;
}

int main(int argc, char *argv[])
{
    int multi_map = 0;
    int n_reducers = 1;
    const char *output_file = "./Result/wc_pipe.txt";

    char *inputs[MAX_INPUTS];
    int n_inputs = 0;
    int i;

    int map_to_suffle[2];
    int (*suffle_to_reduce)[2] = NULL;

    pid_t map_pids[MAX_INPUTS];
    pid_t reduce_pids[MAX_REDUCERS];
    pid_t suffle_pid = -1;

    int map_status[MAX_INPUTS];
    int reduce_status[MAX_REDUCERS];
    int suffle_status = 0;

    int outfd;

    if (argc < 2) {
        log_usage(argv[0]);
        return 1;
    }

    for (i = 1; i < argc; ++i) {
        if (strcmp(argv[i], "-m") == 0) {
            multi_map = 1;
        } else if (strcmp(argv[i], "-r") == 0) {
            if (i + 1 >= argc) {
                log_usage(argv[0]);
                return 1;
            }
            n_reducers = atoi(argv[++i]);
            if (n_reducers <= 0 || n_reducers > MAX_REDUCERS) {
                write_all(STDERR_FILENO,
                          "Error: nreducers debe estar entre 1 y 16.\n", 43);
                return 1;
            }
        } else if (strcmp(argv[i], "-o") == 0) {
            if (i + 1 >= argc) {
                log_usage(argv[0]);
                return 1;
            }
            output_file = argv[++i];
        } else {
            if (n_inputs >= MAX_INPUTS) {
                write_all(STDERR_FILENO,
                          "Error: demasiados ficheros de entrada.\n", 40);
                return 1;
            }
            inputs[n_inputs++] = argv[i];
        }
    }

    if (n_inputs == 0) {
        log_usage(argv[0]);
        return 1;
    }

    if (ensure_dir_exists("./Result") < 0 && errno != EEXIST) {
        log_errno_msg("Result");
        return 1;
    }

    if (ensure_parent_dir(output_file) < 0) {
        log_errno_msg("ensure_parent_dir");
        return 1;
    }

    outfd = open(output_file, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0644);
    if (outfd < 0) {
        log_errno_msg("open output_file");
        return 1;
    }

    if (pipe(map_to_suffle) < 0) {
        log_errno_msg("pipe map_to_suffle");
        close(outfd);
        return 1;
    }

    suffle_to_reduce = malloc((size_t)n_reducers * sizeof(*suffle_to_reduce));
    if (!suffle_to_reduce) {
        log_errno_msg("malloc");
        close(outfd);
        return 1;
    }

    for (i = 0; i < n_reducers; ++i) {
        if (pipe(suffle_to_reduce[i]) < 0) {
            log_errno_msg("pipe suffle_to_reduce");
            close(outfd);
            free(suffle_to_reduce);
            return 1;
        }
    }

    for (i = 0; i < n_reducers; ++i) {
        pid_t pid = fork();
        if (pid < 0) {
            log_errno_msg("fork reducer");
            return 1;
        }

        if (pid == 0) {
            char partbuf[32];

            dup2(suffle_to_reduce[i][0], STDIN_FILENO);
            dup2(outfd, STDOUT_FILENO);

            close(map_to_suffle[0]);
            close(map_to_suffle[1]);

            for (int j = 0; j < n_reducers; ++j) {
                close(suffle_to_reduce[j][1]);
                if (j != i) close(suffle_to_reduce[j][0]);
            }
            close(outfd);

            snprintf(partbuf, sizeof(partbuf), "%d", i);
            execl("./WordCountReduce", "./WordCountReduce", partbuf, (char *)NULL);
            log_errno_msg("./WordCountReduce");
            _exit(127);
        }

        reduce_pids[i] = pid;
        log_created("reduce", "./WordCountReduce", pid);
    }

    {
        pid_t pid = fork();
        if (pid < 0) {
            log_errno_msg("fork suffle");
            return 1;
        }

        if (pid == 0) {
            char nrbuf[32];
            char **sargv = malloc((size_t)(n_reducers + 3) * sizeof(char *));
            char **fdstrs = malloc((size_t)n_reducers * sizeof(char *));
            if (!sargv || !fdstrs) _exit(127);

            dup2(map_to_suffle[0], STDIN_FILENO);

            close(map_to_suffle[1]);
            close(outfd);

            for (int j = 0; j < n_reducers; ++j) {
                close(suffle_to_reduce[j][0]);
            }

            sargv[0] = "./WordCountSuffle";
            snprintf(nrbuf, sizeof(nrbuf), "%d", n_reducers);
            sargv[1] = nrbuf;

            for (int j = 0; j < n_reducers; ++j) {
                fdstrs[j] = malloc(32);
                if (!fdstrs[j]) _exit(127);
                snprintf(fdstrs[j], 32, "%d", suffle_to_reduce[j][1]);
                sargv[j + 2] = fdstrs[j];
            }
            sargv[n_reducers + 2] = NULL;

            execv("./WordCountSuffle", sargv);
            log_errno_msg("./WordCountSuffle");
            _exit(127);
        }

        suffle_pid = pid;
        log_created("suffle", "./WordCountSuffle", pid);
    }

    close(map_to_suffle[0]);
    for (i = 0; i < n_reducers; ++i) {
        close(suffle_to_reduce[i][0]);
        close(suffle_to_reduce[i][1]);
    }

    if (multi_map) {
        for (i = 0; i < n_inputs; ++i) {
            pid_t pid = fork();
            if (pid < 0) {
                log_errno_msg("fork mapper");
                return 1;
            }

            if (pid == 0) {
                dup2(map_to_suffle[1], STDOUT_FILENO);
                close(map_to_suffle[0]);
                close(map_to_suffle[1]);
                close(outfd);
                execl("./WordCountMap", "./WordCountMap", inputs[i], (char *)NULL);
                log_errno_msg("./WordCountMap");
                _exit(127);
            }

            map_pids[i] = pid;
            log_created("map", "./WordCountMap", pid);
        }
    } else {
        pid_t pid = fork();
        if (pid < 0) {
            log_errno_msg("fork mapper");
            return 1;
        }

        if (pid == 0) {
            char **margv = malloc((size_t)(n_inputs + 2) * sizeof(char *));
            if (!margv) _exit(127);

            dup2(map_to_suffle[1], STDOUT_FILENO);
            close(map_to_suffle[0]);
            close(map_to_suffle[1]);
            close(outfd);

            margv[0] = "./WordCountMap";
            for (int j = 0; j < n_inputs; ++j) margv[j + 1] = inputs[j];
            margv[n_inputs + 1] = NULL;

            execv("./WordCountMap", margv);
            log_errno_msg("./WordCountMap");
            _exit(127);
        }

        map_pids[0] = pid;
        log_created("map", "./WordCountMap", pid);
    }

    close(map_to_suffle[1]);
    close(outfd);

    if (multi_map) {
        for (i = 0; i < n_inputs; ++i) {
            if (wait_for_pid(map_pids[i], &map_status[i]) < 0) {
                log_errno_msg("wait mapper");
                return 1;
            }
            {
                char buf[128];
                int n = snprintf(buf, sizeof(buf),
                                 "[WordCount:map] Mapper %d returned %d.\n",
                                 i, map_status[i]);
                if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
            }
        }
    } else {
        if (wait_for_pid(map_pids[0], &map_status[0]) < 0) {
            log_errno_msg("wait mapper");
            return 1;
        }
        {
            char buf[128];
            int n = snprintf(buf, sizeof(buf),
                             "[WordCount:map] Mapper returned %d.\n",
                             map_status[0]);
            if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
        }
    }

    if (wait_for_pid(suffle_pid, &suffle_status) < 0 || suffle_status != 0) {
        write_all(STDERR_FILENO,
                  "[WordCount:suffle] Error running WordCountSuffle.\n", 53);
        return 1;
    }

    for (i = 0; i < n_reducers; ++i) {
        if (wait_for_pid(reduce_pids[i], &reduce_status[i]) < 0 || reduce_status[i] != 0) {
            write_all(STDERR_FILENO,
                      "[WordCount:reduce] Error running WordCountReduce.\n", 53);
            return 1;
        }
    }

    {
        char buf[256];
        int n = snprintf(buf, sizeof(buf),
                         "[WordCount:wordcountmr] WordCountMR completed processing %d file(s) with %d reducer(s).\n",
                         n_inputs, n_reducers);
        if (n > 0) write_all(STDERR_FILENO, buf, (size_t)n);
    }

    free(suffle_to_reduce);
    return 0;
}
