This commit is contained in:
hujun5 2025-01-12 16:51:56 +08:00 committed by GitHub
commit 2fb6684fee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 106 additions and 67 deletions

View file

@ -3215,8 +3215,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork); ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_cancel only returns success */
lcderr("ERROR: Failed to cancel work: %d\n", ret); lcderr("ERROR: Failed to cancel work: %d\n", ret);
} }
@ -3225,8 +3223,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0); priv->cbarg, 0);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_queue only returns success */
lcderr("ERROR: Failed to schedule work: %d\n", ret); lcderr("ERROR: Failed to schedule work: %d\n", ret);
} }
} }

View file

@ -3355,8 +3355,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork); ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_cancel only returns success */
mcerr("ERROR: Failed to cancel work: %d\n", ret); mcerr("ERROR: Failed to cancel work: %d\n", ret);
} }
@ -3365,8 +3363,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0); priv->cbarg, 0);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_queue only returns success */
mcerr("ERROR: Failed to schedule work: %d\n", ret); mcerr("ERROR: Failed to schedule work: %d\n", ret);
} }
} }

View file

@ -659,8 +659,6 @@ static void automount_timeout(wdparm_t arg)
ret = work_queue(LPWORK, &priv->work, automount_worker, priv, 0); ret = work_queue(LPWORK, &priv->work, automount_worker, priv, 0);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_queue only returns success */
ferr("ERROR: Failed to schedule work: %d\n", ret); ferr("ERROR: Failed to schedule work: %d\n", ret);
} }
} }
@ -772,8 +770,6 @@ static int automount_interrupt(FAR const struct automount_lower_s *lower,
priv->lower->ddelay); priv->lower->ddelay);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_queue only returns success */
ferr("ERROR: Failed to schedule work: %d\n", ret); ferr("ERROR: Failed to schedule work: %d\n", ret);
} }
else else
@ -848,8 +844,6 @@ FAR void *automount_initialize(FAR const struct automount_lower_s *lower)
priv->lower->ddelay); priv->lower->ddelay);
if (ret < 0) if (ret < 0)
{ {
/* NOTE: Currently, work_queue only returns success */
ferr("ERROR: Failed to schedule work: %d\n", ret); ferr("ERROR: Failed to schedule work: %d\n", ret);
} }

View file

@ -58,23 +58,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
* new work is typically added to the work queue from interrupt handlers. * new work is typically added to the work queue from interrupt handlers.
*/ */
flags = enter_critical_section(); flags = spin_lock_irqsave(&wqueue->lock);
if (work->worker != NULL) if (work->worker != NULL)
{ {
/* Remove the entry from the work queue and make sure that it is /* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified). * marked as available (i.e., the worker field is nullified).
*/ */
if (WDOG_ISACTIVE(&work->u.timer)) work->worker = NULL;
{ wd_cancel(&work->u.timer);
wd_cancel(&work->u.timer); if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
}
else
{ {
dq_rem((FAR dq_entry_t *)work, &wqueue->q); dq_rem((FAR dq_entry_t *)work, &wqueue->q);
} }
work->worker = NULL;
ret = OK; ret = OK;
} }
else if (!up_interrupt_context() && !sched_idletask() && sync) else if (!up_interrupt_context() && !sched_idletask() && sync)
@ -86,14 +83,15 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
if (wqueue->worker[wndx].work == work && if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid()) wqueue->worker[wndx].pid != nxsched_gettid())
{ {
wqueue->worker[wndx].wait_count--;
spin_unlock_irqrestore(&wqueue->lock, flags);
nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait); nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
ret = 1; return 1;
break;
} }
} }
} }
leave_critical_section(flags); spin_unlock_irqrestore(&wqueue->lock, flags);
return ret; return ret;
} }

View file

