summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Härdeman <david@hardeman.nu>2020-06-10 13:28:41 +0200
committerDavid Härdeman <david@hardeman.nu>2020-06-10 13:28:41 +0200
commit9daf6a0e6b461c6c2a16f810f722b9d10504bf90 (patch)
tree4ec8bcdce547abb458257ad1a24cf0249de17a9a
parentfae819296598100e41646e0bebc7d0bce45614f7 (diff)
Introduce a common task buffer and convert cfgdir and rcon to use it
-rw-r--r--cfgdir.c34
-rw-r--r--main.h10
-rw-r--r--rcon.c155
-rw-r--r--server.c1
-rw-r--r--server.h9
-rw-r--r--uring.c140
-rw-r--r--uring.h10
7 files changed, 226 insertions, 133 deletions
diff --git a/cfgdir.c b/cfgdir.c
index 9fa0c78..8793486 100644
--- a/cfgdir.c
+++ b/cfgdir.c
@@ -98,7 +98,7 @@ struct cfg_key_value_map scfg_key_map[] = {
static void
scfg_parse(struct cfg *cfg, struct server *scfg)
{
- char *pos = &scfg->buf[0];
+ char *pos = &scfg->tbuf.buf[0];
if (!config_parse_header(cfg, "server", &pos))
return;
@@ -219,8 +219,6 @@ scfg_parse(struct cfg *cfg, struct server *scfg)
break;
}
}
-
- //printf("Cfg:\n%s\n\n", pos);
}
static void
@@ -228,27 +226,16 @@ scfg_read_cb(struct cfg *cfg, struct uring_task *task, int res)
{
struct server *scfg = container_of(task, struct server, task);
- printf("Asked to parse server cfg %s (bytes %i)\n", scfg->name, res);
-
if (res < 0) {
- perrordie("read");
- } else if (res > 0) {
- scfg->len += res;
- if (scfg->len + 1 >= sizeof(scfg->buf)) {
- fprintf(stderr, "Server config too large\n");
- server_delete(cfg, scfg);
- return;
- }
-
- uring_read(cfg, &scfg->task, scfg->buf + scfg->len, sizeof(scfg->buf) - scfg->len, scfg->len, scfg_read_cb);
- return;
- } else {
- /* EOF */
- scfg->buf[scfg->len] = '\0';
- uring_task_close_fd(cfg, &scfg->task);
- scfg_parse(cfg, scfg);
- server_commit(cfg, scfg);
+ fprintf(stderr, "%s: error reading config file for %s: %s\n",
+ __func__, scfg->name, strerror(-res));
+ server_delete(cfg, scfg);
}
+
+ printf("Asked to parse server cfg %s (bytes %i)\n", scfg->name, res);
+ uring_task_close_fd(cfg, &scfg->task);
+ scfg_parse(cfg, scfg);
+ server_commit(cfg, scfg);
}
static void
@@ -265,8 +252,7 @@ scfg_open_cb(struct cfg *cfg, struct uring_task *task, int res)
printf("Asked to read server cfg %s (fd %i)\n", scfg->name, res);
uring_task_set_fd(&scfg->task, res);
- scfg->len = 0;
- uring_read(cfg, &scfg->task, scfg->buf, sizeof(scfg->buf), 0, scfg_read_cb);
+ uring_tbuf_read_until_eof(cfg, &scfg->task, scfg_read_cb);
}
static bool
diff --git a/main.h b/main.h
index 086cd92..a3b5512 100644
--- a/main.h
+++ b/main.h
@@ -24,6 +24,13 @@ struct uring_task;
/* To save typing in all the function definitions below */
typedef void (*callback_t)(struct cfg *, struct uring_task *, int res);
+typedef int (*rcallback_t)(struct cfg *, struct uring_task *, int res);
+
+struct uring_task_buf {
+ char buf[4096];
+ size_t len;
+ size_t done;
+};
struct uring_task {
const char *name;
@@ -32,7 +39,10 @@ struct uring_task {
void *parent;
void (*free)(struct uring_task *);
bool dead;
+ struct uring_task_buf *tbuf;
callback_t callback;
+ rcallback_t complete_callback; /* to check if tbuf processing is done */
+ callback_t final_callback; /* once tbuf processing is done */
};
struct cfg {
diff --git a/rcon.c b/rcon.c
index 756d480..a9a6328 100644
--- a/rcon.c
+++ b/rcon.c
@@ -21,9 +21,7 @@ struct rcon {
unsigned next_rcon;
struct sockaddr_in46 rcon;
char rconstr[ADDRSTRLEN];
- char rconbuf[4096];
- size_t rconbuflen;
- size_t rconbufdone;
+ struct uring_task_buf tbuf;
};
static void
@@ -129,44 +127,46 @@ enum rcon_packet_type {
static void
create_packet(struct cfg *cfg, struct rcon *rcon, int32_t reqid, enum rcon_packet_type type, const char *msg)
{
- char *pos = &rcon->rconbuf[4];
+ char *pos = &rcon->tbuf.buf[4];
- rcon->rconbufdone = 0;
- rcon->rconbuflen = 4;
- write_int(&pos, &rcon->rconbuflen, reqid);
- write_int(&pos, &rcon->rconbuflen, type);
- write_str(&pos, &rcon->rconbuflen, msg);
- write_end(&pos, &rcon->rconbuflen);
- pos = &rcon->rconbuf[0];
- write_int(&pos, NULL, rcon->rconbuflen - 4);
+ /* Body */
+ rcon->tbuf.len = 4;
+ write_int(&pos, &rcon->tbuf.len, reqid);
+ write_int(&pos, &rcon->tbuf.len, type);
+ write_str(&pos, &rcon->tbuf.len, msg);
+ write_end(&pos, &rcon->tbuf.len);
+
+ /* Header (length of body) */
+ pos = &rcon->tbuf.buf[0];
+ write_int(&pos, NULL, rcon->tbuf.len - 4);
fprintf(stderr, "Created packet (reqid: %" PRIi32 ", type %" PRIi32 ", len %zu, payload: %s)\n",
- reqid, type, rcon->rconbuflen, msg);
+ reqid, type, rcon->tbuf.len, msg);
}
-static bool
-packet_complete(struct cfg *cfg, struct rcon *rcon)
+static int
+packet_complete(struct cfg *cfg, struct uring_task *task, int res)
{
- char *pos = rcon->rconbuf;
- size_t len = rcon->rconbufdone;
+ char *pos = task->tbuf->buf;
+ size_t len = task->tbuf->len;
int32_t plen;
- if (rcon->rconbufdone < 14)
- return false;
+ if (task->tbuf->len < 14)
+ return 0;
plen = read_int(&pos, &len);
- fprintf(stderr, "Reply is %zu bytes, packet size %" PRIi32 "\n", rcon->rconbufdone, plen + 4);
- if (rcon->rconbufdone < plen + 4)
- return false;
+ fprintf(stderr, "Reply is %zu bytes, packet size %" PRIi32 "\n", task->tbuf->len, plen + 4);
+ if (task->tbuf->len < plen + 4)
+ return 0;
else
- return true;
+ return 1;
}
static bool
read_packet(struct cfg *cfg, struct rcon *rcon, int32_t *id, int32_t *type, char **rmsg)
{
- char *pos = rcon->rconbuf;
- size_t len = rcon->rconbufdone;
+ char *pos = rcon->tbuf.buf;
+ size_t len = rcon->tbuf.len;
int32_t plen;
plen = read_int(&pos, &len);
@@ -181,18 +181,7 @@ read_packet(struct cfg *cfg, struct rcon *rcon, int32_t *id, int32_t *type, char
fprintf(stderr, "Remaining = %zu\n", len);
if (len > 2) {
- char *msg;
-
- msg = malloc(len - 1);
- if (!msg) {
- perror("malloc");
- return false;
- }
-
- memcpy(msg, pos, len - 2);
- msg[len - 2] = '\0';
-
- *rmsg = msg;
+ *rmsg = pos;
pos += len - 2;
len = 2;
}
@@ -226,27 +215,16 @@ rcon_stop_reply(struct cfg *cfg, struct uring_task *task, int res)
if (res < 0)
goto out;
- rcon->rconbufdone += res;
-
- /* FIXME: could be multiple packets */
- if (packet_complete(cfg, rcon)) {
- fprintf(stderr, "Packet complete\n");
- read_packet(cfg, rcon, &id, &type, &msg);
- if (id != 2) {
- fprintf(stderr, "RCon stop cmd failed - unexpected reply id (%" PRIi32 ")\n", id);
- goto out;
- } else if (type != RCON_PACKET_RESPONSE) {
- fprintf(stderr, "RCon stop cmd failed - unexpected reply type (%" PRIi32 ")\n", type);
- goto out;
- }
- fprintf(stderr, "RCon stop successful (%s)\n", msg);
- free(msg);
-
- } else {
- fprintf(stderr, "Packet not complete\n");
- uring_read(cfg, &rcon->task, &rcon->rconbuf + rcon->rconbufdone,
- sizeof(rcon->rconbuf) - rcon->rconbufdone, 0, rcon_stop_reply);
+ fprintf(stderr, "Packet complete\n");
+ read_packet(cfg, rcon, &id, &type, &msg);
+ if (id != 2) {
+ fprintf(stderr, "RCon stop cmd failed - unexpected reply id (%" PRIi32 ")\n", id);
+ goto out;
+ } else if (type != RCON_PACKET_RESPONSE) {
+ fprintf(stderr, "RCon stop cmd failed - unexpected reply type (%" PRIi32 ")\n", type);
+ goto out;
}
+ fprintf(stderr, "RCon stop successful (%s)\n", msg);
out:
uring_task_put(cfg, &rcon->task);
@@ -263,17 +241,8 @@ rcon_stop_sent(struct cfg *cfg, struct uring_task *task, int res)
return;
}
- rcon->rconbufdone += res;
- if (rcon->rconbufdone < rcon->rconbuflen) {
- uring_write(cfg, &rcon->task, rcon->rconbuf + rcon->rconbufdone,
- rcon->rconbuflen - rcon->rconbufdone, rcon_stop_sent);
- return;
- }
-
fprintf(stderr, "%s: stop cmd sent\n", __func__);
- rcon->rconbufdone = 0;
- rcon->rconbuflen = 0;
- uring_read(cfg, &rcon->task, &rcon->rconbuf, sizeof(rcon->rconbuf), 0, rcon_stop_reply);
+ uring_tbuf_read_until(cfg, &rcon->task, packet_complete, rcon_stop_reply);
}
static void
@@ -288,32 +257,22 @@ rcon_login_reply(struct cfg *cfg, struct uring_task *task, int res)
if (res < 0)
goto out;
- rcon->rconbufdone += res;
-
- if (packet_complete(cfg, rcon)) {
- fprintf(stderr, "Packet complete\n");
- read_packet(cfg, rcon, &id, &type, &msg);
- if (id != 1) {
- fprintf(stderr, "RCon login failed - unexpected reply id (%" PRIi32 ")\n", id);
- goto out;
- } else if (type == RCON_PACKET_LOGIN_FAIL) {
- fprintf(stderr, "RCon login failed - incorrect password\n");
- goto out;
- } else if (type != RCON_PACKET_LOGIN_OK) {
- fprintf(stderr, "RCon login failed - unexpected reply type (%" PRIi32 ")\n", type);
- goto out;
- }
- free(msg);
- fprintf(stderr, "RCon login successful\n");
- create_packet(cfg, rcon, 2, RCON_PACKET_COMMAND, "stop");
- uring_write(cfg, &rcon->task, rcon->rconbuf, rcon->rconbuflen, rcon_stop_sent);
-
- } else {
- fprintf(stderr, "Packet not complete\n");
- uring_read(cfg, &rcon->task, &rcon->rconbuf + rcon->rconbufdone,
- sizeof(rcon->rconbuf) - rcon->rconbufdone, 0, rcon_login_reply);
+ fprintf(stderr, "Packet complete\n");
+ read_packet(cfg, rcon, &id, &type, &msg);
+ if (id != 1) {
+ fprintf(stderr, "RCon login failed - unexpected reply id (%" PRIi32 ")\n", id);
+ goto out;
+ } else if (type == RCON_PACKET_LOGIN_FAIL) {
+ fprintf(stderr, "RCon login failed - incorrect password\n");
+ goto out;
+ } else if (type != RCON_PACKET_LOGIN_OK) {
+ fprintf(stderr, "RCon login failed - unexpected reply type (%" PRIi32 ")\n", type);
+ goto out;
}
+ fprintf(stderr, "RCon login successful\n");
+ create_packet(cfg, rcon, 2, RCON_PACKET_COMMAND, "stop");
+ uring_tbuf_write(cfg, &rcon->task, rcon_stop_sent);
return;
out:
@@ -331,17 +290,8 @@ rcon_login_sent(struct cfg *cfg, struct uring_task *task, int res)
return;
}
- rcon->rconbufdone += res;
- if (rcon->rconbufdone < rcon->rconbuflen) {
- uring_write(cfg, &rcon->task, rcon->rconbuf + rcon->rconbufdone,
- rcon->rconbuflen - rcon->rconbufdone, rcon_login_sent);
- return;
- }
-
fprintf(stderr, "%s: login sent\n", __func__);
- rcon->rconbufdone = 0;
- rcon->rconbuflen = 0;
- uring_read(cfg, &rcon->task, &rcon->rconbuf, sizeof(rcon->rconbuf), 0, rcon_login_reply);
+ uring_tbuf_read_until(cfg, &rcon->task, packet_complete, rcon_login_reply);
}
static void rcon_connect_next_rcon(struct cfg *cfg, struct rcon *rcon);
@@ -358,7 +308,7 @@ rcon_connected(struct cfg *cfg, struct uring_task *task, int res)
}
create_packet(cfg, rcon, 1, RCON_PACKET_LOGIN, rcon->server->rcon_password);
- uring_write(cfg, &rcon->task, rcon->rconbuf, rcon->rconbuflen, rcon_login_sent);
+ uring_tbuf_write(cfg, &rcon->task, rcon_login_sent);
}
/* FIXME: Parts of this could be shared with proxy.c, probably in server.c */
@@ -418,6 +368,7 @@ rcon_init(struct cfg *cfg, struct server *server)
perrordie("malloc");
uring_task_init(&rcon->task, "rcon", &server->task, rcon_free);
+ uring_task_set_buf(&rcon->task, &rcon->tbuf);
rcon->server = server;
server->rcon = rcon;
diff --git a/server.c b/server.c
index ee93a24..4102d99 100644
--- a/server.c
+++ b/server.c
@@ -695,6 +695,7 @@ server_new(struct cfg *cfg, const char *name)
scfg->stop_method = SERVER_STOP_METHOD_UNDEFINED;
scfg->start_method = SERVER_START_METHOD_UNDEFINED;
uring_task_init(&scfg->task, "scfg", uring_parent(cfg), server_free);
+ uring_task_set_buf(&scfg->task, &scfg->tbuf);
uring_task_init(&scfg->exec_task, "exec", &scfg->task, server_exec_free);
list_init(&scfg->remotes);
list_init(&scfg->locals);
diff --git a/server.h b/server.h
index 8e0b15f..fbaa3fa 100644
--- a/server.h
+++ b/server.h
@@ -37,6 +37,7 @@ struct server {
/* For calling external start/stop executables */
char *stop_exec;
char *start_exec;
+ struct uring_task exec_task;
/* For systemd services */
char *systemd_service;
@@ -56,12 +57,10 @@ struct server {
unsigned idle_timeout;
unsigned idle_count;
- /* For config files */
- char buf[4096];
- size_t len;
-
- struct uring_task exec_task;
+ /* For reading config files */
+ struct uring_task_buf tbuf;
struct uring_task task;
+
struct list_head list;
};
diff --git a/uring.c b/uring.c
index d74b590..561be25 100644
--- a/uring.c
+++ b/uring.c
@@ -69,7 +69,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task)
{
struct uring_task *parent = task->parent;
- fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+ fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n",
+ __func__, task->name, task, task->refcount);
task->refcount--;
if (task->refcount > 0)
@@ -94,7 +95,8 @@ uring_task_put(struct cfg *cfg, struct uring_task *task)
void
uring_task_get(struct cfg *cfg, struct uring_task *task)
{
- fprintf(stderr, "%s: called with task 0x%p and refcount %u\n", __func__, task, task->refcount);
+ fprintf(stderr, "%s: called with task %s (0x%p) and refcount %u\n",
+ __func__, task->name, task, task->refcount);
if (task->refcount < 0)
error("Negative refcount!\n");
@@ -103,6 +105,12 @@ uring_task_get(struct cfg *cfg, struct uring_task *task)
}
void
+uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf)
+{
+ task->tbuf = tbuf;
+}
+
+void
uring_task_set_fd(struct uring_task *task, int fd)
{
task->fd = fd;
@@ -143,6 +151,7 @@ uring_task_init(struct uring_task *task, const char *name, struct uring_task *pa
task->free = free;
task->dead = false;
task->name = name;
+ task->tbuf = NULL;
if (task->parent)
uring_task_get(NULL, task->parent);
@@ -165,6 +174,49 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac
fprintf(stderr, "%s: done\n", __func__);
}
+static void
+uring_tbuf_write_cb(struct cfg *cfg, struct uring_task *task, int res)
+{
+ int r;
+
+ if (!task || !task->tbuf || !task->final_callback)
+ perrordie("%s: missing parameters\n");
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ /* We wrote some more data */
+ task->tbuf->done += res;
+ if (task->tbuf->done >= task->tbuf->len) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ uring_write(cfg, task, task->tbuf->buf + task->tbuf->done,
+ task->tbuf->len - task->tbuf->done,
+ uring_tbuf_write_cb);
+ return;
+
+finished:
+ task->final_callback(cfg, task, r);
+ return;
+}
+
+void
+uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback)
+{
+ if (!task || task->fd < 0 || !task->tbuf || task->tbuf->len < 0) {
+ error("%s invalid parameters\n", __func__);
+ return;
+ }
+
+ task->tbuf->done = 0;
+ task->final_callback = callback;
+ uring_write(cfg, task, &task->tbuf->buf, task->tbuf->len, uring_tbuf_write_cb);
+}
+
void
uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback)
{
@@ -182,6 +234,90 @@ uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, cal
io_uring_sqe_set_data(sqe, task);
}
+static int
+uring_tbuf_complete_eof(struct cfg *cfg, struct uring_task *task, int res)
+{
+ if (task->tbuf->len + 1 >= sizeof(task->tbuf->buf))
+ return -E2BIG;
+
+ if (res > 0)
+ return 1;
+
+ task->tbuf->buf[task->tbuf->len] = '\0';
+ task->tbuf->len++;
+ return 0;
+}
+
+static void
+uring_tbuf_read_until_cb(struct cfg *cfg, struct uring_task *task, int res)
+{
+ int r;
+
+ if (!task || !task->tbuf || !task->final_callback || !task->complete_callback) {
+ error("%s: invalid parameters\n", __func__);
+ return;
+ }
+
+ if (res < 0) {
+ r = res;
+ goto finished;
+ }
+
+ task->tbuf->len += res;
+ r = task->complete_callback(cfg, task, res);
+ if (r < 0) {
+ r = res;
+ goto finished;
+ } else if (r > 0) {
+ r = task->tbuf->len;
+ goto finished;
+ }
+
+ /* Need to read more */
+ if (task->tbuf->len >= sizeof(task->tbuf->buf)) {
+ r = E2BIG;
+ goto finished;
+ }
+
+ uring_read(cfg, task, task->tbuf->buf + task->tbuf->len,
+ sizeof(task->tbuf->buf) - task->tbuf->len,
+ task->tbuf->len, uring_tbuf_read_until_cb);
+ return;
+
+finished:
+ task->final_callback(cfg, task, r);
+ return;
+}
+
+void
+uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
+ rcallback_t complete, callback_t callback)
+{
+ if (!task)
+ error("task\n");
+ if (task->fd < 0)
+ error("task->fd\n");
+ if (!task->tbuf)
+ error("task->tbuf\n");
+ if (!complete)
+ error("complete\n");
+ if (!task || task->fd < 0 || !task->tbuf || !complete) {
+ error("%s: invalid parameters\n", __func__);
+ return;
+ }
+
+ task->tbuf->len = 0;
+ task->complete_callback = complete;
+ task->final_callback = callback;
+ uring_read(cfg, task, &task->tbuf->buf, sizeof(task->tbuf->buf), 0, uring_tbuf_read_until_cb);
+}
+
+void
+uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task, callback_t callback)
+{
+ uring_tbuf_read_until(cfg, task, uring_tbuf_complete_eof, callback);
+}
+
void
uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)
{
diff --git a/uring.h b/uring.h
index 83119a1..935720d 100644
--- a/uring.h
+++ b/uring.h
@@ -7,6 +7,8 @@ void uring_task_put(struct cfg *cfg, struct uring_task *task);
void uring_task_get(struct cfg *cfg, struct uring_task *task);
+void uring_task_set_buf(struct uring_task *task, struct uring_task_buf *tbuf);
+
void uring_task_set_fd(struct uring_task *task, int fd);
void uring_task_close_fd(struct cfg *cfg, struct uring_task *task);
@@ -20,9 +22,17 @@ void uring_task_init(struct uring_task *task, const char *name,
void uring_close(struct cfg *cfg, struct uring_task *task, int fd,
callback_t callback);
+void uring_tbuf_write(struct cfg *cfg, struct uring_task *task, callback_t callback);
+
void uring_write(struct cfg *cfg, struct uring_task *task, void *buf,
size_t len, callback_t callback);
+void uring_tbuf_read_until(struct cfg *cfg, struct uring_task *task,
+ rcallback_t complete, callback_t callback);
+
+void uring_tbuf_read_until_eof(struct cfg *cfg, struct uring_task *task,
+ callback_t callback);
+
void uring_read(struct cfg *cfg, struct uring_task *task, void *buf,
size_t len, off_t offset, callback_t callback);