From 77071eb45391c9f0bbc593bcf2c10605ed1d5d17 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Mon, 22 Jun 2020 12:00:05 +0200 Subject: Rename proxy and cfgdir to more descriptive names --- cfgdir.c | 576 ------------------------------------------------------- cfgdir.h | 10 - igmp.c | 2 +- main.c | 19 +- main.h | 2 +- meson.build | 4 +- proxy.c | 424 ----------------------------------------- proxy.h | 48 ----- server-config.c | 580 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ server-config.h | 10 + server-proxy.c | 424 +++++++++++++++++++++++++++++++++++++++++ server-proxy.h | 48 +++++ server.c | 2 +- uring.c | 6 +- uring.h | 2 +- 15 files changed, 1080 insertions(+), 1077 deletions(-) delete mode 100644 cfgdir.c delete mode 100644 cfgdir.h delete mode 100644 proxy.c delete mode 100644 proxy.h create mode 100644 server-config.c create mode 100644 server-config.h create mode 100644 server-proxy.c create mode 100644 server-proxy.h diff --git a/cfgdir.c b/cfgdir.c deleted file mode 100644 index ec8a0bf..0000000 --- a/cfgdir.c +++ /dev/null @@ -1,576 +0,0 @@ -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "main.h" -#include "uring.h" -#include "cfgdir.h" -#include "config.h" -#include "server.h" - -static void -scfg_dns_cb(struct dns_async *dns, bool (*server_cb)(struct server *, struct saddr *)) -{ - struct server *server; - struct sockaddr_in *in4; - struct sockaddr_in6 *in6; - struct saddr *saddr; - struct addrinfo *results = NULL, *ai; - int r; - - assert_return(dns && dns->priv && server_cb); - - server = dns->priv; - debug(DBG_DNS, "called, dns: %p, name: %s, server: %p, server->name: %s", - dns, dns->name, server, server->name); - - r = gai_error(&dns->gcb); - if (r == EAI_INPROGRESS) { - /* This shouldn't happen, assume we'll get called again */ - error("called with request in progress"); - return; - } else if (r == EAI_CANCELED) { - /* The server must be in the process of going away */ - goto out; - } else if (r < 0) { - error("DNS lookup of %s:%s failed: %s", - dns->name, dns->port, gai_strerror(r)); - goto out; - } - - results = dns->gcb.ar_result; - - for (ai = results; ai; ai = ai->ai_next) { - saddr = zmalloc(sizeof(*saddr)); - if (!saddr) { - error("DNS lookup of %s:%s failed: %m", dns->name, dns->port); - goto out; - } - - switch (ai->ai_family) { - case AF_INET: - in4 = (struct sockaddr_in *)ai->ai_addr; - saddr_set_ipv4(saddr, in4->sin_addr.s_addr, in4->sin_port); - server_cb(server, saddr); - break; - - case AF_INET6: - in6 = (struct sockaddr_in6 *)ai->ai_addr; - saddr_set_ipv6(saddr, &in6->sin6_addr, in6->sin6_port); - server_cb(server, saddr); - break; - - default: - error("getaddrinfo(%s:%s): unknown address family (%i)", - dns->name, dns->port, ai->ai_family); - xfree(saddr); - break; - } - } - -out: - freeaddrinfo(results); - list_del(&dns->list); - xfree(dns); - uring_task_put(&server->task); - server_commit(server); -} - -static void -scfg_local_dns_cb(struct dns_async *dns) -{ - assert_return(dns); - - scfg_dns_cb(dns, server_add_local); -} - -static void -scfg_remote_dns_cb(struct dns_async *dns) -{ - assert_return(dns); - - scfg_dns_cb(dns, server_add_remote); -} - -static void -scfg_rcon_dns_cb(struct dns_async *dns) -{ - assert_return(dns); - - scfg_dns_cb(dns, server_add_rcon); -} - -enum scfg_keys { - SCFG_KEY_INVALID = 0, - SCFG_KEY_TYPE, - SCFG_KEY_NAME, - SCFG_KEY_PORT, - SCFG_KEY_LOCAL, - SCFG_KEY_REMOTE, - SCFG_KEY_IDLE_TIMEOUT, - SCFG_KEY_STOP_METHOD, - SCFG_KEY_START_METHOD, - SCFG_KEY_STOP_EXEC, - SCFG_KEY_START_EXEC, - SCFG_KEY_RCON, - SCFG_KEY_RCON_PASSWORD, - SCFG_KEY_SYSTEMD_SERVICE, -}; - -struct cfg_key_value_map scfg_key_map[] = { - { - .key_name = "type", - .key_value = SCFG_KEY_TYPE, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "name", - .key_value = SCFG_KEY_NAME, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "port", - .key_value = SCFG_KEY_PORT, - .value_type = CFG_VAL_TYPE_UINT16, - }, { - .key_name = "local", - .key_value = SCFG_KEY_LOCAL, - .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, - }, { - .key_name = "remote", - .key_value = SCFG_KEY_REMOTE, - .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, - }, { - .key_name = "idle_timeout", - .key_value = SCFG_KEY_IDLE_TIMEOUT, - .value_type = CFG_VAL_TYPE_UINT16, - }, { - .key_name = "stop_method", - .key_value = SCFG_KEY_STOP_METHOD, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "start_method", - .key_value = SCFG_KEY_START_METHOD, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "stop_exec", - .key_value = SCFG_KEY_STOP_EXEC, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "start_exec", - .key_value = SCFG_KEY_START_EXEC, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "rcon", - .key_value = SCFG_KEY_RCON, - .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, - }, { - .key_name = "rcon_password", - .key_value = SCFG_KEY_RCON_PASSWORD, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = "systemd_service", - .key_value = SCFG_KEY_SYSTEMD_SERVICE, - .value_type = CFG_VAL_TYPE_STRING, - }, { - .key_name = NULL, - .key_value = SCFG_KEY_INVALID, - .value_type = CFG_VAL_TYPE_INVALID, - } -}; - -static bool -handle_dns(struct server *server, const char *type, - struct cfg_value *value, dns_cb_t *async_cb, - bool (*sync_cb)(struct server *, struct saddr *)) -{ - struct saddr *saddr, *tmp; - struct dns_async *dns; - - assert_return(server && type && value && async_cb && sync_cb, false); - - switch (value->type) { - case CFG_VAL_TYPE_ADDRS: - debug(DBG_DNS, "%s: got immediate addrs", type); - - list_for_each_entry_safe(saddr, tmp, &value->saddrs, list) { - list_del(&saddr->list); - sync_cb(server, saddr); - } - return true; - - case CFG_VAL_TYPE_ASYNC_ADDRS: - debug(DBG_DNS, "%s: doing async lookup of DNS record: %p", - type, value->dns_async); - - dns = value->dns_async; - dns->cb = async_cb; - dns->priv = server; - list_add(&dns->list, &server->dnslookups); - uring_task_get(&server->task); - return true; - - default: - return false; - } -} - -static void -scfg_parse(struct server *server) -{ - char *pos; - - assert_return(server); - - pos = server->tbuf.buf; - - if (!config_parse_header(server->name, "server", &pos)) - return; - - while (true) { - int key; - const char *keyname; - struct cfg_value value; - - if (!config_parse_line(server->name, &pos, scfg_key_map, - &key, &keyname, &value)) - break; - - if (key == SCFG_KEY_INVALID) - break; - - debug(DBG_CFG, "%s: key %s", server->name, keyname); - - switch (key) { - - case SCFG_KEY_TYPE: - if (streq(value.str, "proxy")) { - if (!server_set_type(server, SERVER_TYPE_PROXY)) - return; - } else if (streq(value.str, "announce")) { - if (!server_set_type(server, SERVER_TYPE_ANNOUNCE)) - return; - } - break; - - case SCFG_KEY_NAME: - if (!server_set_pretty_name(server, value.str)) - return; - break; - - case SCFG_KEY_PORT: - if (!server_set_port(server, value.uint16)) - return; - break; - - case SCFG_KEY_LOCAL: - if (!handle_dns(server, "local", &value, - scfg_local_dns_cb, server_add_local)) - return; - break; - - case SCFG_KEY_REMOTE: - if (!handle_dns(server, "remote", &value, - scfg_remote_dns_cb, server_add_remote)) - return; - break; - - case SCFG_KEY_IDLE_TIMEOUT: - if (!server_set_idle_timeout(server, value.uint16)) - return; - break; - - case SCFG_KEY_STOP_METHOD: - if (streq(value.str, "exec")) { - if (server_set_stop_method(server, SERVER_STOP_METHOD_EXEC)) - break; - } else if (streq(value.str, "rcon")) { - if (server_set_stop_method(server, SERVER_STOP_METHOD_RCON)) - break; - } else if (streq(value.str, "systemd")) { - if (server_set_stop_method(server, SERVER_STOP_METHOD_SYSTEMD)) - break; - } - return; - - case SCFG_KEY_START_METHOD: - if (streq(value.str, "exec")) { - if (server_set_start_method(server, SERVER_START_METHOD_EXEC)) - break; - } else if (streq(value.str, "systemd")) { - if (server_set_start_method(server, SERVER_START_METHOD_SYSTEMD)) - break; - } - return; - - case SCFG_KEY_STOP_EXEC: - if (!server_set_stop_exec(server, value.str)) - return; - break; - - case SCFG_KEY_START_EXEC: - if (!server_set_start_exec(server, value.str)) - return; - break; - - case SCFG_KEY_RCON: - if (!handle_dns(server, "rcon", &value, - scfg_rcon_dns_cb, server_add_rcon)) - return; - break; - - case SCFG_KEY_RCON_PASSWORD: - if (!server_set_rcon_password(server, value.str)) - return; - break; - - case SCFG_KEY_SYSTEMD_SERVICE: - if (!server_set_systemd_service(server, value.str)) - return; - break; - - case SCFG_KEY_INVALID: - default: - break; - } - } -} - -static void -scfg_read_cb(struct uring_task *task, int res) -{ - struct server *server = container_of(task, struct server, task); - - assert_return(task); - assert_task_alive(DBG_CFG, task); - - if (res <= 0) { - error("error reading config file for %s: %s", - server->name, strerror(-res)); - server_delete(server); - } - - debug(DBG_CFG, "%s: parsing cfg (%i bytes)", server->name, res); - uring_task_close_fd(&server->task); - scfg_parse(server); - server_commit(server); -} - -static void -scfg_open_cb(struct uring_task *task, int res) -{ - struct server *server = container_of(task, struct server, task); - - assert_return(task); - assert_task_alive(DBG_CFG, task); - - if (res < 0) { - error("open(%s) failed: %s", server->name, strerror(-res)); - server_delete(server); - return; - } - - debug(DBG_CFG, "reading server cfg %s (fd %i)", server->name, res); - uring_task_set_fd(&server->task, res); - uring_tbuf_read_until_eof(&server->task, scfg_read_cb); -} - -static bool -scfg_valid_filename(const char *name) -{ - const char *suffix; - - if (empty_str(name)) - return false; - if (name[0] == '.') - return false; - if ((suffix = strrchr(name, '.')) == NULL) - return false; - if (!streq(suffix, ".server")) - return false; - - return true; -} - -struct inotify_ev { - struct uring_task task; - char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event)))); -}; - -static void -inotify_free(struct uring_task *task) -{ - struct inotify_ev *iev = container_of(task, struct inotify_ev, task); - - assert_return(task); - - debug(DBG_CFG, "called"); - xfree(iev); - cfg->iev = NULL; -} - -static void -inotify_event_dump(const struct inotify_event *event) -{ - assert_return(event); - - debug(DBG_CFG, "inotify event:"); - debug(DBG_CFG, " * WD : %i", event->wd); - debug(DBG_CFG, " * Cookie : %" PRIu32, event->cookie); - debug(DBG_CFG, " * Length : %" PRIu32, event->len); - debug(DBG_CFG, " * Name : %s", event->name); - debug(DBG_CFG, " * Mask : %" PRIu32, event->mask); - if (event->mask & IN_ACCESS) - debug(DBG_CFG, "\tIN_ACCESS"); - else if(event->mask & IN_MODIFY) - debug(DBG_CFG, "\tIN_MODIFY"); - else if(event->mask & IN_ATTRIB) - debug(DBG_CFG, "\tIN_ATTRIB"); - else if(event->mask & IN_CLOSE_WRITE) - debug(DBG_CFG, "\tIN_CLOSE_WRITE"); - else if(event->mask & IN_CLOSE_NOWRITE) - debug(DBG_CFG, "\tIN_CLOSE_NOWRITE"); - else if(event->mask & IN_OPEN) - debug(DBG_CFG, "\tIN_OPEN"); - else if(event->mask & IN_MOVED_FROM) - debug(DBG_CFG, "\tIN_MOVED_FROM"); - else if(event->mask & IN_MOVED_TO) - debug(DBG_CFG, "\tIN_MOVED_TO"); - else if(event->mask & IN_CREATE) - debug(DBG_CFG, "\tIN_CREATE"); - else if(event->mask & IN_DELETE) - debug(DBG_CFG, "\tIN_DELETE"); - else if(event->mask & IN_DELETE_SELF) - debug(DBG_CFG, "\tIN_DELETE_SELF"); - else if(event->mask & IN_MOVE_SELF) - debug(DBG_CFG, "\tIN_MOVE_SELF"); - else if(event->mask & IN_UNMOUNT) - debug(DBG_CFG, "\tIN_UNMOUNT"); - else if(event->mask & IN_Q_OVERFLOW) - debug(DBG_CFG, "\tIN_Q_OVERFLOW"); - else if(event->mask & IN_IGNORED) - debug(DBG_CFG, "\tIN_IGNORED"); -} - -static void -inotify_cb(struct uring_task *task, int res) -{ - struct inotify_ev *iev = container_of(task, struct inotify_ev, task); - const struct inotify_event *event; - char *ptr; - struct server *server; - - assert_return(task); - assert_task_alive(DBG_CFG, task); - - if (res <= 0) { - error("inotify_read: %i", res); - return; - } - - for (ptr = iev->buf; ptr < iev->buf + res; ptr += sizeof(struct inotify_event) + event->len) { - event = (const struct inotify_event *)ptr; - - if (debug_enabled(DBG_CFG)) - inotify_event_dump(event); - - if (event->mask & (IN_IGNORED | IN_MOVE_SELF | IN_DELETE_SELF | IN_UNMOUNT)) - die("configuration directory gone, exiting"); - - if (event->mask & IN_Q_OVERFLOW) { - error("inotify queue overflow"); - continue; - } - - if (!scfg_valid_filename(event->name)) - continue; - - if (event->mask & (IN_MOVED_FROM | IN_DELETE)) - server_delete_by_name(event->name); - else if (event->mask & (IN_MOVED_TO | IN_CREATE | IN_CLOSE_WRITE)) { - server = server_new(event->name); - verbose("New server config file detected: %s", server->name); - uring_openat(&server->task, server->name, scfg_open_cb); - } else - error("inotify: unknown event: 0x%08x", event->mask); - } - - uring_read(&iev->task, iev->buf, sizeof(iev->buf), inotify_cb); -} - -void -cfgdir_refdump() -{ - assert_return_silent(cfg->iev); - - uring_task_refdump(&cfg->iev->task); -} - -void -cfgdir_delete() -{ - assert_return(cfg->iev); - - debug(DBG_CFG, "closing fd %i", cfg->iev->task.fd); - uring_task_destroy(&cfg->iev->task); - cfg->iev = NULL; -} - -void -cfgdir_init() -{ - int ifd; - int iwd; - struct inotify_ev *iev; - DIR *dir; - struct dirent *dent; - struct server *server; - - assert_return(!cfg->iev); - - iev = zmalloc(sizeof(*iev)); - if (!iev) - die("malloc: %m"); - - ifd = inotify_init1(IN_CLOEXEC); - if (ifd < 0) - die("inotify_init1: %m"); - - /* ln = IN_CREATE, cp/vi/mv = IN_CREATE, IN_OPEN, IN_CLOSE_WRITE */ - iwd = inotify_add_watch(ifd, ".", - IN_CLOSE_WRITE | IN_DELETE | IN_CREATE | - IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_TO | - IN_MOVED_FROM | IN_DONT_FOLLOW | - IN_EXCL_UNLINK | IN_ONLYDIR ); - if (iwd < 0) - die("inotify_add_watch: %m"); - - uring_task_init(&iev->task, "server_config", uring_parent(), inotify_free); - uring_task_set_fd(&iev->task, ifd); - cfg->iev = iev; - uring_read(&iev->task, iev->buf, sizeof(iev->buf), inotify_cb); - - dir = opendir("."); - if (!dir) - die("opendir(%s): %m", cfg->homedir); - - while ((dent = readdir(dir)) != NULL) { - if (dent->d_type != DT_REG && dent->d_type != DT_UNKNOWN) - continue; - if (!scfg_valid_filename(dent->d_name)) - continue; - - server = server_new(dent->d_name); - if (server) - uring_openat(&server->task, server->name, scfg_open_cb); - } - - closedir(dir); -} - diff --git a/cfgdir.h b/cfgdir.h deleted file mode 100644 index 2fe36eb..0000000 --- a/cfgdir.h +++ /dev/null @@ -1,10 +0,0 @@ -#ifndef foocfgdirhfoo -#define foocfgdirhfoo - -void cfgdir_delete(); - -void cfgdir_refdump(); - -void cfgdir_init(); - -#endif diff --git a/igmp.c b/igmp.c index cebab40..2fb75a2 100644 --- a/igmp.c +++ b/igmp.c @@ -12,7 +12,7 @@ #include #include -/* Remove later */ +/* FIXME: Remove later */ #include #include "main.h" diff --git a/main.c b/main.c index da11e97..c240da2 100644 --- a/main.c +++ b/main.c @@ -21,7 +21,7 @@ #include "uring.h" #include "config.h" #include "server.h" -#include "cfgdir.h" +#include "server-config.h" #include "announce.h" #include "systemd.h" #include "igmp.h" @@ -568,14 +568,13 @@ dump_tree() info("Dumping Tree"); info("============"); uring_task_refdump(&cfg->task); - uring_refdump(cfg->uev); - idle_refdump(cfg->idle); if (cfg->sev) uring_task_refdump(&cfg->sev->task); - igmp_refdump(cfg->igmp); - announce_refdump(cfg->aev); - if (cfg->iev) - cfgdir_refdump(cfg->iev); + uring_refdump(); + idle_refdump(); + igmp_refdump(); + announce_refdump(); + server_cfg_monitor_refdump(); list_for_each_entry(server, &cfg->servers, list) server_refdump(server); info("============"); @@ -642,11 +641,11 @@ signalfd_read(struct uring_task *task, int res) verbose("got a signal to dump tree"); sd_notifyf(0, "STOPPING=1\nSTATUS=Received signal, exiting"); dump_tree(); - uring_task_put(&sev->task); + uring_task_destroy(&sev->task); igmp_delete(); announce_delete(); idle_delete(); - cfgdir_delete(); + server_cfg_monitor_delete(); list_for_each_entry_safe(server, stmp, &cfg->servers, list) server_delete(server); uring_delete(); @@ -753,7 +752,7 @@ main(int argc, char **argv) signalfd_init(); - cfgdir_init(); + server_cfg_monitor_init(); announce_init(); diff --git a/main.h b/main.h index 488ae8d..688dbe2 100644 --- a/main.h +++ b/main.h @@ -146,7 +146,7 @@ struct cfg { char *igmp_iface; struct uring_ev *uev; - struct inotify_ev *iev; + struct server_cfg_monitor *server_cfg_monitor; struct signalfd_ev *sev; struct announce *aev; struct igmp *igmp; diff --git a/meson.build b/meson.build index fa9199e..a4472cd 100644 --- a/meson.build +++ b/meson.build @@ -8,9 +8,9 @@ mcproxy_sources = [ 'main.c', 'uring.c', 'server.c', - 'proxy.c', + 'server-proxy.c', + 'server-config.c', 'announce.c', - 'cfgdir.c', 'config.c', 'rcon.c', 'idle.c', diff --git a/proxy.c b/proxy.c deleted file mode 100644 index 7edfffd..0000000 --- a/proxy.c +++ /dev/null @@ -1,424 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "main.h" -#include "uring.h" -#include "server.h" -#include "proxy.h" -#include "utils.h" - -static void -format_bytes(char *buf, size_t len, uint64_t val) -{ - uint64_t tmp; - const char *suffix = "B"; - - assert_return(buf && len > 0); - - tmp = val * 10; - if (val > 1152921504606846976ULL) { - tmp = val / 115292150460684697ULL; - suffix= "EiB"; - } else if (val > 1125899906842624ULL) { - tmp /= 1125899906842624ULL; - suffix = "PiB"; - } else if (val > 1099511627776ULL) { - tmp /= 1099511627776ULL; - suffix = "TiB"; - } else if (val > 1073741824ULL) { - tmp /= 1073741824ULL; - suffix = "GiB"; - } else if (val > 1048576) { - tmp /= 1048576; - suffix = "MiB"; - } else if (val > 1024) { - tmp /= 1024; - suffix = "KiB"; - } - - snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); -} - -static void -format_time(char *buf, size_t len, time_t diff) -{ - unsigned hh, mm, ss; - - assert_return(buf && len > 0); - - hh = diff / 3600; - diff %= 3600; - mm = diff / 60; - diff %= 60; - ss = diff; - - snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); -} - -static void -proxy_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, task); - char cts[100]; - char stc[100]; - char duration[100]; - - assert_return(task); - - debug(DBG_PROXY, "server: %s, src: %s, dst: %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - - if (proxy->begin > 0) { - format_time(duration, sizeof(duration), time(NULL) - proxy->begin); - format_bytes(cts, sizeof(cts), proxy->client_bytes); - format_bytes(stc, sizeof(stc), proxy->server_bytes); - - info("%s: proxy connection %s -> %s closed " - "(CtS: %s, StC: %s), duration %s", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr, - cts, stc, duration); - } - - list_del(&proxy->list); - xfree(proxy); -} - -static void -proxy_client_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - - debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); -} - -static void -proxy_server_free(struct uring_task *task) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - - debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); -} - -void -proxy_delete(struct server_proxy *proxy) -{ - debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); - - assert_return(proxy); - - uring_task_destroy(&proxy->servertask); - uring_task_destroy(&proxy->clienttask); - uring_task_destroy(&proxy->task); -} - -static void proxy_client_data_in(struct uring_task *task, int res); - -static void -proxy_client_data_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - proxy->client_bytes += res; - uring_task_set_fd(&proxy->clienttask, proxy->cfd); - uring_tbuf_read(task, proxy_client_data_in); -} - -static void -proxy_client_data_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->clienttask, proxy->sfd); - uring_tbuf_write(task, proxy_client_data_out); -} - -static void proxy_server_data_in(struct uring_task *task, - int res); - -static void -proxy_server_data_out(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - proxy->server_bytes += res; - uring_task_set_fd(&proxy->servertask, proxy->sfd); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); -} - -static void -proxy_server_data_in(struct uring_task *task, int res) -{ - struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - if (res <= 0) { - debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); - uring_task_close_fd(task); - proxy_delete(proxy); - return; - } - - uring_task_set_fd(&proxy->servertask, proxy->cfd); - uring_tbuf_write(task, proxy_server_data_out); -} - -static void -proxy_connected_cb(struct connection *conn, bool connected) -{ - struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); - - assert_return(conn); - assert_task_alive(DBG_PROXY, &proxy->clienttask); - assert_task_alive(DBG_PROXY, &proxy->servertask); - - if (!connected) { - error("%s: proxy connection to remote server failed", - proxy->server->name); - proxy_delete(proxy); - return; - } - - proxy->sfd = proxy->servertask.fd; - verbose("%s: proxy connection %s -> %s opened", - proxy->server->name, - proxy->client_conn.remote.addrstr, - proxy->server_conn.remote.addrstr); - proxy->begin = time(NULL); - - uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); - uring_tbuf_read(&proxy->servertask, proxy_server_data_in); -} - -void -proxy_refdump(struct server_proxy *proxy) -{ - assert_return(proxy); - - uring_task_refdump(&proxy->task); - uring_task_refdump(&proxy->clienttask); - uring_task_refdump(&proxy->servertask); -} - -struct server_proxy * -proxy_new(struct server *server, struct saddr *client, int fd) -{ - struct server_proxy *proxy; - - assert_return(server && client && fd > 0, NULL); - - proxy = zmalloc(sizeof(*proxy)); - if (!proxy) { - error("malloc: %m"); - return NULL; - } - - proxy->sfd = -1; - proxy->cfd = fd; - proxy->server = server; - uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); - - connection_set_local(&proxy->client_conn, fd); - connection_set_remote(&proxy->client_conn, client); - - uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, - proxy_client_free); - uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); - uring_task_set_fd(&proxy->clienttask, fd); - - uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, - proxy_server_free); - uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); - - list_add(&proxy->list, &server->proxys); - connect_any(&proxy->servertask, &server->remotes, - &proxy->server_conn, proxy_connected_cb); - - return proxy; -} - -static void -local_accept(struct uring_task *task, int res) -{ - struct server_local *local = container_of(task, struct server_local, task); - struct server *server = container_of(task->parent, struct server, task); - struct server_proxy *proxy; - - assert_return(task); - assert_task_alive(DBG_PROXY, task); - - debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); - - if (res < 0) { - error("result was %i", res); - goto out; - } - - saddr_set_addrstr(&local->client); - - verbose("%s: incoming proxy connection: %s -> %s", - server->name, local->client.addrstr, local->local.addrstr); - - if (list_empty(&server->remotes)) { - /* This shouldn't be possible, checked before opening local */ - error("server->remotes empty!"); - uring_close(&local->task, res); - goto out; - } - - proxy = proxy_new(server, &local->client, res); - if (!proxy) - uring_close(&local->task, res); - -out: - uring_accept(&local->task, &local->client, local_accept); -} - -bool -local_open(struct server_local *local) -{ - int sfd; - int option; - int r; - - assert_return(local && local->server, false); - - sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (sfd < 0) { - error("socket: %m"); - goto error; - } - - option = true; - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { - error("setsockopt: %m"); - goto error; - } - - /* The MC protocol expects the client to send data first */ - /* FIXME: could make this configurable */ - option = true; - if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - - /* FIXME: could make this configurable */ - option = true; - if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) - error("setsockopt: %m"); - - socket_set_low_latency(sfd); - - r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); - if (r < 0) { - error("bind: %m"); - goto error; - } - - r = listen(sfd, 100); - if (r < 0) { - error("listen: %m"); - goto error; - } - - uring_task_set_fd(&local->task, sfd); - uring_accept(&local->task, &local->client, local_accept); - return true; - -error: - if (sfd >= 0) - uring_close(&local->task, sfd); - return false; -} - -void -local_refdump(struct server_local *local) -{ - assert_return(local); - - uring_task_refdump(&local->task); -} - -static void -local_free(struct uring_task *task) -{ - struct server_local *local = container_of(task, struct server_local, task); - - assert_return(task); - - debug(DBG_PROXY, "task %p, local %p", task, local); - list_del(&local->list); - xfree(local); -} - -void -local_delete(struct server_local *local) -{ - assert_return(local); - - uring_task_destroy(&local->task); -} - -struct server_local * -local_new(struct server *server, struct saddr *saddr) -{ - struct server_local *local; - - assert_return(server && saddr, NULL); - - local = zmalloc(sizeof(*local)); - if (!local) { - error("malloc: %m"); - return NULL; - } - - debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); - local->local = *saddr; - local->server = server; - uring_task_init(&local->task, "local", &server->task, local_free); - xfree(saddr); - return local; -} - diff --git a/proxy.h b/proxy.h deleted file mode 100644 index 75c078d..0000000 --- a/proxy.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef fooproxyhfoo -#define fooproxyhfoo - -struct server_proxy { - struct connection client_conn; - struct uring_task_buf clientbuf; - struct uring_task clienttask; - uint64_t client_bytes; - int cfd; - - struct connection server_conn; - struct uring_task_buf serverbuf; - struct uring_task servertask; - uint64_t server_bytes; - int sfd; - - time_t begin; - unsigned next_remote; - struct uring_task task; - struct server *server; - struct list_head list; -}; - -void proxy_refdump(struct server_proxy *proxy); - -void proxy_delete(struct server_proxy *proxy); - -struct server_proxy *proxy_new(struct server *server, struct saddr *client, - int fd); - -struct server_local { - struct saddr local; - struct saddr client; - struct uring_task task; - - struct server *server; - struct list_head list; -}; - -bool local_open(struct server_local *local); - -void local_refdump(struct server_local *local); - -void local_delete(struct server_local *local); - -struct server_local *local_new(struct server *server, struct saddr *saddr); - -#endif diff --git a/server-config.c b/server-config.c new file mode 100644 index 0000000..d42022b --- /dev/null +++ b/server-config.c @@ -0,0 +1,580 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "main.h" +#include "uring.h" +#include "config.h" +#include "server.h" +#include "server-config.h" + +static void +scfg_dns_cb(struct dns_async *dns, bool (*server_cb)(struct server *, struct saddr *)) +{ + struct server *server; + struct sockaddr_in *in4; + struct sockaddr_in6 *in6; + struct saddr *saddr; + struct addrinfo *results = NULL, *ai; + int r; + + assert_return(dns && dns->priv && server_cb); + + server = dns->priv; + debug(DBG_DNS, "called, dns: %p, name: %s, server: %p, server->name: %s", + dns, dns->name, server, server->name); + + r = gai_error(&dns->gcb); + if (r == EAI_INPROGRESS) { + /* This shouldn't happen, assume we'll get called again */ + error("called with request in progress"); + return; + } else if (r == EAI_CANCELED) { + /* The server must be in the process of going away */ + goto out; + } else if (r < 0) { + error("DNS lookup of %s:%s failed: %s", + dns->name, dns->port, gai_strerror(r)); + goto out; + } + + results = dns->gcb.ar_result; + + for (ai = results; ai; ai = ai->ai_next) { + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) { + error("DNS lookup of %s:%s failed: %m", dns->name, dns->port); + goto out; + } + + switch (ai->ai_family) { + case AF_INET: + in4 = (struct sockaddr_in *)ai->ai_addr; + saddr_set_ipv4(saddr, in4->sin_addr.s_addr, in4->sin_port); + server_cb(server, saddr); + break; + + case AF_INET6: + in6 = (struct sockaddr_in6 *)ai->ai_addr; + saddr_set_ipv6(saddr, &in6->sin6_addr, in6->sin6_port); + server_cb(server, saddr); + break; + + default: + error("getaddrinfo(%s:%s): unknown address family (%i)", + dns->name, dns->port, ai->ai_family); + xfree(saddr); + break; + } + } + +out: + freeaddrinfo(results); + list_del(&dns->list); + xfree(dns); + uring_task_put(&server->task); + server_commit(server); +} + +static void +scfg_local_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_local); +} + +static void +scfg_remote_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_remote); +} + +static void +scfg_rcon_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_rcon); +} + +enum scfg_keys { + SCFG_KEY_INVALID = 0, + SCFG_KEY_TYPE, + SCFG_KEY_NAME, + SCFG_KEY_PORT, + SCFG_KEY_LOCAL, + SCFG_KEY_REMOTE, + SCFG_KEY_IDLE_TIMEOUT, + SCFG_KEY_STOP_METHOD, + SCFG_KEY_START_METHOD, + SCFG_KEY_STOP_EXEC, + SCFG_KEY_START_EXEC, + SCFG_KEY_RCON, + SCFG_KEY_RCON_PASSWORD, + SCFG_KEY_SYSTEMD_SERVICE, +}; + +struct cfg_key_value_map scfg_key_map[] = { + { + .key_name = "type", + .key_value = SCFG_KEY_TYPE, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "name", + .key_value = SCFG_KEY_NAME, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "port", + .key_value = SCFG_KEY_PORT, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "local", + .key_value = SCFG_KEY_LOCAL, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "remote", + .key_value = SCFG_KEY_REMOTE, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "idle_timeout", + .key_value = SCFG_KEY_IDLE_TIMEOUT, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "stop_method", + .key_value = SCFG_KEY_STOP_METHOD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "start_method", + .key_value = SCFG_KEY_START_METHOD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "stop_exec", + .key_value = SCFG_KEY_STOP_EXEC, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "start_exec", + .key_value = SCFG_KEY_START_EXEC, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "rcon", + .key_value = SCFG_KEY_RCON, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "rcon_password", + .key_value = SCFG_KEY_RCON_PASSWORD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "systemd_service", + .key_value = SCFG_KEY_SYSTEMD_SERVICE, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = NULL, + .key_value = SCFG_KEY_INVALID, + .value_type = CFG_VAL_TYPE_INVALID, + } +}; + +static bool +handle_dns(struct server *server, const char *type, + struct cfg_value *value, dns_cb_t *async_cb, + bool (*sync_cb)(struct server *, struct saddr *)) +{ + struct saddr *saddr, *tmp; + struct dns_async *dns; + + assert_return(server && type && value && async_cb && sync_cb, false); + + switch (value->type) { + case CFG_VAL_TYPE_ADDRS: + debug(DBG_DNS, "%s: got immediate addrs", type); + + list_for_each_entry_safe(saddr, tmp, &value->saddrs, list) { + list_del(&saddr->list); + sync_cb(server, saddr); + } + return true; + + case CFG_VAL_TYPE_ASYNC_ADDRS: + debug(DBG_DNS, "%s: doing async lookup of DNS record: %p", + type, value->dns_async); + + dns = value->dns_async; + dns->cb = async_cb; + dns->priv = server; + list_add(&dns->list, &server->dnslookups); + uring_task_get(&server->task); + return true; + + default: + return false; + } +} + +static void +scfg_parse(struct server *server) +{ + char *pos; + + assert_return(server); + + pos = server->tbuf.buf; + + if (!config_parse_header(server->name, "server", &pos)) + return; + + while (true) { + int key; + const char *keyname; + struct cfg_value value; + + if (!config_parse_line(server->name, &pos, scfg_key_map, + &key, &keyname, &value)) + break; + + if (key == SCFG_KEY_INVALID) + break; + + debug(DBG_CFG, "%s: key %s", server->name, keyname); + + switch (key) { + + case SCFG_KEY_TYPE: + if (streq(value.str, "proxy")) { + if (!server_set_type(server, SERVER_TYPE_PROXY)) + return; + } else if (streq(value.str, "announce")) { + if (!server_set_type(server, SERVER_TYPE_ANNOUNCE)) + return; + } + break; + + case SCFG_KEY_NAME: + if (!server_set_pretty_name(server, value.str)) + return; + break; + + case SCFG_KEY_PORT: + if (!server_set_port(server, value.uint16)) + return; + break; + + case SCFG_KEY_LOCAL: + if (!handle_dns(server, "local", &value, + scfg_local_dns_cb, server_add_local)) + return; + break; + + case SCFG_KEY_REMOTE: + if (!handle_dns(server, "remote", &value, + scfg_remote_dns_cb, server_add_remote)) + return; + break; + + case SCFG_KEY_IDLE_TIMEOUT: + if (!server_set_idle_timeout(server, value.uint16)) + return; + break; + + case SCFG_KEY_STOP_METHOD: + if (streq(value.str, "exec")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_EXEC)) + break; + } else if (streq(value.str, "rcon")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_RCON)) + break; + } else if (streq(value.str, "systemd")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_SYSTEMD)) + break; + } + return; + + case SCFG_KEY_START_METHOD: + if (streq(value.str, "exec")) { + if (server_set_start_method(server, SERVER_START_METHOD_EXEC)) + break; + } else if (streq(value.str, "systemd")) { + if (server_set_start_method(server, SERVER_START_METHOD_SYSTEMD)) + break; + } + return; + + case SCFG_KEY_STOP_EXEC: + if (!server_set_stop_exec(server, value.str)) + return; + break; + + case SCFG_KEY_START_EXEC: + if (!server_set_start_exec(server, value.str)) + return; + break; + + case SCFG_KEY_RCON: + if (!handle_dns(server, "rcon", &value, + scfg_rcon_dns_cb, server_add_rcon)) + return; + break; + + case SCFG_KEY_RCON_PASSWORD: + if (!server_set_rcon_password(server, value.str)) + return; + break; + + case SCFG_KEY_SYSTEMD_SERVICE: + if (!server_set_systemd_service(server, value.str)) + return; + break; + + case SCFG_KEY_INVALID: + default: + break; + } + } +} + +static void +scfg_read_cb(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, task); + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res <= 0) { + error("error reading config file for %s: %s", + server->name, strerror(-res)); + server_delete(server); + } + + debug(DBG_CFG, "%s: parsing cfg (%i bytes)", server->name, res); + uring_task_close_fd(&server->task); + scfg_parse(server); + server_commit(server); +} + +static void +scfg_open_cb(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, task); + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res < 0) { + error("open(%s) failed: %s", server->name, strerror(-res)); + server_delete(server); + return; + } + + debug(DBG_CFG, "reading server cfg %s (fd %i)", server->name, res); + uring_task_set_fd(&server->task, res); + uring_tbuf_read_until_eof(&server->task, scfg_read_cb); +} + +static bool +scfg_valid_filename(const char *name) +{ + const char *suffix; + + if (empty_str(name)) + return false; + if (name[0] == '.') + return false; + if ((suffix = strrchr(name, '.')) == NULL) + return false; + if (!streq(suffix, ".server")) + return false; + + return true; +} + +struct server_cfg_monitor { + struct uring_task task; + char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event)))); +}; + +static void +scfgm_free(struct uring_task *task) +{ + struct server_cfg_monitor *scfgm = container_of(task, + struct server_cfg_monitor, + task); + + assert_return(task); + + debug(DBG_CFG, "called"); + xfree(scfgm); + cfg->server_cfg_monitor = NULL; +} + +static void +inotify_event_dump(const struct inotify_event *event) +{ + assert_return(event); + + debug(DBG_CFG, "inotify event:"); + debug(DBG_CFG, " * WD : %i", event->wd); + debug(DBG_CFG, " * Cookie : %" PRIu32, event->cookie); + debug(DBG_CFG, " * Length : %" PRIu32, event->len); + debug(DBG_CFG, " * Name : %s", event->name); + debug(DBG_CFG, " * Mask : %" PRIu32, event->mask); + if (event->mask & IN_ACCESS) + debug(DBG_CFG, "\tIN_ACCESS"); + else if(event->mask & IN_MODIFY) + debug(DBG_CFG, "\tIN_MODIFY"); + else if(event->mask & IN_ATTRIB) + debug(DBG_CFG, "\tIN_ATTRIB"); + else if(event->mask & IN_CLOSE_WRITE) + debug(DBG_CFG, "\tIN_CLOSE_WRITE"); + else if(event->mask & IN_CLOSE_NOWRITE) + debug(DBG_CFG, "\tIN_CLOSE_NOWRITE"); + else if(event->mask & IN_OPEN) + debug(DBG_CFG, "\tIN_OPEN"); + else if(event->mask & IN_MOVED_FROM) + debug(DBG_CFG, "\tIN_MOVED_FROM"); + else if(event->mask & IN_MOVED_TO) + debug(DBG_CFG, "\tIN_MOVED_TO"); + else if(event->mask & IN_CREATE) + debug(DBG_CFG, "\tIN_CREATE"); + else if(event->mask & IN_DELETE) + debug(DBG_CFG, "\tIN_DELETE"); + else if(event->mask & IN_DELETE_SELF) + debug(DBG_CFG, "\tIN_DELETE_SELF"); + else if(event->mask & IN_MOVE_SELF) + debug(DBG_CFG, "\tIN_MOVE_SELF"); + else if(event->mask & IN_UNMOUNT) + debug(DBG_CFG, "\tIN_UNMOUNT"); + else if(event->mask & IN_Q_OVERFLOW) + debug(DBG_CFG, "\tIN_Q_OVERFLOW"); + else if(event->mask & IN_IGNORED) + debug(DBG_CFG, "\tIN_IGNORED"); +} + +static void +inotify_cb(struct uring_task *task, int res) +{ + struct server_cfg_monitor *scfgm = container_of(task, + struct server_cfg_monitor, + task); + const struct inotify_event *event; + char *ptr; + struct server *server; + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res <= 0) { + error("inotify_read: %i", res); + return; + } + + for (ptr = scfgm->buf; ptr < scfgm->buf + res; ptr += sizeof(struct inotify_event) + event->len) { + event = (const struct inotify_event *)ptr; + + if (debug_enabled(DBG_CFG)) + inotify_event_dump(event); + + if (event->mask & (IN_IGNORED | IN_MOVE_SELF | IN_DELETE_SELF | IN_UNMOUNT)) + die("configuration directory gone, exiting"); + + if (event->mask & IN_Q_OVERFLOW) { + error("inotify queue overflow"); + continue; + } + + if (!scfg_valid_filename(event->name)) + continue; + + if (event->mask & (IN_MOVED_FROM | IN_DELETE)) + server_delete_by_name(event->name); + else if (event->mask & (IN_MOVED_TO | IN_CREATE | IN_CLOSE_WRITE)) { + server = server_new(event->name); + verbose("New server config file detected: %s", server->name); + uring_openat(&server->task, server->name, scfg_open_cb); + } else + error("inotify: unknown event: 0x%08x", event->mask); + } + + uring_read(&scfgm->task, scfgm->buf, sizeof(scfgm->buf), inotify_cb); +} + +void +server_cfg_monitor_refdump() +{ + assert_return_silent(cfg->server_cfg_monitor); + + uring_task_refdump(&cfg->server_cfg_monitor->task); +} + +void +server_cfg_monitor_delete() +{ + assert_return(cfg->server_cfg_monitor); + + debug(DBG_CFG, "closing fd %i", cfg->server_cfg_monitor->task.fd); + uring_task_destroy(&cfg->server_cfg_monitor->task); + cfg->server_cfg_monitor = NULL; +} + +void +server_cfg_monitor_init() +{ + int ifd; + int iwd; + struct server_cfg_monitor *scfgm; + DIR *dir; + struct dirent *dent; + struct server *server; + + assert_return(!cfg->server_cfg_monitor); + + scfgm = zmalloc(sizeof(*scfgm)); + if (!scfgm) + die("malloc: %m"); + + ifd = inotify_init1(IN_CLOEXEC); + if (ifd < 0) + die("inotify_init1: %m"); + + /* ln = IN_CREATE, cp/vi/mv = IN_CREATE, IN_OPEN, IN_CLOSE_WRITE */ + iwd = inotify_add_watch(ifd, ".", + IN_CLOSE_WRITE | IN_DELETE | IN_CREATE | + IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_TO | + IN_MOVED_FROM | IN_DONT_FOLLOW | + IN_EXCL_UNLINK | IN_ONLYDIR ); + if (iwd < 0) + die("inotify_add_watch: %m"); + + uring_task_init(&scfgm->task, "server-config-monitor", uring_parent(), scfgm_free); + uring_task_set_fd(&scfgm->task, ifd); + cfg->server_cfg_monitor = scfgm; + uring_read(&scfgm->task, scfgm->buf, sizeof(scfgm->buf), inotify_cb); + + dir = opendir("."); + if (!dir) + die("opendir(%s): %m", cfg->homedir); + + while ((dent = readdir(dir)) != NULL) { + if (dent->d_type != DT_REG && dent->d_type != DT_UNKNOWN) + continue; + if (!scfg_valid_filename(dent->d_name)) + continue; + + server = server_new(dent->d_name); + if (server) + uring_openat(&server->task, server->name, scfg_open_cb); + } + + closedir(dir); +} + diff --git a/server-config.h b/server-config.h new file mode 100644 index 0000000..590dae0 --- /dev/null +++ b/server-config.h @@ -0,0 +1,10 @@ +#ifndef fooserverconfighfoo +#define fooserverconfighfoo + +void server_cfg_monitor_delete(); + +void server_cfg_monitor_refdump(); + +void server_cfg_monitor_init(); + +#endif diff --git a/server-proxy.c b/server-proxy.c new file mode 100644 index 0000000..a0affbd --- /dev/null +++ b/server-proxy.c @@ -0,0 +1,424 @@ +#include +#include +#include +#include +#include +#include + +#include "main.h" +#include "uring.h" +#include "server.h" +#include "server-proxy.h" +#include "utils.h" + +static void +format_bytes(char *buf, size_t len, uint64_t val) +{ + uint64_t tmp; + const char *suffix = "B"; + + assert_return(buf && len > 0); + + tmp = val * 10; + if (val > 1152921504606846976ULL) { + tmp = val / 115292150460684697ULL; + suffix= "EiB"; + } else if (val > 1125899906842624ULL) { + tmp /= 1125899906842624ULL; + suffix = "PiB"; + } else if (val > 1099511627776ULL) { + tmp /= 1099511627776ULL; + suffix = "TiB"; + } else if (val > 1073741824ULL) { + tmp /= 1073741824ULL; + suffix = "GiB"; + } else if (val > 1048576) { + tmp /= 1048576; + suffix = "MiB"; + } else if (val > 1024) { + tmp /= 1024; + suffix = "KiB"; + } + + snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); +} + +static void +format_time(char *buf, size_t len, time_t diff) +{ + unsigned hh, mm, ss; + + assert_return(buf && len > 0); + + hh = diff / 3600; + diff %= 3600; + mm = diff / 60; + diff %= 60; + ss = diff; + + snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); +} + +static void +proxy_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, task); + char cts[100]; + char stc[100]; + char duration[100]; + + assert_return(task); + + debug(DBG_PROXY, "server: %s, src: %s, dst: %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + + if (proxy->begin > 0) { + format_time(duration, sizeof(duration), time(NULL) - proxy->begin); + format_bytes(cts, sizeof(cts), proxy->client_bytes); + format_bytes(stc, sizeof(stc), proxy->server_bytes); + + info("%s: proxy connection %s -> %s closed " + "(CtS: %s, StC: %s), duration %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr, + cts, stc, duration); + } + + list_del(&proxy->list); + xfree(proxy); +} + +static void +proxy_client_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + + debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); +} + +static void +proxy_server_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + + debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); +} + +void +proxy_delete(struct server_proxy *proxy) +{ + debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); + + assert_return(proxy); + + uring_task_destroy(&proxy->servertask); + uring_task_destroy(&proxy->clienttask); + uring_task_destroy(&proxy->task); +} + +static void proxy_client_data_in(struct uring_task *task, int res); + +static void +proxy_client_data_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + proxy->client_bytes += res; + uring_task_set_fd(&proxy->clienttask, proxy->cfd); + uring_tbuf_read(task, proxy_client_data_in); +} + +static void +proxy_client_data_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->clienttask, proxy->sfd); + uring_tbuf_write(task, proxy_client_data_out); +} + +static void proxy_server_data_in(struct uring_task *task, + int res); + +static void +proxy_server_data_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + proxy->server_bytes += res; + uring_task_set_fd(&proxy->servertask, proxy->sfd); + uring_tbuf_read(&proxy->servertask, proxy_server_data_in); +} + +static void +proxy_server_data_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: result was %i", proxy->server->name, res); + uring_task_close_fd(task); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->servertask, proxy->cfd); + uring_tbuf_write(task, proxy_server_data_out); +} + +static void +proxy_connected_cb(struct connection *conn, bool connected) +{ + struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); + + assert_return(conn); + assert_task_alive(DBG_PROXY, &proxy->clienttask); + assert_task_alive(DBG_PROXY, &proxy->servertask); + + if (!connected) { + error("%s: proxy connection to remote server failed", + proxy->server->name); + proxy_delete(proxy); + return; + } + + proxy->sfd = proxy->servertask.fd; + verbose("%s: proxy connection %s -> %s opened", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + proxy->begin = time(NULL); + + uring_tbuf_read(&proxy->clienttask, proxy_client_data_in); + uring_tbuf_read(&proxy->servertask, proxy_server_data_in); +} + +void +proxy_refdump(struct server_proxy *proxy) +{ + assert_return(proxy); + + uring_task_refdump(&proxy->task); + uring_task_refdump(&proxy->clienttask); + uring_task_refdump(&proxy->servertask); +} + +struct server_proxy * +proxy_new(struct server *server, struct saddr *client, int fd) +{ + struct server_proxy *proxy; + + assert_return(server && client && fd > 0, NULL); + + proxy = zmalloc(sizeof(*proxy)); + if (!proxy) { + error("malloc: %m"); + return NULL; + } + + proxy->sfd = -1; + proxy->cfd = fd; + proxy->server = server; + uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); + + connection_set_local(&proxy->client_conn, fd); + connection_set_remote(&proxy->client_conn, client); + + uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, + proxy_client_free); + uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); + uring_task_set_fd(&proxy->clienttask, fd); + + uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, + proxy_server_free); + uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); + + list_add(&proxy->list, &server->proxys); + connect_any(&proxy->servertask, &server->remotes, + &proxy->server_conn, proxy_connected_cb); + + return proxy; +} + +static void +local_accept(struct uring_task *task, int res) +{ + struct server_local *local = container_of(task, struct server_local, task); + struct server *server = container_of(task->parent, struct server, task); + struct server_proxy *proxy; + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); + + if (res < 0) { + error("result was %i", res); + goto out; + } + + saddr_set_addrstr(&local->client); + + verbose("%s: incoming proxy connection: %s -> %s", + server->name, local->client.addrstr, local->local.addrstr); + + if (list_empty(&server->remotes)) { + /* This shouldn't be possible, checked before opening local */ + error("server->remotes empty!"); + uring_close(&local->task, res); + goto out; + } + + proxy = proxy_new(server, &local->client, res); + if (!proxy) + uring_close(&local->task, res); + +out: + uring_accept(&local->task, &local->client, local_accept); +} + +bool +local_open(struct server_local *local) +{ + int sfd; + int option; + int r; + + assert_return(local && local->server, false); + + sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sfd < 0) { + error("socket: %m"); + goto error; + } + + option = true; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { + error("setsockopt: %m"); + goto error; + } + + /* The MC protocol expects the client to send data first */ + /* FIXME: could make this configurable */ + option = true; + if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + + /* FIXME: could make this configurable */ + option = true; + if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + + socket_set_low_latency(sfd); + + r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); + if (r < 0) { + error("bind: %m"); + goto error; + } + + r = listen(sfd, 100); + if (r < 0) { + error("listen: %m"); + goto error; + } + + uring_task_set_fd(&local->task, sfd); + uring_accept(&local->task, &local->client, local_accept); + return true; + +error: + if (sfd >= 0) + uring_close(&local->task, sfd); + return false; +} + +void +local_refdump(struct server_local *local) +{ + assert_return(local); + + uring_task_refdump(&local->task); +} + +static void +local_free(struct uring_task *task) +{ + struct server_local *local = container_of(task, struct server_local, task); + + assert_return(task); + + debug(DBG_PROXY, "task %p, local %p", task, local); + list_del(&local->list); + xfree(local); +} + +void +local_delete(struct server_local *local) +{ + assert_return(local); + + uring_task_destroy(&local->task); +} + +struct server_local * +local_new(struct server *server, struct saddr *saddr) +{ + struct server_local *local; + + assert_return(server && saddr, NULL); + + local = zmalloc(sizeof(*local)); + if (!local) { + error("malloc: %m"); + return NULL; + } + + debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); + local->local = *saddr; + local->server = server; + uring_task_init(&local->task, "local", &server->task, local_free); + xfree(saddr); + return local; +} + diff --git a/server-proxy.h b/server-proxy.h new file mode 100644 index 0000000..dc8300d --- /dev/null +++ b/server-proxy.h @@ -0,0 +1,48 @@ +#ifndef fooserverproxyhfoo +#define fooserverproxyhfoo + +struct server_proxy { + struct connection client_conn; + struct uring_task_buf clientbuf; + struct uring_task clienttask; + uint64_t client_bytes; + int cfd; + + struct connection server_conn; + struct uring_task_buf serverbuf; + struct uring_task servertask; + uint64_t server_bytes; + int sfd; + + time_t begin; + unsigned next_remote; + struct uring_task task; + struct server *server; + struct list_head list; +}; + +void proxy_refdump(struct server_proxy *proxy); + +void proxy_delete(struct server_proxy *proxy); + +struct server_proxy *proxy_new(struct server *server, struct saddr *client, + int fd); + +struct server_local { + struct saddr local; + struct saddr client; + struct uring_task task; + + struct server *server; + struct list_head list; +}; + +bool local_open(struct server_local *local); + +void local_refdump(struct server_local *local); + +void local_delete(struct server_local *local); + +struct server_local *local_new(struct server *server, struct saddr *saddr); + +#endif diff --git a/server.c b/server.c index 9290a96..55072c0 100644 --- a/server.c +++ b/server.c @@ -14,7 +14,7 @@ #include "main.h" #include "uring.h" #include "server.h" -#include "proxy.h" +#include "server-proxy.h" #include "utils.h" #include "config.h" #include "idle.h" diff --git a/uring.c b/uring.c index 65b59f9..e923947 100644 --- a/uring.c +++ b/uring.c @@ -571,11 +571,11 @@ uring_free(struct uring_task *task) } void -uring_refdump(struct uring_ev *uev) +uring_refdump() { - assert_return(uev); + assert_return(cfg->uev); - uring_task_refdump(&uev->task); + uring_task_refdump(&cfg->uev->task); } void diff --git a/uring.h b/uring.h index 70c72e5..9f686b3 100644 --- a/uring.h +++ b/uring.h @@ -62,7 +62,7 @@ void uring_poll_cancel(struct uring_task *task); void uring_delete(); -void uring_refdump(struct uring_ev *uev); +void uring_refdump(); void uring_init(); -- cgit v1.2.3