mirror of
https://github.com/apache/nuttx.git
synced 2025-01-13 10:58:49 +08:00
mqueue: add poll support
Change-Id: I7e908f6a6c00158c0946587dd79ae3dc5d279d37 Signed-off-by: ligd <liguiding1@xiaomi.com>
This commit is contained in:
parent
8b73e30185
commit
48d49e5a7c
6 changed files with 136 additions and 4 deletions
|
@ -11,4 +11,10 @@ config FS_MQUEUE_MPATH
|
|||
---help---
|
||||
The path to where POSIX message queues will exist in the VFS namespace.
|
||||
|
||||
config FS_MQUEUE_NPOLLWAITERS
|
||||
int "Maximum number of poll waiters"
|
||||
default 4
|
||||
---help---
|
||||
The maximum number of waiters for the poll operation.
|
||||
|
||||
endif # !DISABLE_MQUEUE
|
||||
|
|
|
@ -43,6 +43,8 @@
|
|||
****************************************************************************/
|
||||
|
||||
static int nxmq_file_close(FAR struct file *filep);
|
||||
static int nxmq_file_poll(FAR struct file *filep,
|
||||
struct pollfd *fds, bool setup);
|
||||
|
||||
/****************************************************************************
|
||||
* Private Data
|
||||
|
@ -56,7 +58,7 @@ static const struct file_operations g_nxmq_fileops =
|
|||
NULL, /* write */
|
||||
NULL, /* seek */
|
||||
NULL, /* ioctl */
|
||||
NULL, /* poll */
|
||||
nxmq_file_poll, /* poll */
|
||||
#ifndef CONFIG_DISABLE_PSEUDOFS_OPERATIONS
|
||||
NULL, /* unlink */
|
||||
#endif
|
||||
|
@ -84,6 +86,76 @@ static int nxmq_file_close(FAR struct file *filep)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int nxmq_file_poll(FAR struct file *filep,
|
||||
struct pollfd *fds, bool setup)
|
||||
{
|
||||
FAR struct inode *inode = filep->f_inode;
|
||||
FAR struct mqueue_inode_s *msgq = inode->i_private;
|
||||
pollevent_t eventset = 0;
|
||||
irqstate_t flags;
|
||||
int ret = 0;
|
||||
int i;
|
||||
|
||||
flags = enter_critical_section();
|
||||
|
||||
if (setup)
|
||||
{
|
||||
for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++)
|
||||
{
|
||||
/* Find an available slot */
|
||||
|
||||
if (!msgq->fds[i])
|
||||
{
|
||||
/* Bind the poll structure and this slot */
|
||||
|
||||
msgq->fds[i] = fds;
|
||||
fds->priv = &msgq->fds[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i >= CONFIG_FS_MQUEUE_NPOLLWAITERS)
|
||||
{
|
||||
fds->priv = NULL;
|
||||
ret = -EBUSY;
|
||||
goto errout;
|
||||
}
|
||||
|
||||
/* Immediately notify on any of the requested events */
|
||||
|
||||
if (msgq->nmsgs < msgq->maxmsgs)
|
||||
{
|
||||
eventset |= (fds->events & POLLOUT);
|
||||
}
|
||||
|
||||
if (msgq->nmsgs)
|
||||
{
|
||||
eventset |= (fds->events & POLLIN);
|
||||
}
|
||||
|
||||
if (eventset)
|
||||
{
|
||||
nxmq_pollnotify(msgq, eventset);
|
||||
}
|
||||
}
|
||||
else if (fds->priv != NULL)
|
||||
{
|
||||
for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++)
|
||||
{
|
||||
if (fds == msgq->fds[i])
|
||||
{
|
||||
msgq->fds[i] = NULL;
|
||||
fds->priv = NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errout:
|
||||
leave_critical_section(flags);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int file_mq_vopen(FAR struct file *mq, FAR const char *mq_name,
|
||||
int oflags, va_list ap, int *created)
|
||||
{
|
||||
|
@ -287,6 +359,32 @@ static mqd_t nxmq_vopen(FAR const char *mq_name, int oflags, va_list ap)
|
|||
* Public Functions
|
||||
****************************************************************************/
|
||||
|
||||
void nxmq_pollnotify(FAR struct mqueue_inode_s *msgq, pollevent_t eventset)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < CONFIG_FS_MQUEUE_NPOLLWAITERS; i++)
|
||||
{
|
||||
FAR struct pollfd *fds = msgq->fds[i];
|
||||
|
||||
if (fds)
|
||||
{
|
||||
fds->revents |= (fds->events & eventset);
|
||||
|
||||
if (fds->revents != 0)
|
||||
{
|
||||
int semcount;
|
||||
|
||||
nxsem_get_value(fds->sem, &semcount);
|
||||
if (semcount < 1)
|
||||
{
|
||||
nxsem_post(fds->sem);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
* Name: file_mq_open
|
||||
*
|
||||
|
|
|
@ -308,7 +308,8 @@ int file_poll(FAR struct file *filep, FAR struct pollfd *fds, bool setup)
|
|||
* If not, return -ENOSYS
|
||||
*/
|
||||
|
||||
if ((INODE_IS_DRIVER(inode) || INODE_IS_SOCKET(inode)) &&
|
||||
if ((INODE_IS_DRIVER(inode) || INODE_IS_MQUEUE(inode) ||
|
||||
INODE_IS_SOCKET(inode)) &&
|
||||
inode->u.i_ops != NULL && inode->u.i_ops->poll != NULL)
|
||||
{
|
||||
/* Yes, it does... Setup the poll */
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include <stdbool.h>
|
||||
#include <mqueue.h>
|
||||
#include <queue.h>
|
||||
#include <poll.h>
|
||||
|
||||
#if CONFIG_MQ_MAXMSGSIZE > 0
|
||||
|
||||
|
@ -99,6 +100,7 @@ struct mqueue_inode_s
|
|||
pid_t ntpid; /* Notification: Receiving Task's PID */
|
||||
struct sigevent ntevent; /* Notification description */
|
||||
struct sigwork_s ntwork; /* Notification work */
|
||||
FAR struct pollfd *fds[CONFIG_FS_MQUEUE_NPOLLWAITERS];
|
||||
};
|
||||
|
||||
/****************************************************************************
|
||||
|
@ -394,6 +396,24 @@ void nxmq_free_msgq(FAR struct mqueue_inode_s *msgq);
|
|||
FAR struct mqueue_inode_s *nxmq_alloc_msgq(mode_t mode,
|
||||
FAR struct mq_attr *attr);
|
||||
|
||||
/****************************************************************************
|
||||
* Name: nxmq_pollnotify
|
||||
*
|
||||
* Description:
|
||||
* pollnotify, used for notify the poll
|
||||
*
|
||||
* Input Parameters:
|
||||
* msgq - Named essage queue
|
||||
* eventset - evnet
|
||||
*
|
||||
* Returned Value:
|
||||
* The allocated and initialized message queue structure or NULL in the
|
||||
* event of a failure.
|
||||
*
|
||||
****************************************************************************/
|
||||
|
||||
void nxmq_pollnotify(FAR struct mqueue_inode_s *msgq, pollevent_t eventset);
|
||||
|
||||
/****************************************************************************
|
||||
* Name: file_mq_open
|
||||
*
|
||||
|
|
|
@ -203,7 +203,10 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
|
|||
|
||||
if (newmsg)
|
||||
{
|
||||
msgq->nmsgs--;
|
||||
if (msgq->nmsgs-- == msgq->maxmsgs)
|
||||
{
|
||||
nxmq_pollnotify(msgq, POLLOUT);
|
||||
}
|
||||
}
|
||||
|
||||
*rcvmsg = newmsg;
|
||||
|
|
|
@ -365,7 +365,11 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
|
|||
|
||||
/* Increment the count of messages in the queue */
|
||||
|
||||
msgq->nmsgs++;
|
||||
if (msgq->nmsgs++ == 0)
|
||||
{
|
||||
nxmq_pollnotify(msgq, POLLIN);
|
||||
}
|
||||
|
||||
leave_critical_section(flags);
|
||||
|
||||
/* Check if we need to notify any tasks that are attached to the
|
||||
|
|
Loading…
Reference in a new issue