net/local: add nonblock connect(2) support

Signed-off-by: chao.an <anchao@xiaomi.com>
This commit is contained in:
chao.an 2021-08-13 20:53:38 +08:00 committed by Xiang Xiao
parent ee3980abd7
commit 1a55d933ef
8 changed files with 77 additions and 46 deletions

View file

@ -88,6 +88,7 @@ enum local_state_s
/* SOCK_STREAM peers only */
LOCAL_STATE_ACCEPT, /* Client waiting for a connection */
LOCAL_STATE_CONNECTING, /* Non-blocking connect */
LOCAL_STATE_CONNECTED, /* Peer connected */
LOCAL_STATE_DISCONNECTED /* Peer disconnected */
};
@ -149,12 +150,13 @@ struct local_conn_s
/* SOCK_STREAM fields common to both client and server */
sem_t lc_waitsem; /* Use to wait for a connection to be accepted */
FAR struct socket *lc_psock; /* A reference to the socket structure */
/* The following is a list if poll structures of threads waiting for
* socket events.
*/
struct pollfd *lc_accept_fds[LOCAL_NPOLLWAITERS];
struct pollfd *lc_event_fds[LOCAL_NPOLLWAITERS];
struct pollfd lc_inout_fds[2*LOCAL_NPOLLWAITERS];
/* Union of fields unique to SOCK_STREAM client, server, and connected
@ -617,11 +619,11 @@ int local_open_sender(FAR struct local_conn_s *conn, FAR const char *path,
#endif
/****************************************************************************
* Name: local_accept_pollnotify
* Name: local_event_pollnotify
****************************************************************************/
void local_accept_pollnotify(FAR struct local_conn_s *conn,
pollevent_t eventset);
void local_event_pollnotify(FAR struct local_conn_s *conn,
pollevent_t eventset);
/****************************************************************************
* Name: local_pollsetup

View file

@ -163,6 +163,7 @@ int local_accept(FAR struct socket *psock, FAR struct sockaddr *addr,
conn->lc_proto = SOCK_STREAM;
conn->lc_type = LOCAL_TYPE_PATHNAME;
conn->lc_state = LOCAL_STATE_CONNECTED;
conn->lc_psock = psock;
strncpy(conn->lc_path, client->lc_path, UNIX_PATH_MAX - 1);
conn->lc_path[UNIX_PATH_MAX - 1] = '\0';
@ -228,6 +229,13 @@ int local_accept(FAR struct socket *psock, FAR struct sockaddr *addr,
/* Signal the client with the result of the connection */
client->u.client.lc_result = ret;
if (client->lc_state == LOCAL_STATE_CONNECTING)
{
client->lc_state = LOCAL_STATE_CONNECTED;
_SO_SETERRNO(client->lc_psock, ret);
local_event_pollnotify(client, POLLOUT);
}
nxsem_post(&client->lc_waitsem);
return ret;
}

View file

