diff options
author | David Härdeman <david@hardeman.nu> | 2020-06-23 20:56:22 +0200 |
---|---|---|
committer | David Härdeman <david@hardeman.nu> | 2020-06-23 20:56:22 +0200 |
commit | ea053d96f7e89e053d4af8d39b04c5428760345f (patch) | |
tree | 8182ca73675ad3933b0f38cb48a99c69101309b4 /minecproxy | |
parent | 8c27290245b7bcc7cd2f72f3b4a7562294b43bbe (diff) |
Big renaming, move some more functionality to shared lib
Diffstat (limited to 'minecproxy')
-rw-r--r-- | minecproxy/announce.c | 116 | ||||
-rw-r--r-- | minecproxy/announce.h | 14 | ||||
-rw-r--r-- | minecproxy/config-parser.c | 490 | ||||
-rw-r--r-- | minecproxy/config-parser.h | 59 | ||||
-rw-r--r-- | minecproxy/idle.c | 378 | ||||
-rw-r--r-- | minecproxy/idle.h | 13 | ||||
-rw-r--r-- | minecproxy/igmp.c | 587 | ||||
-rw-r--r-- | minecproxy/igmp.h | 10 | ||||
-rw-r--r-- | minecproxy/main.c | 741 | ||||
-rw-r--r-- | minecproxy/main.h | 107 | ||||
-rw-r--r-- | minecproxy/meson.build | 36 | ||||
-rw-r--r-- | minecproxy/misc.c | 281 | ||||
-rw-r--r-- | minecproxy/misc.h | 36 | ||||
-rw-r--r-- | minecproxy/ptimer.c | 223 | ||||
-rw-r--r-- | minecproxy/ptimer.h | 33 | ||||
-rw-r--r-- | minecproxy/server-config.c | 580 | ||||
-rw-r--r-- | minecproxy/server-config.h | 10 | ||||
-rw-r--r-- | minecproxy/server-proxy.c | 578 | ||||
-rw-r--r-- | minecproxy/server-proxy.h | 51 | ||||
-rw-r--r-- | minecproxy/server-rcon.c | 227 | ||||
-rw-r--r-- | minecproxy/server-rcon.h | 12 | ||||
-rw-r--r-- | minecproxy/server.c | 836 | ||||
-rw-r--r-- | minecproxy/server.h | 128 | ||||
-rw-r--r-- | minecproxy/signal-handler.c | 195 | ||||
-rw-r--r-- | minecproxy/signal-handler.h | 8 | ||||
-rw-r--r-- | minecproxy/systemd.c | 219 | ||||
-rw-r--r-- | minecproxy/systemd.h | 14 | ||||
-rw-r--r-- | minecproxy/uring.c | 759 | ||||
-rw-r--r-- | minecproxy/uring.h | 73 |
29 files changed, 6814 insertions, 0 deletions
diff --git a/minecproxy/announce.c b/minecproxy/announce.c new file mode 100644 index 0000000..13ef423 --- /dev/null +++ b/minecproxy/announce.c @@ -0,0 +1,116 @@ +#include <inttypes.h> +#include <sys/socket.h> +#include <netinet/ip.h> +#include <arpa/inet.h> +#include <string.h> +#include <unistd.h> + +#include "main.h" +#include "uring.h" +#include "announce.h" +#include "server.h" +#include "ptimer.h" + +struct announce { + uint64_t value; + struct uring_task task; + struct ptimer_task ptask; + int mcast_fd; +}; + +static void +announce_cb(struct ptimer_task *ptask) +{ + struct announce *announce = container_of(ptask, struct announce, ptask); + struct server *server; + + assert_return(ptask); + assert_task_alive(DBG_ANN, &announce->task); + + debug(DBG_ANN, "announcing servers"); + list_for_each_entry(server, &cfg->servers, list) + server_announce(server, announce->mcast_fd); +} + +static void +announce_free(struct uring_task *task) +{ + struct announce *announce = container_of(task, struct announce, task); + + assert_return(task); + debug(DBG_ANN, "task %p, announce 0x%p, mcast_fd: %i", + task, announce, announce->mcast_fd); + close(announce->mcast_fd); + xfree(announce); +} + +void +announce_refdump() +{ + assert_return_silent(cfg->announce); + + uring_task_refdump(&cfg->announce->task); +} + +void +announce_delete() +{ + assert_return_silent(cfg->announce); + + debug(DBG_ANN, "called"); + announce_stop(); + uring_task_destroy(&cfg->announce->task); + cfg->announce = NULL; +} + +void +announce_stop() +{ + struct announce *announce = cfg->announce; + + assert_return_silent(announce); + + ptimer_del_task(&announce->ptask); +} + +void +announce_start(unsigned duration) +{ + struct announce *announce = cfg->announce; + unsigned times; + + assert_return_silent(announce); + + if (duration == 0) + times = 0; + else + times = MAX(announce->ptask.times, + DIV_ROUND_UP(duration, cfg->announce_interval)); + + announce->ptask.times = times; + ptimer_add_task(&announce->ptask); +} + +void +announce_init() +{ + struct announce *announce; + int sfd; + + assert_return(!cfg->announce); + assert_return_silent(cfg->announce_interval > 0); + + announce = zmalloc(sizeof(*announce)); + if (!announce) + die("malloc: %m"); + + sfd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); + if (sfd < 0) + die("socket: %m"); + + uring_task_init(&announce->task, "announce", uring_parent(), announce_free); + ptask_init(&announce->ptask, cfg->announce_interval, 0, announce_cb); + announce->mcast_fd = sfd; + cfg->announce = announce; +} + diff --git a/minecproxy/announce.h b/minecproxy/announce.h new file mode 100644 index 0000000..77a36f2 --- /dev/null +++ b/minecproxy/announce.h @@ -0,0 +1,14 @@ +#ifndef fooannouncehfoo +#define fooannouncehfoo + +void announce_refdump(); + +void announce_delete(); + +void announce_stop(); + +void announce_start(unsigned duration); + +void announce_init(); + +#endif diff --git a/minecproxy/config-parser.c b/minecproxy/config-parser.c new file mode 100644 index 0000000..9c89cf2 --- /dev/null +++ b/minecproxy/config-parser.c @@ -0,0 +1,490 @@ +#define _GNU_SOURCE +#include <stdio.h> +#include <ctype.h> +#include <string.h> +#include <stdbool.h> +#include <errno.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <inttypes.h> + +#include "utils.h" +#include "config-parser.h" + +static void +eat_whitespace_and_comments(char **pos) +{ + assert_return(pos && *pos); + + while (true) { + while (isspace(**pos)) + (*pos)++; + + if (**pos != '#') + return; + + while (**pos != '\r' && **pos != '\n' && **pos != '\0') + (*pos)++; + } +} + +static char * +get_line(char **pos) +{ + char *begin, *end; + + assert_return(pos && *pos, NULL); + + begin = *pos; + while (isspace(*begin)) + begin++; + + if (*begin == '\0') + return NULL; + + end = begin; + while (*end != '\n' && *end != '\0') + end++; + + if (*end == '\0') + *pos = end; + else + *pos = end + 1; + + while (isspace(*end)) { + *end = '\0'; + end--; + } + + return begin; +} + +static bool +dnslookup(const char *name, uint16_t port, struct cfg_value *rvalue, bool async) +{ + struct sockaddr_in *in4; + struct sockaddr_in6 *in6; + struct dns_async tmp; + struct dns_async *dns; + int mode = async ? GAI_NOWAIT : GAI_WAIT; + struct addrinfo *results = NULL, *ai; + struct saddr *saddr = NULL; + bool rv = false; + int r; + + assert_return(!empty_str(name) && strlen(name) < sizeof(dns->name) && port > 0 && rvalue, false); + + if (async) { + rvalue->type = CFG_VAL_TYPE_ASYNC_ADDRS; + rvalue->dns_async = NULL; + dns = zmalloc(sizeof(*dns)); + if (!dns) { + error("async DNS lookup of %s failed: %m", name); + goto out; + } + debug(DBG_DNS, "doing async DNS lookup of %s: %p", name, dns); + } else { + memset(&tmp, 0, sizeof(tmp)); + dns = &tmp; + debug(DBG_DNS, "doing sync DNS lookup of %s", name); + } + + sprintf(dns->name, "%s", name); + sprintf(dns->port, "%" PRIu16, port); + + dns->req.ai_family = AF_UNSPEC; + dns->req.ai_socktype = SOCK_STREAM; + dns->req.ai_protocol = 0; + dns->req.ai_flags = AI_NUMERICSERV; + + dns->sev.sigev_notify = SIGEV_SIGNAL; + dns->sev.sigev_signo = SIGUSR1; + dns->sev.sigev_value.sival_ptr = dns; + + dns->gcb.ar_name = dns->name; + dns->gcb.ar_service = dns->port; + dns->gcb.ar_request = &dns->req; + + struct gaicb *gcbs[] = { &dns->gcb }; + + r = getaddrinfo_a(mode, gcbs, ARRAY_SIZE(gcbs), &dns->sev); + if (r != 0) { + error("getaddrinfo(%s:%" PRIu16 "): %s", name, port, gai_strerror(r)); + goto out; + } + + if (async) { + rvalue->dns_async = dns; + rv = true; + goto out; + } + + results = dns->gcb.ar_result; + + for (ai = results; ai; ai = ai->ai_next) { + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) { + error("sync DNS lookup of %s failed: %m", name); + goto out; + } + + switch (ai->ai_family) { + case AF_INET: + in4 = (struct sockaddr_in *)ai->ai_addr; + saddr_set_ipv4(saddr, in4->sin_addr.s_addr, in4->sin_port); + error("addrstr: %s", saddr->addrstr); + list_add(&saddr->list, &rvalue->saddrs); + break; + + case AF_INET6: + in6 = (struct sockaddr_in6 *)ai->ai_addr; + saddr_set_ipv6(saddr, &in6->sin6_addr, in6->sin6_port); + error("addrstr: %s", saddr->addrstr); + list_add(&saddr->list, &rvalue->saddrs); + break; + + default: + error("getaddrinfo(%s:%s): unknown address family (%i)", + dns->name, dns->port, ai->ai_family); + xfree(saddr); + break; + } + } + + rv = true; + +out: + freeaddrinfo(results); + return rv; +} + +static bool +strtosockaddrs(const char *str, struct cfg_value *rvalue, bool async) +{ + struct saddr *saddr; + uint16_t port; + char *tmp; + struct list_head *list; + unsigned naddrs = 0; + + assert_return(!empty_str(str) && rvalue, false); + + rvalue->type = CFG_VAL_TYPE_ADDRS; + list = &rvalue->saddrs; + list_init(list); + + if (*str == '[') { + /* IPv6, [a:b:c...h]:p or [*]:p */ + debug(DBG_CFG, "attempting to parse IPv6 addr (%s)", str); + + str++; + tmp = strchr(str, ']'); + if (!tmp) + goto error; + *tmp = '\0'; + + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) + goto error; + + /* early list_add to make sure saddr is free():d on error */ + list_add(&saddr->list, list); + + if (streq(str, "*")) + saddr->in6.sin6_addr = in6addr_any; + else if (inet_pton(AF_INET6, str, &saddr->in6.sin6_addr) <= 0) + goto error; + + tmp++; + if (*tmp != ':') + goto error; + + tmp++; + if (strtou16_strict(tmp, &port) < 0) + goto error; + + saddr_set_ipv6(saddr, NULL, htons(port)); + naddrs++; + + } else if (*str == '*') { + /* IPv4, *:p */ + debug(DBG_CFG, "attempting to parse IPv4 addr (%s)", str); + + str++; + if (*str != ':') + goto error; + + str++; + if (strtou16_strict(str, &port) < 0) + goto error; + + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) + goto error; + + saddr_set_ipv4(saddr, INADDR_ANY, htons(port)); + list_add(&saddr->list, list); + naddrs++; + + } else if ((tmp = strchr(str, ':'))) { + /* IPv4, a.b.c.d:p or IPv4/6 hostname:p */ + debug(DBG_CFG, "attempting to parse IPv4 addr or hostname (%s)", str); + + *tmp = '\0'; + tmp++; + if (strtou16_strict(tmp, &port) < 0) + goto error; + + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) + goto error; + + if (inet_pton(AF_INET, str, &saddr->in4.sin_addr) > 0) { + debug(DBG_CFG, "got an IPv4:port (%s:%" PRIu16 ")", str, port); + saddr_set_ipv4(saddr, saddr->in4.sin_addr.s_addr, htons(port)); + list_add(&saddr->list, list); + naddrs++; + goto success; + } + + xfree(saddr); + debug(DBG_CFG, "maybe got a hostname:port (%s:%" PRIu16 ")", str, port); + if (!dnslookup(str, port, rvalue, async)) + goto error; + + } else if (strtou16_strict(tmp, &port) == 0) { + /* Port */ + debug(DBG_CFG, "attempting to parse a port number (%s)", str); + + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) + goto error; + + saddr_set_ipv6(saddr, &in6addr_any, htons(port)); + list_add(&saddr->list, list); + naddrs++; + + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) + goto error; + + saddr_set_ipv4(saddr, INADDR_ANY, htons(port)); + list_add(&saddr->list, list); + naddrs++; + + } else { + /* Unknown */ + error("unable to parse address: %s", str); + goto error; + } + +success: + switch (rvalue->type) { + case CFG_VAL_TYPE_ADDRS: + if (list_empty(list) || naddrs == 0) { + error("empty address list"); + return false; + } + + debug(DBG_CFG, "parsed to %u addresses", naddrs); + return true; + + case CFG_VAL_TYPE_ASYNC_ADDRS: + debug(DBG_CFG, "looking up address asynchronously"); + return true; + + default: + error("invalid rvalue type"); + rvalue->type = CFG_VAL_TYPE_INVALID; + break; + } + +error: + if (rvalue->type == CFG_VAL_TYPE_ADDRS && !list_empty(list)) { + struct saddr *tmp; + + list_for_each_entry_safe(saddr, tmp, list, list) { + list_del(&saddr->list); + xfree(saddr); + } + } + return false; +} + +/* Returns true if there's data left to parse in buf */ +bool +config_parse_line(const char *filename, char **buf, + struct cfg_key_value_map *kvmap, int *rkey, + const char **rkeyname, struct cfg_value *rvalue) +{ + char *line, *tmp, *key; + int i; + + assert_return(buf && *buf && kvmap && rkey && rkeyname && rvalue, false); + + eat_whitespace_and_comments(buf); + line = get_line(buf); + if (!line) + return false; + + debug(DBG_CFG, "%s: parsing config line: %s", filename, line); + + tmp = line; + while (isspace(*tmp)) + tmp++; + + if (*tmp == '\0') + goto error; + + key = tmp; + while (*tmp != '\0' && !isspace(*tmp)) + tmp++; + + if (*tmp == '\0') + goto error; + + *tmp = '\0'; + tmp++; + + while (isspace(*tmp)) + tmp++; + + if (*tmp != '=') + goto error; + + tmp++; + while (isspace(*tmp)) + tmp++; + + if (*tmp == '\0') + goto error; + + for (i = 0; kvmap[i].key_name; i++) { + if (!streq(kvmap[i].key_name, key)) + continue; + + switch (kvmap[i].value_type) { + + case CFG_VAL_TYPE_STRING: + rvalue->type = CFG_VAL_TYPE_STRING; + rvalue->str = tmp; + break; + + case CFG_VAL_TYPE_UINT16: { + uint16_t v; + + if (strtou16_strict(tmp, &v) < 0) + goto error; + + rvalue->type = CFG_VAL_TYPE_UINT16; + rvalue->uint16 = v; + break; + } + + case CFG_VAL_TYPE_ADDRS: + if (!strtosockaddrs(tmp, rvalue, false)) + goto error; + + if (rvalue->type != CFG_VAL_TYPE_ADDRS) { + error("invalid type returned from strtosockaddrs"); + goto error; + } + + if (list_empty(&rvalue->saddrs)) { + error("empty address list"); + goto error; + } + break; + + case CFG_VAL_TYPE_ASYNC_ADDRS: + if (!strtosockaddrs(tmp, rvalue, true)) + goto error; + + switch (rvalue->type) { + case CFG_VAL_TYPE_ADDRS: + if (list_empty(&rvalue->saddrs)) { + error("empty address list"); + goto error; + } + break; + + case CFG_VAL_TYPE_ASYNC_ADDRS: + if (!rvalue->dns_async) { + error("dns_async not set"); + goto error; + } + break; + + default: + error("invalid type returned from strtosockaddrs"); + goto error; + } + + break; + + case CFG_VAL_TYPE_BOOL: + if (strcaseeq(tmp, "yes") || strcaseeq(tmp, "true")) { + rvalue->type = CFG_VAL_TYPE_BOOL; + rvalue->boolean = true; + } else if (strcaseeq(tmp, "no") || strcaseeq(tmp, "false")) { + rvalue->type = CFG_VAL_TYPE_BOOL; + rvalue->boolean = false; + } else { + error("invalid boolean value (%s)", tmp); + goto error; + } + break; + + case CFG_VAL_TYPE_INVALID: + /* fall through */ + default: + goto error; + } + + /* sanity check */ + if ((rvalue->type != kvmap[i].value_type) && + ((kvmap[i].value_type != CFG_VAL_TYPE_ASYNC_ADDRS) && + (rvalue->type != CFG_VAL_TYPE_ADDRS))) { + error("rvalue->type != kvmap->type"); + goto error; + } + + *rkey = kvmap[i].key_value; + *rkeyname = kvmap[i].key_name; + return true; + } + +error: + /* FIXME: the line is already mangled here, a line number would be nice */ + error("%s: invalid config line: %s", filename, line); + rvalue->type = CFG_VAL_TYPE_INVALID; + *rkey = 0; + *rkeyname = NULL; + return true; +} + +bool +config_parse_header(const char *filename, const char *title, char **buf) +{ + char *line; + + assert_return(!empty_str(filename) && !empty_str(title) && buf && *buf, false); + + eat_whitespace_and_comments(buf); + + line = get_line(buf); + if (!line) { + error("%s: missing header in configuration file", filename); + return false; + } else { + char titlehdr[strlen(title) + 3]; + + sprintf(titlehdr, "[%s]", title); + if (!streq(line, titlehdr)) { + error("%s: incorrect header in configuration file", filename); + return false; + } + } + + return true; +} diff --git a/minecproxy/config-parser.h b/minecproxy/config-parser.h new file mode 100644 index 0000000..3a117a3 --- /dev/null +++ b/minecproxy/config-parser.h @@ -0,0 +1,59 @@ +#ifndef fooconfigparserhfoo +#define fooconfigparserhfoo + +#define _GNU_SOURCE +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> +#include <signal.h> + +enum cfg_value_type { + CFG_VAL_TYPE_INVALID, + CFG_VAL_TYPE_STRING, + CFG_VAL_TYPE_UINT16, + CFG_VAL_TYPE_ADDRS, + CFG_VAL_TYPE_ASYNC_ADDRS, + CFG_VAL_TYPE_BOOL, +}; + +struct dns_async; + +typedef void (dns_cb_t)(struct dns_async *); + +struct dns_async { + char name[FQDN_STR_LEN + 1]; + char port[PORT_STR_LEN + 1]; + struct addrinfo req; + struct gaicb gcb; + struct sigevent sev; + dns_cb_t *cb; + void *priv; + struct list_head list; +}; + +struct cfg_key_value_map { + const char *key_name; + int key_value; + enum cfg_value_type value_type; +}; + +struct cfg_value { + enum cfg_value_type type; + union { + const char *str; + uint16_t uint16; + struct list_head saddrs; + struct dns_async *dns_async; + bool boolean; + }; +}; + +bool config_parse_line(const char *filename, char **buf, + struct cfg_key_value_map *kvmap, + int *rkey, const char **rkeyname, + struct cfg_value *rvalue); + +bool config_parse_header(const char *filename, + const char *title, char **buf); + +#endif diff --git a/minecproxy/idle.c b/minecproxy/idle.c new file mode 100644 index 0000000..c49846d --- /dev/null +++ b/minecproxy/idle.c @@ -0,0 +1,378 @@ +#define _GNU_SOURCE +#include <inttypes.h> +#include <sys/socket.h> +#include <netinet/ip.h> +#include <arpa/inet.h> +#include <string.h> +#include <ctype.h> +#include <errno.h> + +#include "main.h" +#include "uring.h" +#include "server.h" +#include "idle.h" +#include "ptimer.h" + +struct idle { + struct ptimer_task ptask; + struct uring_task task; +}; + +static inline void +write_byte(char **pos, char byte) +{ + assert_return(pos && *pos); + + **pos = byte; + (*pos)++; +} + +#define MC_HELO 0x00 +#define MC_NEXT_STATE_STATUS 0x01 +#define MC_GET_STATUS 0x00 +#define MC_VARINT_MAX_BYTES 5 +#define MC_STATUS_REPLY 0x00 +#define MC_UNDEFINED_VERSION -1 + +static inline void +write_varint(char **pos, int32_t orig) +{ + assert_return(pos && *pos); + + uint32_t val = (uint32_t)orig; + + while (val) { + **pos = val & 0x7f; + val >>= 7; + if (val > 0) + **pos |= 0x80; + (*pos)++; + } +} + +/* + * return value: + * positive = varint parsed + * zero = need more bytes + * negative = error + */ +static inline int +read_varint(char **pos, size_t *remain, int32_t *res) +{ + unsigned consumed; + uint32_t val = 0; + + assert_return(pos && *pos && remain && res, -1); + + for (consumed = 1; consumed <= *remain; consumed++) { + uint32_t tmp; + + tmp = **pos & 0x7f; + val += (tmp << (7 * (consumed - 1))); + (*pos)++; + + if (!(tmp & 0x80)) + break; + } + + if (consumed > *remain) + return 0; + else if (consumed > MC_VARINT_MAX_BYTES) + return -1; + + *remain -= consumed; + *res = (int32_t)val; + return 1; +} + +static inline void +write_bytes(char **pos, const char *bytes, size_t n) +{ + assert_return(pos && *pos && bytes && n > 0); + + memcpy(*pos, bytes, n); + *pos += n; +} + +static inline void +write_str(char **pos, const char *str) +{ + size_t len; + + assert_return(pos && *pos && !empty_str(str)); + + len = strlen(str); + write_varint(pos, len); + write_bytes(pos, str, len); +} + +static inline void +write_cmd(char **pos, const char *begin, const char *end) +{ + assert_return(pos && *pos && begin && end && end > begin); + + write_varint(pos, end - begin); + write_bytes(pos, begin, end - begin); +} + +static int +idle_check_handshake_complete(struct uring_task *task, int res) +{ + size_t remain; + char *pos; + int32_t mclen; + int r; + + assert_return(task, -EINVAL); + assert_task_alive_or(DBG_IDLE, task, return -EINTR); + + remain = task->tbuf->len; + pos = task->tbuf->buf; + + r = read_varint(&pos, &remain, &mclen); + if (r < 0) { + error("failed to parse message length"); + return -EINVAL; + } else if (r == 0) { + return 0; + } else if (mclen < 2) { + error("short MC message"); + return -EINVAL; + } + + if (mclen < remain) { + debug(DBG_IDLE, "short MC message - len: %" PRIi32 ", remain: %zu", + mclen, remain); + return 0; + } + + debug(DBG_IDLE, "Complete message"); + return 1; +} + +#define ONLINE_NEEDLE "\"online\"" +static int +get_player_count(const char *pos, size_t remain) +{ + /* + * Example JSON (line breaks added): + * {"description":{ + * "text":"A Minecraft Server"}, + * "players":{"max":20,"online":0}, + * "version":{"name":"1.15.2","protocol":578} + * } + */ + char *online; + char *end; + unsigned count; + + assert_return(pos && remain > 0, -1); + + online = memmem(pos, remain, ONLINE_NEEDLE, strlen(ONLINE_NEEDLE)); + if (!online) { + error("could not find online count in JSON"); + return -1; + } + + remain -= (online - pos); + + end = memchr(online, '}', remain); + if (!end) { + error("could not parse JSON (no end)"); + return -1; + } + *end = '\0'; + + if (sscanf(online, ONLINE_NEEDLE " : %u", &count) != 1) { + error("could not parse JSON (online count)"); + return -1; + } + + return count; +} + +static void +idle_check_handshake_reply(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, idle_task); + int32_t mclen; + int32_t jsonlen; + char *pos; + size_t remain; + int player_count = -1; + int r; + + assert_return(task); + assert_task_alive(DBG_IDLE, task); + + debug(DBG_IDLE, "res: %i", res); + if (res < 0) + goto out; + + /* + fprintf(stderr, "Received MC message (%i bytes):", res); + for (int i = 0; i < res; i++) + fprintf(stderr, "0x%02hhx ", idle->remotebuf[i]); + fprintf(stderr, "n"); + */ + + remain = server->idle_buf.len; + pos = server->idle_buf.buf; + + r = read_varint(&pos, &remain, &mclen); + if (r <= 0 || mclen < 2 || mclen < remain) { + /* Should not happen since the msg has been checked already */ + error("invalid message"); + goto out; + } + + debug(DBG_IDLE, "MC message - len: %" PRIi32 ", remain: %zu", + mclen, remain); + + if (*pos != MC_STATUS_REPLY) { + error("unknown server reply (0x%02hhx)", *pos); + goto out; + } + + pos++; + remain--; + + r = read_varint(&pos, &remain, &jsonlen); + if (r <= 0) { + error("could not read JSON length"); + goto out; + } + + debug(DBG_IDLE, "MC - json len: %" PRIi32 ", remain: %zu", + jsonlen, remain); + + if (jsonlen < remain) { + error("invalid JSON length"); + goto out; + } + + /* + fprintf(stderr, "JSON: "); + for (int i = 0; i < jsonlen; i++) + fprintf(stderr, "%c", pos[i]); + */ + + player_count = get_player_count(pos, remain); + +out: + uring_task_close_fd(task); + server_set_active_players(server, player_count); + return; +} + +static void +idle_check_handshake_sent(struct uring_task *task, int res) +{ + assert_return(task); + assert_task_alive(DBG_IDLE, task); + + debug(DBG_IDLE, "sent %i bytes", res); + if (res < 0) { + uring_task_close_fd(task); + return; + } + + uring_tbuf_read_until(task, + idle_check_handshake_complete, + idle_check_handshake_reply); +} + +void +idle_check_get_player_count(struct server *server, struct connection *conn) +{ + char buf[1024]; + char *pos; + char *cmdbuf = server->idle_buf.buf; + uint16_t port; + char hostname[INET6_ADDRSTRLEN]; + + assert_return(server && conn && server->idle_task.priv); + + port = saddr_port(&conn->remote); + saddr_addr(&conn->remote, hostname, sizeof(hostname)); + + pos = buf; + write_byte(&pos, MC_HELO); + write_varint(&pos, MC_UNDEFINED_VERSION); + write_str(&pos, hostname); + write_byte(&pos, (port >> 8) & 0xff); + write_byte(&pos, (port >> 0) & 0xff); + write_byte(&pos, MC_NEXT_STATE_STATUS); + write_cmd(&cmdbuf, buf, pos); + + pos = buf; + write_byte(&pos, MC_GET_STATUS); + write_cmd(&cmdbuf, buf, pos); + + server->idle_buf.len = (cmdbuf - server->idle_buf.buf); + debug(DBG_IDLE, "sending MC message (%zu bytes)", server->idle_buf.len); + + uring_tbuf_write(&server->idle_task, idle_check_handshake_sent); +} + +static void +idle_cb(struct ptimer_task *ptask) +{ + struct idle *idle = container_of(ptask, struct idle, ptask); + struct server *server; + + assert_return(ptask); + assert_task_alive(DBG_IDLE, &idle->task); + + debug(DBG_IDLE, "timer fired"); + + list_for_each_entry(server, &cfg->servers, list) + server_idle_check(server); +} + +static void +idle_free(struct uring_task *task) +{ + struct idle *idle = container_of(task, struct idle, task); + + assert_return(task); + debug(DBG_IDLE, "task %p, idle %p", task, idle); + xfree(idle); +} + +void +idle_refdump() +{ + assert_return_silent(cfg->idle); + + uring_task_refdump(&cfg->idle->task); +} + +void +idle_delete() +{ + assert_return(cfg->idle); + + debug(DBG_IDLE, "closing fd %i", cfg->idle->task.fd); + ptimer_del_task(&cfg->idle->ptask); + uring_task_destroy(&cfg->idle->task); + cfg->idle = NULL; +} + +void +idle_init() +{ + struct idle *idle; + + assert_return(!cfg->idle); + + idle = zmalloc(sizeof(*idle)); + if (!idle) + die("malloc: %m"); + + ptask_init(&idle->ptask, 60, 0, idle_cb); + uring_task_init(&idle->task, "idle", uring_parent(), idle_free); + ptimer_add_task(&idle->ptask); + cfg->idle = idle; +} + diff --git a/minecproxy/idle.h b/minecproxy/idle.h new file mode 100644 index 0000000..d7e4ab0 --- /dev/null +++ b/minecproxy/idle.h @@ -0,0 +1,13 @@ +#ifndef fooidlehfoo +#define fooidlehfoo + +void idle_check_get_player_count(struct server *server, + struct connection *conn); + +void idle_refdump(); + +void idle_delete(); + +void idle_init(); + +#endif diff --git a/minecproxy/igmp.c b/minecproxy/igmp.c new file mode 100644 index 0000000..dc43a9f --- /dev/null +++ b/minecproxy/igmp.c @@ -0,0 +1,587 @@ +#include <unistd.h> +#include <string.h> +#include <stdint.h> +#include <sys/socket.h> +#include <linux/if_packet.h> +#include <net/ethernet.h> +#include <net/if.h> +#include <netinet/ip.h> +#include <linux/bpf.h> +#include <linux/filter.h> +#include <arpa/inet.h> +#include <errno.h> +#include <sys/ioctl.h> + +#include "main.h" +#include "uring.h" +#include "igmp.h" +#include "announce.h" + +struct igmp { + struct uring_task task; + struct uring_task_buf tbuf; +}; + +#define ETH_HDR_LEN 14 +#define IPV4_MIN_HDR_LEN 20 +#define IGMP_MIN_LEN 8 + +struct __attribute__((packed, scalar_storage_order("big-endian"))) ipv4_hdr { + unsigned version:4; + unsigned ihl:4; + unsigned dscp:6; + unsigned ecn:2; + unsigned length:16; + unsigned id:16; + unsigned flags:3; + unsigned fragment_offset:13; + unsigned ttl:8; + unsigned protocol:8; + unsigned checksum:16; + unsigned src:32; + unsigned dst:32; + unsigned options[]; +}; + +enum igmp_type { + IGMP_MEMBERSHIP_QUERY = 0x11, + IGMP_V1_MEMBERSHIP_REPORT = 0x12, + IGMP_V2_MEMBERSHIP_REPORT = 0x16, + IGMP_V3_MEMBERSHIP_REPORT = 0x22, + IGMP_V2_LEAVE_GROUP = 0x17 +}; + +union igmp_msg { + struct __attribute__((packed, scalar_storage_order("big-endian"))) { + enum igmp_type type:8; + unsigned unknown:8; + unsigned checksum:16; + } common; + + struct __attribute__((packed, scalar_storage_order("big-endian"))) { + enum igmp_type type:8; + unsigned resptime:8; + unsigned checksum:16; + unsigned addr:32; + } v2; + + struct __attribute__((packed, scalar_storage_order("big-endian"))) { + enum igmp_type type:8; + unsigned reserved1:8; + unsigned checksum:16; + unsigned reserved2:16; + unsigned nrecs:16; + uint8_t records[]; + } v3; +}; + +enum igmp_v3_record_type { + IGMP_V3_REC_MODE_IS_INCL = 1, + IGMP_V3_REC_MODE_IS_EXCL = 2, + IGMP_V3_REC_MODE_CH_INCL = 3, + IGMP_V3_REC_MODE_CH_EXCL = 4 +}; + +union igmp_v3_record { + struct __attribute__((packed, scalar_storage_order("big-endian"))) { + enum igmp_v3_record_type type:8; + unsigned auxlen:8; + unsigned nsrcs:16; + unsigned addr:32; + uint32_t saddr[]; + }; +}; + +static inline unsigned short +from32to16(unsigned int x) +{ + /* add up 16-bit and 16-bit for 16+c bit */ + x = (x & 0xffff) + (x >> 16); + /* add up carry.. */ + x = (x & 0xffff) + (x >> 16); + return x; +} + +static unsigned int +do_csum(const unsigned char *buf, int len) +{ + int odd; + unsigned int result = 0; + + assert_return(buf && len > 0, 0); + + odd = 1 & (unsigned long)buf; + if (odd) { +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + result += (*buf << 8); +#else + result = *buf; +#endif + len--; + buf++; + } + + if (len >= 2) { + if (2 & (unsigned long)buf) { + result += *(unsigned short *)buf; + len -= 2; + buf += 2; + } + if (len >= 4) { + const unsigned char *end = buf + ((unsigned)len & ~3); + unsigned int carry = 0; + do { + unsigned int w = *(unsigned int *)buf; + buf += 4; + result += carry; + result += w; + carry = (w > result); + } while (buf < end); + result += carry; + result = (result & 0xffff) + (result >> 16); + } + if (len & 2) { + result += *(unsigned short *)buf; + buf += 2; + } + } + + if (len & 1) +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + result += *buf; +#else + result += (*buf << 8); +#endif + + result = from32to16(result); + if (odd) + result = ((result >> 8) & 0xff) | ((result & 0xff) << 8); + + return result; +} + +static inline bool +csum_valid(const char *buf, size_t len) +{ + assert_return(buf && len > 0, false); + + return do_csum((unsigned const char *)buf, len) == 0xffff; +} + +static void +igmp_match() +{ + debug(DBG_IGMP, "multicast request discovered"); + /* + * IGMP messages are sent with approx 120-130 sec intervals, + * so set time to 5 minutes to allow some slack. + */ + announce_start(5 * 60); +} + +static void +igmp_parse(struct igmp *igmp) +{ + char *buf; + size_t len; + struct ipv4_hdr *hdr; + size_t body_len; + union igmp_msg *igmp_msg; + + assert_return(igmp); + + buf = igmp->task.tbuf->buf; + len = igmp->task.tbuf->len; + hdr = (struct ipv4_hdr *)buf; + + if (len <= IPV4_MIN_HDR_LEN) + return; + + if (hdr->version != 4) + return; + + if (hdr->ihl * 4 < IPV4_MIN_HDR_LEN) + return; + + if (hdr->length < hdr->ihl * 4) + return; + + if (hdr->length != len) + return; + + if (hdr->fragment_offset > 0) + return; + + if (hdr->protocol != IPPROTO_IGMP) + return; + + if (!csum_valid(buf, hdr->ihl * 4)) + return; + + body_len = hdr->length - hdr->ihl * 4; + igmp_msg = (union igmp_msg *)(buf + hdr->ihl * 4); + + if (body_len < IGMP_MIN_LEN) + return; + + switch (igmp_msg->common.type) { + case IGMP_V1_MEMBERSHIP_REPORT: + debug(DBG_IGMP, "igmp_v1_membership_report"); + /* fall through */ + + case IGMP_V2_MEMBERSHIP_REPORT: { + struct in_addr src; + char src_str[INET_ADDRSTRLEN]; + struct in_addr dst; + char dst_str[INET_ADDRSTRLEN]; + struct in_addr grp; + char grp_str[INET_ADDRSTRLEN]; + + src.s_addr = htonl(hdr->src); + inet_ntop(AF_INET, &src, src_str, sizeof(src_str)); + dst.s_addr = htonl(hdr->dst); + inet_ntop(AF_INET, &dst, dst_str, sizeof(dst_str)); + grp.s_addr = htonl(igmp_msg->v2.addr); + inet_ntop(AF_INET, &grp, grp_str, sizeof(grp_str)); + + debug(DBG_IGMP, "igmp_v2_membership_report %s -> %s (%s)", + src_str, dst_str, grp_str); + + if (body_len != IGMP_MIN_LEN) { + error("IGMPv2 invalid size"); + break; + } + + if (!csum_valid((char *)igmp_msg, body_len)) { + error("IGMPv2 invalid checksum"); + break; + } + + debug(DBG_IGMP, "Inet addr: 0x%x", inet_addr("224.0.2.60")); + debug(DBG_IGMP, "Inet addr: 0x%x", cinet_addr(224,0,2,60)); + debug(DBG_IGMP, "Inet addr: 0x%x", chtobe32(cinet_addr(224,0,2,60))); + if (htonl(hdr->dst) != cinet_addr(224,0,2,60)) { + debug(DBG_IGMP, "IGMPv2 invalid dst addr"); + break; + } + + if (htonl(igmp_msg->v2.addr) != cinet_addr(224,0,2,60)) { + debug(DBG_IGMP, "IGMPv2 invalid grp addr"); + break; + } + + igmp_match(); + break; + } + + case IGMP_V3_MEMBERSHIP_REPORT: { + char *pos = (char *)igmp_msg; + struct in_addr src; + char src_str[INET_ADDRSTRLEN]; + struct in_addr dst; + char dst_str[INET_ADDRSTRLEN]; + + src.s_addr = htonl(hdr->src); + inet_ntop(AF_INET, &src, src_str, sizeof(src_str)); + dst.s_addr = htonl(hdr->dst); + inet_ntop(AF_INET, &dst, dst_str, sizeof(dst_str)); + + debug(DBG_IGMP, "igmp_v3_membership_report %s -> %s", + src_str, dst_str); + + debug(DBG_IGMP, "IGMPv3" + " type: %x," + " reserved: %u," + " csum: %u," + " reserved: %u," + " nrecs: %u," + " size: %zu\n", + igmp_msg->v3.type, + igmp_msg->v3.reserved1, + igmp_msg->v3.checksum, + igmp_msg->v3.reserved2, + igmp_msg->v3.nrecs, + sizeof(igmp_msg->v3)); + + if (!csum_valid(pos, body_len)) { + error("IGMPv3 csum invalid"); + break; + } + + if (htonl(hdr->dst) != cinet_addr(224,0,0,22)) { + debug(DBG_IGMP, "IGMPv2 invalid dst addr"); + break; + } + + body_len -= sizeof(igmp_msg->v3); + pos += sizeof(igmp_msg->v3); + + for (unsigned rec = 0; rec < igmp_msg->v3.nrecs; rec++) { + union igmp_v3_record *record = (union igmp_v3_record *)pos; + struct in_addr grp; + char grp_str[INET_ADDRSTRLEN]; + + if (body_len < sizeof(*record)) { + error("IGMPv3 too short"); + break; + } + + grp.s_addr = htonl(record->addr); + inet_ntop(AF_INET, &grp, grp_str, sizeof(grp_str)); + debug(DBG_IGMP, "received IGMPv3 record to %s", grp_str); + + debug(DBG_IGMP, "IGMPv3 rec, " + " type: %u," + " auxlen: %u," + " nsrcs: %u," + " addr: %s," + " size: %zu bytes", + record->type, + record->auxlen, + record->nsrcs, + grp_str, + sizeof(*record)); + + body_len -= sizeof(*record); + pos += sizeof(*record); + + if (body_len < record->nsrcs * sizeof(uint32_t) + record->auxlen) { + error("IGMPv3 too short"); + break; + } + + for (unsigned addr = 0; addr < record->nsrcs; addr++) { + struct in_addr grp_src; + char grp_src_str[INET_ADDRSTRLEN]; + + grp_src.s_addr = htonl(record->saddr[addr]); + inet_ntop(AF_INET, &grp_src, grp_src_str, sizeof(grp_src_str)); + debug(DBG_IGMP, "received IGMPv3 record src %s", + grp_src_str); + + body_len -= sizeof(record->saddr[addr]); + pos += sizeof(record->saddr[addr]); + } + + /* Yes, EXCL, not INCL, see RFC3376 */ + if ((htonl(record->addr) == cinet_addr(224,0,2,60)) && + ((record->type == IGMP_V3_REC_MODE_IS_EXCL) || + (record->type == IGMP_V3_REC_MODE_CH_EXCL))) + igmp_match(); + + body_len -= record->auxlen; + pos += record->auxlen; + } + + break; + } + + case IGMP_MEMBERSHIP_QUERY: + debug(DBG_IGMP, "igmp_membership_query"); + break; + + case IGMP_V2_LEAVE_GROUP: + debug(DBG_IGMP, "igmp_v2_leave_group"); + break; + + default: + debug(DBG_IGMP, "IGMP msg type %02hhx", igmp_msg->common.type); + break; + } + + buf += hdr->length; + len -= hdr->length; +} + +static void +igmp_read_cb(struct uring_task *task, int res) +{ + struct igmp *igmp = container_of(task, struct igmp, task); + + assert_return(task); + assert_task_alive(DBG_IGMP, task); + + debug(DBG_IGMP, "task %p, igmp %p, res %i", task, igmp, res); + if (res < 0) { + error("res: %i", res); + return; + } + + task->tbuf->len = res; + + if (task->saddr.storage.ss_family == AF_PACKET || + task->saddr.ll.sll_protocol == htons(ETH_P_IP)) + igmp_parse(igmp); + else + debug(DBG_IGMP, "invalid packet type received"); + + uring_tbuf_read(&igmp->task, igmp_read_cb); +} + +static void +igmp_free(struct uring_task *task) +{ + struct igmp *igmp = container_of(task, struct igmp, task); + + assert_return(task); + debug(DBG_IGMP, "task %p, igmp %p", task, igmp); + xfree(igmp); +} + +void +igmp_refdump() +{ + assert_return_silent(cfg->igmp); + + uring_task_refdump(&cfg->igmp->task); +} + +void +igmp_delete() +{ + assert_return_silent(cfg->igmp); + + debug(DBG_IGMP, "closing fd %i", cfg->igmp->task.fd); + uring_task_destroy(&cfg->igmp->task); + cfg->igmp = NULL; +} + +void +igmp_init() +{ + static const struct sock_filter filter[] = { + BPF_STMT(BPF_LD + BPF_W + BPF_LEN, 0), /* A <- packet length */ + BPF_JUMP(BPF_JMP + BPF_JGE + BPF_K, sizeof(struct iphdr), 1, 0), /* A < sizeof(iphdr) */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LD + BPF_B + BPF_ABS, 0 /* iphdr[0] */), /* A <- version + ihl */ + BPF_STMT(BPF_ALU + BPF_RSH + BPF_K, 4), /* A <- A >> 4 (version) */ + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, 0x04, 1, 0), /* A != 4 */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LD + BPF_B + BPF_ABS, offsetof(struct iphdr, protocol)), /* A <- ip protocol */ + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, IPPROTO_IGMP, 1, 0), /* A != IPPROTO_IGMP */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LD + BPF_W + BPF_ABS, offsetof(struct iphdr, daddr)), /* A <- ip dst addr */ + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, chtobe32(cinet_addr(224,0,2,60)), 2, 0), /* A != 224.0.2.60 */ + //BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, 0xe000023c, 2, 0), /* A != 224.0.2.60 */ + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, chtobe32(cinet_addr(224,0,0,22)), 1, 0), /* A != 224.0.0.22 */ + //BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K, 0xe0000016, 1, 0), /* A != 224.0.0.22 */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LDX + BPF_B + BPF_MSH, 0 /* iphdr[0] */), /* X <- pkt->ihl * 4 */ + BPF_STMT(BPF_LD + BPF_IMM, 20), /* A <- 20 */ + BPF_JUMP(BPF_JMP + BPF_JGT + BPF_X, 0, 0, 1), /* A > X */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LD + BPF_H + BPF_ABS, offsetof(struct iphdr, tot_len)), /* A <- ip tot_len */ + BPF_JUMP(BPF_JMP + BPF_JGT + BPF_X, 0, 1, 0), /* A <= ip->ihl * 4 */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + BPF_STMT(BPF_ALU + BPF_SUB + BPF_X, 0), /* A <- A - X (bodylen) */ + BPF_JUMP(BPF_JMP + BPF_JGE + BPF_K, 8, 1, 0), /* A < 8 */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_LD + BPF_H + BPF_ABS, offsetof(struct iphdr, tot_len)), /* A <- ip->tot_len */ + BPF_STMT(BPF_MISC + BPF_TAX, 0), /* X <- A */ + BPF_STMT(BPF_LD + BPF_W + BPF_LEN, 0), /* A <- packet length */ + BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_X, 0, 1, 0), /* A != ip->tot_len */ + BPF_STMT(BPF_RET + BPF_K, 0), /* drop packet */ + + BPF_STMT(BPF_RET + BPF_K, (uint32_t) -1), /* accept packet */ + }; + static const struct sock_fprog fprog = { + .len = ARRAY_SIZE(filter), + .filter = (struct sock_filter*) filter, + }; + struct sockaddr_ll addr = { + .sll_family = AF_PACKET, + .sll_ifindex = 0, + .sll_pkttype = PACKET_MULTICAST, + }; + struct igmp *igmp; + int sfd; + int opt; + + if (!cfg->do_igmp) { + debug(DBG_IGMP, "igmp snooping disabled"); + return; + } + + assert_return(!cfg->igmp); + + igmp = zmalloc(sizeof(*igmp)); + if (!igmp) + return; + + /* + * Kernel limitation, must be ETH_P_ALL, not ETH_P_IP or we won't get + * outgoing packets, https://lkml.org/lkml/1999/12/23/112 + */ + sfd = socket(AF_PACKET, SOCK_DGRAM | SOCK_CLOEXEC, htons(ETH_P_ALL)); + if (sfd < 0) { + if (errno == EACCES || errno == EPERM) + info("Unable to do IGMP snooping, permission denied"); + else + error("socket: %m"); + goto error; + } + + if (setsockopt(sfd, SOL_SOCKET, SO_ATTACH_FILTER, &fprog, sizeof(fprog)) < 0) { + error("setsockopt(SO_ATTACH_FILTER): %m"); + goto error; + } + + if (setsockopt(sfd, SOL_SOCKET, SO_LOCK_FILTER, &opt, sizeof(opt)) < 0) { + error("setsockopt(SO_LOCK_FILTER): %m"); + goto error; + } + + if (cfg->igmp_iface) { + struct ifreq ifreq; + int r; + + r = snprintf(ifreq.ifr_name, sizeof(ifreq.ifr_name), + "%s", cfg->igmp_iface); + if (r < 0 || r >= sizeof(ifreq.ifr_name)) + die("invalid interface name: %s", cfg->igmp_iface); + + if (ioctl(sfd, SIOCGIFINDEX, &ifreq) < 0) + die("ioctl: %m"); + + debug(DBG_IGMP, "using interface %s (%i)", + cfg->igmp_iface, ifreq.ifr_ifindex); + + struct packet_mreq mreq = { + .mr_ifindex = ifreq.ifr_ifindex, + .mr_type = PACKET_MR_ALLMULTI + }; + + if (setsockopt(sfd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, + &mreq, sizeof(mreq)) < 0) { + error("setsockopt(PACKET_ADD_MEMBERSHIP): %m"); + goto error; + } + } + + /* can't set .sll_protocol to htons(ETH_P_IP), see comment above */ + if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + error("bind: %m"); + goto error; + } + + debug(DBG_IGMP, "init successful, using fd %i", sfd); + uring_task_init(&igmp->task, "igmp", uring_parent(), igmp_free); + uring_task_set_fd(&igmp->task, sfd); + uring_task_set_buf(&igmp->task, &igmp->tbuf); + igmp->task.saddr.addrlen = sizeof(igmp->task.saddr.ll); + uring_tbuf_recvmsg(&igmp->task, igmp_read_cb); + + cfg->igmp = igmp; + + return; + +error: + close(sfd); + xfree(igmp); +} diff --git a/minecproxy/igmp.h b/minecproxy/igmp.h new file mode 100644 index 0000000..80875b0 --- /dev/null +++ b/minecproxy/igmp.h @@ -0,0 +1,10 @@ +#ifndef fooigmphfoo +#define fooigmphfoo + +void igmp_refdump(); + +void igmp_delete(); + +void igmp_init(); + +#endif diff --git a/minecproxy/main.c b/minecproxy/main.c new file mode 100644 index 0000000..bbe3fad --- /dev/null +++ b/minecproxy/main.c @@ -0,0 +1,741 @@ +#define _GNU_SOURCE +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdbool.h> +#include <getopt.h> +#include <systemd/sd-daemon.h> +#include <cap-ng.h> +#include <pwd.h> +#include <grp.h> +#include <sys/time.h> +#include <sys/resource.h> + +#include "main.h" +#include "signal-handler.h" +#include "uring.h" +#include "config-parser.h" +#include "server.h" +#include "server-config.h" +#include "announce.h" +#include "systemd.h" +#include "igmp.h" +#include "idle.h" +#include "ptimer.h" +#include <config.h> + +/* Global */ +struct cfg *cfg = NULL; +bool exiting = false; + +/* Local */ +static bool daemonize = false; +static FILE *log_file = NULL; +static const char *log_file_path = NULL; + +#define ANSI_RED "\x1B[0;31m" +#define ANSI_GREEN "\x1B[0;32m" +#define ANSI_YELLOW "\x1B[0;33m" +#define ANSI_BLUE "\x1B[0;34m" +#define ANSI_MAGENTA "\x1B[0;35m" +#define ANSI_GREY "\x1B[0;38;5;245m" +#define ANSI_NORMAL "\x1B[0m" + +static void +msg(enum debug_lvl lvl, const char *fmt, va_list ap) +{ + static bool first = true; + static bool use_colors = false; + static bool sd_daemon = false; + const char *color; + const char *sd_lvl; + + assert_return(lvl != 0 && !empty_str(fmt) && ap); + + while (first) { + int fd; + const char *e; + + first = false; + + /* assume we're not launched by systemd when daemonized */ + if (daemonize) { + sd_daemon = false; + use_colors = false; + break; + } + + if (log_file) { + sd_daemon = false; + use_colors = false; + break; + } + + if (getenv("NO_COLOR")) { + sd_daemon = false; + use_colors = false; + break; + } + + fd = fileno(stderr); + if (fd < 0) { + /* Umm... */ + sd_daemon = true; + use_colors = false; + break; + } + + if (!isatty(fd)) { + sd_daemon = true; + use_colors = false; + break; + } + + /* systemd wouldn't normally set TERM */ + e = getenv("TERM"); + if (!e) { + sd_daemon = true; + use_colors = false; + break; + } + + if (streq(e, "dumb")) { + sd_daemon = false; + use_colors = false; + break; + } + + sd_daemon = false; + use_colors = true; + } + + switch (lvl) { + case DBG_ERROR: + sd_lvl = SD_ERR; + color = use_colors ? ANSI_RED : NULL; + break; + case DBG_VERBOSE: + sd_lvl = SD_INFO; + color = NULL; + break; + case DBG_INFO: + sd_lvl = SD_NOTICE; + color = NULL; + break; + default: + sd_lvl = SD_DEBUG; + color = use_colors ? ANSI_GREY : NULL; + break; + } + + if (sd_daemon) + fprintf(stderr, sd_lvl); + else if (color) + fprintf(stderr, color); + + vfprintf(log_file ? log_file : stderr, fmt, ap); + + if (color) + fprintf(stderr, ANSI_NORMAL); +} + +void +__debug(enum debug_lvl lvl, const char *fmt, ...) +{ + va_list ap; + + assert_return(lvl != 0 && !empty_str(fmt)); + + va_start(ap, fmt); + msg(lvl, fmt, ap); + va_end(ap); +} + +__attribute__((noreturn)) void +__die(const char *fmt, ...) +{ + va_list ap; + + if (!empty_str(fmt)) { + va_start(ap, fmt); + msg(DBG_ERROR, fmt, ap); + va_end(ap); + } else + error("fmt not set"); + + sd_notifyf(0, "STATUS=Error, shutting down"); + exit(EXIT_FAILURE); +}; + +static void +cfg_free(struct uring_task *task) +{ + struct cfg *xcfg = container_of(task, struct cfg, task); + + assert_return(task && xcfg == cfg); + + debug(DBG_SIG, "called"); + systemd_delete(cfg); + xfree(cfg->igmp_iface); + cfg->igmp_iface = NULL; + exiting = true; + /* The cfg struct is free:d in main() */ +} + +enum mcfg_keys { + MCFG_KEY_INVALID = 0, + MCFG_KEY_IGMP, + MCFG_KEY_IGMP_IFACE, + MCFG_KEY_ANN_INTERVAL, + MCFG_KEY_PROXY_CONN_INTERVAL, + MCFG_KEY_PROXY_CONN_ATTEMPTS, + MCFG_KEY_SOCKET_DEFER, + MCFG_KEY_SOCKET_FREEBIND, + MCFG_KEY_SOCKET_KEEPALIVE, + MCFG_KEY_SOCKET_IPTOS, + MCFG_KEY_SOCKET_NODELAY, +}; + +struct cfg_key_value_map mcfg_key_map[] = { + { + .key_name = "igmp", + .key_value = MCFG_KEY_IGMP, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = "igmp_iface", + .key_value = MCFG_KEY_IGMP_IFACE, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "announce_interval", + .key_value = MCFG_KEY_ANN_INTERVAL, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "proxy_connection_interval", + .key_value = MCFG_KEY_PROXY_CONN_INTERVAL, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "proxy_connection_attempts", + .key_value = MCFG_KEY_PROXY_CONN_ATTEMPTS, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "socket_defer", + .key_value = MCFG_KEY_SOCKET_DEFER, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = "socket_freebind", + .key_value = MCFG_KEY_SOCKET_FREEBIND, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = "socket_keepalive", + .key_value = MCFG_KEY_SOCKET_KEEPALIVE, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = "socket_iptos", + .key_value = MCFG_KEY_SOCKET_IPTOS, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = "socket_nodelay", + .key_value = MCFG_KEY_SOCKET_NODELAY, + .value_type = CFG_VAL_TYPE_BOOL, + }, { + .key_name = NULL, + .key_value = MCFG_KEY_INVALID, + .value_type = CFG_VAL_TYPE_INVALID, + } +}; + +static void +cfg_read() +{ + FILE *cfgfile; + const char *path; + char buf[4096]; + char *pos = buf; + size_t rd = 0; + size_t r; + + assert_return(cfg); + + if (cfg->cfg_file) + path = cfg->cfg_file; + else + path = DEFAULT_MAIN_CFG_FILE; + + cfgfile = fopen(path, "re"); + if (!cfgfile) { + /* ENOENT is only an error with an explicitly set path */ + if (errno == ENOENT && !cfg->cfg_file) + return; + else if (errno == ENOENT) + die("main config file (%s) missing", path); + else + die("fopen(%s): %m", path); + } + + debug(DBG_CFG, "opened main config file (%s)", path); + + while (rd < sizeof(buf)) { + r = fread(pos, 1, sizeof(buf) - rd - 1, cfgfile); + if (r == 0) + break; + rd += r; + pos += r; + } + + if (rd == 0) + die("main config file (%s) zero size", path); + + if (rd >= sizeof(buf)) + die("main config file (%s) too large", path); + + fclose(cfgfile); + *pos = '\0'; + pos = buf; + + if (!config_parse_header(path, "mcproxy", &pos)) + die("main config file (%s) invalid", path); + + while (true) { + int key; + const char *keyname; + struct cfg_value value; + + if (!config_parse_line(path, &pos, mcfg_key_map, + &key, &keyname, &value)) + break; + + if (key == MCFG_KEY_INVALID) + die("main config file (%s) invalid", path); + + debug(DBG_CFG, "main cfg: key %s", keyname); + + switch (key) { + + case MCFG_KEY_IGMP: + cfg->do_igmp = value.boolean; + break; + + case MCFG_KEY_IGMP_IFACE: + cfg->igmp_iface = xstrdup(value.str); + if (!cfg->igmp_iface) + die("xstrdup: %m"); + + break; + + case MCFG_KEY_ANN_INTERVAL: + cfg->announce_interval = value.uint16; + break; + + case MCFG_KEY_PROXY_CONN_INTERVAL: + cfg->proxy_connection_interval = value.uint16; + break; + + case MCFG_KEY_PROXY_CONN_ATTEMPTS: + cfg->proxy_connection_attempts = value.uint16; + break; + + case MCFG_KEY_SOCKET_DEFER: + cfg->socket_defer = value.boolean; + break; + + case MCFG_KEY_SOCKET_FREEBIND: + cfg->socket_freebind = value.boolean; + break; + + case MCFG_KEY_SOCKET_KEEPALIVE: + cfg->socket_keepalive = value.boolean; + break; + + case MCFG_KEY_SOCKET_IPTOS: + cfg->socket_iptos = value.boolean; + break; + + case MCFG_KEY_SOCKET_NODELAY: + cfg->socket_nodelay = value.boolean; + break; + + case MCFG_KEY_INVALID: + default: + die("main config file (%s) invalid", path); + } + } +} + +const struct { + const char *name; + unsigned val; +} debug_category_str[] = { + { + .name = "config", + .val = DBG_CFG, + },{ + .name = "refcount", + .val = DBG_REF, + },{ + .name = "malloc", + .val = DBG_MALLOC, + },{ + .name = "announce", + .val = DBG_ANN, + },{ + .name = "signal", + .val = DBG_SIG, + },{ + .name = "uring", + .val = DBG_UR, + },{ + .name = "server", + .val = DBG_SRV, + },{ + .name = "proxy", + .val = DBG_PROXY, + },{ + .name = "rcon", + .val = DBG_RCON, + },{ + .name = "idle", + .val = DBG_IDLE, + },{ + .name = "igmp", + .val = DBG_IGMP, + },{ + .name = "systemd", + .val = DBG_SYSD, + },{ + .name = "dns", + .val = DBG_DNS, + },{ + .name = "timer", + .val = DBG_TIMER, + },{ + .name = NULL, + .val = 0, + } +}; + +__attribute__((noreturn)) static void +usage(int argc, char **argv, bool invalid) +{ + if (invalid) + info("Invalid option(s)"); + + info("Usage: %s [OPTIONS]\n" + "\n" + "Valid options:\n" + " -c, --cfgdir=DIR\tlook for configuration files in DIR\n" + " -u, --user=USER\trun as USER\n" + " -D, --daemonize\trun in daemon mode (disables stderr output)\n" + " -l, --logfile=FILE\tlog to FILE instead of stderr\n" + " -h, --help\t\tprint this information\n" + " -v, --verbose\t\tenable verbose logging\n" + " -d, --debug=CATEGORY\tenable debugging for CATEGORY\n" + "\t\t\t(use \"list\" to see available categories,\n" + "\t\t\t or \"all\" to enable all categories)\n", + argv ? argv[0] : "mcproxy"); + + exit(invalid ? EXIT_FAILURE : EXIT_SUCCESS); +} + +static void +cfg_init(int argc, char **argv) +{ + int c; + unsigned i; + + assert_die(argc > 0 && argv, "invalid arguments"); + + cfg = zmalloc(sizeof(*cfg)); + if (!cfg) + die("malloc: %m"); + + uring_task_init(&cfg->task, "main", NULL, cfg_free); + list_init(&cfg->servers); + + cfg->cfg_dir = DEFAULT_CFG_DIR; + cfg->announce_interval = DEFAULT_ANNOUNCE_INTERVAL; + cfg->proxy_connection_interval = DEFAULT_PROXY_CONN_INTERVAL; + cfg->proxy_connection_attempts = DEFAULT_PROXY_CONN_ATTEMPTS; + cfg->socket_defer = DEFAULT_SOCKET_DEFER; + cfg->socket_freebind = DEFAULT_SOCKET_FREEBIND; + cfg->socket_keepalive = DEFAULT_SOCKET_KEEPALIVE; + cfg->socket_iptos = DEFAULT_SOCKET_IPTOS; + cfg->socket_nodelay = DEFAULT_SOCKET_NODELAY; + cfg->uid = geteuid(); + cfg->gid = getegid(); + + while (true) { + int option_index = 0; + static struct option long_options[] = { + { "cfgdir", required_argument, 0, 'c' }, + { "cfgfile", required_argument, 0, 'C' }, + { "user", required_argument, 0, 'u' }, + { "daemonize", no_argument, 0, 'D' }, + { "logfile", required_argument, 0, 'l' }, + { "help", no_argument, 0, 'h' }, + { "verbose", no_argument, 0, 'v' }, + { "debug", required_argument, 0, 'd' }, + { 0, 0, 0, 0 } + }; + + c = getopt_long(argc, argv, ":c:C:u:Dl:hvd:", + long_options, &option_index); + if (c == -1) + break; + + switch (c) { + case 'c': + cfg->cfg_dir = optarg; + break; + + case 'C': + cfg->cfg_file = optarg; + break; + + case 'v': + debug_mask |= DBG_VERBOSE; + break; + + case 'D': + daemonize = true; + break; + + case 'l': + log_file_path = optarg; + break; + + case 'u': { + struct passwd *pwd; + + errno = 0; + pwd = getpwnam(optarg); + if (!pwd) { + if (errno == 0) + errno = ESRCH; + if (errno == ESRCH) + die("failed to find user %s", optarg); + else + die("failed to find user %s (%m)", optarg); + } + + debug(DBG_CFG, "asked to execute with uid %ji gid %ji", + (intmax_t)pwd->pw_uid, + (intmax_t)pwd->pw_gid); + cfg->uid = pwd->pw_uid; + cfg->gid = pwd->pw_gid; + break; + } + + case 'd': + if (strcaseeq(optarg, "all")) { + debug_mask = ~0; + break; + } else if (strcaseeq(optarg, "list")) { + info("Debug categories:"); + info(" * all"); + for (i = 0; debug_category_str[i].name; i++) + info(" * %s", debug_category_str[i].name); + exit(EXIT_FAILURE); + } + + for (i = 0; debug_category_str[i].name; i++) { + if (strcaseeq(optarg, debug_category_str[i].name)) + break; + } + + if (!debug_category_str[i].name) + usage(argc, argv, true); + + debug_mask |= DBG_VERBOSE; + debug_mask |= debug_category_str[i].val; + break; + + case 'h': + usage(argc, argv, false); + + default: + usage(argc, argv, true); + } + + } + + if (optind < argc) + usage(argc, argv, true); +} + +static void +cfg_apply() +{ + if (cfg->uid == 0 || cfg->gid == 0) + /* This catches both -u root and running as root without -u */ + die("Execution as root is not supported (use -u <someuser>)"); + + capng_clear(CAPNG_SELECT_BOTH); + if (capng_updatev(CAPNG_ADD, + CAPNG_EFFECTIVE | CAPNG_PERMITTED, + CAP_NET_RAW, CAP_NET_BIND_SERVICE, -1)) + die("capng_updatev failed"); + + if (geteuid() != cfg->uid) { + if (capng_change_id(cfg->uid, + cfg->gid, + CAPNG_DROP_SUPP_GRP | CAPNG_CLEAR_BOUNDING)) + die("capng_change_id failed"); + } else { + /* + * This can fail if any of the caps are lacking, but it'll + * be re-checked later. + */ + capng_apply(CAPNG_SELECT_BOTH); + setgroups(0, NULL); + } + + if (daemonize) { + if (daemon(1, 0) < 0) + die("daemon() failed: %m"); + } + + if (log_file_path) { + log_file = fopen(log_file_path, "ae"); + if (!log_file) + die("fopen(%s) failed: %m", log_file_path); + } + + /* + * Do this after caps have been dropped to make sure we're not + * accessing a directory we should have permissions to. + */ + if (chdir(cfg->cfg_dir)) + die("chdir(%s): %m", cfg->cfg_dir); + + if (debug_enabled(DBG_VERBOSE)) { + char *wd; + + wd = get_current_dir_name(); + verbose("Working directory: %s", wd ? wd : "<unknown>"); + free(wd); + } +} + +void +dump_tree() +{ + struct server *server; + + if (!debug_enabled(DBG_REF)) + return; + + info("\n\n"); + info("Dumping Tree"); + info("============"); + uring_task_refdump(&cfg->task); + uring_refdump(); + signal_refdump(); + ptimer_refdump(); + idle_refdump(); + igmp_refdump(); + announce_refdump(); + server_cfg_monitor_refdump(); + list_for_each_entry(server, &cfg->servers, list) + server_refdump(server); + info("============"); + info("\n\n"); +} + +int +main(int argc, char **argv) +{ + struct server *server; + unsigned server_count; + struct rlimit old_rlimit; + + debug_mask = DBG_ERROR | DBG_INFO; + + cfg_init(argc, argv); + + cfg_apply(); + + cfg_read(); + + /* + * In the splice case we use 4 fds per proxy connection... + */ + if (prlimit(0, RLIMIT_NOFILE, NULL, &old_rlimit) == 0) { + struct rlimit new_rlimit; + + new_rlimit.rlim_cur = old_rlimit.rlim_max; + new_rlimit.rlim_max = old_rlimit.rlim_max; + + if (prlimit(0, RLIMIT_NOFILE, &new_rlimit, NULL) == 0) + debug(DBG_MALLOC, "prlimit(NOFILE): %u/%u -> %u/%u", + (unsigned)old_rlimit.rlim_cur, + (unsigned)old_rlimit.rlim_max, + (unsigned)new_rlimit.rlim_cur, + (unsigned)new_rlimit.rlim_cur); + } + + uring_init(); + + ptimer_init(); + + igmp_init(); + + /* Drop CAP_NET_RAW (if we have it), only used for igmp */ + capng_clear(CAPNG_SELECT_BOTH); + if (capng_update(CAPNG_ADD, + CAPNG_EFFECTIVE | CAPNG_PERMITTED, + CAP_NET_BIND_SERVICE)) + die("capng_update failed"); + + if (capng_apply(CAPNG_SELECT_BOTH)) { + /* Try clearing all caps, shouldn't fail */ + capng_clear(CAPNG_SELECT_BOTH); + if (capng_apply(CAPNG_SELECT_BOTH)) + die("capng_apply failed"); + } + + signal_init(); + + server_cfg_monitor_init(); + + announce_init(); + + if (!cfg->igmp) + announce_start(0); + + idle_init(); + + uring_task_put(&cfg->task); + + server_count = 0; + list_for_each_entry(server, &cfg->servers, list) + server_count++; + + sd_notifyf(0, "READY=1\n" + "STATUS=Running, %u server configurations loaded\n" + "MAINPID=%lu", + server_count, + (unsigned long)getpid()); + + info("mcproxy (%s) started, %u server configurations loaded", + VERSION, server_count); + + uring_event_loop(); + + verbose("Exiting"); + + xfree(cfg); + cfg = NULL; + + if (debug_enabled(DBG_MALLOC)) + debug_resource_usage(); + + fflush(stdout); + fflush(stderr); + exit(EXIT_SUCCESS); +} diff --git a/minecproxy/main.h b/minecproxy/main.h new file mode 100644 index 0000000..f1f5df2 --- /dev/null +++ b/minecproxy/main.h @@ -0,0 +1,107 @@ +#ifndef foomainhfoo +#define foomainhfoo + +#include <sys/socket.h> +#include <netinet/ip.h> + +struct cfg; +struct uring_task; + +#include "misc.h" +#include "utils.h" + +extern struct cfg *cfg; +extern bool exiting; + +/* +enum debug_lvl { + DBG_ERROR = (0x1 << 1), + DBG_INFO = (0x1 << 2), + DBG_VERBOSE = (0x1 << 3), + DBG_CFG = (0x1 << 4), + DBG_REF = (0x1 << 5), + DBG_MALLOC = (0x1 << 6), + DBG_ANN = (0x1 << 7), + DBG_SIG = (0x1 << 8), + DBG_UR = (0x1 << 9), + DBG_SRV = (0x1 << 10), + DBG_PROXY = (0x1 << 11), + DBG_RCON = (0x1 << 12), + DBG_IDLE = (0x1 << 13), + DBG_IGMP = (0x1 << 14), + DBG_SYSD = (0x1 << 15), + DBG_DNS = (0x1 << 16), + DBG_TIMER = (0x1 << 17), +}; +*/ + +void dump_tree(); + +/* To save typing in all the function definitions below */ +typedef void (*utask_cb_t)(struct uring_task *, int res); +typedef int (*rutask_cb_t)(struct uring_task *, int res); + +struct uring_task_buf { + char buf[4096]; + size_t len; + size_t done; + struct iovec iov; + struct msghdr msg; +}; + +struct uring_task { + const char *name; + unsigned refcount; + int fd; + struct uring_task *parent; + void (*free)(struct uring_task *); + bool dead; + struct uring_task_buf *tbuf; + + /* called once or repeatedly until is_complete_cb is satisfied */ + utask_cb_t cb; + + /* returns: 0 = not complete; < 0 = error; > 0 = complete */ + rutask_cb_t is_complete_cb; + + /* called once tbuf processing is done */ + utask_cb_t final_cb; + + /* used for recvmsg/sendmsg */ + struct saddr saddr; + void *priv; +}; + +struct cfg { + /* Options */ + uid_t uid; + gid_t gid; + const char *cfg_dir; + const char *cfg_file; + bool do_igmp; + char *igmp_iface; + bool splice_supported; + uint16_t announce_interval; + uint16_t proxy_connection_interval; + uint16_t proxy_connection_attempts; + bool socket_defer; + bool socket_freebind; + bool socket_keepalive; + bool socket_iptos; + bool socket_nodelay; + + /* Bookkeeping */ + struct uring_ev *uring; + struct server_cfg_monitor *server_cfg_monitor; + struct signal_ev *signal; + struct announce *announce; + struct ptimer *ptimer; + struct igmp *igmp; + struct idle *idle; + struct sd_bus *sd_bus; + bool sd_bus_failed; + struct uring_task task; + struct list_head servers; +}; + +#endif diff --git a/minecproxy/meson.build b/minecproxy/meson.build new file mode 100644 index 0000000..db6a31b --- /dev/null +++ b/minecproxy/meson.build @@ -0,0 +1,36 @@ +minecproxy_sources = [ + 'main.c', + 'uring.c', + 'signal-handler.c', + 'server.c', + 'server-proxy.c', + 'server-config.c', + 'server-rcon.c', + 'announce.c', + 'config-parser.c', + 'idle.c', + 'ptimer.c', + 'igmp.c', + 'systemd.c', + 'misc.c', +] + +dep_liburing = dependency('liburing') +dep_libsystemd = dependency('libsystemd') +dep_libcapng = dependency('libcap-ng') + +minecproxy_deps = [ + dep_liburing, + dep_libsystemd, + dep_libcapng, + dep_config_h, + dep_libshared, +] + +executable( + 'minecproxy', + minecproxy_sources, + link_args: [ '-lanl' ], + dependencies: minecproxy_deps, +) + diff --git a/minecproxy/misc.c b/minecproxy/misc.c new file mode 100644 index 0000000..f954618 --- /dev/null +++ b/minecproxy/misc.c @@ -0,0 +1,281 @@ +#include <stdlib.h> +#include <errno.h> +#include <stdint.h> +#include <limits.h> +#include <arpa/inet.h> +#include <string.h> +#include <sys/types.h> +#include <dirent.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <inttypes.h> + +#include "main.h" +#include "misc.h" +#include "uring.h" + +static unsigned total_malloc_count = 0; +static int malloc_count = 0; + +LIST_HEAD(malloc_list); + +struct allocation { + const char *allocfn; + const char *callerfn; + int line; + void *ptr; + size_t size; + struct list_head list; +}; + +static void +add_allocation(const char *allocfn, const char *callerfn, int line, void *ptr, size_t size) +{ + struct allocation *a; + + assert_die(!empty_str(allocfn) && !empty_str(callerfn) && line > 0 && ptr && size > 0, "invalid arguments"); + + a = malloc(sizeof(*a)); + if (!a) + die("malloc: %m"); + a->allocfn = allocfn; + a->callerfn = callerfn; + a->line = line; + a->ptr = ptr; + a->size = size; + list_add(&a->list, &malloc_list); + total_malloc_count++; + malloc_count++; + debug(DBG_MALLOC, "called from %s:%i - %s(%zu) = %p (%p)", + callerfn, line, allocfn, size, ptr, a); +} + +void * +__zmalloc(const char *fn, int line, size_t size) +{ + void *ptr; + + assert_die(!empty_str(fn) && line > 0 && size > 0, "invalid arguments"); + + ptr = calloc(1, size); + if (ptr) + add_allocation("zmalloc", fn, line, ptr, size); + return ptr; +} + +char * +__xstrdup(const char *fn, int line, const char *s) +{ + char *ptr; + + assert_die(!empty_str(fn) && line > 0 && !empty_str(s), "invalid arguments"); + + ptr = strdup(s); + if (ptr) + add_allocation("xstrdup", fn, line, ptr, strlen(s) + 1); + return ptr; +} + +char * +__xstrndup(const char *fn, int line, const char *s, size_t n) +{ + char *ptr; + + assert_die(!empty_str(fn) && line > 0 && !empty_str(s) && n > 0, "invalid arguments"); + + ptr = strndup(s, n); + if (ptr) + add_allocation("xstrndup", fn, line, ptr, n); + return ptr; +} + +void +__xfree(const char *fn, int line, void *ptr) +{ + struct allocation *a, *tmp; + unsigned delete_count = 0; + + assert_die(!empty_str(fn) && line > 0, "invalid arguments"); + + if (!ptr) + return; + free(ptr); + malloc_count--; + + debug(DBG_MALLOC, "called from %s:%i - %p", fn, line, ptr); + + list_for_each_entry_safe(a, tmp, &malloc_list, list) { + if (a->ptr == ptr) { + list_del(&a->list); + free(a); + delete_count++; + } + } + + if (delete_count != 1) { + error("Delete count is %u for ptr 0x%p", delete_count, ptr); + exit(EXIT_FAILURE); + } +} + +void +debug_resource_usage() +{ + struct allocation *a; + DIR *dir; + struct dirent *dent; + char buf[4096]; + ssize_t r; + unsigned file_count = 0; + + debug(DBG_MALLOC, "Still malloced %i (total %u)", + malloc_count, total_malloc_count); + + list_for_each_entry(a, &malloc_list, list) { + debug(DBG_MALLOC, "* Lost allocation - %s:%i - ptr: %p, size: %zu", + a->callerfn, a->line, a->ptr, a->size); + } + + dir = opendir("/proc/self/fd"); + if (!dir) { + error("failed to open fd dir"); + return; + } + + debug(DBG_MALLOC, "Open files:"); + while ((dent = readdir(dir)) != NULL) { + if (streq(dent->d_name, ".") || streq(dent->d_name, "..")) + continue; + + r = readlinkat(dirfd(dir), dent->d_name, buf, sizeof(buf)); + if (r < 0) { + debug(DBG_MALLOC, "Failed to readlink %s", dent->d_name); + continue; + } + buf[r] = '\0'; + debug(DBG_MALLOC, " * %s -> %s", dent->d_name, buf); + file_count++; + } + closedir(dir); + + if (file_count > 4) + debug(DBG_MALLOC, "Lost file descriptor(s)"); + + debug(DBG_MALLOC, "CQEs used: %" PRIu64 ", SQEs used: %" PRIu64, + cqe_count, sqe_count); +} + +void +connection_set_local(struct connection *conn, int fd) +{ + assert_return(conn && fd >= 0); + + conn->local.addrlen = sizeof(conn->local.storage); + if (getsockname(fd, (struct sockaddr *)&conn->local.storage, + &conn->local.addrlen) < 0) + sprintf(conn->local.addrstr, "<unknown>"); + else + saddr_set_addrstr(&conn->local); +} + +void +connection_set_remote(struct connection *conn, struct saddr *remote) +{ + assert_return(conn && remote); + + conn->remote = *remote; + saddr_set_addrstr(&conn->remote); +} + +static void connect_next(struct uring_task *task, struct connection *conn); + +static void +connect_cb(struct uring_task *task, int res) +{ + struct connection *conn; + + assert_return(task && task->priv); + + conn = task->priv; + if (res < 0) { + debug(DBG_SRV, "%s: connection to %s failed", + task->name, conn->remote.addrstr); + uring_task_close_fd(task); + connect_next(task, conn); + return; + } + + connection_set_local(conn, task->fd); + + debug(DBG_SRV, "%s: connection established %s -> %s", + task->name, conn->local.addrstr, conn->remote.addrstr); + + conn->cb(conn, true); +} + +static void +connect_next(struct uring_task *task, struct connection *conn) +{ + struct saddr *remote, *tmp; + int sfd; + unsigned i; + + assert_return(task && conn && conn->cb); +again: + assert_task_alive_or(DBG_UR, task, goto out); + + i = 0; + remote = NULL; + list_for_each_entry(tmp, conn->addrs, list) { + if (i == conn->next_addr) { + remote = tmp; + break; + } + i++; + } + + if (!remote) { + debug(DBG_SRV, "%s: no more remote addresses to attempt", + task->name); + goto out; + } + + conn->next_addr++; + connection_set_remote(conn, remote); + debug(DBG_SRV, "%s: attempting to connect to %s", + task->name, conn->remote.addrstr); + + sfd = socket(conn->remote.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sfd < 0) { + error("socket: %m"); + goto again; + } + + socket_set_low_latency(sfd, cfg->socket_keepalive, + cfg->socket_iptos, cfg->socket_nodelay); + + task->priv = conn; + uring_task_set_fd(task, sfd); + uring_connect(task, &conn->remote, connect_cb); + return; + +out: + conn->cb(conn, false); +} + +void +connect_any(struct uring_task *task, + struct list_head *addrs, struct connection *conn, + connection_cb_t cb) +{ + assert_return(task && addrs && conn && cb); + + conn->next_addr = 0; + conn->addrs = addrs; + conn->cb = cb; + connect_next(task, conn); +} + diff --git a/minecproxy/misc.h b/minecproxy/misc.h new file mode 100644 index 0000000..6627913 --- /dev/null +++ b/minecproxy/misc.h @@ -0,0 +1,36 @@ +#ifndef foomischfoo +#define foomischfoo + +#include <stdio.h> +#include <string.h> +#include <stdbool.h> +#include <stdlib.h> +#include <linux/if_packet.h> + +#include "utils.h" + +void debug_resource_usage(); + +struct connection; + +typedef void(*connection_cb_t)(struct connection *, bool); + +struct connection { + struct saddr remote; + struct saddr local; + + struct list_head *addrs; + unsigned next_addr; + + connection_cb_t cb; +}; + +void connection_set_local(struct connection *conn, int fd); + +void connection_set_remote(struct connection *conn, struct saddr *remote); + +void connect_any(struct uring_task *task, + struct list_head *addrs, struct connection *conn, + connection_cb_t cb); + +#endif diff --git a/minecproxy/ptimer.c b/minecproxy/ptimer.c new file mode 100644 index 0000000..5f9cf5d --- /dev/null +++ b/minecproxy/ptimer.c @@ -0,0 +1,223 @@ +#include <inttypes.h> +#include <sys/timerfd.h> +#include <string.h> +#include <unistd.h> +#include <time.h> + +#include "main.h" +#include "uring.h" +#include "ptimer.h" + +struct ptimer { + uint64_t value; + time_t previous_time; + struct uring_task task; + unsigned task_count; + struct list_head ptasks; +}; + +static void +ptimer_set(unsigned value, unsigned interval) +{ + struct itimerspec tspec = { + .it_interval = { + .tv_sec = interval, + .tv_nsec = 0 + }, + .it_value = { + .tv_sec = value, + .tv_nsec = 0 + } + }; + + assert_return(cfg->ptimer && cfg->ptimer->task.fd >= 0); + + if (timerfd_settime(cfg->ptimer->task.fd, 0, &tspec, NULL) != 0) + error("timerfd_settime: %m"); +} + +static unsigned +gcd(unsigned a, unsigned b) +{ + if (a == 0) + return b; + return gcd(b % a, a); +} + +static unsigned +array_gcd(unsigned arr[], unsigned n) +{ + unsigned result = arr[0]; + + for (unsigned i = 1; i < n; i++) + result = gcd(arr[i], result); + + return result; +} + +static void +ptimer_tick(struct ptimer *ptimer) +{ + time_t now = time(NULL); + unsigned diff = (unsigned)(now - ptimer->previous_time); + struct ptimer_task *ptask, *ptmp; + + debug(DBG_TIMER, "got a tick of %u secs", diff); + list_for_each_entry_safe(ptask, ptmp, &ptimer->ptasks, list) { + if (ptask->remain > diff) { + ptask->remain -= diff; + continue; + } + + debug(DBG_TIMER, "triggering ptask %p (times %u)", + ptask, ptask->times); + + ptask->cb(ptask); + ptask->remain = ptask->interval; + + if (ptask->times == 0) + continue; + + ptask->times--; + if (ptask->times == 0) { + ptask->active = false; + list_del(&ptask->list); + } + } + + ptimer->previous_time = now; +} + +static void +ptimer_reconfig(struct ptimer *ptimer) +{ + struct ptimer_task *ptask; + unsigned i = 0; + unsigned lowest = ~0; + unsigned interval; + + if (list_empty(&ptimer->ptasks)) { + debug(DBG_TIMER, "no tasks"); + ptimer_set(0, 0); + return; + } + + unsigned intervals[ptimer->task_count]; + + list_for_each_entry(ptask, &ptimer->ptasks, list) { + if (ptask->remain < lowest) + lowest = ptask->remain; + intervals[i++] = ptask->interval; + } + + interval = array_gcd(intervals, i); + + debug(DBG_TIMER, "lowest: %u, gcd: %u\n", lowest, interval); + ptimer_set(lowest, interval); +} + +void +ptimer_del_task(struct ptimer_task *ptask) +{ + struct ptimer *ptimer = cfg->ptimer; + + assert_return(ptask && ptimer); + assert_return_silent(ptask->active); + assert_return(ptimer->task_count > 0); + + list_del(&ptask->list); + ptask->active = false; + ptimer->task_count--; + ptimer_tick(ptimer); + ptimer_reconfig(ptimer); + uring_task_put(&ptimer->task); +} + +void +ptimer_add_task(struct ptimer_task *ptask) +{ + struct ptimer *ptimer = cfg->ptimer; + + assert_return(ptask && ptask->interval > 0 && ptask->cb && ptimer); + assert_return_silent(!ptask->active); + + uring_task_get(&ptimer->task); + ptask->active = true; + ptask->remain = ptask->interval; + ptimer_tick(ptimer); + list_add(&ptask->list, &ptimer->ptasks); + ptimer->task_count++; + ptimer_reconfig(ptimer); +} + +void +ptimer_refdump() +{ + assert_return(cfg->ptimer); + + uring_task_refdump(&cfg->ptimer->task); +} + +static void +ptimer_free(struct uring_task *task) +{ + struct ptimer *ptimer = container_of(task, struct ptimer, task); + + assert_return(task); + + debug(DBG_TIMER, "task %p, ptimer %p", task, ptimer); + xfree(ptimer); + cfg->ptimer = NULL; +} + +void +ptimer_delete() +{ + assert_return(cfg->ptimer); + + debug(DBG_TIMER, "closing fd %i", cfg->ptimer->task.fd); + uring_task_destroy(&cfg->ptimer->task); +} + +static void +ptimer_cb(struct uring_task *task, int res) +{ + struct ptimer *ptimer = container_of(task, struct ptimer, task); + + assert_return(task); + assert_task_alive(DBG_IGMP, task); + + if (res != sizeof(ptimer->value)) { + error("timerfd_read: res: %i, %m", res); + return; + } + + ptimer_tick(ptimer); + uring_read(&ptimer->task, &ptimer->value, sizeof(ptimer->value), ptimer_cb); +} + +void +ptimer_init() +{ + struct ptimer *ptimer; + int tfd; + + assert_return(!cfg->ptimer); + + ptimer = zmalloc(sizeof(*ptimer)); + if (!ptimer) + die("malloc: %m"); + + tfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); + if (tfd < 0) + die("timerfd_create: %m"); + + ptimer->task_count = 0; + ptimer->previous_time = time(NULL); + list_init(&ptimer->ptasks); + uring_task_init(&ptimer->task, "ptimer", uring_parent(), ptimer_free); + uring_task_set_fd(&ptimer->task, tfd); + cfg->ptimer = ptimer; + uring_read(&ptimer->task, &ptimer->value, sizeof(ptimer->value), ptimer_cb); +} + diff --git a/minecproxy/ptimer.h b/minecproxy/ptimer.h new file mode 100644 index 0000000..0b53590 --- /dev/null +++ b/minecproxy/ptimer.h @@ -0,0 +1,33 @@ +#ifndef fooptimerhfoo +#define fooptimerhfoo + +struct ptimer_task { + unsigned interval; + unsigned times; + void (*cb)(struct ptimer_task *); + bool active; + unsigned remain; + struct list_head list; +}; + +static inline void +ptask_init(struct ptimer_task *ptask, unsigned interval, + unsigned times, void(*cb)(struct ptimer_task *)) +{ + ptask->interval = interval; + ptask->times = times; + ptask->cb = cb; + ptask->active = false; +} + +void ptimer_del_task(struct ptimer_task *ptask); + +void ptimer_add_task(struct ptimer_task *ptask); + +void ptimer_refdump(); + +void ptimer_delete(); + +void ptimer_init(); + +#endif diff --git a/minecproxy/server-config.c b/minecproxy/server-config.c new file mode 100644 index 0000000..549cf16 --- /dev/null +++ b/minecproxy/server-config.c @@ -0,0 +1,580 @@ +#define _GNU_SOURCE +#include <stdio.h> +#include <ctype.h> +#include <string.h> +#include <stdbool.h> +#include <sys/inotify.h> +#include <dirent.h> +#include <errno.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <inttypes.h> + +#include "main.h" +#include "uring.h" +#include "config-parser.h" +#include "server.h" +#include "server-config.h" + +static void +scfg_dns_cb(struct dns_async *dns, bool (*server_cb)(struct server *, struct saddr *)) +{ + struct server *server; + struct sockaddr_in *in4; + struct sockaddr_in6 *in6; + struct saddr *saddr; + struct addrinfo *results = NULL, *ai; + int r; + + assert_return(dns && dns->priv && server_cb); + + server = dns->priv; + debug(DBG_DNS, "called, dns: %p, name: %s, server: %p, server->name: %s", + dns, dns->name, server, server->name); + + r = gai_error(&dns->gcb); + if (r == EAI_INPROGRESS) { + /* This shouldn't happen, assume we'll get called again */ + error("called with request in progress"); + return; + } else if (r == EAI_CANCELED) { + /* The server must be in the process of going away */ + goto out; + } else if (r < 0) { + error("DNS lookup of %s:%s failed: %s", + dns->name, dns->port, gai_strerror(r)); + goto out; + } + + results = dns->gcb.ar_result; + + for (ai = results; ai; ai = ai->ai_next) { + saddr = zmalloc(sizeof(*saddr)); + if (!saddr) { + error("DNS lookup of %s:%s failed: %m", dns->name, dns->port); + goto out; + } + + switch (ai->ai_family) { + case AF_INET: + in4 = (struct sockaddr_in *)ai->ai_addr; + saddr_set_ipv4(saddr, in4->sin_addr.s_addr, in4->sin_port); + server_cb(server, saddr); + break; + + case AF_INET6: + in6 = (struct sockaddr_in6 *)ai->ai_addr; + saddr_set_ipv6(saddr, &in6->sin6_addr, in6->sin6_port); + server_cb(server, saddr); + break; + + default: + error("getaddrinfo(%s:%s): unknown address family (%i)", + dns->name, dns->port, ai->ai_family); + xfree(saddr); + break; + } + } + +out: + freeaddrinfo(results); + list_del(&dns->list); + xfree(dns); + uring_task_put(&server->task); + server_commit(server); +} + +static void +scfg_local_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_local); +} + +static void +scfg_remote_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_remote); +} + +static void +scfg_rcon_dns_cb(struct dns_async *dns) +{ + assert_return(dns); + + scfg_dns_cb(dns, server_add_rcon); +} + +enum scfg_keys { + SCFG_KEY_INVALID = 0, + SCFG_KEY_TYPE, + SCFG_KEY_NAME, + SCFG_KEY_PORT, + SCFG_KEY_LOCAL, + SCFG_KEY_REMOTE, + SCFG_KEY_IDLE_TIMEOUT, + SCFG_KEY_STOP_METHOD, + SCFG_KEY_START_METHOD, + SCFG_KEY_STOP_EXEC, + SCFG_KEY_START_EXEC, + SCFG_KEY_RCON, + SCFG_KEY_RCON_PASSWORD, + SCFG_KEY_SYSTEMD_SERVICE, +}; + +struct cfg_key_value_map scfg_key_map[] = { + { + .key_name = "type", + .key_value = SCFG_KEY_TYPE, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "name", + .key_value = SCFG_KEY_NAME, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "port", + .key_value = SCFG_KEY_PORT, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "local", + .key_value = SCFG_KEY_LOCAL, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "remote", + .key_value = SCFG_KEY_REMOTE, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "idle_timeout", + .key_value = SCFG_KEY_IDLE_TIMEOUT, + .value_type = CFG_VAL_TYPE_UINT16, + }, { + .key_name = "stop_method", + .key_value = SCFG_KEY_STOP_METHOD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "start_method", + .key_value = SCFG_KEY_START_METHOD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "stop_exec", + .key_value = SCFG_KEY_STOP_EXEC, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "start_exec", + .key_value = SCFG_KEY_START_EXEC, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "rcon", + .key_value = SCFG_KEY_RCON, + .value_type = CFG_VAL_TYPE_ASYNC_ADDRS, + }, { + .key_name = "rcon_password", + .key_value = SCFG_KEY_RCON_PASSWORD, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = "systemd_service", + .key_value = SCFG_KEY_SYSTEMD_SERVICE, + .value_type = CFG_VAL_TYPE_STRING, + }, { + .key_name = NULL, + .key_value = SCFG_KEY_INVALID, + .value_type = CFG_VAL_TYPE_INVALID, + } +}; + +static bool +handle_dns(struct server *server, const char *type, + struct cfg_value *value, dns_cb_t *async_cb, + bool (*sync_cb)(struct server *, struct saddr *)) +{ + struct saddr *saddr, *tmp; + struct dns_async *dns; + + assert_return(server && type && value && async_cb && sync_cb, false); + + switch (value->type) { + case CFG_VAL_TYPE_ADDRS: + debug(DBG_DNS, "%s: got immediate addrs", type); + + list_for_each_entry_safe(saddr, tmp, &value->saddrs, list) { + list_del(&saddr->list); + sync_cb(server, saddr); + } + return true; + + case CFG_VAL_TYPE_ASYNC_ADDRS: + debug(DBG_DNS, "%s: doing async lookup of DNS record: %p", + type, value->dns_async); + + dns = value->dns_async; + dns->cb = async_cb; + dns->priv = server; + list_add(&dns->list, &server->dnslookups); + uring_task_get(&server->task); + return true; + + default: + return false; + } +} + +static void +scfg_parse(struct server *server) +{ + char *pos; + + assert_return(server); + + pos = server->tbuf.buf; + + if (!config_parse_header(server->name, "server", &pos)) + return; + + while (true) { + int key; + const char *keyname; + struct cfg_value value; + + if (!config_parse_line(server->name, &pos, scfg_key_map, + &key, &keyname, &value)) + break; + + if (key == SCFG_KEY_INVALID) + break; + + debug(DBG_CFG, "%s: key %s", server->name, keyname); + + switch (key) { + + case SCFG_KEY_TYPE: + if (streq(value.str, "proxy")) { + if (!server_set_type(server, SERVER_TYPE_PROXY)) + return; + } else if (streq(value.str, "announce")) { + if (!server_set_type(server, SERVER_TYPE_ANNOUNCE)) + return; + } + break; + + case SCFG_KEY_NAME: + if (!server_set_pretty_name(server, value.str)) + return; + break; + + case SCFG_KEY_PORT: + if (!server_set_port(server, value.uint16)) + return; + break; + + case SCFG_KEY_LOCAL: + if (!handle_dns(server, "local", &value, + scfg_local_dns_cb, server_add_local)) + return; + break; + + case SCFG_KEY_REMOTE: + if (!handle_dns(server, "remote", &value, + scfg_remote_dns_cb, server_add_remote)) + return; + break; + + case SCFG_KEY_IDLE_TIMEOUT: + if (!server_set_idle_timeout(server, value.uint16)) + return; + break; + + case SCFG_KEY_STOP_METHOD: + if (streq(value.str, "exec")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_EXEC)) + break; + } else if (streq(value.str, "rcon")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_RCON)) + break; + } else if (streq(value.str, "systemd")) { + if (server_set_stop_method(server, SERVER_STOP_METHOD_SYSTEMD)) + break; + } + return; + + case SCFG_KEY_START_METHOD: + if (streq(value.str, "exec")) { + if (server_set_start_method(server, SERVER_START_METHOD_EXEC)) + break; + } else if (streq(value.str, "systemd")) { + if (server_set_start_method(server, SERVER_START_METHOD_SYSTEMD)) + break; + } + return; + + case SCFG_KEY_STOP_EXEC: + if (!server_set_stop_exec(server, value.str)) + return; + break; + + case SCFG_KEY_START_EXEC: + if (!server_set_start_exec(server, value.str)) + return; + break; + + case SCFG_KEY_RCON: + if (!handle_dns(server, "rcon", &value, + scfg_rcon_dns_cb, server_add_rcon)) + return; + break; + + case SCFG_KEY_RCON_PASSWORD: + if (!server_set_rcon_password(server, value.str)) + return; + break; + + case SCFG_KEY_SYSTEMD_SERVICE: + if (!server_set_systemd_service(server, value.str)) + return; + break; + + case SCFG_KEY_INVALID: + default: + break; + } + } +} + +static void +scfg_read_cb(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, task); + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res <= 0) { + error("error reading config file for %s: %s", + server->name, strerror(-res)); + server_delete(server); + } + + debug(DBG_CFG, "%s: parsing cfg (%i bytes)", server->name, res); + uring_task_close_fd(&server->task); + scfg_parse(server); + server_commit(server); +} + +static void +scfg_open_cb(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, task); + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res < 0) { + error("open(%s) failed: %s", server->name, strerror(-res)); + server_delete(server); + return; + } + + debug(DBG_CFG, "reading server cfg %s (fd %i)", server->name, res); + uring_task_set_fd(&server->task, res); + uring_tbuf_read_until_eof(&server->task, scfg_read_cb); +} + +static bool +scfg_valid_filename(const char *name) +{ + const char *suffix; + + if (empty_str(name)) + return false; + if (name[0] == '.') + return false; + if ((suffix = strrchr(name, '.')) == NULL) + return false; + if (!streq(suffix, ".server")) + return false; + + return true; +} + +struct server_cfg_monitor { + struct uring_task task; + char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event)))); +}; + +static void +scfgm_free(struct uring_task *task) +{ + struct server_cfg_monitor *scfgm = container_of(task, + struct server_cfg_monitor, + task); + + assert_return(task); + + debug(DBG_CFG, "called"); + xfree(scfgm); + cfg->server_cfg_monitor = NULL; +} + +static void +inotify_event_dump(const struct inotify_event *event) +{ + assert_return(event); + + debug(DBG_CFG, "inotify event:"); + debug(DBG_CFG, " * WD : %i", event->wd); + debug(DBG_CFG, " * Cookie : %" PRIu32, event->cookie); + debug(DBG_CFG, " * Length : %" PRIu32, event->len); + debug(DBG_CFG, " * Name : %s", event->name); + debug(DBG_CFG, " * Mask : %" PRIu32, event->mask); + if (event->mask & IN_ACCESS) + debug(DBG_CFG, "\tIN_ACCESS"); + else if(event->mask & IN_MODIFY) + debug(DBG_CFG, "\tIN_MODIFY"); + else if(event->mask & IN_ATTRIB) + debug(DBG_CFG, "\tIN_ATTRIB"); + else if(event->mask & IN_CLOSE_WRITE) + debug(DBG_CFG, "\tIN_CLOSE_WRITE"); + else if(event->mask & IN_CLOSE_NOWRITE) + debug(DBG_CFG, "\tIN_CLOSE_NOWRITE"); + else if(event->mask & IN_OPEN) + debug(DBG_CFG, "\tIN_OPEN"); + else if(event->mask & IN_MOVED_FROM) + debug(DBG_CFG, "\tIN_MOVED_FROM"); + else if(event->mask & IN_MOVED_TO) + debug(DBG_CFG, "\tIN_MOVED_TO"); + else if(event->mask & IN_CREATE) + debug(DBG_CFG, "\tIN_CREATE"); + else if(event->mask & IN_DELETE) + debug(DBG_CFG, "\tIN_DELETE"); + else if(event->mask & IN_DELETE_SELF) + debug(DBG_CFG, "\tIN_DELETE_SELF"); + else if(event->mask & IN_MOVE_SELF) + debug(DBG_CFG, "\tIN_MOVE_SELF"); + else if(event->mask & IN_UNMOUNT) + debug(DBG_CFG, "\tIN_UNMOUNT"); + else if(event->mask & IN_Q_OVERFLOW) + debug(DBG_CFG, "\tIN_Q_OVERFLOW"); + else if(event->mask & IN_IGNORED) + debug(DBG_CFG, "\tIN_IGNORED"); +} + +static void +inotify_cb(struct uring_task *task, int res) +{ + struct server_cfg_monitor *scfgm = container_of(task, + struct server_cfg_monitor, + task); + const struct inotify_event *event; + char *ptr; + struct server *server; + + assert_return(task); + assert_task_alive(DBG_CFG, task); + + if (res <= 0) { + error("inotify_read: %i", res); + return; + } + + for (ptr = scfgm->buf; ptr < scfgm->buf + res; ptr += sizeof(struct inotify_event) + event->len) { + event = (const struct inotify_event *)ptr; + + if (debug_enabled(DBG_CFG)) + inotify_event_dump(event); + + if (event->mask & (IN_IGNORED | IN_MOVE_SELF | IN_DELETE_SELF | IN_UNMOUNT)) + die("configuration directory gone, exiting"); + + if (event->mask & IN_Q_OVERFLOW) { + error("inotify queue overflow"); + continue; + } + + if (!scfg_valid_filename(event->name)) + continue; + + if (event->mask & (IN_MOVED_FROM | IN_DELETE)) + server_delete_by_name(event->name); + else if (event->mask & (IN_MOVED_TO | IN_CREATE | IN_CLOSE_WRITE)) { + server = server_new(event->name); + verbose("New server config file detected: %s", server->name); + uring_openat(&server->task, server->name, scfg_open_cb); + } else + error("inotify: unknown event: 0x%08x", event->mask); + } + + uring_read(&scfgm->task, scfgm->buf, sizeof(scfgm->buf), inotify_cb); +} + +void +server_cfg_monitor_refdump() +{ + assert_return_silent(cfg->server_cfg_monitor); + + uring_task_refdump(&cfg->server_cfg_monitor->task); +} + +void +server_cfg_monitor_delete() +{ + assert_return(cfg->server_cfg_monitor); + + debug(DBG_CFG, "closing fd %i", cfg->server_cfg_monitor->task.fd); + uring_task_destroy(&cfg->server_cfg_monitor->task); + cfg->server_cfg_monitor = NULL; +} + +void +server_cfg_monitor_init() +{ + int ifd; + int iwd; + struct server_cfg_monitor *scfgm; + DIR *dir; + struct dirent *dent; + struct server *server; + + assert_return(!cfg->server_cfg_monitor); + + scfgm = zmalloc(sizeof(*scfgm)); + if (!scfgm) + die("malloc: %m"); + + ifd = inotify_init1(IN_CLOEXEC); + if (ifd < 0) + die("inotify_init1: %m"); + + /* ln = IN_CREATE, cp/vi/mv = IN_CREATE, IN_OPEN, IN_CLOSE_WRITE */ + iwd = inotify_add_watch(ifd, ".", + IN_CLOSE_WRITE | IN_DELETE | IN_CREATE | + IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_TO | + IN_MOVED_FROM | IN_DONT_FOLLOW | + IN_EXCL_UNLINK | IN_ONLYDIR ); + if (iwd < 0) + die("inotify_add_watch: %m"); + + uring_task_init(&scfgm->task, "server-config-monitor", uring_parent(), scfgm_free); + uring_task_set_fd(&scfgm->task, ifd); + cfg->server_cfg_monitor = scfgm; + uring_read(&scfgm->task, scfgm->buf, sizeof(scfgm->buf), inotify_cb); + + dir = opendir("."); + if (!dir) + die("opendir(%s): %m", cfg->cfg_dir); + + while ((dent = readdir(dir)) != NULL) { + if (dent->d_type != DT_REG && dent->d_type != DT_UNKNOWN) + continue; + if (!scfg_valid_filename(dent->d_name)) + continue; + + server = server_new(dent->d_name); + if (server) + uring_openat(&server->task, server->name, scfg_open_cb); + } + + closedir(dir); +} + diff --git a/minecproxy/server-config.h b/minecproxy/server-config.h new file mode 100644 index 0000000..590dae0 --- /dev/null +++ b/minecproxy/server-config.h @@ -0,0 +1,10 @@ +#ifndef fooserverconfighfoo +#define fooserverconfighfoo + +void server_cfg_monitor_delete(); + +void server_cfg_monitor_refdump(); + +void server_cfg_monitor_init(); + +#endif diff --git a/minecproxy/server-proxy.c b/minecproxy/server-proxy.c new file mode 100644 index 0000000..d8ff0cf --- /dev/null +++ b/minecproxy/server-proxy.c @@ -0,0 +1,578 @@ +#define _GNU_SOURCE +#include <stdio.h> +#include <unistd.h> +#include <time.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> + +#include "main.h" +#include "uring.h" +#include "ptimer.h" +#include "server.h" +#include "server-proxy.h" + +static void +format_bytes(char *buf, size_t len, uint64_t val) +{ + uint64_t tmp; + const char *suffix = "B"; + + assert_return(buf && len > 0); + + tmp = val * 10; + if (val > 1152921504606846976ULL) { + tmp = val / 115292150460684697ULL; + suffix= "EiB"; + } else if (val > 1125899906842624ULL) { + tmp /= 1125899906842624ULL; + suffix = "PiB"; + } else if (val > 1099511627776ULL) { + tmp /= 1099511627776ULL; + suffix = "TiB"; + } else if (val > 1073741824ULL) { + tmp /= 1073741824ULL; + suffix = "GiB"; + } else if (val > 1048576) { + tmp /= 1048576; + suffix = "MiB"; + } else if (val > 1024) { + tmp /= 1024; + suffix = "KiB"; + } + + snprintf(buf, len, "%lu.%lu %s", tmp / 10, tmp % 10, suffix); +} + +static void +format_time(char *buf, size_t len, time_t diff) +{ + unsigned hh, mm, ss; + + assert_return(buf && len > 0); + + hh = diff / 3600; + diff %= 3600; + mm = diff / 60; + diff %= 60; + ss = diff; + + snprintf(buf, len, "%02u:%02u:%02u", hh, mm, ss); +} + +static void +proxy_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, task); + char cts[100]; + char stc[100]; + char duration[100]; + + assert_return(task); + + debug(DBG_PROXY, "server: %s, src: %s, dst: %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + + if (proxy->begin > 0) { + format_time(duration, sizeof(duration), time(NULL) - proxy->begin); + format_bytes(cts, sizeof(cts), proxy->client_bytes); + format_bytes(stc, sizeof(stc), proxy->server_bytes); + + info("%s: proxy connection %s -> %s closed " + "(CtS: %s, StC: %s), duration %s", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr, + cts, stc, duration); + } + + list_del(&proxy->list); + xfree(proxy); +} + +static void +proxy_client_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + + debug(DBG_PROXY, "%s: client connection closed", proxy->server->name); +} + +static void +proxy_server_free(struct uring_task *task) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + + debug(DBG_PROXY, "%s: server connection closed", proxy->server->name); +} + +void +proxy_delete(struct server_proxy *proxy) +{ + debug(DBG_PROXY, "%s: shutting down proxy %p", proxy->server->name, proxy); + + assert_return(proxy); + + ptimer_del_task(&proxy->ptask); + + if (cfg->splice_supported) { + uring_close(&proxy->server->task, proxy->cpipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->cpipe[PIPE_WR]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_RD]); + uring_close(&proxy->server->task, proxy->spipe[PIPE_WR]); + } + + uring_task_set_fd(&proxy->servertask, proxy->sfd); + uring_task_destroy(&proxy->servertask); + uring_task_set_fd(&proxy->clienttask, proxy->cfd); + uring_task_destroy(&proxy->clienttask); + uring_task_destroy(&proxy->task); +} + +/* + * These four functions provide the fallback read-write mode + */ +static void proxy_client_read(struct uring_task *task, int res); + +static void +proxy_client_written(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + proxy->client_bytes += res; + uring_task_set_fd(&proxy->clienttask, proxy->cfd); + uring_tbuf_read(task, proxy_client_read); +} + +static void +proxy_client_read(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->clienttask, proxy->sfd); + uring_tbuf_write(task, proxy_client_written); +} + +static void proxy_server_read(struct uring_task *task, int res); + +static void +proxy_server_written(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + proxy->server_bytes += res; + uring_task_set_fd(&proxy->servertask, proxy->sfd); + uring_tbuf_read(&proxy->servertask, proxy_server_read); +} + +static void +proxy_server_read(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_task_set_fd(&proxy->servertask, proxy->cfd); + uring_tbuf_write(task, proxy_server_written); +} + +/* + * These four functions provide the splice fd->pipe->fd mode + */ +static void proxy_client_spliced_in(struct uring_task *task, int res); + +static void +proxy_client_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); +} + +static void +proxy_client_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->cpipe[PIPE_RD], proxy->sfd, proxy_client_spliced_out); +} + +static void proxy_server_spliced_in(struct uring_task *task, int res); + +static void +proxy_server_spliced_out(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); +} + +static void +proxy_server_spliced_in(struct uring_task *task, int res) +{ + struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + if (res <= 0) { + debug(DBG_PROXY, "%s: res: %i", proxy->server->name, res); + proxy_delete(proxy); + return; + } + + uring_splice(task, proxy->spipe[PIPE_RD], proxy->cfd, proxy_server_spliced_out); +} + +static void +proxy_connected_cb(struct connection *conn, bool connected) +{ + struct server_proxy *proxy = container_of(conn, struct server_proxy, server_conn); + + assert_return(conn); + assert_task_alive(DBG_PROXY, &proxy->clienttask); + assert_task_alive(DBG_PROXY, &proxy->servertask); + + proxy->connecting = false; + if (!connected) { + error("%s: proxy connection to remote server failed", + proxy->server->name); + if (!proxy->ptask.active) + proxy_delete(proxy); + return; + } + + ptimer_del_task(&proxy->ptask); + + proxy->sfd = proxy->servertask.fd; + verbose("%s: proxy connection %s -> %s opened", + proxy->server->name, + proxy->client_conn.remote.addrstr, + proxy->server_conn.remote.addrstr); + proxy->begin = time(NULL); + + if (cfg->splice_supported) { + debug(DBG_PROXY, "handling proxy connection with splice"); + uring_splice(&proxy->clienttask, proxy->cfd, proxy->cpipe[PIPE_WR], proxy_client_spliced_in); + uring_splice(&proxy->servertask, proxy->sfd, proxy->spipe[PIPE_WR], proxy_server_spliced_in); + } else { + debug(DBG_PROXY, "handling proxy connection with read-write"); + uring_tbuf_read(&proxy->clienttask, proxy_client_read); + uring_tbuf_read(&proxy->servertask, proxy_server_read); + } +} + +void +proxy_refdump(struct server_proxy *proxy) +{ + assert_return(proxy); + + uring_task_refdump(&proxy->task); + uring_task_refdump(&proxy->clienttask); + uring_task_refdump(&proxy->servertask); +} + +static void +proxy_connect_timer_cb(struct ptimer_task *ptask) +{ + struct server_proxy *proxy = container_of(ptask, struct server_proxy, ptask); + + assert_return(ptask); + + if (proxy->connecting) + return; + + proxy->connecting = true; + connect_any(&proxy->servertask, &proxy->server->remotes, + &proxy->server_conn, proxy_connected_cb); +} + +struct server_proxy * +proxy_new(struct server *server, struct saddr *client, int fd) +{ + struct server_proxy *proxy; + + assert_return(server && client && fd > 0, NULL); + + proxy = zmalloc(sizeof(*proxy)); + if (!proxy) { + error("malloc: %m"); + goto out; + } + + if (cfg->splice_supported) { + if (pipe2(proxy->cpipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_free; + } + + if (pipe2(proxy->spipe, O_CLOEXEC) < 0) { + error("pipe2: %m"); + goto out_close_cpipe; + } + } + + proxy->sfd = -1; + proxy->cfd = fd; + proxy->server = server; + uring_task_init(&proxy->task, "proxy", &server->task, proxy_free); + + connection_set_local(&proxy->client_conn, fd); + connection_set_remote(&proxy->client_conn, client); + + uring_task_init(&proxy->clienttask, "proxy_client", &proxy->task, + proxy_client_free); + uring_task_set_buf(&proxy->clienttask, &proxy->clientbuf); + uring_task_set_fd(&proxy->clienttask, fd); + + uring_task_init(&proxy->servertask, "proxy_server", &proxy->task, + proxy_server_free); + uring_task_set_buf(&proxy->servertask, &proxy->serverbuf); + + list_add(&proxy->list, &server->proxys); + + if (server->state != SERVER_STATE_RUNNING) { + if (server_start(server) && + cfg->proxy_connection_interval > 0 && + cfg->proxy_connection_attempts > 0) { + ptask_init(&proxy->ptask, + cfg->proxy_connection_interval, + cfg->proxy_connection_attempts, + proxy_connect_timer_cb); + ptimer_add_task(&proxy->ptask); + } + } + + proxy->connecting = true; + connect_any(&proxy->servertask, &server->remotes, + &proxy->server_conn, proxy_connected_cb); + + return proxy; + +out_close_cpipe: + uring_close(&server->task, proxy->cpipe[PIPE_RD]); + uring_close(&server->task, proxy->cpipe[PIPE_WR]); +out_free: + xfree(proxy); +out: + return NULL; +} + +static void +local_accept(struct uring_task *task, int res) +{ + struct server_local *local = container_of(task, struct server_local, task); + struct server *server = container_of(task->parent, struct server, task); + struct server_proxy *proxy; + + assert_return(task); + assert_task_alive(DBG_PROXY, task); + + debug(DBG_PROXY, "task %p, res %i, server %s", task, res, server->name); + + if (res < 0) { + error("res: %i", res); + goto out; + } + + saddr_set_addrstr(&local->client); + + verbose("%s: incoming proxy connection: %s -> %s", + server->name, local->client.addrstr, local->local.addrstr); + + if (list_empty(&server->remotes)) { + /* This shouldn't be possible, checked before opening local */ + error("server->remotes empty!"); + uring_close(&local->task, res); + goto out; + } + + proxy = proxy_new(server, &local->client, res); + if (!proxy) + uring_close(&local->task, res); + +out: + uring_accept(&local->task, &local->client, local_accept); +} + +bool +local_open(struct server_local *local) +{ + int sfd; + int option; + int r; + + assert_return(local && local->server, false); + + sfd = socket(local->local.storage.ss_family, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sfd < 0) { + error("socket: %m"); + goto error; + } + + option = true; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0) { + error("setsockopt: %m"); + goto error; + } + + /* The MC protocol expects the client to send data first */ + if (cfg->socket_defer) { + option = true; + if (setsockopt(sfd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + } + + /* + * This has the advantage that interfaces don't need to be up but + * it means that cfg errors will not be caught. + */ + if (cfg->socket_freebind) { + option = true; + if (setsockopt(sfd, IPPROTO_IP, IP_FREEBIND, &option, sizeof(option)) < 0) + error("setsockopt: %m"); + } + + socket_set_low_latency(sfd, cfg->socket_keepalive, + cfg->socket_iptos, cfg->socket_nodelay); + + r = bind(sfd, (struct sockaddr *)&local->local.storage, local->local.addrlen); + if (r < 0) { + error("bind: %m"); + goto error; + } + + r = listen(sfd, 100); + if (r < 0) { + error("listen: %m"); + goto error; + } + + uring_task_set_fd(&local->task, sfd); + uring_accept(&local->task, &local->client, local_accept); + return true; + +error: + if (sfd >= 0) + uring_close(&local->task, sfd); + return false; +} + +void +local_refdump(struct server_local *local) +{ + assert_return(local); + + uring_task_refdump(&local->task); +} + +static void +local_free(struct uring_task *task) +{ + struct server_local *local = container_of(task, struct server_local, task); + + assert_return(task); + + debug(DBG_PROXY, "task %p, local %p", task, local); + list_del(&local->list); + xfree(local); +} + +void +local_delete(struct server_local *local) +{ + assert_return(local); + + uring_task_destroy(&local->task); +} + +struct server_local * +local_new(struct server *server, struct saddr *saddr) +{ + struct server_local *local; + + assert_return(server && saddr, NULL); + + local = zmalloc(sizeof(*local)); + if (!local) { + error("malloc: %m"); + return NULL; + } + + debug(DBG_PROXY, "%s adding local: %s", server->name, saddr->addrstr); + local->local = *saddr; + local->server = server; + uring_task_init(&local->task, "local", &server->task, local_free); + xfree(saddr); + return local; +} + diff --git a/minecproxy/server-proxy.h b/minecproxy/server-proxy.h new file mode 100644 index 0000000..ee3bda3 --- /dev/null +++ b/minecproxy/server-proxy.h @@ -0,0 +1,51 @@ +#ifndef fooserverproxyhfoo +#define fooserverproxyhfoo + +struct server_proxy { + struct connection client_conn; + struct uring_task_buf clientbuf; + struct uring_task clienttask; + uint64_t client_bytes; + int cpipe[2]; + int cfd; + + struct connection server_conn; + struct uring_task_buf serverbuf; + struct uring_task servertask; + uint64_t server_bytes; + int spipe[2]; + int sfd; + + bool connecting; + time_t begin; + struct ptimer_task ptask; + struct uring_task task; + struct server *server; + struct list_head list; +}; + +void proxy_refdump(struct server_proxy *proxy); + +void proxy_delete(struct server_proxy *proxy); + +struct server_proxy *proxy_new(struct server *server, struct saddr *client, + int fd); + +struct server_local { + struct saddr local; + struct saddr client; + struct uring_task task; + + struct server *server; + struct list_head list; +}; + +bool local_open(struct server_local *local); + +void local_refdump(struct server_local *local); + +void local_delete(struct server_local *local); + +struct server_local *local_new(struct server *server, struct saddr *saddr); + +#endif diff --git a/minecproxy/server-rcon.c b/minecproxy/server-rcon.c new file mode 100644 index 0000000..1f8ef70 --- /dev/null +++ b/minecproxy/server-rcon.c @@ -0,0 +1,227 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/ip.h> +#include <arpa/inet.h> +#include <stdint.h> +#include <inttypes.h> +#include <errno.h> + +#include "main.h" +#include "uring.h" +#include "server.h" +#include "server-rcon.h" +#include "rcon-protocol.h" + +static int +rcon_packet_complete(struct uring_task *task, int res) +{ + assert_return(task, -EINVAL); + assert_task_alive_or(DBG_RCON, task, return -EINTR); + + return rcon_protocol_packet_complete(task->tbuf->buf, task->tbuf->len); +} + +static bool +rcon_check_reply(struct uring_task *task, int res, int32_t *id, + int32_t *type, const char **msg) +{ + struct server *server = container_of(task, struct server, rcon_task); + const char *error; + + if (res < 0) { + error("rcon(%s): reading reply failed, res: %i", + server->name, res); + goto error; + } + + if (!rcon_protocol_read_packet(task->tbuf->buf, task->tbuf->len, + id, type, msg, &error)) { + error("rcon(%s): failed to parse packet: %s", + server->name, error); + goto error; + } + + return true; + +error: + uring_task_close_fd(task); + return false; +} + +static void +rcon_stop_reply(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, rcon_task); + int32_t id; + int32_t type; + const char *msg; + + assert_return(task); + assert_task_alive(DBG_RCON, task); + + if (!rcon_check_reply(task, res, &id, &type, &msg)) + return; + + if (id != 2) { + error("rcon(%s): stop cmd failed, reply id (%" PRIi32 ")", + server->name, id); + goto out; + } else if (type != RCON_PACKET_RESPONSE) { + error("rcon(%s): stop cmd failed, reply type (%" PRIi32 ")", + server->name, type); + goto out; + } + + verbose("rcon(%s): stop command sent, reply: %s", server->name, msg); + +out: + uring_task_close_fd(task); +} + +static void +rcon_stop_sent(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, rcon_task); + + assert_return(task); + assert_task_alive(DBG_RCON, task); + + if (res != task->tbuf->len) { + error("rcon(%s): sending stop cmd failed, res: %i", + server->name, res); + uring_task_close_fd(task); + return; + } + + debug(DBG_RCON, "rcon(%s): stop cmd sent", server->name); + uring_tbuf_read_until(task, rcon_packet_complete, rcon_stop_reply); +} + +static void +rcon_login_reply(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, rcon_task); + int32_t id; + int32_t type; + const char *msg; + + assert_return(task); + assert_task_alive(DBG_RCON, task); + + if (!rcon_check_reply(task, res, &id, &type, &msg)) + return; + + if (id != 1) { + error("rcon(%s): login failed, reply id (%" PRIi32 ")", + server->name, id); + goto error; + } else if (type == RCON_PACKET_LOGIN_FAIL) { + error("rcon(%s): login failed, incorrect password", + server->name); + goto error; + } else if (type != RCON_PACKET_LOGIN_OK) { + error("rcon(%s): login failed, reply type (%" PRIi32 ")", + server->name, type); + goto error; + } + + debug(DBG_RCON, "rcon(%s): login successful", server->name); + rcon_protocol_create_packet(task->tbuf->buf, sizeof(task->tbuf->buf), + &task->tbuf->len, 2, RCON_PACKET_COMMAND, + "stop"); + uring_tbuf_write(task, rcon_stop_sent); + return; + +error: + uring_task_close_fd(task); +} + +static void +rcon_login_sent(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, rcon_task); + + assert_return(task); + assert_task_alive(DBG_RCON, task); + + if (res != task->tbuf->len) { + error("rcon(%s): sending login failed, res: %i", + server->name, res); + uring_task_close_fd(task); + return; + } + + debug(DBG_RCON, "rcon(%s): login sent", server->name); + uring_tbuf_read_until(task, rcon_packet_complete, rcon_login_reply); +} + +static void +rcon_connected_cb(struct connection *conn, bool connected) +{ + struct server *server = container_of(conn, struct server, rcon_conn); + + assert_return(conn); + assert_task_alive(DBG_RCON, &server->rcon_task); + + if (!connected) { + error("rcon (%s): connection failed", server->name); + return; + } + + rcon_protocol_create_packet(server->rcon_tbuf.buf, + sizeof(server->rcon_tbuf.buf), + &server->rcon_tbuf.len, 1, + RCON_PACKET_LOGIN, + server->rcon_password); + uring_tbuf_write(&server->rcon_task, rcon_login_sent); +} + +static void +rcon_free(struct uring_task *task) +{ + struct server *server = container_of(task, struct server, rcon_task); + + assert_return(task); + + debug(DBG_RCON, "task %p, server %s (%p)", task, server->name, server); +} + +void +rcon_stop(struct server *server) +{ + assert_return(server && !list_empty(&server->rcons) && !empty_str(server->rcon_password)); + assert_task_alive(DBG_RCON, &server->rcon_task); + + connect_any(&server->rcon_task, &server->rcons, &server->rcon_conn, rcon_connected_cb); +} + +void +rcon_refdump(struct server *server) +{ + assert_return(server); + + uring_task_refdump(&server->rcon_task); +} + +void +rcon_delete(struct server *server) +{ + assert_return(server); + + debug(DBG_RCON, "closing fd %i", server->rcon_task.fd); + uring_task_destroy(&server->rcon_task); +} + +void +rcon_init(struct server *server) +{ + assert_return(server); + + uring_task_init(&server->rcon_task, "rcon", &server->task, rcon_free); + uring_task_set_buf(&server->rcon_task, &server->rcon_tbuf); +} + diff --git a/minecproxy/server-rcon.h b/minecproxy/server-rcon.h new file mode 100644 index 0000000..6625f25 --- /dev/null +++ b/minecproxy/server-rcon.h @@ -0,0 +1,12 @@ +#ifndef fooserverrconhfoo +#define fooserverrconhfoo + +void rcon_stop(struct server *server); + +void rcon_refdump(struct server *server); + +void rcon_delete(struct server *server); + +void rcon_init(struct server *server); + +#endif diff --git a/minecproxy/server.c b/minecproxy/server.c new file mode 100644 index 0000000..534ceca --- /dev/null +++ b/minecproxy/server.c @@ -0,0 +1,836 @@ +#define _GNU_SOURCE +#include <stdlib.h> +#include <string.h> +#include <arpa/inet.h> +#include <unistd.h> +#include <inttypes.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <sched.h> +#include <poll.h> +#include <errno.h> +#include <sched.h> + +#include "main.h" +#include "uring.h" +#include "ptimer.h" +#include "server.h" +#include "server-proxy.h" +#include "server-rcon.h" +#include "config-parser.h" +#include "idle.h" +#include "systemd.h" + +static bool +set_property(struct server *server, char **property, const char *value) +{ + assert_return(server && !*property && !empty_str(value), false); + + *property = xstrdup(value); + if (!*property) { + error("strdup: %m"); + return false; + } + + return true; +} + +void +server_refdump(struct server *server) +{ + struct server_local *local; + struct server_proxy *proxy; + + assert_return(server); + + uring_task_refdump(&server->task); + uring_task_refdump(&server->exec_task); + uring_task_refdump(&server->ann_task); + uring_task_refdump(&server->idle_task); + list_for_each_entry(local, &server->locals, list) + local_refdump(local); + list_for_each_entry(proxy, &server->proxys, list) + proxy_refdump(proxy); + rcon_refdump(server); +} + +static void +server_free(struct uring_task *task) +{ + struct server *server = container_of(task, struct server, task); + + assert_return(task); + + debug(DBG_SRV, "freeing server %s (%p)", server->name, server); + list_del(&server->list); + xfree(server->pretty_name); + xfree(server->start_exec); + xfree(server->stop_exec); + xfree(server->systemd_service); + xfree(server->systemd_obj); + xfree(server->rcon_password); + xfree(server->name); + xfree(server); +} + +void +server_delete(struct server *server) +{ + struct server_local *local, *ltmp; + struct server_proxy *proxy, *ptmp; + struct saddr *remote; + struct saddr *rcon; + struct saddr *tmp; + struct dns_async *dns, *dtmp; + + assert_return(server); + + verbose("Removing server %s", server->name); + server->state = SERVER_STATE_DEAD; + + rcon_delete(server); + + list_for_each_entry_safe(local, ltmp, &server->locals, list) + local_delete(local); + + list_for_each_entry_safe(proxy, ptmp, &server->proxys, list) + proxy_delete(proxy); + + list_for_each_entry_safe(rcon, tmp, &server->rcons, list) { + list_del(&rcon->list); + xfree(rcon); + } + + list_for_each_entry_safe(remote, tmp, &server->remotes, list) { + list_del(&remote->list); + xfree(remote); + } + + list_for_each_entry_safe(dns, dtmp, &server->dnslookups, list) + gai_cancel(&dns->gcb); + + uring_task_destroy(&server->idle_task); + uring_poll_cancel(&server->exec_task); + uring_task_put(&server->exec_task); + uring_task_destroy(&server->task); + uring_task_put(&server->ann_task); +} + +void +server_delete_by_name(const char *name) +{ + struct server *server; + + assert_return(!empty_str(name)); + + list_for_each_entry(server, &cfg->servers, list) { + if (streq(server->name, name)) { + server_delete(server); + return; + } + } +} + +static void +server_dump(struct server *server) +{ + struct server_local *local; + struct saddr *remote; + struct saddr *rcon; + + assert_return(server); + + verbose("Server %s:", server->name); + switch (server->type) { + case SERVER_TYPE_ANNOUNCE: + verbose(" * Type: announce"); + break; + case SERVER_TYPE_PROXY: + verbose(" * Type: proxy"); + break; + default: + verbose(" * Type: unknown"); + break; + } + verbose(" * Name: %s", server->pretty_name ? server->pretty_name : "<undefined>"); + verbose(" * Announce port: %" PRIu16, server->announce_port); + + if (!list_empty(&server->locals)) { + verbose(" * Local:"); + list_for_each_entry(local, &server->locals, list) + verbose(" * %s", local->local.addrstr); + } + + if (!list_empty(&server->remotes)) { + verbose(" * Remote:"); + list_for_each_entry(remote, &server->remotes, list) + verbose(" * %s", remote->addrstr); + } + + if (!list_empty(&server->rcons)) { + verbose(" * RCon:"); + list_for_each_entry(rcon, &server->rcons, list) + verbose(" * %s", rcon->addrstr); + } + + verbose(""); +} + +static void +server_exec_free(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_SRV, "called"); +} + +#ifndef P_PIDFD +#define P_PIDFD 3 +#endif + +/* FIXME: update states */ +static void +server_exec_done(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, exec_task); + int r; + siginfo_t info; + + assert_return(task); + assert_task_alive_or(DBG_SRV, task, goto out); + /* Should we leave child processes running? */ + + if (!(res & POLLIN)) { + error("unexpected result: %i", res); + goto out; + } + + r = waitid(P_PIDFD, server->exec_task.fd, &info, WEXITED); + if (r < 0) { + error("waitid: %m"); + goto out; + } + + if (info.si_status == 0) + debug(DBG_SRV, "command successfully executed"); + else + error("command failed: %i", info.si_status); + +out: + uring_task_close_fd(&server->exec_task); +} + +static int +server_exec_child(void *ptr) +{ + const char *cmd = ptr; + + assert_return(ptr, EINVAL); + + execl(cmd, cmd, NULL); + return errno; +} + +#ifndef CLONE_PIDFD +#define CLONE_PIDFD 0x00001000 +#endif + +static bool +server_exec(struct server *server, const char *cmd) +{ + char stack[4096]; /* Beautiful/horrible hack :) */ + int pidfd; + int r; + + assert_return(server && cmd && server->exec_task.fd < 1, false); + + r = clone(server_exec_child, stack + sizeof(stack), + CLONE_VM | CLONE_VFORK | CLONE_PIDFD | SIGCHLD, + (void *)cmd, &pidfd); + if (r < 0) { + error("clone: %m: %i", r); + return false; + } + + uring_task_set_fd(&server->exec_task, pidfd); + uring_poll(&server->exec_task, POLLIN, server_exec_done); + return true; +} + +static bool +server_check_running(struct server *server) +{ + assert_return(server, false); + + /* FIXME: other methods, rcon? */ + if (server->systemd_service) { + verbose("%s: checking if systemd service is running", server->name); + if (systemd_service_running(server)) { + server->state = SERVER_STATE_RUNNING; + return true; + } else { + server->state = SERVER_STATE_STOPPED; + return false; + } + } + + return false; +} + +bool +server_start(struct server *server) +{ + assert_return(server, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + switch (server->start_method) { + + case SERVER_START_METHOD_EXEC: + verbose("Starting server %s via external cmd", server->name); + return server_exec(server, server->start_exec); + + case SERVER_START_METHOD_SYSTEMD: + verbose("Starting server %s via systemd (%s)", + server->name, server->systemd_service); + + if (systemd_service_start(server)) { + server->state = SERVER_STATE_RUNNING; + return true; + } else + return server_check_running(server); + + case SERVER_START_METHOD_UNDEFINED: + default: + break; + } + + return false; +} + +bool +server_stop(struct server *server) +{ + assert_return(server, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + switch (server->stop_method) { + + case SERVER_STOP_METHOD_EXEC: + verbose("Stopping server %s via external cmd", server->name); + return server_exec(server, server->stop_exec); + + case SERVER_STOP_METHOD_SYSTEMD: + verbose("Stopping server %s via systemd (%s)", + server->name, server->systemd_service); + if (systemd_service_stop(server)) { + server->state = SERVER_STATE_STOPPED; + return true; + } else + return server_check_running(server); + + case SERVER_STOP_METHOD_RCON: + verbose("Stopping server %s via rcon", server->name); + rcon_stop(server); + return true; + + case SERVER_STOP_METHOD_UNDEFINED: + default: + break; + } + + return false; +} + +static void +server_idle_free(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_ANN, "called"); +} + +void +server_set_active_players(struct server *server, int count) +{ + assert_return(server); + assert_task_alive(DBG_IDLE, &server->idle_task); + + debug(DBG_IDLE, "%s: currently %i active players", + server->name, count); + + if (count < 0) + return; + + server->state = SERVER_STATE_RUNNING; + if (count > 0) + server->idle_count = 0; + else if (count == 0) + server->idle_count++; + + if (server->idle_count > server->idle_timeout) { + verbose("stopping idle server %s", server->name); + server_stop(server); + } +} + +static void +server_idle_connected_cb(struct connection *conn, bool connected) +{ + struct server *server = container_of(conn, struct server, idle_conn); + + assert_return(conn); + assert_task_alive(DBG_IDLE, &server->idle_task); + + if (!connected) { + debug(DBG_IDLE, + "idle check connection to remote server (%s) failed", + server->name); + server->idle_count = 0; + server->state = SERVER_STATE_STOPPED; + return; + } + + debug(DBG_IDLE, "connected to remote %s\n", conn->remote.addrstr); + idle_check_get_player_count(server, conn); +} + +bool +server_idle_check(struct server *server) +{ + assert_return(server, false); + + if (server->state == SERVER_STATE_INIT || + server->state == SERVER_STATE_DEAD) + return false; + + if (server->idle_timeout < 1) + return false; + + if (list_empty(&server->remotes)) + return false; + + if (!list_empty(&server->proxys)) { + server->idle_count = 0; + return true; + } + + connect_any(&server->idle_task, &server->remotes, + &server->idle_conn, server_idle_connected_cb); + return true; +} + +static void +server_announce_free(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_ANN, "called"); +} + +static void +server_announce_cb(struct uring_task *task, int res) +{ + struct server *server = container_of(task, struct server, ann_task); + + assert_return(task); + + if (res < 0) + error("%s: failure %i", server->name, res); + else if (res == server->ann_buf.len) + debug(DBG_ANN, "%s: ok (%i)", server->name, res); + else + debug(DBG_ANN, "%s: unexpected result: %i", server->name, res); + + uring_task_set_fd(&server->ann_task, -1); +} + +bool +server_announce(struct server *server, int fd) +{ + assert_return(server && fd >= 0, false); + + if (server->state == SERVER_STATE_INIT || + server->state == SERVER_STATE_DEAD) + return false; + + debug(DBG_ANN, "announcing server: %s", server->name); + uring_task_set_fd(&server->ann_task, fd); + uring_tbuf_sendmsg(&server->ann_task, server_announce_cb); + return true; +} + +bool +server_commit(struct server *server) +{ + struct server_local *local; + uint16_t port; + int r; + + assert_return(server && server->name, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + if (server->state != SERVER_STATE_INIT) { + error("called in wrong state"); + return false; + } + + if (!list_empty(&server->proxys)) { + error("%s: proxys not empty?", server->name); + return false; + } + + if (!list_empty(&server->dnslookups)) { + debug(DBG_SRV, "called with pending DNS requests"); + return true; + } + + if (server->stop_method == SERVER_STOP_METHOD_RCON && + list_empty(&server->rcons)) { + error("%s: rcon stop method missing rcon address", + server->name); + return false; + } + + if (server->stop_method == SERVER_STOP_METHOD_RCON && + !server->rcon_password) { + error("%s: rcon stop method missing rcon password", + server->name); + return false; + } + + if ((server->start_method == SERVER_START_METHOD_SYSTEMD || + server->stop_method == SERVER_STOP_METHOD_SYSTEMD) && + !server->systemd_service) { + error("%s: systemd start/stop method missing systemd service", + server->name); + return false; + } + + if (server->systemd_service && !server->systemd_obj) { + server->systemd_obj = systemd_object_path(server->systemd_service); + if (!server->systemd_obj) { + error("%s: failed to create systemd object path (%s)", + server->name, server->systemd_service); + return false; + } + } + + if (server->idle_timeout > 0 && + server->stop_method == SERVER_STOP_METHOD_UNDEFINED) { + error("%s: idle_timeout set but missing stop method", server->name); + return false; + } + + switch (server->type) { + case SERVER_TYPE_ANNOUNCE: + if (server->announce_port < 1) { + error("%s: missing announce port", server->name); + return false; + } + + if (server->start_method != SERVER_START_METHOD_UNDEFINED) { + error("%s: can't set start_method for announce server", server->name); + return false; + } + + if (!list_empty(&server->locals)) { + error("%s: can't set local addresses for announce server", server->name); + return false; + } + + if (!list_empty(&server->remotes)) { + error("%s: can't set remote addresses for announce server", server->name); + return false; + } + + break; + + case SERVER_TYPE_PROXY: + if (server->announce_port >= 1) { + error("%s: can't set announce port for proxy server", server->name); + return false; + } + + if (list_empty(&server->locals)) { + error("%s: missing local addresses for proxy server", server->name); + return false; + } + + if (list_empty(&server->remotes)) { + error("%s: missing remote addresses for proxy server", server->name); + return false; + } + + list_for_each_entry(local, &server->locals, list) { + port = saddr_port(&local->local); + + if (port == 0) { + error("%s: invalid local port", server->name); + return false; + } + + if (server->announce_port < 1) + server->announce_port = port; + + if (server->announce_port != port) { + error("%s: multiple local ports", server->name); + return false; + } + } + + if (server->announce_port < 1) { + error("%s: can't determine which port to announce", server->name); + return false; + } + + break; + + default: + error("%s: can't determine server type", server->name); + return false; + } + + if (!server->pretty_name) { + char *suffix; + + suffix = strrchr(server->name, '.'); + if (!suffix || suffix == server->name) { + error("invalid server name: %s", server->name); + return false; + } + + server->pretty_name = xstrndup(server->name, suffix - server->name); + if (!server->pretty_name) { + error("failed to create display name: %s", server->name); + return false; + } + } + + r = snprintf(server->ann_buf.buf, sizeof(server->ann_buf.buf), + "[MOTD]%s[/MOTD][AD]%" PRIu16 "[/AD]", + server->pretty_name, server->announce_port); + if (r < 1 || r >= sizeof(server->ann_buf.buf)) { + error("%s: unable to create announce msg: %i\n", server->name, r); + return false; + } + server->ann_buf.len = r; + + /* FIXME: config, dont reread config if server running, make sure fd is available before this is called */ + server_dump(server); + + list_for_each_entry(local, &server->locals, list) + local_open(local); + + server->state = SERVER_STATE_CFG_OK; + + server_check_running(server); + + debug(DBG_SRV, "success"); + return true; +} + +bool +server_add_remote(struct server *server, struct saddr *remote) +{ + assert_return(server && remote, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + debug(DBG_SRV, "adding remote: %s", remote->addrstr); + list_add(&remote->list, &server->remotes); + return true; +} + +bool +server_add_local(struct server *server, struct saddr *saddr) +{ + struct server_local *local; + + assert_return(server && saddr, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + local = local_new(server, saddr); + if (!local) + return false; + + list_add(&local->list, &server->locals); + return true; +} + +bool +server_add_rcon(struct server *server, struct saddr *rcon) +{ + assert_return(server && rcon, false); + assert_task_alive_or(DBG_SRV, &server->task, return false); + + debug(DBG_SRV, "adding rcon: %s", rcon->addrstr); + list_add(&rcon->list, &server->rcons); + return true; +} + +bool +server_set_rcon_password(struct server *server, const char *password) +{ + assert_return(server && !empty_str(password), false); + + return set_property(server, &server->rcon_password, password); +} + +bool +server_set_systemd_service(struct server *server, const char *service) +{ + const char *suffix; + char *tmp; + + assert_return(server && !empty_str(service) && !server->systemd_service, false); + + suffix = strrchr(service, '.'); + if (!suffix || !streq(suffix, ".service")) { + tmp = zmalloc(strlen(service) + strlen(".service") + 1); + if (tmp) + sprintf(tmp, "%s.service", service); + } else + tmp = xstrdup(service); + + if (!tmp) { + error("malloc/strdup: %m"); + return false; + } + + server->systemd_service = tmp; + return true; +} + +bool +server_set_stop_method(struct server *server, + enum server_stop_method stop_method) +{ + assert_return(server->stop_method == SERVER_STOP_METHOD_UNDEFINED && + stop_method != SERVER_STOP_METHOD_UNDEFINED, false); + + server->stop_method = stop_method; + return true; +} + +bool +server_set_start_method(struct server *server, + enum server_start_method start_method) +{ + assert_return(server->start_method == SERVER_START_METHOD_UNDEFINED && + start_method != SERVER_START_METHOD_UNDEFINED, false); + + server->start_method = start_method; + return true; +} + +bool +server_set_stop_exec(struct server *server, const char *cmd) +{ + assert_return(server && !empty_str(cmd), false); + + return set_property(server, &server->stop_exec, cmd); +} + +bool +server_set_start_exec(struct server *server, const char *cmd) +{ + assert_return(server && !empty_str(cmd), false); + + return set_property(server, &server->start_exec, cmd); +} + +bool +server_set_idle_timeout(struct server *server, uint16_t timeout) +{ + assert_return(server && timeout > 0 && server->idle_timeout == 0, false); + + server->idle_timeout = timeout; + return true; +} + +bool +server_set_port(struct server *server, uint16_t port) +{ + assert_return(server && port > 0 && server->announce_port == 0, false); + + server->announce_port = htons(port); + return true; +} + +bool +server_set_type(struct server *server, enum server_type type) +{ + assert_return(server && type != SERVER_TYPE_UNDEFINED, false); + + switch (type) { + case SERVER_TYPE_ANNOUNCE: + server->type = SERVER_TYPE_ANNOUNCE; + break; + case SERVER_TYPE_PROXY: + server->type = SERVER_TYPE_PROXY; + break; + default: + return false; + } + + return true; +} + +bool +server_set_pretty_name(struct server *server, const char *pretty_name) +{ + assert_return(server && !empty_str(pretty_name), false); + + return set_property(server, &server->pretty_name, pretty_name); +} + +struct server * +server_new(const char *name) +{ + struct server *server; + + assert_return(!empty_str(name), NULL); + + list_for_each_entry(server, &cfg->servers, list) { + if (!streq(name, server->name)) + continue; + error("attempt to add duplicate server: %s", name); + return server; + } + + verbose("Adding server %s", name); + server = zmalloc(sizeof(*server)); + if (!server) { + error("malloc: %m"); + return NULL; + } + + server->state = SERVER_STATE_INIT; + server->type = SERVER_TYPE_UNDEFINED; + server->name = xstrdup(name); + server->stop_method = SERVER_STOP_METHOD_UNDEFINED; + server->start_method = SERVER_START_METHOD_UNDEFINED; + server->idle_timeout = 0; + + uring_task_init(&server->task, server->name, uring_parent(), server_free); + uring_task_set_buf(&server->task, &server->tbuf); + + uring_task_init(&server->ann_task, "announce", &server->task, server_announce_free); + uring_task_set_buf(&server->ann_task, &server->ann_buf); + saddr_set_ipv4(&server->ann_task.saddr, cinet_addr(224,0,2,60), htons(4445)); + + uring_task_init(&server->exec_task, "exec", &server->task, server_exec_free); + + uring_task_init(&server->idle_task, "idle", &server->task, server_idle_free); + uring_task_set_buf(&server->idle_task, &server->idle_buf); + + rcon_init(server); + + list_init(&server->remotes); + list_init(&server->locals); + list_init(&server->proxys); + list_init(&server->rcons); + list_init(&server->dnslookups); + list_add(&server->list, &cfg->servers); + + return server; +} diff --git a/minecproxy/server.h b/minecproxy/server.h new file mode 100644 index 0000000..ff4c28e --- /dev/null +++ b/minecproxy/server.h @@ -0,0 +1,128 @@ +#ifndef fooserverhfoo +#define fooserverhfoo + +enum server_state { + SERVER_STATE_INIT = 0, + SERVER_STATE_CFG_OK = 1, + SERVER_STATE_RUNNING = 2, + SERVER_STATE_STOPPED = 3, + SERVER_STATE_DEAD = 4, +}; + +enum server_type { + SERVER_TYPE_UNDEFINED, + SERVER_TYPE_ANNOUNCE, + SERVER_TYPE_PROXY, +}; + +enum server_stop_method { + SERVER_STOP_METHOD_UNDEFINED, + SERVER_STOP_METHOD_RCON, + SERVER_STOP_METHOD_SYSTEMD, + SERVER_STOP_METHOD_EXEC, +}; + +enum server_start_method { + SERVER_START_METHOD_UNDEFINED, + SERVER_START_METHOD_SYSTEMD, + SERVER_START_METHOD_EXEC, +}; + +struct server { + enum server_type type; + char *name; + char *pretty_name; + uint16_t announce_port; + struct list_head locals; + struct list_head remotes; + struct list_head proxys; + struct list_head rcons; + struct list_head dnslookups; + enum server_state state; + + enum server_stop_method stop_method; + enum server_start_method start_method; + + /* For calling external start/stop executables */ + char *stop_exec; + char *start_exec; + struct uring_task exec_task; + + /* For systemd services */ + char *systemd_service; + char *systemd_obj; + + /* For rcon connections */ + char *rcon_password; + struct connection rcon_conn; + struct uring_task rcon_task; + struct uring_task_buf rcon_tbuf; + + /* For announce messages */ + struct uring_task ann_task; + struct uring_task_buf ann_buf; + + /* For checking idle status */ + struct uring_task idle_task; + struct connection idle_conn; + struct uring_task_buf idle_buf; + unsigned idle_timeout; + unsigned idle_count; + + /* For reading config files */ + struct uring_task task; + struct uring_task_buf tbuf; + + struct list_head list; +}; + +void server_refdump(struct server *server); + +void server_delete(struct server *server); + +void server_delete_by_name(const char *name); + +bool server_start(struct server *server); + +bool server_stop(struct server *server); + +void server_set_active_players(struct server *server, int count); + +bool server_idle_check(struct server *server); + +bool server_announce(struct server *server, int fd); + +bool server_commit(struct server *server); + +bool server_add_remote(struct server *server, struct saddr *remote); + +bool server_add_local(struct server *server, struct saddr *saddr); + +bool server_add_rcon(struct server *server, struct saddr *rcon); + +bool server_set_rcon_password(struct server *server, const char *password); + +bool server_set_systemd_service(struct server *server, const char *service); + +bool server_set_stop_method(struct server *server, + enum server_stop_method stop_method); + +bool server_set_start_method(struct server *server, + enum server_start_method start_method); + +bool server_set_stop_exec(struct server *server, const char *cmd); + +bool server_set_start_exec(struct server *server, const char *cmd); + +bool server_set_idle_timeout(struct server *server, uint16_t timeout); + +bool server_set_port(struct server *server, uint16_t port); + +bool server_set_type(struct server *server, enum server_type type); + +bool server_set_pretty_name(struct server *server, const char *pretty_name); + +struct server *server_new(const char *name); + +#endif + diff --git a/minecproxy/signal-handler.c b/minecproxy/signal-handler.c new file mode 100644 index 0000000..67c2e0b --- /dev/null +++ b/minecproxy/signal-handler.c @@ -0,0 +1,195 @@ +#define _GNU_SOURCE +#include <sys/signalfd.h> +#include <signal.h> +#include <fcntl.h> +#include <unistd.h> +#include <systemd/sd-daemon.h> + +#include "main.h" +#include "signal-handler.h" +#include "uring.h" +#include "server.h" +#include "server-config.h" +#include "config-parser.h" +#include "igmp.h" +#include "announce.h" +#include "idle.h" +#include "ptimer.h" + +struct signal_ev { + struct uring_task task; + struct uring_task_buf tbuf; + int pipe[2]; +}; + +static void +signal_delete() +{ + assert_return(cfg->signal); + + uring_task_destroy(&cfg->signal->task); +} + +static void +signalfd_read(struct uring_task *task, int res) +{ + struct signal_ev *signal = container_of(task, struct signal_ev, task); + struct server *server, *stmp; + static unsigned count = 0; + siginfo_t *si; + + assert_return(task); + assert_task_alive(DBG_SIG, task); + + si = (siginfo_t *)task->tbuf->buf; + if (res != sizeof(*si)) + die("error in signalfd (%i)", res); + + switch (si->si_signo) { + case SIGUSR1: { + struct dns_async *dns; + + debug(DBG_SIG, "Got a SIGUSR1"); + if (si->si_code != SI_ASYNCNL || !si->si_ptr) { + error("SIGUSR1: unexpected values in siginfo"); + goto out; + } + + dns = si->si_ptr; + if (!dns->cb) { + error("DNS callback not set"); + goto out; + } + + debug(DBG_DNS, "DNS lookup complete, dns: %p, dns->cb: %p", + dns, dns->cb); + dns->cb(dns); + break; + } + + case SIGUSR2: + debug(DBG_SIG, "got a SIGUSR2"); + dump_tree(); + break; + + case SIGTERM: + debug(DBG_SIG, "Got a SIGINT/SIGHUP"); + verbose("got a signal to quit"); + sd_notifyf(0, "STOPPING=1\nSTATUS=Received signal, exiting"); + exit(EXIT_SUCCESS); + break; + + case SIGINT: + case SIGHUP: + count++; + if (count > 5) { + dump_tree(); + exit(EXIT_FAILURE); + } + + verbose("got a signal to dump tree"); + sd_notifyf(0, "STOPPING=1\nSTATUS=Received signal, exiting"); + dump_tree(); + signal_delete(); + ptimer_delete(); + igmp_delete(); + announce_delete(); + idle_delete(); + server_cfg_monitor_delete(); + list_for_each_entry_safe(server, stmp, &cfg->servers, list) + server_delete(server); + uring_delete(); + return; + + default: + error("got an unknown signal: %i", si->si_signo); + break; + } + +out: + uring_tbuf_read(&signal->task, signalfd_read); +} + +static void +hack_signal_handler(int signum, siginfo_t *si, void *ucontext) +{ + ssize_t r; + + assert_return(signum > 0 && si); + + r = write(cfg->signal->pipe[PIPE_WR], si, sizeof(*si)); + if (r != sizeof(*si)) + error("write: %zi\n", r); + +} + +static void +signal_free(struct uring_task *task) +{ + struct signal_ev *signal = container_of(task, struct signal_ev, task); + + assert_return(task); + + debug(DBG_SIG, "called"); + close(signal->pipe[PIPE_WR]); + cfg->signal = NULL; + xfree(signal); +} + +void +signal_refdump() +{ + assert_return(cfg->signal); + + uring_task_refdump(&cfg->signal->task); +} + +void +signal_init() +{ + //sigset_t mask; + struct signal_ev *signal; + + assert_return(!cfg->signal); + + signal = zmalloc(sizeof(*signal)); + if (!signal) + die("malloc: %m"); + + if (pipe2(signal->pipe, O_CLOEXEC) < 0) + die("pipe2: %m"); + + /* + sigfillset(&mask); + if (sigprocmask(SIG_BLOCK, &mask, NULL) < 0) + die("sigprocmask: %m"); + + sfd = signalfd(-1, &mask, SFD_CLOEXEC); + if (sfd < 0) + die("signalfd: %m"); + */ + + struct sigaction action; + sigfillset(&action.sa_mask); + action.sa_sigaction = hack_signal_handler; + action.sa_flags = SA_SIGINFO; + + sigaction(SIGINT, &action, NULL); + sigaction(SIGHUP, &action, NULL); + sigaction(SIGTERM, &action, NULL); + sigaction(SIGUSR1, &action, NULL); + sigaction(SIGUSR2, &action, NULL); + + action.sa_handler = SIG_IGN; + action.sa_flags = 0; + sigaction(SIGPIPE, &action, NULL); + + debug(DBG_SIG, "using pipe fds %i -> %i", + signal->pipe[PIPE_WR], signal->pipe[PIPE_RD]); + uring_task_init(&signal->task, "signal", uring_parent(), signal_free); + uring_task_set_fd(&signal->task, signal->pipe[PIPE_RD]); + uring_task_set_buf(&signal->task, &signal->tbuf); + cfg->signal = signal; + uring_tbuf_read(&signal->task, signalfd_read); +} + diff --git a/minecproxy/signal-handler.h b/minecproxy/signal-handler.h new file mode 100644 index 0000000..e0140b3 --- /dev/null +++ b/minecproxy/signal-handler.h @@ -0,0 +1,8 @@ +#ifndef foosignalhfoo +#define foosignalhfoo + +void signal_refdump(); + +void signal_init(); + +#endif diff --git a/minecproxy/systemd.c b/minecproxy/systemd.c new file mode 100644 index 0000000..a44b0d8 --- /dev/null +++ b/minecproxy/systemd.c @@ -0,0 +1,219 @@ +#include <string.h> +#include <stdlib.h> +#include <systemd/sd-bus.h> + +#include "main.h" +#include "server.h" +#include "systemd.h" + +#define SYSTEMD_DBUS_SERVICE "org.freedesktop.systemd1" +#define SYSTEMD_DBUS_INTERFACE "org.freedesktop.systemd1.Unit" +#define SYSTEMD_DBUS_PATH_PREFIX "/org/freedesktop/systemd1/unit/" + +static inline char +tohex(uint8_t val) +{ + static const char hex[] = "0123456789abcdef"; + + return hex[val & 0x0f]; +} + +/* + * Creates an escaped D-Bus object path for a given systemd service + * + * Escaping rules are documented here: + * https://dbus.freedesktop.org/doc/dbus-specification.html + * + * Essentially, everyting but a-z, A-Z, 0-9 is replaced by _xx where xx is + * the hexadecimal value of the character. + * + * Example: minecraft@world1.service -> minecraft_40world1_2eservice + */ +char * +systemd_object_path(const char *service) +{ + char *r; + char *d; + const char *s; + + assert_return(service && !empty_str(service), NULL); + + r = zmalloc(strlen(SYSTEMD_DBUS_PATH_PREFIX) + strlen(service) * 3 + 1); + if (!r) + return NULL; + + memcpy(r, SYSTEMD_DBUS_PATH_PREFIX, strlen(SYSTEMD_DBUS_PATH_PREFIX)); + d = r + strlen(SYSTEMD_DBUS_PATH_PREFIX); + + for (s = service; *s; s++) { + if ((*s >= 'a' && *s <= 'z') || + (*s >= 'A' && *s <= 'Z') || + (*s >= '0' && *s <= '9')) { + *(d++) = *s; + continue; + } + + *(d++) = '_'; + *(d++) = tohex(*s >> 4); + *(d++) = tohex(*s); + } + + *d = '\0'; + return r; +} + +void +systemd_delete() +{ + assert_return_silent(cfg->sd_bus); + + sd_bus_unref(cfg->sd_bus); + cfg->sd_bus = NULL; +} + +static sd_bus * +get_bus() +{ + int r; + + if (cfg->sd_bus_failed) + return NULL; + + if (!cfg->sd_bus) { + r = sd_bus_open_user(&cfg->sd_bus); + if (r < 0) { + error("failed to connect to user system bus: %s", strerror(-r)); + cfg->sd_bus_failed = true; + return NULL; + } + } + + return cfg->sd_bus; +} + +/* + * Check if a systemd service is running. + * + * This is equivalent to (assuming service minecraft@world1): + * gdbus call --session + * --dest org.freedesktop.systemd1 + * --object-path /org/freedesktop/systemd1/unit/minecraft_40world1_2eservice + * --method org.freedesktop.DBus.Properties.Get + * "org.freedesktop.systemd1.Unit" + * "ActiveState" + */ +bool +systemd_service_running(struct server *server) +{ + sd_bus *bus = get_bus(); + sd_bus_error error = SD_BUS_ERROR_NULL; + char *status = NULL; + bool running = false; + int r; + + assert_return(server && bus && server->systemd_service && server->systemd_obj, false); + + r = sd_bus_get_property_string(bus, + SYSTEMD_DBUS_SERVICE, + server->systemd_obj, + SYSTEMD_DBUS_INTERFACE, + "ActiveState", + &error, + &status); + if (r < 0) { + error("failed to get status for service %s (%s): %s", + server->systemd_service, server->systemd_obj, error.message); + goto out; + } + + if (streq(status, "active")) { + running = true; + debug(DBG_SYSD, "systemd service %s (%s) is active", + server->systemd_service, server->systemd_obj); + } else + debug(DBG_SYSD, "systemd service %s (%s) is not active", + server->systemd_service, server->systemd_obj); + +out: + free(status); + sd_bus_error_free(&error); + return running; +} + +static bool +systemd_service_action(struct server *server, const char *action) +{ + sd_bus_error error = SD_BUS_ERROR_NULL; + sd_bus_message *m = NULL; + sd_bus *bus = get_bus(); + const char *path; + bool performed = false; + int r; + + assert_return(server && bus && server->systemd_service && server->systemd_obj && action, false); + + r = sd_bus_call_method(bus, + SYSTEMD_DBUS_SERVICE, + server->systemd_obj, + SYSTEMD_DBUS_INTERFACE, + action, + &error, + &m, + "s", + "fail"); + if (r < 0) { + error("failed to perform action %s on systemd service %s: %s", + action, server->systemd_service, error.message); + goto out; + } + + r = sd_bus_message_read(m, "o", &path); + if (r < 0) { + error("failed to parse response message: %s", strerror(-r)); + goto out; + } + + verbose("action %s queued for service %s", + action, server->systemd_service); + performed = true; + +out: + sd_bus_error_free(&error); + sd_bus_message_unref(m); + return performed; +} + +/* + * Stop systemd service. + * + * This is equivalent to (assuming service minecraft@world1): + * gdbus call --session + * --dest org.freedesktop.systemd1 + * --object-path /org/freedesktop/systemd1/unit/minecraft_40world1_2eservice + * --method org.freedesktop.systemd1.Unit.Stop "fail" + */ +bool +systemd_service_stop(struct server *server) +{ + assert_return(server, false); + + return systemd_service_action(server, "Stop"); +} + +/* + * Start systemd service. + * + * This is equivalent to (assuming service minecraft@world1): + * gdbus call --session + * --dest org.freedesktop.systemd1 + * --object-path /org/freedesktop/systemd1/unit/minecraft_40world1_2eservice + * --method org.freedesktop.systemd1.Unit.Start "fail" + */ +bool +systemd_service_start(struct server *server) +{ + assert_return(server, false); + + return systemd_service_action(server, "Start"); +} + diff --git a/minecproxy/systemd.h b/minecproxy/systemd.h new file mode 100644 index 0000000..d455044 --- /dev/null +++ b/minecproxy/systemd.h @@ -0,0 +1,14 @@ +#ifndef foosystemdhfoo +#define foosystemdhfoo + +char *systemd_object_path(const char *service); + +void systemd_delete(); + +bool systemd_service_running(struct server *server); + +bool systemd_service_stop(struct server *server); + +bool systemd_service_start(struct server *server); + +#endif diff --git a/minecproxy/uring.c b/minecproxy/uring.c new file mode 100644 index 0000000..e979471 --- /dev/null +++ b/minecproxy/uring.c @@ -0,0 +1,759 @@ +#define _GNU_SOURCE +#include <liburing.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "main.h" +#include "uring.h" + +struct uring_ev { + struct io_uring uring; + struct io_uring_params uring_params; + struct uring_task task; + + /* for testing if the kernel supports splice */ + int pipe[2]; + int tfd; +}; + +enum cqe_type { + CQE_TYPE_NORMAL = 0x0, + CQE_TYPE_CANCEL = 0x1, + CQE_TYPE_CLOSE = 0x2, + CQE_TYPE_POLL_CANCEL = 0x3 +}; + +#define CQE_TYPE_PTR_MASK 0x3 + +uint64_t sqe_count = 0; +uint64_t cqe_count = 0; + +static struct io_uring_sqe * +get_sqe(struct uring_task *task) +{ + struct io_uring_sqe *sqe; + + assert_die(task, "invalid arguments"); + + sqe = io_uring_get_sqe(&cfg->uring->uring); + if (!sqe) { + io_uring_submit(&cfg->uring->uring); + sqe = io_uring_get_sqe(&cfg->uring->uring); + if (!sqe) + die("failed to get an sqe!"); + } + + sqe_count++; + uring_task_get(task); + return sqe; +} + +void +uring_task_refdump(struct uring_task *task) +{ + char buf[4096]; + struct uring_task *tmp; + + assert_return(task); + + if (!debug_enabled(DBG_REF)) + return; + + buf[0] = '\0'; + for (tmp = task; tmp; tmp = tmp->parent) { + size_t prefix; + char *dst; + + if (tmp->parent) + prefix = strlen("->") + strlen(tmp->name); + else + prefix = strlen(tmp->name); + + memmove(&buf[prefix], &buf[0], strlen(buf) + 1); + + dst = &buf[0]; + if (tmp->parent) { + *dst++ = '-'; + *dst++ = '>'; + } + + memcpy(dst, tmp->name, strlen(tmp->name)); + } + + info("%s (0x%p parent 0x%p free 0x%p fd %i ref %u)", + buf, task, task->parent, task->free, task->fd, + task->refcount); +} + +/* + * Similar to uring_task_put, but can be called from other tasks + * while the task is active. + */ +void +uring_task_destroy(struct uring_task *task) +{ + assert_return(task); + assert_return_silent(!task->dead); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (task->fd >= 0) { + struct io_uring_sqe *sqe; + + sqe = get_sqe(task); + io_uring_prep_cancel(sqe, task, 0); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CANCEL)); + } + + task->dead = true; + uring_task_put(task); +} + +void +uring_task_put(struct uring_task *task) +{ + struct uring_task *parent; + + assert_return(task); + + debug(DBG_REF, "task %s (%p), refcount %u -> %u", + task->name, task, task->refcount, task->refcount - 1); + + task->refcount--; + + if (task->refcount > 0) + return; + + if (task->refcount < 0) + error("Negative refcount!"); + + if (task->fd >= 0) { + uring_task_close_fd(task); + /* We'll be called again once the fd is closed */ + return; + } + + parent = task->parent; + if (task->free) + task->free(task); + + if (parent) { + debug(DBG_REF, "putting parent %s (%p)", parent->name, parent); + uring_task_put(parent); + } +} + +void +uring_task_get(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_REF, "task %s (%p), refcount %u -> %u", + task->name, task, task->refcount, task->refcount + 1); + + if (task->refcount < 0) + error("Negative refcount!"); + + task->refcount++; +} + +void +uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) +{ + assert_return(task && tbuf); + + debug(DBG_UR, "task %s (%p), buf %p, refcount %u", + task->name, task, tbuf, task->refcount); + + /* iov_len and msg_namelen are set at send/receive time */ + tbuf->iov.iov_base = tbuf->buf; + tbuf->msg.msg_name = &task->saddr.storage; + tbuf->msg.msg_iov = &tbuf->iov; + tbuf->msg.msg_iovlen = 1; + tbuf->msg.msg_control = NULL; + tbuf->msg.msg_controllen = 0; + tbuf->msg.msg_flags = 0; + task->tbuf = tbuf; +} + +void +uring_task_set_fd(struct uring_task *task, int fd) +{ + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, fd, task->refcount); + + task->fd = fd; +} + +void +uring_task_close_fd(struct uring_task *task) +{ + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (task->fd < 0) + return; + + uring_close(task, task->fd); + task->fd = -1; +} + +struct uring_task * +uring_parent() +{ + assert_die(cfg->uring, "invalid arguments"); + + return &cfg->uring->task; +} + +void +uring_task_init(struct uring_task *task, const char *name, + struct uring_task *parent, void (*free)(struct uring_task *)) +{ + static bool first = true; + + assert_die(task && !empty_str(name) && free, "invalid arguments"); + + if (first) + first = false; + else + assert_die(parent, "called without a parent task"); + + task->refcount = 1; + task->fd = -1; + task->parent = parent; + task->free = free; + task->dead = false; + task->name = name; + task->tbuf = NULL; + + if (task->parent) { + debug(DBG_REF, "task %s (%p), refcount %u, " + "getting parent %s (%p), refcount %u", + task->name, task, task->refcount, + task->parent->name, task->parent, task->parent->refcount); + uring_task_get(task->parent); + } +} + +void +uring_close(struct uring_task *task, int fd) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + io_uring_prep_close(sqe, fd); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_CLOSE)); +} + +static void +uring_tbuf_write_cb(struct uring_task *task, int res) +{ + int r; + + assert_return(task && task->tbuf && task->final_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (res < 0) { + r = res; + goto finished; + } + + /* We wrote some more data */ + task->tbuf->done += res; + if (task->tbuf->done >= task->tbuf->len || res == 0) { + r = task->tbuf->len; + goto finished; + } + + uring_write(task, task->tbuf->buf + task->tbuf->done, + task->tbuf->len - task->tbuf->done, + uring_tbuf_write_cb); + return; + +finished: + task->final_cb(task, r); +} + +void +uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb) +{ + assert_return(task && task->fd >= 0 && task->tbuf && task->tbuf->len > 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + task->tbuf->done = 0; + task->final_cb = final_cb; + uring_write(task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); +} + +void +uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && buf && len > 0 && cb && task->fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_write(sqe, task->fd, buf, len, 0); + io_uring_sqe_set_data(sqe, task); +} + +static void +uring_tbuf_read_until_cb(struct uring_task *task, int res) +{ + int r; + + assert_return(task && task->tbuf && task->final_cb && task->is_complete_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + if (res < 0) { + r = res; + goto finished; + } + + task->tbuf->len += res; + r = task->is_complete_cb(task, res); + if (r < 0) { + r = res; + goto finished; + } else if (r > 0) { + r = task->tbuf->len; + goto finished; + } + + /* Need to read more */ + if (task->tbuf->len >= sizeof(task->tbuf->buf)) { + r = E2BIG; + goto finished; + } + + uring_read_offset(task, task->tbuf->buf + task->tbuf->len, + sizeof(task->tbuf->buf) - task->tbuf->len, + task->tbuf->len, uring_tbuf_read_until_cb); + return; + +finished: + task->final_cb(task, r); +} + +void +uring_tbuf_read_until(struct uring_task *task, + rutask_cb_t is_complete_cb, utask_cb_t final_cb) +{ + assert_return(task && task->fd >= 0 && task->tbuf && is_complete_cb && final_cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + task->tbuf->len = 0; + task->is_complete_cb = is_complete_cb; + task->final_cb = final_cb; + uring_read(task, &task->tbuf->buf, sizeof(task->tbuf->buf), + uring_tbuf_read_until_cb); +} + +static int +uring_tbuf_eof(struct uring_task *task, int res) +{ + assert_return(task && task->tbuf, -EINVAL); + assert_task_alive_or(DBG_UR, task, return -EINTR); + + if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) + return -E2BIG; + + if (res > 0) + return 0; + + task->tbuf->buf[task->tbuf->len] = '\0'; + task->tbuf->len++; + return 1; +} + +void +uring_tbuf_read_until_eof(struct uring_task *task, + utask_cb_t final_cb) +{ + assert_return(task && task->tbuf && final_cb); + + uring_tbuf_read_until(task, uring_tbuf_eof, final_cb); +} + +static int +uring_tbuf_have_data(struct uring_task *task, int res) +{ + assert_return(task, -EINVAL); + + if (res < 0) + return res; + else + return 1; +} + +void +uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb) +{ + assert_return(task && final_cb); + + uring_tbuf_read_until(task, uring_tbuf_have_data, final_cb); +} + +void +uring_read_offset(struct uring_task *task, void *buf, size_t len, off_t offset, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && buf && len > 0 && task->fd >= 0); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_read(sqe, task->fd, buf, len, offset); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_openat(struct uring_task *task, const char *path, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && !empty_str(path) && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_openat(sqe, AT_FDCWD, path, O_RDONLY | O_CLOEXEC, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && task->tbuf && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->tbuf->done = 0; + task->tbuf->len = 0; + task->tbuf->iov.iov_len = sizeof(task->tbuf->buf); + task->tbuf->msg.msg_namelen = task->saddr.addrlen; + task->cb = cb; + io_uring_prep_recvmsg(sqe, task->fd, &task->tbuf->msg, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && task->tbuf && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->tbuf->done = 0; + task->tbuf->iov.iov_len = task->tbuf->len; + task->tbuf->msg.msg_namelen = task->saddr.addrlen; + task->cb = cb; + io_uring_prep_sendmsg(sqe, task->fd, &task->tbuf->msg, 0); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && saddr && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_connect(sqe, task->fd, (struct sockaddr *)&saddr->storage, saddr->addrlen); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && saddr && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + saddr->addrlen = sizeof(saddr->storage); + task->cb = cb; + io_uring_prep_accept(sqe, task->fd, (struct sockaddr *)&saddr->storage, &saddr->addrlen, SOCK_CLOEXEC); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && fd_in >= 0 && fd_out >= 0 && cb); + + debug(DBG_UR, "task %s (%p), fd_in %i, fd_out %i, refcount %u", + task->name, task, fd_in, fd_out, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_splice(sqe, fd_in, -1, fd_out, -1, 4096, SPLICE_F_MOVE); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb) +{ + struct io_uring_sqe *sqe; + + assert_return(task && task->fd >= 0 && poll_mask && cb); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->cb = cb; + io_uring_prep_poll_add(sqe, task->fd, poll_mask); + io_uring_sqe_set_data(sqe, task); +} + +void +uring_poll_cancel(struct uring_task *task) +{ + struct io_uring_sqe *sqe; + + assert_return(task); + + if (task->fd < 0) { + /* not an error, no need to print error msg */ + return; + } + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + sqe = get_sqe(task); + task->dead = true; + io_uring_prep_poll_remove(sqe, task); + io_uring_sqe_set_data(sqe, (void *)((uintptr_t)task | CQE_TYPE_POLL_CANCEL)); +} + +static void +uring_free(struct uring_task *task) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_return(task); + + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + io_uring_queue_exit(&uring->uring); + cfg->uring = NULL; + xfree(uring); +} + +void +uring_refdump() +{ + assert_return(cfg->uring); + + uring_task_refdump(&cfg->uring->task); +} + +void +uring_delete() +{ + struct uring_task *task; + + assert_return(cfg->uring); + + task = &cfg->uring->task; + debug(DBG_UR, "task %s (%p), fd %i, refcount %u", + task->name, task, task->fd, task->refcount); + + uring_task_put(task); +} + +static void +uring_splice_test_cb(struct uring_task *task, int res) +{ + struct uring_ev *uring = container_of(task, struct uring_ev, task); + + assert_die(task && uring == cfg->uring, "splice test failed"); + + uring_close(task, uring->tfd); + uring_close(task, uring->pipe[PIPE_RD]); + uring_close(task, uring->pipe[PIPE_WR]); + + uring->tfd = -1; + uring->pipe[PIPE_RD] = -1; + uring->pipe[PIPE_WR] = -1; + + if (res >= 0) { + cfg->splice_supported = true; + debug(DBG_UR, "splice supported"); + } else if (res == -EINVAL) + debug(DBG_UR, "splice not supported"); + else + error("splice check failed: %i\n", res); +} + +void +uring_init() +{ + struct uring_ev *uring; + + assert_return(!cfg->uring); + + uring = zmalloc(sizeof(*uring)); + if (!uring) + die("malloc: %m"); + + if (io_uring_queue_init_params(4096, &uring->uring, &uring->uring_params) < 0) + die("io_uring_queue_init_params"); + + debug(DBG_UR, "uring initialized, features: 0x%08x", + uring->uring_params.features); + + uring_task_init(&uring->task, "io_uring", &cfg->task, uring_free); + cfg->uring = uring; + + /* splice check, a bit convoluted, but seems to be no simpler way */ + cfg->splice_supported = false; + if (pipe2(uring->pipe, O_CLOEXEC) < 0) + die("pipe2: %m"); + uring->tfd = open("/dev/null", O_RDONLY | O_CLOEXEC | O_NOCTTY); + if (uring->tfd < 0) + die("open(\"/dev/null\"): %m"); + uring_splice(&uring->task, uring->tfd, uring->pipe[PIPE_WR], uring_splice_test_cb); +} + +static inline void +uring_print_cqe(const char *type, struct uring_task *task, + struct io_uring_cqe *cqe) +{ + assert_return(!empty_str(type) && task && cqe); + + debug(DBG_UR, "got CQE " + "(type: %s, res: %i (%s), task: %s (%p), fd: %i, cb: %p)", + type, + cqe->res, + cqe->res < 0 ? strerror(-cqe->res) : "ok", + task->name ? task->name : "<none>", + task, + task->fd, + task->cb); +} + +void +uring_event_loop() +{ + while (true) { + struct io_uring_cqe *cqe; + unsigned nr, head; + int r; + + io_uring_submit(&cfg->uring->uring); + r = io_uring_wait_cqe(&cfg->uring->uring, &cqe); + if (r < 0) { + if (errno == EINTR) + continue; + else + die("io_uring_wait_cqe: %i", r); + } + + nr = 0; + io_uring_for_each_cqe(&cfg->uring->uring, head, cqe) { + struct uring_task *task = io_uring_cqe_get_data(cqe); + bool do_cb; + enum cqe_type cqe_type; + + cqe_count++; + + cqe_type = ((uintptr_t)task & CQE_TYPE_PTR_MASK); + task = (void *)((uintptr_t)task & ~CQE_TYPE_PTR_MASK); + + if (!task) + die("null task"); + + switch (cqe_type) { + case CQE_TYPE_CANCEL: + uring_print_cqe("cancel", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_CLOSE: + uring_print_cqe("close", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_POLL_CANCEL: + uring_print_cqe("poll_cancel", task, cqe); + do_cb = false; + break; + + case CQE_TYPE_NORMAL: + uring_print_cqe("standard", task, cqe); + do_cb = true; + break; + + default: + die("unknown CQE type"); + } + + if (do_cb && task->cb) + task->cb(task, cqe->res); + + uring_task_put(task); + + if (exiting) + return; + + nr++; + } + + io_uring_cq_advance(&cfg->uring->uring, nr); + } +} + diff --git a/minecproxy/uring.h b/minecproxy/uring.h new file mode 100644 index 0000000..9c33104 --- /dev/null +++ b/minecproxy/uring.h @@ -0,0 +1,73 @@ +#ifndef foouringhfoo +#define foouringhfoo + +extern uint64_t sqe_count; +extern uint64_t cqe_count; + +void uring_task_refdump(struct uring_task *task); + +void uring_task_destroy(struct uring_task *task); + +void uring_task_put(struct uring_task *task); + +void uring_task_get(struct uring_task *task); + +void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf); + +void uring_task_set_fd(struct uring_task *task, int fd); + +void uring_task_close_fd(struct uring_task *task); + +struct uring_task *uring_parent(); + +void uring_task_init(struct uring_task *task, const char *name, + struct uring_task *parent, + void (*free)(struct uring_task *)); + +void uring_close(struct uring_task *task, int fd); + +void uring_tbuf_write(struct uring_task *task, utask_cb_t final_cb); + +void uring_write(struct uring_task *task, void *buf, size_t len, utask_cb_t cb); + +void uring_tbuf_read_until(struct uring_task *task, + rutask_cb_t is_complete_cb, utask_cb_t final_cb); + +void uring_tbuf_read_until_eof(struct uring_task *task, utask_cb_t final_cb); + +void uring_tbuf_read(struct uring_task *task, utask_cb_t final_cb); + +void uring_read_offset(struct uring_task *task, void *buf, + size_t len, off_t offset, utask_cb_t cb); + +static inline void +uring_read(struct uring_task *task, void *buf, size_t len, utask_cb_t cb) +{ + uring_read_offset(task, buf, len, 0, cb); +} + +void uring_openat(struct uring_task *task, const char *path, utask_cb_t cb); + +void uring_tbuf_recvmsg(struct uring_task *task, utask_cb_t cb); + +void uring_tbuf_sendmsg(struct uring_task *task, utask_cb_t cb); + +void uring_connect(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); + +void uring_accept(struct uring_task *task, struct saddr *saddr, utask_cb_t cb); + +void uring_splice(struct uring_task *task, int fd_in, int fd_out, utask_cb_t cb); + +void uring_poll(struct uring_task *task, short poll_mask, utask_cb_t cb); + +void uring_poll_cancel(struct uring_task *task); + +void uring_delete(); + +void uring_refdump(); + +void uring_init(); + +void uring_event_loop(); + +#endif |