From 9daf6a0e6b461c6c2a16f810f722b9d10504bf90 Mon Sep 17 00:00:00 2001 From: David Härdeman Date: Wed, 10 Jun 2020 13:28:41 +0200 Subject: Introduce a common task buffer and convert cfgdir and rcon to use it --- cfgdir.c | 34 +++++--------- main.h | 10 +++++ rcon.c | 155 ++++++++++++++++++++++----------------------------------------- server.c | 1 + server.h | 9 ++-- uring.c | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- uring.h | 10 +++++ 7 files changed, 226 insertions(+), 133 deletions(-) diff --git a/cfgdir.c b/cfgdir.c index 9fa0c78..8793486 100644 --- a/cfgdir.c +++ b/cfgdir.c @@ -98,7 +98,7 @@ struct cfg_key_value_map scfg_key_map[] = { static void scfg_parse(struct cfg *cfg, struct server *scfg) { - char *pos = &scfg->buf[0]; + char *pos = &scfg->tbuf.buf[0]; if (!config_parse_header(cfg, "server", &pos)) return; @@ -219,8 +219,6 @@ scfg_parse(struct cfg *cfg, struct server *scfg) break; } } - - //printf("Cfg:\n%s\n\n", pos); } static void @@ -228,27 +226,16 @@ scfg_read_cb(struct cfg *cfg, struct uring_task *task, int res) { struct server *scfg = container_of(task, struct server, task); - printf("Asked to parse server cfg %s (bytes %i)\n", scfg->name, res); - if (res < 0) { - perrordie("read"); - } else if (res > 0) { - scfg->len += res; - if (scfg->len + 1 >= sizeof(scfg->buf)) { - fprintf(stderr, "Server config too large\n"); - server_delete(cfg, scfg); - return; - } - - uring_read(cfg, &scfg->task, scfg->buf + scfg->len, sizeof(scfg->buf) - scfg->len, scfg->len, scfg_read_cb); - return; - } else { - /* EOF */ - scfg->buf[scfg->len] = '\0'; - uring_task_close_fd(cfg, &scfg->task); - scfg_parse(cfg, scfg); - server_commit(cfg, scfg); + fprintf(stderr, "%s: error reading config file for %s: %s\n", + __func__, scfg->name, strerror(-res)); + server_delete(cfg, scfg); } + + printf("Asked to parse server cfg %s (bytes %i)\n", scfg->name, res); + uring_task_close_fd(cfg, &scfg->task); + scfg_parse(cfg, scfg); + server_commit(cfg, scfg); } static void @@ -265,8 +252,7 @@ scfg_open_cb(struct cfg *cfg, struct uring_task *task, int res) printf("Asked to read server cfg %s (fd %i)\n", scfg->name, res); uring_task_set_fd(&scfg->task, res); - scfg->len = 0; - uring_read(cfg, &scfg->task, scfg->buf, sizeof(scfg->buf), 0, scfg_read_cb); + uring_tbuf_read_until_eof(cfg, &scfg->task, scfg_read_cb); } static bool diff --git a/main.h b/main.h index 086cd92..a3b5512 100644 --- a/main.h +++ b/main.h @@ -24,6 +24,13 @@ struct uring_task; /* To save typing in all the function definitions below */ typedef void (*callback_t)(struct cfg *, struct uring_task *, int res); +typedef int (*rcallback_t)(struct cfg *, struct uring_task *, int res); + +struct uring_task_buf { + char buf[4096]; + size_t len; + size_t done; +}; struct uring_task { const char *name; @@ -32,7 +39,10 @@ struct uring_task { void *parent; void (*free)(struct uring_task *); bool dead; + struct uring_task_buf *tbuf; callback_t callback; + rcallback_t complete_callback; /* to check if tbuf processing is done */ + callback_t final_callback; /* once tbuf processing is done */ }; struct cfg { diff --git a/rcon.c b/rcon.c index 756d480..a9a6328 100644 --- a/rcon.c +++ b/rcon.c @@ -21,9 +21,7 @@ struct rcon { unsigned next_rcon; struct sockaddr_in46 rcon; char rconstr[ADDRSTRLEN]; - char rconbuf[4096]; - size_t rconbuflen; - size_t rconbufdone; + struct uring_task_buf tbuf; }; static void @@ -129,44 +127,46 @@ enum rcon_packet_type { static void create_packet(struct cfg *cfg, struct rcon *rcon, int32_t reqid, enum rcon_packet_type type, const char *msg) { - char *pos = &rcon->rconbuf[4]; + char *pos = &rcon->tbuf.buf[4]; - rcon->rconbufdone = 0; - rcon->rconbuflen = 4; - write_int(&pos, &rcon->rconbuflen, reqid); - write_int(&pos, &rcon->rconbuflen, type); - write_str(&pos, &rcon->rconbuflen, msg); - write_end(&pos, &rcon->rconbuflen); - pos = &rcon->rconbuf[0]; - write_int(&pos, NULL, rcon->rconbuflen - 4); + /* Body */ + rcon->tbuf.len = 4; + write_int(&pos, &rcon->tbuf.len, reqid); + write_int(&pos, &rcon->tbuf.len, type); + write_str(&pos, &rcon->tbuf.len, msg); + write_end(&pos, &rcon->tbuf.len); + + /* Header (length of body) */ + pos = &rcon->tbuf.buf[0]; + write_int(&pos, NULL, rcon->tbuf.len - 4); fprintf(stderr, "Created packet (reqid: %" PRIi32 ", type %" PRIi32 ", len %zu, payload: %s)\n", - reqid, type, rcon->rconbuflen, msg); + reqid, type, rcon->tbuf.len, msg); } -static bool -packet_complete(struct cfg *cfg, struct rcon *rcon) +static int +packet_complete(struct cfg *cfg, struct uring_task *task, int res) { - char *pos = rcon->rconbuf; - size_t len = rcon->rconbufdone; + char *pos = task->tbuf->buf; + size_t len = task->tbuf->len; int32_t plen; - if (rcon->rconbufdone < 14) - return false; + if (task->tbuf->len < 14) + return 0; plen = read_int(&pos, &len); - fprintf(stderr, "Reply is %zu bytes, packet size %" PRIi32 "\n", rcon->rconbufdone, plen + 4); - if (rcon->rconbufdone < plen + 4) - return false; + fprintf(stderr, "Reply is %zu bytes, packet size %" PRIi32 "\n", task->tbuf->len, plen + 4); + if (task->tbuf->len < plen + 4) + return 0; else - return true; + return 1; } static bool read_packet(struct cfg *cfg, struct rcon *rcon, int32_t *id, int32_t *type, char **rmsg) { - char *pos = rcon->rconbuf; - size_t len = rcon->rconbufdone; + char *pos = rcon->tbuf.buf; + size_t len = rcon->tbuf.len; int32_t plen; plen = read_int(&pos, &len); @@ -181,18 +181,7 @@ read_packet(struct cfg *cfg, struct rcon *rcon, int32_t *id, int32_t *type, char fprintf(stderr, "Remaining = %zu\n", len); if (len > 2) { - char *msg; - - msg = malloc(len - 1); - if (!msg) { - perror("malloc"); - return false; - } - - memcpy(msg, pos, len - 2); - msg[len - 2] = '\0'; - - *rmsg = msg; + *rmsg = pos; pos += len - 2; len = 2; } @@ -226,27 +215,16 @@ rcon_stop_reply(struct cfg *cfg, struct uring_task *task, int res) if (res < 0) goto out; - rcon->rconbufdone += res; - - /* FIXME: could be multiple packets */ - if (packet_complete(cfg, rcon)) { - fprintf(stderr, "Packet complete\n"); - read_packet(cfg, rcon, &id, &type, &msg); - if (id != 2) { - fprintf(stderr, "RCon stop cmd failed - unexpected reply id (%" PRIi32 ")\n", id); - goto out; - } else if (type != RCON_PACKET_RESPONSE) { - fprintf(stderr, "RCon stop cmd failed - unexpected reply type (%" PRIi32 ")\n", type); - goto out; - } - fprintf(stderr, "RCon stop successful (%s)\n", msg); - free(msg); - - } else { - fprintf(stderr, "Packet not complete\n"); - uring_read(cfg, &rcon->task, &rcon->rconbuf + rcon->rconbufdone, - sizeof(rcon->rconbuf) - rcon->rconbufdone, 0, rcon_stop_reply); + fprintf(stderr, "Packet complete\n"); + read_packet(cfg, rcon, &id, &type, &msg); + if (id != 2) { + fprintf(stderr, "RCon stop cmd failed - unexpected reply id (%" PRIi32 ")\n", id); + goto out; + } else if (type != RCON_PACKET_RESPONSE) { + fprintf(stderr, "RCon stop cmd failed - unexpected reply type (%" PRIi32 ")\n", type); + goto out; } + fprintf(stderr, "RCon stop successful (%s)\n", msg); out: uring_task_put(cfg, &rcon->task); @@ -263,17 +241,8 @@ rcon_stop_sent(struct cfg *cfg, struct uring_task *task, int res) return; } - rcon->rconbufdone += res; - if (rcon->rconbufdone < rcon->rconbuflen) { - uring_write(cfg, &rcon->task, rcon->rconbuf + rcon->rconbufdone, - rcon->rconbuflen - rcon->rconbufdone, rcon_stop_sent); - return; - } - fprintf(stderr, "%s: stop cmd sent\n", __func__); - rcon->rconbufdone = 0; - rcon->rconbuflen = 0; - uring_read(cfg, &rcon->task, &rcon->rconbuf, sizeof(rcon->rconbuf), 0, rcon_stop_reply); + uring_tbuf_read_until(cfg, &rcon->task, packet_complete, rcon_stop_reply); } static void @@ -288,32 +257,22 @@ rcon_login_reply(struct cfg *cfg, struct uring_task *task, int res) if (res < 0) goto out; - rcon->rconbufdone += res; - - if (packet_complete(cfg, rcon)) { - fprintf(stderr, "Packet complete\n"); - read_packet(cfg, rcon, &id, &type, &msg); - if (id != 1) { - fprintf(stderr, "RCon login failed - unexpected reply id (%" PRIi32 ")\n", id); - goto out; - } else if (type == RCON_PACKET_LOGIN_FAIL) { - fprintf(stderr, "RCon login failed - incorrect password\n"); - goto out; - } else if (type != RCON_PACKET_LOGIN_OK) { - fprintf(stderr, "RCon login failed - unexpected reply type (%" PRIi32 ")\n", type); - goto out; - } - free(msg); - fprintf(stderr, "RCon login successful\n"); - create_packet(cfg, rcon, 2, RCON_PACKET_COMMAND, "stop"); - uring_write(cfg, &rcon->task, rcon->rconbuf, rcon->rconbuflen, rcon_stop_sent); - - } else { - fprintf(stderr, "Packet not complete\n"); - uring_read(cfg, &rcon->task, &rcon->rconbuf + rcon->rconbufdone, - sizeof(rcon->rconbuf) - rcon->rconbufdone, 0, rcon_login_reply); + fprintf(stderr, "Packet complete\n"); + read_packet(cfg, rcon, &id, &type, &msg); + if (id != 1) { + fprintf(stderr, "RCon login failed - unexpected reply id (%" PRIi32 ")\n", id); + goto out; + } else if (type == RCON_PACKET_LOGIN_FAIL) { + fprintf(stderr, "RCon login failed - incorrect password\n"); + goto out; + } else if (type != RCON_PACKET_LOGIN_OK) { + fprintf(stderr, "RCon login failed - unexpected reply type (%" PRIi32 ")\n", type); + goto out; } + fprintf(stderr, "RCon login successful\n"); + create_packet(cfg, rcon, 2, RCON_PACKET_COMMAND, "stop"); + uring_tbuf_write(cfg, &rcon->task, rcon_stop_sent); return; out: @@ -331,17 +290,8 @@ rcon_login_sent(struct cfg *cfg, struct uring_task *task, int res) return; } - rcon->rconbufdone += res; - if (rcon->rconbufdone < rcon->rconbuflen) { - uring_write(cfg, &rcon->task, rcon->rconbuf + rcon->rconbufdone, - rcon->rconbuflen - rcon->rconbufdone, rcon_login_sent); - return; - } - fprintf(stderr, "%s: login sent\n", __func__); - rcon->rconbufdone = 0; - rcon->rconbuflen = 0; - uring_read(cfg, &rcon->task, &rcon->rconbuf, sizeof(rcon->rconbuf), 0, rcon_login_reply); + uring_tbuf_read_until(cfg, &rcon->task, packet_complete, rcon_login_reply); } static void rcon_connect_next_rcon(struct cfg *cfg, struct rcon *rcon); @@ -358,7 +308,7 @@ rcon_connected(struct cfg *cfg, struct uring_task *task, int res) } create_packet(cfg, rcon, 1, RCON_PACKET_LOGIN, rcon->server->rcon_password); - uring_write(cfg, &rcon->task, rcon->rconbuf, rcon->rconbuflen, rcon_login_sent); + uring_tbuf_write(cfg, &rcon->task, rcon_login_sent); } /* FIXME: Parts of this could be shared with proxy.c, probably in server.c */ @@ -418,6 +368,7 @@ rcon_init(struct cfg *cfg, struct server *server) perrordie("malloc"); uring_task_init(&rcon->task, "rcon", &server->task, rcon_free); + uring_task_set_buf(&rcon->task, &rcon->tbuf); rcon->server = server; server->rcon = rcon; diff --git a/server.c b/server.c index ee93a24..4102d99 100644 --- a/server.c +++ b/server.c @@ -695,6 +695,7 @@ server_new(struct cfg *cfg, const char *name) scfg->stop_method = SERVER_STOP_METHOD_UNDEFINED; scfg->start_method = SERVER_START_METHOD_UNDEFINED; uring_task_init(&scfg->task, "scfg", uring_parent(cfg), server_free); + uring_task_set_buf(&scfg->task, &scfg->tbuf); uring_task_init(&scfg->exec_task, "exec", &scfg->task, server_exec_free); list_init(&scfg->remotes); list_init(&scfg->locals); diff --git a/server.h b/server.h index 8e0b15f..fbaa3fa 100644 --- a/server.h +++ b/server.h @@ -37,6 +37,7 @@ struct server { /* For calling external start/stop executables */ char *stop_exec; char *start_exec; + struct uring_task exec_task; /* For systemd services */ char *systemd_service; @@ -56,12 +57,10 @@ struct server { unsigned idle_timeout; unsigned idle_count; - /* For config files */ - char buf[4096]; - size_t len; - - struct uring_task exec_task; + /* For reading config files */ + struct uring_task_buf tbuf; struct uring_task task; + struct list_head list; }; diff --git a/uring.c b/uring.c index d74b590..561be25 100644 --- a/uring.c +++ b/uring.c @@ -69,7 +69,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) { struct uring_task *parent = task->parent; - fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", + __func__, task->name, task, task->refcount); task->refcount--; if (task->refcount > 0) @@ -94,7 +95,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task) void uring_task_get(struct cfg *cfg, struct uring_task *task) { - fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount); + fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n", + __func__, task->name, task, task->refcount); if (task->refcount < 0) error("Negative refcount!\n"); @@ -102,6 +104,12 @@ uring_task_get(struct cfg *cfg, struct uring_task *task) task->refcount++; } +void +uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf) +{ + task->tbuf = tbuf; +} + void uring_task_set_fd(struct uring_task *task, int fd) { @@ -143,6 +151,7 @@ uring_task_init(struct uring_task *task, const char *name, struct uring_task *pa task->free = free; task->dead = false; task->name = name; + task->tbuf = NULL; if (task->parent) uring_task_get(NULL, task->parent); @@ -165,6 +174,49 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac fprintf(stderr, "%s: done\n", __func__); } +static void +uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res) +{ + int r; + + if (!task || !task->tbuf || !task->final_callback) + perrordie("%s: missing parameters\n"); + + if (res < 0) { + r = res; + goto finished; + } + + /* We wrote some more data */ + task->tbuf->done += res; + if (task->tbuf->done >= task->tbuf->len) { + r = task->tbuf->len; + goto finished; + } + + uring_write(cfg, task, task->tbuf->buf + task->tbuf->done, + task->tbuf->len - task->tbuf->done, + uring_tbuf_write_cb); + return; + +finished: + task->final_callback(cfg, task, r); + return; +} + +void +uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback) +{ + if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) { + error("%s invalid parameters\n", __func__); + return; + } + + task->tbuf->done = 0; + task->final_callback = callback; + uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb); +} + void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) { @@ -182,6 +234,90 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal io_uring_sqe_set_data(sqe, task); } +static int +uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res) +{ + if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf)) + return -E2BIG; + + if (res > 0) + return 1; + + task->tbuf->buf[task->tbuf->len] = '\0'; + task->tbuf->len++; + return 0; +} + +static void +uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res) +{ + int r; + + if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) { + error("%s: invalid parameters\n", __func__); + return; + } + + if (res < 0) { + r = res; + goto finished; + } + + task->tbuf->len += res; + r = task->complete_callback(cfg, task, res); + if (r < 0) { + r = res; + goto finished; + } else if (r > 0) { + r = task->tbuf->len; + goto finished; + } + + /* Need to read more */ + if (task->tbuf->len >= sizeof(task->tbuf->buf)) { + r = E2BIG; + goto finished; + } + + uring_read(cfg, task, task->tbuf->buf + task->tbuf->len, + sizeof(task->tbuf->buf) - task->tbuf->len, + task->tbuf->len, uring_tbuf_read_until_cb); + return; + +finished: + task->final_callback(cfg, task, r); + return; +} + +void +uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, + rcallback_t complete, callback_t callback) +{ + if (!task) + error("task\n"); + if (task->fd < 0) + error("task->fd\n"); + if (!task->tbuf) + error("task->tbuf\n"); + if (!complete) + error("complete\n"); + if (!task || task->fd < 0 || !task->tbuf || !complete) { + error("%s: invalid parameters\n", __func__); + return; + } + + task->tbuf->len = 0; + task->complete_callback = complete; + task->final_callback = callback; + uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb); +} + +void +uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback) +{ + uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback); +} + void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback) { diff --git a/uring.h b/uring.h index 83119a1..935720d 100644 --- a/uring.h +++ b/uring.h @@ -7,6 +7,8 @@ void uring_task_put(struct cfg *cfg, struct uring_task *task); void uring_task_get(struct cfg *cfg, struct uring_task *task); +void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf); + void uring_task_set_fd(struct uring_task *task, int fd); void uring_task_close_fd(struct cfg *cfg, struct uring_task *task); @@ -20,9 +22,17 @@ void uring_task_init(struct uring_task *task, const char *name, void uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callback); +void uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback); + void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback); +void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task, + rcallback_t complete, callback_t callback); + +void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, + callback_t callback); + void uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback); -- cgit v1.2.3