@ -93,7 +93,6 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
if (server->lc_state != LOCAL_STATE_LISTENING ||
server->u.server.lc_pending >= server->u.server.lc_backlog)
{
net_unlock();
nerr("ERROR: Server is not listening: lc_state=%d\n",
server->lc_state);
nerr(" OR: The backlog limit was reached: %d or %d\n",
@ -114,7 +113,6 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
nerr("ERROR: Failed to create FIFOs for %s: %d\n",
client->lc_path, ret);
net_unlock();
return ret;
}
@ -128,7 +126,6 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
nerr("ERROR: Failed to open write-only FIFOs for %s: %d\n",
client->lc_path, ret);
net_unlock();
goto errout_with_fifos;
}
@ -137,35 +134,36 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
/* Set the busy "result" before giving the semaphore. */
client->u.client.lc_result = -EBUSY;
client->lc_state = LOCAL_STATE_ACCEPT;
/* Add ourself to the list of waiting connections and notify the server. */
dq_addlast(&client->lc_node, &server->u.server.lc_waiters);
client->lc_state = LOCAL_STATE_ACCEPT;
local_accept_pollnotify(server, POLLIN);
local_event_pollnotify(server, POLLIN);
if (nxsem_get_value(&server->lc_waitsem, &sval) >= 0 && sval < 1)
{
_local_semgive(&server->lc_waitsem);
}
net_unlock();
/* Wait for the server to accept the connections */
do
if (!nonblock)
{
_local_semtake(&client->lc_waitsem);
ret = client->u.client.lc_result;
}
while (ret == -EBUSY);
do
{
_local_semtake(&client->lc_waitsem);
ret = client->u.client.lc_result;
}
while (ret == -EBUSY);
/* Did we successfully connect? */
/* Did we successfully connect? */
if (ret < 0)
{
nerr("ERROR: Failed to connect: %d\n", ret);
goto errout_with_outfd;
if (ret < 0)
{
nerr("ERROR: Failed to connect: %d\n", ret);
goto errout_with_outfd;
}
}
/* Yes.. open the read-only FIFO */
@ -179,8 +177,15 @@ static int inline local_stream_connect(FAR struct local_conn_s *client,
}
DEBUGASSERT(client->lc_infile.f_inode != NULL);
client->lc_state = LOCAL_STATE_CONNECTED;
return OK;
if (!nonblock)
{
client->lc_state = LOCAL_STATE_CONNECTED;
return ret;
}
client->lc_state = LOCAL_STATE_CONNECTING;
return -EINPROGRESS;
errout_with_outfd:
file_close(&client->lc_outfile);
@ -291,6 +296,7 @@ int psock_local_connect(FAR struct socket *psock,
/* Bind the address and protocol */
client->lc_type = conn->lc_type;
client->lc_proto = conn->lc_proto;
strncpy(client->lc_path, unaddr->sun_path,
UNIX_PATH_MAX - 1);
@ -309,11 +315,8 @@ int psock_local_connect(FAR struct socket *psock,
local_stream_connect(client, conn,
_SS_ISNONBLOCK(psock->s_flags));
}
else
{
net_unlock();
}
net_unlock();
return ret;
}
}

View file

