1
0
Fork 0
forked from nuttx/nuttx-update

net/rpmsg: Minor cleanups

1.Simplify the message process in rpmsg_socket_send_single
2.Add more lock protection in rpmsg socket
3.Fix the style issue

Signed-off-by: Xiang Xiao <xiaoxiang@xiaomi.com>
This commit is contained in:
Xiang Xiao 2024-02-25 16:08:56 +08:00 committed by Xiang Xiao
parent 3a32531a05
commit cb2a6a3480

View file

@ -27,12 +27,12 @@
#include <nuttx/config.h>
#include <assert.h>
#include <debug.h>
#include <poll.h>
#include <stdio.h>
#include <string.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <debug.h>
#include <nuttx/kmalloc.h>
#include <nuttx/circbuf.h>
@ -81,7 +81,7 @@ begin_packed_struct struct rpmsg_socket_data_s
/* Act data len, don't include len itself when SOCK_DGRAM */
uint32_t len;
char data[0];
uint8_t data[0];
} end_packed_struct;
begin_packed_struct struct rpmsg_socket_shutdown_s
@ -239,9 +239,9 @@ static inline void rpmsg_socket_post(FAR sem_t *sem)
}
}
static inline void rpmsg_socket_poll_notify(
FAR struct rpmsg_socket_conn_s *conn,
pollevent_t eventset)
static inline void
rpmsg_socket_poll_notify(FAR struct rpmsg_socket_conn_s *conn,
pollevent_t eventset)
{
nxmutex_lock(&conn->polllock);
poll_notify(conn->fds, CONFIG_NET_RPMSG_NPOLLWAITERS, eventset);
@ -314,12 +314,11 @@ static int rpmsg_socket_wakeup(FAR struct rpmsg_socket_conn_s *conn)
}
nxmutex_unlock(&conn->recvlock);
return ret ? rpmsg_send(&conn->ept, &msg, sizeof(msg)) : 0;
}
static inline uint32_t rpmsg_socket_get_space(
FAR struct rpmsg_socket_conn_s *conn)
static inline uint32_t
rpmsg_socket_get_space(FAR struct rpmsg_socket_conn_s *conn)
{
return conn->sendsize - (conn->sendpos - conn->ackpos);
}
@ -340,7 +339,6 @@ static int rpmsg_socket_ept_cb(FAR struct rpmsg_endpoint *ept,
conn->cred.gid = head->gid;
conn->sconn.s_flags |= _SF_CONNECTED;
_SO_CONN_SETERRNO(conn, OK);
rpmsg_socket_post(&conn->sendsem);
@ -350,12 +348,10 @@ static int rpmsg_socket_ept_cb(FAR struct rpmsg_endpoint *ept,
else if (head->cmd == RPMSG_SOCKET_CMD_DATA)
{
FAR struct rpmsg_socket_data_s *msg = data;
FAR uint8_t *buf = (FAR uint8_t *)msg->data;
FAR uint8_t *buf = msg->data;
nxmutex_lock(&conn->sendlock);
conn->ackpos = msg->pos;
if (rpmsg_socket_get_space(conn) > 0)
{
rpmsg_socket_post(&conn->sendsem);
@ -363,19 +359,15 @@ static int rpmsg_socket_ept_cb(FAR struct rpmsg_endpoint *ept,
}
nxmutex_unlock(&conn->sendlock);
if (len > sizeof(*msg))
{
len -= sizeof(*msg);
DEBUGASSERT(len == msg->len || len == msg->len + sizeof(uint32_t));
nxmutex_lock(&conn->recvlock);
if (conn->recvdata)
{
conn->recvlen = MIN(conn->recvlen, msg->len);
if (len == msg->len)
{
/* SOCK_STREAM */
@ -421,18 +413,20 @@ static int rpmsg_socket_ept_cb(FAR struct rpmsg_endpoint *ept,
if (msg->how & SHUT_WR)
{
nxmutex_lock(&conn->recvlock);
conn->how |= SHUT_RD;
rpmsg_socket_post(&conn->recvsem);
rpmsg_socket_poll_notify(conn, POLLIN | POLLHUP);
nxmutex_unlock(&conn->recvlock);
}
if (msg->how & SHUT_RD)
{
nxmutex_lock(&conn->sendlock);
conn->how |= SHUT_WR;
rpmsg_socket_post(&conn->sendsem);
rpmsg_socket_poll_notify(conn, POLLOUT | POLLHUP);
nxmutex_unlock(&conn->sendlock);
}
}
@ -480,11 +474,6 @@ static void rpmsg_socket_ns_unbind(FAR struct rpmsg_endpoint *ept)
{
FAR struct rpmsg_socket_conn_s *conn = ept->priv;
if (!conn)
{
return;
}
nxmutex_lock(&conn->recvlock);
conn->unbind = true;
@ -546,8 +535,8 @@ static bool rpmsg_socket_ns_match(FAR struct rpmsg_device *rdev,
return false;
}
if (strlen(server->rpaddr.rp_cpu) &&
strcmp(server->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev)))
if (server->rpaddr.rp_cpu[0] &&
strcmp(server->rpaddr.rp_cpu, rpmsg_get_cpuname(rdev)))
{
/* Bind specific CPU, then only listen that CPU */
@ -600,7 +589,6 @@ static void rpmsg_socket_ns_bind(FAR struct rpmsg_device *rdev,
rpmsg_socket_ns_bound(&new->ept);
nxmutex_lock(&server->recvlock);
for (tmp = server; tmp->next; tmp = tmp->next)
{
if (++cnt >= server->backlog)
@ -616,10 +604,9 @@ static void rpmsg_socket_ns_bind(FAR struct rpmsg_device *rdev,
tmp->next = new;
nxmutex_unlock(&server->recvlock);
rpmsg_socket_post(&server->recvsem);
rpmsg_socket_poll_notify(server, POLLIN);
nxmutex_unlock(&server->recvlock);
}
static int rpmsg_socket_getaddr(FAR struct rpmsg_socket_conn_s *conn,
@ -773,7 +760,7 @@ static int rpmsg_socket_connect_internal(FAR struct socket *psock)
}
ret = net_sem_timedwait(&conn->sendsem,
_SO_TIMEOUT(conn->sconn.s_rcvtimeo));
_SO_TIMEOUT(conn->sconn.s_sndtimeo));
if (ret < 0)
{
@ -885,13 +872,11 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
{
FAR struct rpmsg_socket_conn_s *conn = psock->s_conn;
pollevent_t eventset = 0;
int ret = 0;
int i;
if (setup)
{
nxmutex_lock(&conn->polllock);
for (i = 0; i < CONFIG_NET_RPMSG_NPOLLWAITERS; i++)
{
/* Find an available slot */
@ -907,22 +892,23 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
}
nxmutex_unlock(&conn->polllock);
if (i >= CONFIG_NET_RPMSG_NPOLLWAITERS)
{
fds->priv = NULL;
ret = -EBUSY;
goto errout;
return -EBUSY;
}
/* Immediately notify on any of the requested events */
if (_SS_ISLISTENING(conn->sconn.s_flags))
{
nxmutex_lock(&conn->recvlock);
if (conn->next)
{
eventset |= POLLIN;
}
nxmutex_unlock(&conn->recvlock);
}
else if (_SS_ISCONNECTED(conn->sconn.s_flags))
{
@ -932,7 +918,6 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
}
nxmutex_lock(&conn->sendlock);
if (rpmsg_socket_get_space(conn) > 0)
{
eventset |= POLLOUT;
@ -941,7 +926,6 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
nxmutex_unlock(&conn->sendlock);
nxmutex_lock(&conn->recvlock);
if (!circbuf_is_empty(&conn->recvbuf))
{
eventset |= POLLIN;
@ -955,10 +939,6 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
{
eventset |= POLLHUP;
}
else
{
ret = OK;
}
}
poll_notify(&fds, 1, eventset);
@ -966,7 +946,6 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
else
{
nxmutex_lock(&conn->polllock);
if (fds->priv != NULL)
{
for (i = 0; i < CONFIG_NET_RPMSG_NPOLLWAITERS; i++)
@ -983,8 +962,7 @@ static int rpmsg_socket_poll(FAR struct socket *psock,
nxmutex_unlock(&conn->polllock);
}
errout:
return ret;
return OK;
}
static uint32_t rpmsg_socket_get_iovlen(FAR const struct iovec *buf,
@ -1022,26 +1000,24 @@ static ssize_t rpmsg_socket_send_continuous(FAR struct socket *psock,
if (block == 0)
{
if (!nonblock)
{
ret = net_sem_timedwait(&conn->sendsem,
_SO_TIMEOUT(conn->sconn.s_sndtimeo));
if (!conn->ept.rdev || conn->unbind)
{
ret = -ECONNRESET;
}
if (ret < 0)
{
break;
}
}
else
if (nonblock)
{
ret = -EAGAIN;
break;
}
ret = net_sem_timedwait(&conn->sendsem,
_SO_TIMEOUT(conn->sconn.s_sndtimeo));
if (!conn->ept.rdev || conn->unbind)
{
ret = -ECONNRESET;
}
if (ret < 0)
{
break;
}
continue;
}
@ -1053,7 +1029,6 @@ static ssize_t rpmsg_socket_send_continuous(FAR struct socket *psock,
}
nxmutex_lock(&conn->sendlock);
block = MIN(len - written, rpmsg_socket_get_space(conn));
block = MIN(block, ipcsize - sizeof(*msg));
@ -1064,7 +1039,7 @@ static ssize_t rpmsg_socket_send_continuous(FAR struct socket *psock,
{
uint32_t chunk = MIN(block - block_written, buf->iov_len - offset);
memcpy(msg->data + block_written,
(FAR const char *)buf->iov_base + offset, chunk);
(FAR const uint8_t *)buf->iov_base + offset, chunk);
offset += chunk;
if (offset == buf->iov_len)
{
@ -1100,11 +1075,11 @@ static ssize_t rpmsg_socket_send_single(FAR struct socket *psock,
FAR struct rpmsg_socket_conn_s *conn = psock->s_conn;
FAR struct rpmsg_socket_data_s *msg;
uint32_t len = rpmsg_socket_get_iovlen(buf, iovcnt);
uint32_t total = len + sizeof(*msg) + sizeof(uint32_t);
uint32_t total = len + sizeof(uint32_t);
uint32_t written = 0;
uint32_t ipcsize;
uint32_t space;
char *msgpos;
FAR uint8_t *msgpos;
int ret;
if (total > conn->sendsize)
@ -1118,27 +1093,27 @@ static ssize_t rpmsg_socket_send_single(FAR struct socket *psock,
space = rpmsg_socket_get_space(conn);
nxmutex_unlock(&conn->sendlock);
if (space >= total - sizeof(*msg))
break;
if (!nonblock)
if (space >= total)
{
ret = net_sem_timedwait(&conn->sendsem,
_SO_TIMEOUT(conn->sconn.s_sndtimeo));
if (!conn->ept.rdev || conn->unbind)
{
ret = -ECONNRESET;
}
if (ret < 0)
{
return ret;
}
break;
}
else
if (nonblock)
{
return -EAGAIN;
}
ret = net_sem_timedwait(&conn->sendsem,
_SO_TIMEOUT(conn->sconn.s_sndtimeo));
if (!conn->ept.rdev || conn->unbind)
{
ret = -ECONNRESET;
}
if (ret < 0)
{
return ret;
}
}
msg = rpmsg_get_tx_payload_buffer(&conn->ept, &ipcsize, true);
@ -1148,11 +1123,9 @@ static ssize_t rpmsg_socket_send_single(FAR struct socket *psock,
}
nxmutex_lock(&conn->sendlock);
space = rpmsg_socket_get_space(conn);
total = MIN(total, space + sizeof(*msg));
total = MIN(total, ipcsize);
len = total - sizeof(*msg) - sizeof(uint32_t);
total = MIN(total, rpmsg_socket_get_space(conn));
total = MIN(total, ipcsize - sizeof(*msg));
len = total - sizeof(uint32_t);
/* SOCK_DGRAM need write len to buffer */
@ -1178,9 +1151,10 @@ static ssize_t rpmsg_socket_send_single(FAR struct socket *psock,
}
conn->lastpos = conn->recvpos;
conn->sendpos += len + sizeof(uint32_t);
conn->sendpos += total;
ret = rpmsg_sendto_nocopy(&conn->ept, msg, total, conn->ept.dest_addr);
ret = rpmsg_sendto_nocopy(&conn->ept, msg, total + sizeof(*msg),
conn->ept.dest_addr);
nxmutex_unlock(&conn->sendlock);
if (ret < 0)
{
@ -1228,7 +1202,7 @@ static ssize_t rpmsg_socket_sendmsg(FAR struct socket *psock,
}
nonblock = _SS_ISNONBLOCK(conn->sconn.s_flags) ||
(flags & MSG_DONTWAIT) != 0;
(flags & MSG_DONTWAIT) != 0;
if (psock->s_type == SOCK_STREAM)
{
@ -1243,15 +1217,16 @@ static ssize_t rpmsg_socket_sendmsg(FAR struct socket *psock,
static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock,
FAR struct msghdr *msg, int flags)
{
FAR void *buf = msg->msg_iov->iov_base;
size_t len = msg->msg_iov->iov_len;
FAR struct rpmsg_socket_conn_s *conn = psock->s_conn;
FAR struct sockaddr *from = msg->msg_name;
FAR socklen_t *fromlen = &msg->msg_namelen;
FAR struct rpmsg_socket_conn_s *conn = psock->s_conn;
FAR void *buf = msg->msg_iov->iov_base;
size_t len = msg->msg_iov->iov_len;
ssize_t ret;
if (psock->s_type != SOCK_STREAM && _SS_ISBOUND(conn->sconn.s_flags)
&& !_SS_ISCONNECTED(conn->sconn.s_flags))
if (psock->s_type != SOCK_STREAM &&
_SS_ISBOUND(conn->sconn.s_flags) &&
!_SS_ISCONNECTED(conn->sconn.s_flags))
{
ret = rpmsg_socket_connect_internal(psock);
if (ret < 0)
@ -1271,7 +1246,6 @@ static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock,
}
nxmutex_lock(&conn->recvlock);
if (psock->s_type != SOCK_STREAM)
{
uint32_t datalen;
@ -1291,7 +1265,7 @@ static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock,
else
{
ret = circbuf_read(&conn->recvbuf, buf, len);
conn->recvpos += ret > 0 ? ret : 0;
conn->recvpos += ret > 0 ? ret : 0;
}
if (ret > 0)
@ -1320,14 +1294,13 @@ static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock,
nxmutex_unlock(&conn->recvlock);
ret = net_sem_timedwait(&conn->recvsem,
_SO_TIMEOUT(conn->sconn.s_rcvtimeo));
_SO_TIMEOUT(conn->sconn.s_rcvtimeo));
if (!conn->ept.rdev || conn->unbind)
{
ret = 0;
}
nxmutex_lock(&conn->recvlock);
if (!conn->recvdata)
{
ret = conn->recvlen;
@ -1339,7 +1312,6 @@ static ssize_t rpmsg_socket_recvmsg(FAR struct socket *psock,
out:
nxmutex_unlock(&conn->recvlock);
if (ret > 0)
{
rpmsg_socket_wakeup(conn);