 4d334e5205
			
		
	
	4d334e5205
	
	
	
		
			
			Typofixes. * ak/typofixes: t: fix typos t/helper: fix a typo t/perf: fix typos t/unit-tests: fix typos contrib: fix typos compat: fix typos
		
			
				
	
	
		
			1049 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			1049 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| #include "git-compat-util.h"
 | |
| #include "gettext.h"
 | |
| #include "simple-ipc.h"
 | |
| #include "strbuf.h"
 | |
| #include "thread-utils.h"
 | |
| #include "trace2.h"
 | |
| #include "unix-socket.h"
 | |
| #include "unix-stream-server.h"
 | |
| 
 | |
| #ifndef SUPPORTS_SIMPLE_IPC
 | |
| /*
 | |
|  * This source file should only be compiled when Simple IPC is supported.
 | |
|  * See the top-level Makefile.
 | |
|  */
 | |
| #error SUPPORTS_SIMPLE_IPC not defined
 | |
| #endif
 | |
| 
 | |
| enum ipc_active_state ipc_get_active_state(const char *path)
 | |
| {
 | |
| 	enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
 | |
| 	struct ipc_client_connect_options options
 | |
| 		= IPC_CLIENT_CONNECT_OPTIONS_INIT;
 | |
| 	struct stat st;
 | |
| 	struct ipc_client_connection *connection_test = NULL;
 | |
| 
 | |
| 	options.wait_if_busy = 0;
 | |
| 	options.wait_if_not_found = 0;
 | |
| 
 | |
| 	if (lstat(path, &st) == -1) {
 | |
| 		switch (errno) {
 | |
| 		case ENOENT:
 | |
| 		case ENOTDIR:
 | |
| 			return IPC_STATE__NOT_LISTENING;
 | |
| 		default:
 | |
| 			return IPC_STATE__INVALID_PATH;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| #ifdef __CYGWIN__
 | |
| 	/*
 | |
| 	 * Cygwin emulates Unix sockets by writing special-crafted files whose
 | |
| 	 * `system` bit is set.
 | |
| 	 *
 | |
| 	 * If we are too fast, Cygwin might still be in the process of marking
 | |
| 	 * the underlying file as a system file. Until then, we will not see a
 | |
| 	 * Unix socket here, but a plain file instead. Just in case that this
 | |
| 	 * is happening, wait a little and try again.
 | |
| 	 */
 | |
| 	{
 | |
| 		static const int delay[] = { 1, 10, 20, 40, -1 };
 | |
| 		int i;
 | |
| 
 | |
| 		for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
 | |
| 			sleep_millisec(delay[i]);
 | |
| 			if (lstat(path, &st) == -1)
 | |
| 				return IPC_STATE__INVALID_PATH;
 | |
| 		}
 | |
| 	}
 | |
| #endif
 | |
| 
 | |
| 	/* also complain if a plain file is in the way */
 | |
| 	if ((st.st_mode & S_IFMT) != S_IFSOCK)
 | |
| 		return IPC_STATE__INVALID_PATH;
 | |
| 
 | |
| 	/*
 | |
| 	 * Just because the filesystem has a S_IFSOCK type inode
 | |
| 	 * at `path`, doesn't mean it that there is a server listening.
 | |
| 	 * Ping it to be sure.
 | |
| 	 */
 | |
| 	state = ipc_client_try_connect(path, &options, &connection_test);
 | |
| 	ipc_client_close_connection(connection_test);
 | |
| 
 | |
| 	return state;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Retry frequency when trying to connect to a server.
 | |
|  *
 | |
|  * This value should be short enough that we don't seriously delay our
 | |
|  * caller, but not fast enough that our spinning puts pressure on the
 | |
|  * system.
 | |
|  */
 | |
| #define WAIT_STEP_MS (50)
 | |
| 
 | |
| /*
 | |
|  * Try to connect to the server.  If the server is just starting up or
 | |
|  * is very busy, we may not get a connection the first time.
 | |
|  */
 | |
| static enum ipc_active_state connect_to_server(
 | |
| 	const char *path,
 | |
| 	int timeout_ms,
 | |
| 	const struct ipc_client_connect_options *options,
 | |
| 	int *pfd)
 | |
| {
 | |
| 	int k;
 | |
| 
 | |
| 	*pfd = -1;
 | |
| 
 | |
| 	for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
 | |
| 		int fd = unix_stream_connect(path, options->uds_disallow_chdir);
 | |
| 
 | |
| 		if (fd != -1) {
 | |
| 			*pfd = fd;
 | |
| 			return IPC_STATE__LISTENING;
 | |
| 		}
 | |
| 
 | |
| 		if (errno == ENOENT) {
 | |
| 			if (!options->wait_if_not_found)
 | |
| 				return IPC_STATE__PATH_NOT_FOUND;
 | |
| 
 | |
| 			goto sleep_and_try_again;
 | |
| 		}
 | |
| 
 | |
| 		if (errno == ETIMEDOUT) {
 | |
| 			if (!options->wait_if_busy)
 | |
| 				return IPC_STATE__NOT_LISTENING;
 | |
| 
 | |
| 			goto sleep_and_try_again;
 | |
| 		}
 | |
| 
 | |
| 		if (errno == ECONNREFUSED) {
 | |
| 			if (!options->wait_if_busy)
 | |
| 				return IPC_STATE__NOT_LISTENING;
 | |
| 
 | |
| 			goto sleep_and_try_again;
 | |
| 		}
 | |
| 
 | |
| 		return IPC_STATE__OTHER_ERROR;
 | |
| 
 | |
| 	sleep_and_try_again:
 | |
| 		sleep_millisec(WAIT_STEP_MS);
 | |
| 	}
 | |
| 
 | |
| 	return IPC_STATE__NOT_LISTENING;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * The total amount of time that we are willing to wait when trying to
 | |
|  * connect to a server.
 | |
|  *
 | |
|  * When the server is first started, it might take a little while for
 | |
|  * it to become ready to service requests.  Likewise, the server may
 | |
|  * be very (temporarily) busy and not respond to our connections.
 | |
|  *
 | |
|  * We should gracefully and silently handle those conditions and try
 | |
|  * again for a reasonable time period.
 | |
|  *
 | |
|  * The value chosen here should be long enough for the server
 | |
|  * to reliably heal from the above conditions.
 | |
|  */
 | |
| #define MY_CONNECTION_TIMEOUT_MS (1000)
 | |
| 
 | |
| enum ipc_active_state ipc_client_try_connect(
 | |
| 	const char *path,
 | |
| 	const struct ipc_client_connect_options *options,
 | |
| 	struct ipc_client_connection **p_connection)
 | |
| {
 | |
| 	enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
 | |
| 	int fd = -1;
 | |
| 
 | |
| 	*p_connection = NULL;
 | |
| 
 | |
| 	trace2_region_enter("ipc-client", "try-connect", NULL);
 | |
| 	trace2_data_string("ipc-client", NULL, "try-connect/path", path);
 | |
| 
 | |
| 	state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
 | |
| 				  options, &fd);
 | |
| 
 | |
| 	trace2_data_intmax("ipc-client", NULL, "try-connect/state",
 | |
| 			   (intmax_t)state);
 | |
| 	trace2_region_leave("ipc-client", "try-connect", NULL);
 | |
| 
 | |
| 	if (state == IPC_STATE__LISTENING) {
 | |
| 		(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
 | |
| 		(*p_connection)->fd = fd;
 | |
| 	}
 | |
| 
 | |
| 	return state;
 | |
| }
 | |
| 
 | |
| void ipc_client_close_connection(struct ipc_client_connection *connection)
 | |
| {
 | |
| 	if (!connection)
 | |
| 		return;
 | |
| 
 | |
| 	if (connection->fd != -1)
 | |
| 		close(connection->fd);
 | |
| 
 | |
| 	free(connection);
 | |
| }
 | |
| 
 | |
| int ipc_client_send_command_to_connection(
 | |
| 	struct ipc_client_connection *connection,
 | |
| 	const char *message, size_t message_len,
 | |
| 	struct strbuf *answer)
 | |
| {
 | |
| 	int ret = 0;
 | |
| 
 | |
| 	strbuf_setlen(answer, 0);
 | |
| 
 | |
| 	trace2_region_enter("ipc-client", "send-command", NULL);
 | |
| 
 | |
| 	if (write_packetized_from_buf_no_flush(message, message_len,
 | |
| 					       connection->fd) < 0 ||
 | |
| 	    packet_flush_gently(connection->fd) < 0) {
 | |
| 		ret = error(_("could not send IPC command"));
 | |
| 		goto done;
 | |
| 	}
 | |
| 
 | |
| 	if (read_packetized_to_strbuf(
 | |
| 		    connection->fd, answer,
 | |
| 		    PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
 | |
| 		ret = error(_("could not read IPC response"));
 | |
| 		goto done;
 | |
| 	}
 | |
| 
 | |
| done:
 | |
| 	trace2_region_leave("ipc-client", "send-command", NULL);
 | |
| 	return ret;
 | |
| }
 | |
| 
 | |
| int ipc_client_send_command(const char *path,
 | |
| 			    const struct ipc_client_connect_options *options,
 | |
| 			    const char *message, size_t message_len,
 | |
| 			    struct strbuf *answer)
 | |
| {
 | |
| 	int ret = -1;
 | |
| 	enum ipc_active_state state;
 | |
| 	struct ipc_client_connection *connection = NULL;
 | |
| 
 | |
| 	state = ipc_client_try_connect(path, options, &connection);
 | |
| 
 | |
| 	if (state != IPC_STATE__LISTENING)
 | |
| 		return ret;
 | |
| 
 | |
| 	ret = ipc_client_send_command_to_connection(connection,
 | |
| 						    message, message_len,
 | |
| 						    answer);
 | |
| 
 | |
| 	ipc_client_close_connection(connection);
 | |
| 
 | |
| 	return ret;
 | |
| }
 | |
| 
 | |
| static int set_socket_blocking_flag(int fd, int make_nonblocking)
 | |
| {
 | |
| 	int flags;
 | |
| 
 | |
| 	flags = fcntl(fd, F_GETFL, NULL);
 | |
| 
 | |
| 	if (flags < 0)
 | |
| 		return -1;
 | |
| 
 | |
| 	if (make_nonblocking)
 | |
| 		flags |= O_NONBLOCK;
 | |
| 	else
 | |
| 		flags &= ~O_NONBLOCK;
 | |
| 
 | |
| 	return fcntl(fd, F_SETFL, flags);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Magic numbers used to annotate callback instance data.
 | |
|  * These are used to help guard against accidentally passing the
 | |
|  * wrong instance data across multiple levels of callbacks (which
 | |
|  * is easy to do if there are `void*` arguments).
 | |
|  */
 | |
| enum magic {
 | |
| 	MAGIC_SERVER_REPLY_DATA,
 | |
| 	MAGIC_WORKER_THREAD_DATA,
 | |
| 	MAGIC_ACCEPT_THREAD_DATA,
 | |
| 	MAGIC_SERVER_DATA,
 | |
| };
 | |
| 
 | |
| struct ipc_server_reply_data {
 | |
| 	enum magic magic;
 | |
| 	int fd;
 | |
| 	struct ipc_worker_thread_data *worker_thread_data;
 | |
| };
 | |
| 
 | |
| struct ipc_worker_thread_data {
 | |
| 	enum magic magic;
 | |
| 	struct ipc_worker_thread_data *next_thread;
 | |
| 	struct ipc_server_data *server_data;
 | |
| 	pthread_t pthread_id;
 | |
| };
 | |
| 
 | |
| struct ipc_accept_thread_data {
 | |
| 	enum magic magic;
 | |
| 	struct ipc_server_data *server_data;
 | |
| 
 | |
| 	struct unix_ss_socket *server_socket;
 | |
| 
 | |
| 	int fd_send_shutdown;
 | |
| 	int fd_wait_shutdown;
 | |
| 	pthread_t pthread_id;
 | |
| };
 | |
| 
 | |
| /*
 | |
|  * With unix-sockets, the conceptual "ipc-server" is implemented as a single
 | |
|  * controller "accept-thread" thread and a pool of "worker-thread" threads.
 | |
|  * The former does the usual `accept()` loop and dispatches connections
 | |
|  * to an idle worker thread.  The worker threads wait in an idle loop for
 | |
|  * a new connection, communicate with the client and relay data to/from
 | |
|  * the `application_cb` and then wait for another connection from the
 | |
|  * server thread.  This avoids the overhead of constantly creating and
 | |
|  * destroying threads.
 | |
|  */
 | |
| struct ipc_server_data {
 | |
| 	enum magic magic;
 | |
| 	ipc_server_application_cb *application_cb;
 | |
| 	void *application_data;
 | |
| 	struct strbuf buf_path;
 | |
| 
 | |
| 	struct ipc_accept_thread_data *accept_thread;
 | |
| 	struct ipc_worker_thread_data *worker_thread_list;
 | |
| 
 | |
| 	pthread_mutex_t work_available_mutex;
 | |
| 	pthread_cond_t work_available_cond;
 | |
| 
 | |
| 	/*
 | |
| 	 * Accepted but not yet processed client connections are kept
 | |
| 	 * in a circular buffer FIFO.  The queue is empty when the
 | |
| 	 * positions are equal.
 | |
| 	 */
 | |
| 	int *fifo_fds;
 | |
| 	int queue_size;
 | |
| 	int back_pos;
 | |
| 	int front_pos;
 | |
| 
 | |
| 	int started;
 | |
| 	int shutdown_requested;
 | |
| 	int is_stopped;
 | |
| };
 | |
| 
 | |
| /*
 | |
|  * Remove and return the oldest queued connection.
 | |
|  *
 | |
|  * Returns -1 if empty.
 | |
|  */
 | |
| static int fifo_dequeue(struct ipc_server_data *server_data)
 | |
| {
 | |
| 	/* ASSERT holding mutex */
 | |
| 
 | |
| 	int fd;
 | |
| 
 | |
| 	if (server_data->back_pos == server_data->front_pos)
 | |
| 		return -1;
 | |
| 
 | |
| 	fd = server_data->fifo_fds[server_data->front_pos];
 | |
| 	server_data->fifo_fds[server_data->front_pos] = -1;
 | |
| 
 | |
| 	server_data->front_pos++;
 | |
| 	if (server_data->front_pos == server_data->queue_size)
 | |
| 		server_data->front_pos = 0;
 | |
| 
 | |
| 	return fd;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Push a new fd onto the back of the queue.
 | |
|  *
 | |
|  * Drop it and return -1 if queue is already full.
 | |
|  */
 | |
| static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
 | |
| {
 | |
| 	/* ASSERT holding mutex */
 | |
| 
 | |
| 	int next_back_pos;
 | |
| 
 | |
| 	next_back_pos = server_data->back_pos + 1;
 | |
| 	if (next_back_pos == server_data->queue_size)
 | |
| 		next_back_pos = 0;
 | |
| 
 | |
| 	if (next_back_pos == server_data->front_pos) {
 | |
| 		/* Queue is full. Just drop it. */
 | |
| 		close(fd);
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	server_data->fifo_fds[server_data->back_pos] = fd;
 | |
| 	server_data->back_pos = next_back_pos;
 | |
| 
 | |
| 	return fd;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Wait for a connection to be queued to the FIFO and return it.
 | |
|  *
 | |
|  * Returns -1 if someone has already requested a shutdown.
 | |
|  */
 | |
| static int worker_thread__wait_for_connection(
 | |
| 	struct ipc_worker_thread_data *worker_thread_data)
 | |
| {
 | |
| 	/* ASSERT NOT holding mutex */
 | |
| 
 | |
| 	struct ipc_server_data *server_data = worker_thread_data->server_data;
 | |
| 	int fd = -1;
 | |
| 
 | |
| 	pthread_mutex_lock(&server_data->work_available_mutex);
 | |
| 	for (;;) {
 | |
| 		if (server_data->shutdown_requested)
 | |
| 			break;
 | |
| 
 | |
| 		fd = fifo_dequeue(server_data);
 | |
| 		if (fd >= 0)
 | |
| 			break;
 | |
| 
 | |
| 		pthread_cond_wait(&server_data->work_available_cond,
 | |
| 				  &server_data->work_available_mutex);
 | |
| 	}
 | |
| 	pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| 
 | |
| 	return fd;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Forward declare our reply callback function so that any compiler
 | |
|  * errors are reported when we actually define the function (in addition
 | |
|  * to any errors reported when we try to pass this callback function as
 | |
|  * a parameter in a function call).  The former are easier to understand.
 | |
|  */
 | |
| static ipc_server_reply_cb do_io_reply_callback;
 | |
| 
 | |
| /*
 | |
|  * Relay application's response message to the client process.
 | |
|  * (We do not flush at this point because we allow the caller
 | |
|  * to chunk data to the client thru us.)
 | |
|  */
 | |
| static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
 | |
| 		       const char *response, size_t response_len)
 | |
| {
 | |
| 	if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
 | |
| 		BUG("reply_cb called with wrong instance data");
 | |
| 
 | |
| 	return write_packetized_from_buf_no_flush(response, response_len,
 | |
| 						  reply_data->fd);
 | |
| }
 | |
| 
 | |
| /* A randomly chosen value. */
 | |
| #define MY_WAIT_POLL_TIMEOUT_MS (10)
 | |
| 
 | |
| /*
 | |
|  * If the client hangs up without sending any data on the wire, just
 | |
|  * quietly close the socket and ignore this client.
 | |
|  *
 | |
|  * This worker thread is committed to reading the IPC request data
 | |
|  * from the client at the other end of this fd.  Wait here for the
 | |
|  * client to actually put something on the wire -- because if the
 | |
|  * client just does a ping (connect and hangup without sending any
 | |
|  * data), our use of the pkt-line read routines will spew an error
 | |
|  * message.
 | |
|  *
 | |
|  * Return -1 if the client hung up.
 | |
|  * Return 0 if data (possibly incomplete) is ready.
 | |
|  */
 | |
| static int worker_thread__wait_for_io_start(
 | |
| 	struct ipc_worker_thread_data *worker_thread_data,
 | |
| 	int fd)
 | |
| {
 | |
| 	struct ipc_server_data *server_data = worker_thread_data->server_data;
 | |
| 	struct pollfd pollfd[1];
 | |
| 	int result;
 | |
| 
 | |
| 	for (;;) {
 | |
| 		pollfd[0].fd = fd;
 | |
| 		pollfd[0].events = POLLIN;
 | |
| 
 | |
| 		result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
 | |
| 		if (result < 0) {
 | |
| 			if (errno == EINTR)
 | |
| 				continue;
 | |
| 			goto cleanup;
 | |
| 		}
 | |
| 
 | |
| 		if (result == 0) {
 | |
| 			/* a timeout */
 | |
| 
 | |
| 			int in_shutdown;
 | |
| 
 | |
| 			pthread_mutex_lock(&server_data->work_available_mutex);
 | |
| 			in_shutdown = server_data->shutdown_requested;
 | |
| 			pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| 
 | |
| 			/*
 | |
| 			 * If a shutdown is already in progress and this
 | |
| 			 * client has not started talking yet, just drop it.
 | |
| 			 */
 | |
| 			if (in_shutdown)
 | |
| 				goto cleanup;
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (pollfd[0].revents & POLLHUP)
 | |
| 			goto cleanup;
 | |
| 
 | |
| 		if (pollfd[0].revents & POLLIN)
 | |
| 			return 0;
 | |
| 
 | |
| 		goto cleanup;
 | |
| 	}
 | |
| 
 | |
| cleanup:
 | |
| 	close(fd);
 | |
| 	return -1;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Receive the request/command from the client and pass it to the
 | |
|  * registered request-callback.  The request-callback will compose
 | |
|  * a response and call our reply-callback to send it to the client.
 | |
|  */
 | |
| static int worker_thread__do_io(
 | |
| 	struct ipc_worker_thread_data *worker_thread_data,
 | |
| 	int fd)
 | |
| {
 | |
| 	/* ASSERT NOT holding lock */
 | |
| 
 | |
| 	struct strbuf buf = STRBUF_INIT;
 | |
| 	struct ipc_server_reply_data reply_data;
 | |
| 	int ret = 0;
 | |
| 
 | |
| 	reply_data.magic = MAGIC_SERVER_REPLY_DATA;
 | |
| 	reply_data.worker_thread_data = worker_thread_data;
 | |
| 
 | |
| 	reply_data.fd = fd;
 | |
| 
 | |
| 	ret = read_packetized_to_strbuf(
 | |
| 		reply_data.fd, &buf,
 | |
| 		PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
 | |
| 	if (ret >= 0) {
 | |
| 		ret = worker_thread_data->server_data->application_cb(
 | |
| 			worker_thread_data->server_data->application_data,
 | |
| 			buf.buf, buf.len, do_io_reply_callback, &reply_data);
 | |
| 
 | |
| 		packet_flush_gently(reply_data.fd);
 | |
| 	}
 | |
| 	else {
 | |
| 		/*
 | |
| 		 * The client probably disconnected/shutdown before it
 | |
| 		 * could send a well-formed message.  Ignore it.
 | |
| 		 */
 | |
| 	}
 | |
| 
 | |
| 	strbuf_release(&buf);
 | |
| 	close(reply_data.fd);
 | |
| 
 | |
| 	return ret;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Block SIGPIPE on the current thread (so that we get EPIPE from
 | |
|  * write() rather than an actual signal).
 | |
|  *
 | |
|  * Note that using sigchain_push() and _pop() to control SIGPIPE
 | |
|  * around our IO calls is not thread safe:
 | |
|  * [] It uses a global stack of handler frames.
 | |
|  * [] It uses ALLOC_GROW() to resize it.
 | |
|  * [] Finally, according to the `signal(2)` man-page:
 | |
|  *    "The effects of `signal()` in a multithreaded process are unspecified."
 | |
|  */
 | |
| static void thread_block_sigpipe(sigset_t *old_set)
 | |
| {
 | |
| 	sigset_t new_set;
 | |
| 
 | |
| 	sigemptyset(&new_set);
 | |
| 	sigaddset(&new_set, SIGPIPE);
 | |
| 
 | |
| 	sigemptyset(old_set);
 | |
| 	pthread_sigmask(SIG_BLOCK, &new_set, old_set);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Thread proc for an IPC worker thread.  It handles a series of
 | |
|  * connections from clients.  It pulls the next fd from the queue
 | |
|  * processes it, and then waits for the next client.
 | |
|  *
 | |
|  * Block SIGPIPE in this worker thread for the life of the thread.
 | |
|  * This avoids stray (and sometimes delayed) SIGPIPE signals caused
 | |
|  * by client errors and/or when we are under extremely heavy IO load.
 | |
|  *
 | |
|  * This means that the application callback will have SIGPIPE blocked.
 | |
|  * The callback should not change it.
 | |
|  */
 | |
| static void *worker_thread_proc(void *_worker_thread_data)
 | |
| {
 | |
| 	struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
 | |
| 	struct ipc_server_data *server_data = worker_thread_data->server_data;
 | |
| 	sigset_t old_set;
 | |
| 	int fd, io;
 | |
| 	int ret;
 | |
| 
 | |
| 	trace2_thread_start("ipc-worker");
 | |
| 
 | |
| 	thread_block_sigpipe(&old_set);
 | |
| 
 | |
| 	for (;;) {
 | |
| 		fd = worker_thread__wait_for_connection(worker_thread_data);
 | |
| 		if (fd == -1)
 | |
| 			break; /* in shutdown */
 | |
| 
 | |
| 		io = worker_thread__wait_for_io_start(worker_thread_data, fd);
 | |
| 		if (io == -1)
 | |
| 			continue; /* client hung up without sending anything */
 | |
| 
 | |
| 		ret = worker_thread__do_io(worker_thread_data, fd);
 | |
| 
 | |
| 		if (ret == SIMPLE_IPC_QUIT) {
 | |
| 			trace2_data_string("ipc-worker", NULL, "queue_stop_async",
 | |
| 					   "application_quit");
 | |
| 			/*
 | |
| 			 * The application layer is telling the ipc-server
 | |
| 			 * layer to shutdown.
 | |
| 			 *
 | |
| 			 * We DO NOT have a response to send to the client.
 | |
| 			 *
 | |
| 			 * Queue an async stop (to stop the other threads) and
 | |
| 			 * allow this worker thread to exit now (no sense waiting
 | |
| 			 * for the thread-pool shutdown signal).
 | |
| 			 *
 | |
| 			 * Other non-idle worker threads are allowed to finish
 | |
| 			 * responding to their current clients.
 | |
| 			 */
 | |
| 			ipc_server_stop_async(server_data);
 | |
| 			break;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	trace2_thread_exit();
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| /* A randomly chosen value. */
 | |
| #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
 | |
| 
 | |
| /*
 | |
|  * Accept a new client connection on our socket.  This uses non-blocking
 | |
|  * IO so that we can also wait for shutdown requests on our socket-pair
 | |
|  * without actually spinning on a fast timeout.
 | |
|  */
 | |
| static int accept_thread__wait_for_connection(
 | |
| 	struct ipc_accept_thread_data *accept_thread_data)
 | |
| {
 | |
| 	struct pollfd pollfd[2];
 | |
| 	int result;
 | |
| 
 | |
| 	for (;;) {
 | |
| 		pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
 | |
| 		pollfd[0].events = POLLIN;
 | |
| 
 | |
| 		pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
 | |
| 		pollfd[1].events = POLLIN;
 | |
| 
 | |
| 		result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
 | |
| 		if (result < 0) {
 | |
| 			if (errno == EINTR)
 | |
| 				continue;
 | |
| 			return result;
 | |
| 		}
 | |
| 
 | |
| 		if (result == 0) {
 | |
| 			/* a timeout */
 | |
| 
 | |
| 			/*
 | |
| 			 * If someone deletes or force-creates a new unix
 | |
| 			 * domain socket at our path, all future clients
 | |
| 			 * will be routed elsewhere and we silently starve.
 | |
| 			 * If that happens, just queue a shutdown.
 | |
| 			 */
 | |
| 			if (unix_ss_was_stolen(
 | |
| 				    accept_thread_data->server_socket)) {
 | |
| 				trace2_data_string("ipc-accept", NULL,
 | |
| 						   "queue_stop_async",
 | |
| 						   "socket_stolen");
 | |
| 				ipc_server_stop_async(
 | |
| 					accept_thread_data->server_data);
 | |
| 			}
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (pollfd[0].revents & POLLIN) {
 | |
| 			/* shutdown message queued to socketpair */
 | |
| 			return -1;
 | |
| 		}
 | |
| 
 | |
| 		if (pollfd[1].revents & POLLIN) {
 | |
| 			/* a connection is available on server_socket */
 | |
| 
 | |
| 			int client_fd =
 | |
| 				accept(accept_thread_data->server_socket->fd_socket,
 | |
| 				       NULL, NULL);
 | |
| 			if (client_fd >= 0)
 | |
| 				return client_fd;
 | |
| 
 | |
| 			/*
 | |
| 			 * An error here is unlikely -- it probably
 | |
| 			 * indicates that the connecting process has
 | |
| 			 * already dropped the connection.
 | |
| 			 */
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
 | |
| 		    errno, pollfd[0].revents, pollfd[1].revents);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Thread proc for the IPC server "accept thread".  This waits for
 | |
|  * an incoming socket connection, appends it to the queue of available
 | |
|  * connections, and notifies a worker thread to process it.
 | |
|  *
 | |
|  * Block SIGPIPE in this thread for the life of the thread.  This
 | |
|  * avoids any stray SIGPIPE signals when closing pipe fds under
 | |
|  * extremely heavy loads (such as when the fifo queue is full and we
 | |
|  * drop incoming connections).
 | |
|  */
 | |
| static void *accept_thread_proc(void *_accept_thread_data)
 | |
| {
 | |
| 	struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
 | |
| 	struct ipc_server_data *server_data = accept_thread_data->server_data;
 | |
| 	sigset_t old_set;
 | |
| 
 | |
| 	trace2_thread_start("ipc-accept");
 | |
| 
 | |
| 	thread_block_sigpipe(&old_set);
 | |
| 
 | |
| 	for (;;) {
 | |
| 		int client_fd = accept_thread__wait_for_connection(
 | |
| 			accept_thread_data);
 | |
| 
 | |
| 		pthread_mutex_lock(&server_data->work_available_mutex);
 | |
| 		if (server_data->shutdown_requested) {
 | |
| 			pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| 			if (client_fd >= 0)
 | |
| 				close(client_fd);
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (client_fd < 0) {
 | |
| 			/* ignore transient accept() errors */
 | |
| 		}
 | |
| 		else {
 | |
| 			fifo_enqueue(server_data, client_fd);
 | |
| 			pthread_cond_broadcast(&server_data->work_available_cond);
 | |
| 		}
 | |
| 		pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| 	}
 | |
| 
 | |
| 	trace2_thread_exit();
 | |
| 	return NULL;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * We can't predict the connection arrival rate relative to the worker
 | |
|  * processing rate, therefore we allow the "accept-thread" to queue up
 | |
|  * a generous number of connections, since we'd rather have the client
 | |
|  * not unnecessarily timeout if we can avoid it.  (The assumption is
 | |
|  * that this will be used for FSMonitor and a few second wait on a
 | |
|  * connection is better than having the client timeout and do the full
 | |
|  * computation itself.)
 | |
|  *
 | |
|  * The FIFO queue size is set to a multiple of the worker pool size.
 | |
|  * This value chosen at random.
 | |
|  */
 | |
| #define FIFO_SCALE (100)
 | |
| 
 | |
| /*
 | |
|  * The backlog value for `listen(2)`.  This doesn't need to huge,
 | |
|  * rather just large enough for our "accept-thread" to wake up and
 | |
|  * queue incoming connections onto the FIFO without the kernel
 | |
|  * dropping any.
 | |
|  *
 | |
|  * This value chosen at random.
 | |
|  */
 | |
| #define LISTEN_BACKLOG (50)
 | |
| 
 | |
| static int create_listener_socket(
 | |
| 	const char *path,
 | |
| 	const struct ipc_server_opts *ipc_opts,
 | |
| 	struct unix_ss_socket **new_server_socket)
 | |
| {
 | |
| 	struct unix_ss_socket *server_socket = NULL;
 | |
| 	struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
 | |
| 	int ret;
 | |
| 
 | |
| 	uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
 | |
| 	uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
 | |
| 
 | |
| 	ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
 | |
| 	if (ret)
 | |
| 		return ret;
 | |
| 
 | |
| 	if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
 | |
| 		int saved_errno = errno;
 | |
| 		unix_ss_free(server_socket);
 | |
| 		errno = saved_errno;
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	*new_server_socket = server_socket;
 | |
| 
 | |
| 	trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static int setup_listener_socket(
 | |
| 	const char *path,
 | |
| 	const struct ipc_server_opts *ipc_opts,
 | |
| 	struct unix_ss_socket **new_server_socket)
 | |
| {
 | |
| 	int ret, saved_errno;
 | |
| 
 | |
| 	trace2_region_enter("ipc-server", "create-listener_socket", NULL);
 | |
| 
 | |
| 	ret = create_listener_socket(path, ipc_opts, new_server_socket);
 | |
| 
 | |
| 	saved_errno = errno;
 | |
| 	trace2_region_leave("ipc-server", "create-listener_socket", NULL);
 | |
| 	errno = saved_errno;
 | |
| 
 | |
| 	return ret;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Start IPC server in a pool of background threads.
 | |
|  */
 | |
| int ipc_server_init_async(struct ipc_server_data **returned_server_data,
 | |
| 			  const char *path, const struct ipc_server_opts *opts,
 | |
| 			  ipc_server_application_cb *application_cb,
 | |
| 			  void *application_data)
 | |
| {
 | |
| 	struct unix_ss_socket *server_socket = NULL;
 | |
| 	struct ipc_server_data *server_data;
 | |
| 	int sv[2];
 | |
| 	int k;
 | |
| 	int ret;
 | |
| 	int nr_threads = opts->nr_threads;
 | |
| 
 | |
| 	*returned_server_data = NULL;
 | |
| 
 | |
| 	/*
 | |
| 	 * Create a socketpair and set sv[1] to non-blocking.  This
 | |
| 	 * will used to send a shutdown message to the accept-thread
 | |
| 	 * and allows the accept-thread to wait on EITHER a client
 | |
| 	 * connection or a shutdown request without spinning.
 | |
| 	 */
 | |
| 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
 | |
| 		return -1;
 | |
| 
 | |
| 	if (set_socket_blocking_flag(sv[1], 1)) {
 | |
| 		int saved_errno = errno;
 | |
| 		close(sv[0]);
 | |
| 		close(sv[1]);
 | |
| 		errno = saved_errno;
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	ret = setup_listener_socket(path, opts, &server_socket);
 | |
| 	if (ret) {
 | |
| 		int saved_errno = errno;
 | |
| 		close(sv[0]);
 | |
| 		close(sv[1]);
 | |
| 		errno = saved_errno;
 | |
| 		return ret;
 | |
| 	}
 | |
| 
 | |
| 	server_data = xcalloc(1, sizeof(*server_data));
 | |
| 	server_data->magic = MAGIC_SERVER_DATA;
 | |
| 	server_data->application_cb = application_cb;
 | |
| 	server_data->application_data = application_data;
 | |
| 	strbuf_init(&server_data->buf_path, 0);
 | |
| 	strbuf_addstr(&server_data->buf_path, path);
 | |
| 
 | |
| 	if (nr_threads < 1)
 | |
| 		nr_threads = 1;
 | |
| 
 | |
| 	pthread_mutex_init(&server_data->work_available_mutex, NULL);
 | |
| 	pthread_cond_init(&server_data->work_available_cond, NULL);
 | |
| 
 | |
| 	server_data->queue_size = nr_threads * FIFO_SCALE;
 | |
| 	CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
 | |
| 
 | |
| 	server_data->accept_thread =
 | |
| 		xcalloc(1, sizeof(*server_data->accept_thread));
 | |
| 	server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
 | |
| 	server_data->accept_thread->server_data = server_data;
 | |
| 	server_data->accept_thread->server_socket = server_socket;
 | |
| 	server_data->accept_thread->fd_send_shutdown = sv[0];
 | |
| 	server_data->accept_thread->fd_wait_shutdown = sv[1];
 | |
| 
 | |
| 	/*
 | |
| 	 * Hold work-available mutex so that no work can start until
 | |
| 	 * we unlock it.
 | |
| 	 */
 | |
| 	pthread_mutex_lock(&server_data->work_available_mutex);
 | |
| 
 | |
| 	if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
 | |
| 			   accept_thread_proc, server_data->accept_thread))
 | |
| 		die_errno(_("could not start accept_thread '%s'"), path);
 | |
| 
 | |
| 	for (k = 0; k < nr_threads; k++) {
 | |
| 		struct ipc_worker_thread_data *wtd;
 | |
| 
 | |
| 		wtd = xcalloc(1, sizeof(*wtd));
 | |
| 		wtd->magic = MAGIC_WORKER_THREAD_DATA;
 | |
| 		wtd->server_data = server_data;
 | |
| 
 | |
| 		if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
 | |
| 				   wtd)) {
 | |
| 			if (k == 0)
 | |
| 				die(_("could not start worker[0] for '%s'"),
 | |
| 				    path);
 | |
| 			/*
 | |
| 			 * Limp along with the thread pool that we have.
 | |
| 			 */
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		wtd->next_thread = server_data->worker_thread_list;
 | |
| 		server_data->worker_thread_list = wtd;
 | |
| 	}
 | |
| 
 | |
| 	*returned_server_data = server_data;
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| void ipc_server_start_async(struct ipc_server_data *server_data)
 | |
| {
 | |
| 	if (!server_data || server_data->started)
 | |
| 		return;
 | |
| 
 | |
| 	server_data->started = 1;
 | |
| 	pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Gently tell the IPC server treads to shutdown.
 | |
|  * Can be run on any thread.
 | |
|  */
 | |
| int ipc_server_stop_async(struct ipc_server_data *server_data)
 | |
| {
 | |
| 	/* ASSERT NOT holding mutex */
 | |
| 
 | |
| 	int fd;
 | |
| 
 | |
| 	if (!server_data)
 | |
| 		return 0;
 | |
| 
 | |
| 	trace2_region_enter("ipc-server", "server-stop-async", NULL);
 | |
| 
 | |
| 	/* If we haven't started yet, we are already holding lock. */
 | |
| 	if (server_data->started)
 | |
| 		pthread_mutex_lock(&server_data->work_available_mutex);
 | |
| 
 | |
| 	server_data->shutdown_requested = 1;
 | |
| 
 | |
| 	/*
 | |
| 	 * Write a byte to the shutdown socket pair to wake up the
 | |
| 	 * accept-thread.
 | |
| 	 */
 | |
| 	if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
 | |
| 		error_errno("could not write to fd_send_shutdown");
 | |
| 
 | |
| 	/*
 | |
| 	 * Drain the queue of existing connections.
 | |
| 	 */
 | |
| 	while ((fd = fifo_dequeue(server_data)) != -1)
 | |
| 		close(fd);
 | |
| 
 | |
| 	/*
 | |
| 	 * Gently tell worker threads to stop processing new connections
 | |
| 	 * and exit.  (This does not abort in-process conversations.)
 | |
| 	 */
 | |
| 	pthread_cond_broadcast(&server_data->work_available_cond);
 | |
| 
 | |
| 	pthread_mutex_unlock(&server_data->work_available_mutex);
 | |
| 
 | |
| 	trace2_region_leave("ipc-server", "server-stop-async", NULL);
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Wait for all IPC server threads to stop.
 | |
|  */
 | |
| int ipc_server_await(struct ipc_server_data *server_data)
 | |
| {
 | |
| 	pthread_join(server_data->accept_thread->pthread_id, NULL);
 | |
| 
 | |
| 	if (!server_data->shutdown_requested)
 | |
| 		BUG("ipc-server: accept-thread stopped for '%s'",
 | |
| 		    server_data->buf_path.buf);
 | |
| 
 | |
| 	while (server_data->worker_thread_list) {
 | |
| 		struct ipc_worker_thread_data *wtd =
 | |
| 			server_data->worker_thread_list;
 | |
| 
 | |
| 		pthread_join(wtd->pthread_id, NULL);
 | |
| 
 | |
| 		server_data->worker_thread_list = wtd->next_thread;
 | |
| 		free(wtd);
 | |
| 	}
 | |
| 
 | |
| 	server_data->is_stopped = 1;
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| void ipc_server_free(struct ipc_server_data *server_data)
 | |
| {
 | |
| 	struct ipc_accept_thread_data * accept_thread_data;
 | |
| 
 | |
| 	if (!server_data)
 | |
| 		return;
 | |
| 
 | |
| 	if (!server_data->is_stopped)
 | |
| 		BUG("cannot free ipc-server while running for '%s'",
 | |
| 		    server_data->buf_path.buf);
 | |
| 
 | |
| 	accept_thread_data = server_data->accept_thread;
 | |
| 	if (accept_thread_data) {
 | |
| 		unix_ss_free(accept_thread_data->server_socket);
 | |
| 
 | |
| 		if (accept_thread_data->fd_send_shutdown != -1)
 | |
| 			close(accept_thread_data->fd_send_shutdown);
 | |
| 		if (accept_thread_data->fd_wait_shutdown != -1)
 | |
| 			close(accept_thread_data->fd_wait_shutdown);
 | |
| 
 | |
| 		free(server_data->accept_thread);
 | |
| 	}
 | |
| 
 | |
| 	while (server_data->worker_thread_list) {
 | |
| 		struct ipc_worker_thread_data *wtd =
 | |
| 			server_data->worker_thread_list;
 | |
| 
 | |
| 		server_data->worker_thread_list = wtd->next_thread;
 | |
| 		free(wtd);
 | |
| 	}
 | |
| 
 | |
| 	pthread_cond_destroy(&server_data->work_available_cond);
 | |
| 	pthread_mutex_destroy(&server_data->work_available_mutex);
 | |
| 
 | |
| 	strbuf_release(&server_data->buf_path);
 | |
| 
 | |
| 	free(server_data->fifo_fds);
 | |
| 	free(server_data);
 | |
| }
 |