@ -37,13 +37,13 @@
#include "local/local.h"
/****************************************************************************
* Name: local_accept_pollsetup
* Name: local_event_pollsetup
****************************************************************************/
#ifdef CONFIG_NET_LOCAL_STREAM
static int local_accept_pollsetup(FAR struct local_conn_s *conn,
FAR struct pollfd *fds,
bool setup)
static int local_event_pollsetup(FAR struct local_conn_s *conn,
FAR struct pollfd *fds,
bool setup)
{
pollevent_t eventset;
int ret = OK;
@ -60,12 +60,12 @@ static int local_accept_pollsetup(FAR struct local_conn_s *conn,
{
/* Find an available slot */
if (!conn->lc_accept_fds[i])
if (!conn->lc_event_fds[i])
{
/* Bind the poll structure and this slot */
conn->lc_accept_fds[i] = fds;
fds->priv = &conn->lc_accept_fds[i];
conn->lc_event_fds[i] = fds;
fds->priv = &conn->lc_event_fds[i];
break;
}
}
@ -78,14 +78,15 @@ static int local_accept_pollsetup(FAR struct local_conn_s *conn,
}
eventset = 0;
if (dq_peek(&conn->u.server.lc_waiters) != NULL)
if (conn->lc_state == LOCAL_STATE_LISTENING &&
dq_peek(&conn->u.server.lc_waiters) != NULL)
{
eventset |= POLLIN;
}
if (eventset)
{
local_accept_pollnotify(conn, eventset);
local_event_pollnotify(conn, eventset);
}
}
else
@ -117,18 +118,18 @@ errout:
****************************************************************************/
/****************************************************************************
* Name: local_accept_pollnotify
* Name: local_event_pollnotify
****************************************************************************/
void local_accept_pollnotify(FAR struct local_conn_s *conn,
pollevent_t eventset)
void local_event_pollnotify(FAR struct local_conn_s *conn,
pollevent_t eventset)
{
#ifdef CONFIG_NET_LOCAL_STREAM
int i;
for (i = 0; i < LOCAL_NPOLLWAITERS; i++)
{
struct pollfd *fds = conn->lc_accept_fds[i];
struct pollfd *fds = conn->lc_event_fds[i];
if (fds)
{
fds->revents |= (fds->events & eventset);
@ -171,10 +172,11 @@ int local_pollsetup(FAR struct socket *psock, FAR struct pollfd *fds)
}
#ifdef CONFIG_NET_LOCAL_STREAM
if (conn->lc_state == LOCAL_STATE_LISTENING &&
conn->lc_type == LOCAL_TYPE_PATHNAME)
if ((conn->lc_state == LOCAL_STATE_LISTENING ||
conn->lc_state == LOCAL_STATE_CONNECTING) &&
conn->lc_type == LOCAL_TYPE_PATHNAME)
{
return local_accept_pollsetup(conn, fds, true);
return local_event_pollsetup(conn, fds, true);
}
if (conn->lc_state == LOCAL_STATE_DISCONNECTED)
@ -321,10 +323,11 @@ int local_pollteardown(FAR struct socket *psock, FAR struct pollfd *fds)
}
#ifdef CONFIG_NET_LOCAL_STREAM
if (conn->lc_state == LOCAL_STATE_LISTENING &&
conn->lc_type == LOCAL_TYPE_PATHNAME)
if ((conn->lc_state == LOCAL_STATE_LISTENING ||
conn->lc_state == LOCAL_STATE_CONNECTING) &&
conn->lc_type == LOCAL_TYPE_PATHNAME)
{
return local_accept_pollsetup(conn, fds, false);
return local_event_pollsetup(conn, fds, false);
}
if (conn->lc_state == LOCAL_STATE_DISCONNECTED)

View file

@ -141,6 +141,11 @@ psock_stream_recvfrom(FAR struct socket *psock, FAR void *buf, size_t len,
if (conn->lc_state != LOCAL_STATE_CONNECTED)
{
if (conn->lc_state == LOCAL_STATE_CONNECTING)
{
return -EAGAIN;
}
nerr("ERROR: not connected\n");
return -ENOTCONN;
}

View file

@ -68,6 +68,7 @@ int local_release(FAR struct local_conn_s *conn)
/* If the socket is connected (SOCK_STREAM client), then disconnect it */
if (conn->lc_state == LOCAL_STATE_CONNECTED ||
conn->lc_state == LOCAL_STATE_CONNECTING ||
conn->lc_state == LOCAL_STATE_DISCONNECTED)
{
DEBUGASSERT(conn->lc_proto == SOCK_STREAM);
@ -92,6 +93,7 @@ int local_release(FAR struct local_conn_s *conn)
{
client->u.client.lc_result = -ENOTCONN;
nxsem_post(&client->lc_waitsem);
local_event_pollnotify(client, POLLOUT);
}
conn->u.server.lc_pending = 0;

View file

@ -85,6 +85,11 @@ static ssize_t local_send(FAR struct socket *psock,
if (peer->lc_state != LOCAL_STATE_CONNECTED ||
peer->lc_outfile.f_inode == NULL)
{
if (peer->lc_state == LOCAL_STATE_CONNECTING)
{
return -EAGAIN;
}
nerr("ERROR: not connected\n");
return -ENOTCONN;
}

View file

@ -131,6 +131,9 @@ static int local_sockif_alloc(FAR struct socket *psock)
/* Save the pre-allocated connection in the socket structure */
psock->s_conn = conn;
#if defined(CONFIG_NET_LOCAL_STREAM)
conn->lc_psock = psock;
#endif
return OK;
}
#endif