@ -73,6 +73,8 @@ struct work_notifier_entry_s
* Private Data * Private Data
****************************************************************************/ ****************************************************************************/
static spinlock_t g_work_notifier_lock;
/* This is a doubly linked list of free notifications. */ /* This is a doubly linked list of free notifications. */
static dq_queue_t g_notifier_free; static dq_queue_t g_notifier_free;
@ -160,23 +162,34 @@ static void work_notifier_worker(FAR void *arg)
(FAR struct work_notifier_entry_s *)arg; (FAR struct work_notifier_entry_s *)arg;
irqstate_t flags; irqstate_t flags;
/* Forward to the real worker */
notifier->info.worker(notifier->info.arg);
/* Disable interrupts very briefly. */ /* Disable interrupts very briefly. */
flags = enter_critical_section(); flags = spin_lock_irqsave(&g_work_notifier_lock);
/* Remove the notification from the pending list */ /* Remove the notification from the pending list */
dq_rem(&notifier->entry, &g_notifier_pending); notifier = work_notifier_find(notifier->key);
if (notifier != NULL)
{
/* Forward to the real worker */
/* Put the notification to the free list */ spin_unlock_irqrestore(&g_work_notifier_lock, flags);
dq_addlast(&notifier->entry, &g_notifier_free); notifier->info.worker(notifier->info.arg);
leave_critical_section(flags); flags = spin_lock_irqsave(&g_work_notifier_lock);
notifier = work_notifier_find(notifier->key);
if (notifier != NULL)
{
dq_rem(&notifier->entry, &g_notifier_pending);
/* Put the notification to the free list */
dq_addlast(&notifier->entry, &g_notifier_free);
}
}
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
} }
/**************************************************************************** /****************************************************************************
@ -213,14 +226,14 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
/* Disable interrupts very briefly. */ /* Disable interrupts very briefly. */
flags = enter_critical_section(); flags = spin_lock_irqsave(&g_work_notifier_lock);
/* Try to get the entry from the free list */ /* Try to get the entry from the free list */
notifier = (FAR struct work_notifier_entry_s *) notifier = (FAR struct work_notifier_entry_s *)
dq_remfirst(&g_notifier_free); dq_remfirst(&g_notifier_free);
leave_critical_section(flags); spin_unlock_irqrestore(&g_work_notifier_lock, flags);
if (notifier == NULL) if (notifier == NULL)
{ {
@ -245,7 +258,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
/* Disable interrupts very briefly. */ /* Disable interrupts very briefly. */
flags = enter_critical_section(); flags = spin_lock_irqsave(&g_work_notifier_lock);
/* Generate a unique key for this notification */ /* Generate a unique key for this notification */
@ -262,7 +275,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
dq_addlast(&notifier->entry, &g_notifier_pending); dq_addlast(&notifier->entry, &g_notifier_pending);
ret = notifier->key; ret = notifier->key;
leave_critical_section(flags); spin_unlock_irqrestore(&g_work_notifier_lock, flags);
} }
return ret; return ret;
@ -293,7 +306,7 @@ void work_notifier_teardown(int key)
/* Disable interrupts very briefly. */ /* Disable interrupts very briefly. */
flags = enter_critical_section(); flags = spin_lock_irqsave(&g_work_notifier_lock);
/* Find the entry matching this key in the g_notifier_pending list. We /* Find the entry matching this key in the g_notifier_pending list. We
* assume that there is only one. * assume that there is only one.
@ -304,19 +317,18 @@ void work_notifier_teardown(int key)
{ {
/* Cancel the work, this may be waiting */ /* Cancel the work, this may be waiting */
if (work_cancel_sync(notifier->info.qid, &notifier->work) != 1) work_cancel(notifier->info.qid, &notifier->work);
{
/* Remove the notification from the pending list */
dq_rem(&notifier->entry, &g_notifier_pending); /* Remove the notification from the pending list */
/* Put the notification to the free list */ dq_rem(&notifier->entry, &g_notifier_pending);
dq_addlast(&notifier->entry, &g_notifier_free); /* Put the notification to the free list */
}
dq_addlast(&notifier->entry, &g_notifier_free);
} }
leave_critical_section(flags); spin_unlock_irqrestore(&g_work_notifier_lock, flags);
} }
/**************************************************************************** /****************************************************************************
@ -352,7 +364,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
* the notifications have been sent. * the notifications have been sent.
*/ */
flags = enter_critical_section(); flags = spin_lock_irqsave(&g_work_notifier_lock);
sched_lock(); sched_lock();
/* Process the notification at the head of the pending list until the /* Process the notification at the head of the pending list until the
@ -397,7 +409,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
} }
sched_unlock(); sched_unlock();
leave_critical_section(flags); spin_unlock_irqrestore(&g_work_notifier_lock, flags);
} }
#endif /* CONFIG_WQUEUE_NOTIFIER */ #endif /* CONFIG_WQUEUE_NOTIFIER */

View file

