diff options
| -rw-r--r-- | proxy.c | 154 | ||||
| -rw-r--r-- | proxy.h | 13 | ||||
| -rw-r--r-- | uring.c | 19 | ||||
| -rw-r--r-- | uring.h | 3 | 
4 files changed, 166 insertions, 23 deletions
@@ -7,41 +7,140 @@  void  proxy_refdump(struct server_proxy *proxy)  { -	uring_task_refdump(&proxy->task); +	uring_task_refdump(&proxy->clienttask); +	uring_task_refdump(&proxy->servertask);  }  static void -proxy_free(struct uring_task *task) +proxy_client_free(struct uring_task *task)  { -	struct server_proxy *proxy = container_of(task, struct server_proxy, task); +	struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); +	fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); +	/*  	list_del(&proxy->list);  	free(proxy); +	*/  }  static void -proxy_connected(struct cfg *cfg, struct uring_task *task, int res) +proxy_server_free(struct uring_task *task)  { -	//struct server_proxy *proxy = container_of(task, struct server_proxy, task); +	struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + +	fprintf(stderr, "%s: %s\n", __func__, proxy->scfg->name); +	/* +	list_del(&proxy->list); +	free(proxy); +	*/ +} + +static void proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res); + +static void +proxy_client_data_out(struct cfg *cfg, struct uring_task *task, int res) +{ +	struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + +	fprintf(stderr, "%s: result was %i\n", __func__, res); +	if (res <= 0) +		return; + +	if (res != proxy->clientlen) { +		fprintf(stderr, "%s: short write\n", __func__); +		return; +	} + +	uring_read(cfg, task, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); +} + +static void +proxy_client_data_in(struct cfg *cfg, struct uring_task *task, int res) +{ +	struct server_proxy *proxy = container_of(task, struct server_proxy, clienttask); + +	fprintf(stderr, "%s: result was %i\n", __func__, res); +	if (res <= 0) +		return; + +	proxy->clientlen = res; +	uring_write(cfg, task, proxy->clientbuf, res, proxy_client_data_out); +} + +static void proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res); + +static void +proxy_server_data_out(struct cfg *cfg, struct uring_task *task, int res) +{ +	struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + +	fprintf(stderr, "%s: result was %i\n", __func__, res); +	if (res <= 0) +		return; + +	if (res != proxy->serverlen) { +		fprintf(stderr, "%s: short write\n", __func__); +		return; +	} + +	uring_read(cfg, task, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in); +} + +static void +proxy_server_data_in(struct cfg *cfg, struct uring_task *task, int res) +{ +	struct server_proxy *proxy = container_of(task, struct server_proxy, servertask); + +	fprintf(stderr, "%s: result was %i\n", __func__, res); +	if (res <= 0) +		return; + +	proxy->serverlen = res; +	uring_write(cfg, task, proxy->serverbuf, res, proxy_server_data_out); +} + +static void proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy); + +static void +proxy_server_connected(struct cfg *cfg, struct uring_task *task, int res) +{ +	struct server_proxy *proxy = container_of(task, struct server_proxy, servertask);  	fprintf(stderr, "%s: connected %i\n", __func__, res); -	return; +	if (res < 0) { +		proxy_connect_next_remote(cfg, proxy); +		return; +	} + +	uring_read(cfg, &proxy->clienttask, proxy->clientbuf, sizeof(proxy->clientbuf), 0, proxy_client_data_in); +	uring_read(cfg, &proxy->servertask, proxy->serverbuf, sizeof(proxy->serverbuf), 0, proxy_server_data_in);  } -struct server_proxy * -proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd) +static void +proxy_connect_next_remote(struct cfg *cfg, struct server_proxy *proxy)  { -	struct server_proxy *proxy; -	struct sockaddr_in46 *remote; +	struct sockaddr_in46 *remote, *tmp; +	struct server *scfg = proxy->scfg;  	int sfd; +	unsigned i = 0; -	proxy = zmalloc(sizeof(*proxy)); -	if (!proxy) { -		perror("malloc"); -		return NULL; +again: +	remote = NULL; +	list_for_each_entry(tmp, &scfg->remotes, list) { +		if (i == proxy->next_remote) { +			remote = tmp; +			break; +		} +		i++; +	} + +	if (!remote) { +		fprintf(stderr, "No more remote addresses to attempt\n"); +		/* FIXME: put tasks */ +		return;  	} -	remote = list_first_entry(&scfg->remotes, struct sockaddr_in46, list); +	proxy->next_remote++;  	proxy->server = *remote;  	sockaddr_to_str(&proxy->server, proxy->serverstr, sizeof(proxy->serverstr));  	fprintf(stderr, "%s: attempting proxy connection to %s (len %u)\n", @@ -50,17 +149,32 @@ proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, in  	sfd = socket(proxy->server.storage.ss_family, SOCK_STREAM, 0);  	if (sfd < 0) {  		perror("socket"); -		uring_close(cfg, NULL, fd, NULL); -		free(proxy); +		goto again; +	} + +	uring_task_set_fd(&proxy->servertask, sfd); +	uring_connect(cfg, &proxy->servertask, &proxy->server, proxy_server_connected); +} + +struct server_proxy * +proxy_new(struct cfg *cfg, struct server *scfg, struct sockaddr_in46 *client, int fd) +{ +	struct server_proxy *proxy; + +	proxy = zmalloc(sizeof(*proxy)); +	if (!proxy) { +		perror("malloc");  		return NULL;  	} +	proxy->scfg = scfg;  	proxy->client = *client;  	sockaddr_to_str(&proxy->client, proxy->clientstr, sizeof(proxy->clientstr)); -	uring_task_init(&proxy->task, "server_proxy", &scfg->task, proxy_free); -	uring_task_set_fd(&proxy->task, sfd); +	uring_task_init(&proxy->clienttask, "proxy_client", &scfg->task, proxy_client_free); +	uring_task_init(&proxy->servertask, "proxy_server", &scfg->task, proxy_server_free); +	uring_task_set_fd(&proxy->clienttask, fd);  	list_add(&proxy->list, &scfg->proxys); -	uring_connect(cfg, &proxy->task, &proxy->server, proxy_connected); +	proxy_connect_next_remote(cfg, proxy);  	return proxy;  } @@ -4,11 +4,18 @@  struct server_proxy {  	struct sockaddr_in46 client;  	char clientstr[ADDRSTRLEN]; +	char clientbuf[4096]; +	size_t clientlen; +	struct uring_task clienttask; +  	struct sockaddr_in46 server;  	char serverstr[ADDRSTRLEN]; -	struct uring_task task; -	char buf[4096]; -	size_t len; +	char serverbuf[4096]; +	size_t serverlen; +	struct uring_task servertask; +	 +	unsigned next_remote; +	struct server *scfg;  	struct list_head list;  }; @@ -142,6 +142,25 @@ uring_close(struct cfg *cfg, struct uring_task *task, int fd, callback_t callbac  }  void +uring_write(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, callback_t callback) +{ +	struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); + +	if (!sqe) +		perrordie("io_uring_sqe"); + +	if (task->fd < 0) { +		error("uring_write called with no fd set\n"); +		return; +	} + +	uring_task_get(cfg, task); +	task->callback = callback; +	io_uring_prep_write(sqe, task->fd, buf, len, 0); +	io_uring_sqe_set_data(sqe, task); +} + +void  uring_read(struct cfg *cfg, struct uring_task *task, void *buf, size_t len, off_t offset, callback_t callback)  {  	struct io_uring_sqe *sqe = io_uring_get_sqe(&cfg->uev->uring); @@ -18,6 +18,9 @@ void uring_task_init(struct uring_task *task, const char *name,  void uring_close(struct cfg *cfg, struct uring_task *task, int fd,  		 callback_t callback); +void uring_write(struct cfg *cfg, struct uring_task *task, void *buf, +		 size_t len, callback_t callback); +  void uring_read(struct cfg *cfg, struct uring_task *task, void *buf,  		size_t len, off_t offset, callback_t callback);  | 