@ -47,11 +47,10 @@
#define queue_work(wqueue, work) \ #define queue_work(wqueue, work) \
do \ do \
{ \ { \
int sem_count; \
dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \ dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \
nxsem_get_value(&(wqueue)->sem, &sem_count); \ if ((wqueue)->wait_count < 0) /* There are threads waiting for sem. */ \
if (sem_count < 0) /* There are threads waiting for sem. */ \
{ \ { \
(wqueue)->wait_count++; \
nxsem_post(&(wqueue)->sem); \ nxsem_post(&(wqueue)->sem); \
} \ } \
} \ } \
@ -68,24 +67,30 @@
static void work_timer_expiry(wdparm_t arg) static void work_timer_expiry(wdparm_t arg)
{ {
FAR struct work_s *work = (FAR struct work_s *)arg; FAR struct work_s *work = (FAR struct work_s *)arg;
irqstate_t flags = enter_critical_section(); irqstate_t flags = spin_lock_irqsave(&work->wq->lock);
sched_lock();
queue_work(work->wq, work); /* We have being canceled */
leave_critical_section(flags);
if (work->worker != NULL)
{
queue_work(work->wq, work);
}
spin_unlock_irqrestore(&work->wq->lock, flags);
sched_unlock();
} }
static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads,
FAR struct work_s *work) FAR struct work_s *work)
{ {
int semcount;
int wndx; int wndx;
for (wndx = 0; wndx < nthreads; wndx++) for (wndx = 0; wndx < nthreads; wndx++)
{ {
if (kworkers[wndx].work == work) if (kworkers[wndx].work == work)
{ {
nxsem_get_value(&kworkers[wndx].wait, &semcount); if (kworkers[wndx].wait_count < 0)
if (semcount < 0)
{ {
return true; return true;
} }
@ -145,13 +150,23 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
* task logic or from interrupt handling logic. * task logic or from interrupt handling logic.
*/ */
flags = enter_critical_section(); flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
/* Remove the entry from the timer and work queue. */ /* Remove the entry from the timer and work queue. */
if (work->worker != NULL) if (work->worker != NULL)
{ {
work_cancel_wq(wqueue, work); /* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/
work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}
} }
if (work_is_canceling(wqueue->worker, wqueue->nthreads, work)) if (work_is_canceling(wqueue->worker, wqueue->nthreads, work))
@ -177,7 +192,8 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
} }
out: out:
leave_critical_section(flags); spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
return ret; return ret;
} }

View file

@ -87,6 +87,9 @@ struct hp_wqueue_s g_hpwork =
SEM_INITIALIZER(0), SEM_INITIALIZER(0),
SEM_INITIALIZER(0), SEM_INITIALIZER(0),
CONFIG_SCHED_HPNTHREADS, CONFIG_SCHED_HPNTHREADS,
false,
SP_UNLOCKED,
0,
}; };
#endif /* CONFIG_SCHED_HPWORK */ #endif /* CONFIG_SCHED_HPWORK */
@ -100,6 +103,9 @@ struct lp_wqueue_s g_lpwork =
SEM_INITIALIZER(0), SEM_INITIALIZER(0),
SEM_INITIALIZER(0), SEM_INITIALIZER(0),
CONFIG_SCHED_LPNTHREADS, CONFIG_SCHED_LPNTHREADS,
false,
SP_UNLOCKED,
0,
}; };
#endif /* CONFIG_SCHED_LPWORK */ #endif /* CONFIG_SCHED_LPWORK */
@ -138,7 +144,6 @@ static int work_thread(int argc, FAR char *argv[])
worker_t worker; worker_t worker;
irqstate_t flags; irqstate_t flags;
FAR void *arg; FAR void *arg;
int semcount;
/* Get the handle from argv */ /* Get the handle from argv */
@ -147,7 +152,8 @@ static int work_thread(int argc, FAR char *argv[])
kworker = (FAR struct kworker_s *) kworker = (FAR struct kworker_s *)
((uintptr_t)strtoul(argv[2], NULL, 16)); ((uintptr_t)strtoul(argv[2], NULL, 16));
flags = enter_critical_section(); flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
/* Loop forever */ /* Loop forever */
@ -189,9 +195,12 @@ static int work_thread(int argc, FAR char *argv[])
* performed... we don't have any idea how long this will take! * performed... we don't have any idea how long this will take!
*/ */
leave_critical_section(flags); spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
CALL_WORKER(worker, arg); CALL_WORKER(worker, arg);
flags = enter_critical_section(); flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
/* Mark the thread un-busy */ /* Mark the thread un-busy */
@ -199,9 +208,9 @@ static int work_thread(int argc, FAR char *argv[])
/* Check if someone is waiting, if so, wakeup it */ /* Check if someone is waiting, if so, wakeup it */
nxsem_get_value(&kworker->wait, &semcount); while (kworker->wait_count < 0)
while (semcount++ < 0)
{ {
kworker->wait_count++;
nxsem_post(&kworker->wait); nxsem_post(&kworker->wait);
} }
} }
@ -211,10 +220,17 @@ static int work_thread(int argc, FAR char *argv[])
* posted. * posted.
*/ */
wqueue->wait_count--;
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
nxsem_wait_uninterruptible(&wqueue->sem); nxsem_wait_uninterruptible(&wqueue->sem);
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
} }
leave_critical_section(flags); spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
nxsem_post(&wqueue->exsem); nxsem_post(&wqueue->exsem);
return OK; return OK;
@ -277,6 +293,7 @@ static int work_thread_create(FAR const char *name, int priority,
} }
wqueue->worker[wndx].pid = pid; wqueue->worker[wndx].pid = pid;
wqueue->worker[wndx].wait_count = 0;
} }
sched_unlock(); sched_unlock();
@ -337,6 +354,8 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name,
nxsem_init(&wqueue->sem, 0, 0); nxsem_init(&wqueue->sem, 0, 0);
nxsem_init(&wqueue->exsem, 0, 0); nxsem_init(&wqueue->exsem, 0, 0);
wqueue->nthreads = nthreads; wqueue->nthreads = nthreads;
wqueue->wait_count = 0;
spin_lock_init(&wqueue->lock);
/* Create the work queue thread pool */ /* Create the work queue thread pool */

View file

@ -35,6 +35,7 @@
#include <nuttx/clock.h> #include <nuttx/clock.h>
#include <nuttx/queue.h> #include <nuttx/queue.h>
#include <nuttx/wqueue.h> #include <nuttx/wqueue.h>
#include <nuttx/spinlock.h>
#ifdef CONFIG_SCHED_WORKQUEUE #ifdef CONFIG_SCHED_WORKQUEUE
@ -58,6 +59,7 @@ struct kworker_s
pid_t pid; /* The task ID of the worker thread */ pid_t pid; /* The task ID of the worker thread */
FAR struct work_s *work; /* The work structure */ FAR struct work_s *work; /* The work structure */
sem_t wait; /* Sync waiting for worker done */ sem_t wait; /* Sync waiting for worker done */
int16_t wait_count;
}; };
/* This structure defines the state of one kernel-mode work queue */ /* This structure defines the state of one kernel-mode work queue */
@ -69,6 +71,8 @@ struct kwork_wqueue_s
sem_t exsem; /* Sync waiting for thread exit */ sem_t exsem; /* Sync waiting for thread exit */
uint8_t nthreads; /* Number of worker threads */ uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */ bool exit; /* A flag to request the thread to exit */
spinlock_t lock;
int16_t wait_count;
struct kworker_s worker[0]; /* Describes a worker thread */ struct kworker_s worker[0]; /* Describes a worker thread */
}; };
@ -84,6 +88,8 @@ struct hp_wqueue_s
sem_t exsem; /* Sync waiting for thread exit */ sem_t exsem; /* Sync waiting for thread exit */
uint8_t nthreads; /* Number of worker threads */ uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */ bool exit; /* A flag to request the thread to exit */
spinlock_t lock;
int16_t wait_count;
/* Describes each thread in the high priority queue's thread pool */ /* Describes each thread in the high priority queue's thread pool */
@ -103,6 +109,8 @@ struct lp_wqueue_s
sem_t exsem; /* Sync waiting for thread exit */ sem_t exsem; /* Sync waiting for thread exit */
uint8_t nthreads; /* Number of worker threads */ uint8_t nthreads; /* Number of worker threads */
bool exit; /* A flag to request the thread to exit */ bool exit; /* A flag to request the thread to exit */
spinlock_t lock;
int16_t wait_count;
/* Describes each thread in the low priority queue's thread pool */ /* Describes each thread in the low priority queue's thread pool */