468 lines
9.9 KiB
C
468 lines
9.9 KiB
C
/* $OpenBSD: kern_task.c,v 1.34 2023/07/29 06:52:08 anton Exp $ */
|
|
|
|
/*
|
|
* Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
|
|
*
|
|
* Permission to use, copy, modify, and distribute this software for any
|
|
* purpose with or without fee is hereby granted, provided that the above
|
|
* copyright notice and this permission notice appear in all copies.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
*/
|
|
|
|
#include <sys/param.h>
|
|
#include <sys/systm.h>
|
|
#include <sys/malloc.h>
|
|
#include <sys/mutex.h>
|
|
#include <sys/kthread.h>
|
|
#include <sys/task.h>
|
|
#include <sys/proc.h>
|
|
#include <sys/witness.h>
|
|
|
|
#include "kcov.h"
|
|
#if NKCOV > 0
|
|
#include <sys/kcov.h>
|
|
#endif
|
|
|
|
#ifdef WITNESS
|
|
|
|
static struct lock_type taskq_lock_type = {
|
|
.lt_name = "taskq"
|
|
};
|
|
|
|
#define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
|
|
(LO_CLASS_RWLOCK << LO_CLASSSHIFT)
|
|
|
|
#endif /* WITNESS */
|
|
|
|
struct taskq_thread {
|
|
SLIST_ENTRY(taskq_thread)
|
|
tt_entry;
|
|
struct proc *tt_thread;
|
|
};
|
|
SLIST_HEAD(taskq_threads, taskq_thread);
|
|
|
|
struct taskq {
|
|
enum {
|
|
TQ_S_CREATED,
|
|
TQ_S_RUNNING,
|
|
TQ_S_DESTROYED
|
|
} tq_state;
|
|
unsigned int tq_running;
|
|
unsigned int tq_nthreads;
|
|
unsigned int tq_flags;
|
|
const char *tq_name;
|
|
|
|
struct mutex tq_mtx;
|
|
struct task_list tq_worklist;
|
|
|
|
struct taskq_threads tq_threads;
|
|
unsigned int tq_barriers;
|
|
unsigned int tq_bgen;
|
|
unsigned int tq_bthreads;
|
|
|
|
#ifdef WITNESS
|
|
struct lock_object tq_lock_object;
|
|
#endif
|
|
};
|
|
|
|
static const char taskq_sys_name[] = "systq";
|
|
|
|
struct taskq taskq_sys = {
|
|
.tq_state = TQ_S_CREATED,
|
|
.tq_running = 0,
|
|
.tq_nthreads = 1,
|
|
.tq_flags = 0,
|
|
.tq_name = taskq_sys_name,
|
|
.tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
|
|
taskq_sys_name, 0),
|
|
.tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
|
|
|
|
.tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
|
|
.tq_barriers = 0,
|
|
.tq_bgen = 0,
|
|
.tq_bthreads = 0,
|
|
|
|
#ifdef WITNESS
|
|
.tq_lock_object = {
|
|
.lo_name = taskq_sys_name,
|
|
.lo_flags = TASKQ_LOCK_FLAGS,
|
|
},
|
|
#endif
|
|
};
|
|
|
|
static const char taskq_sys_mp_name[] = "systqmp";
|
|
|
|
struct taskq taskq_sys_mp = {
|
|
.tq_state = TQ_S_CREATED,
|
|
.tq_running = 0,
|
|
.tq_nthreads = 1,
|
|
.tq_flags = TASKQ_MPSAFE,
|
|
.tq_name = taskq_sys_mp_name,
|
|
.tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
|
|
taskq_sys_mp_name, 0),
|
|
.tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
|
|
|
|
.tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
|
|
.tq_barriers = 0,
|
|
.tq_bgen = 0,
|
|
.tq_bthreads = 0,
|
|
|
|
#ifdef WITNESS
|
|
.tq_lock_object = {
|
|
.lo_name = taskq_sys_mp_name,
|
|
.lo_flags = TASKQ_LOCK_FLAGS,
|
|
},
|
|
#endif
|
|
};
|
|
|
|
struct taskq *const systq = &taskq_sys;
|
|
struct taskq *const systqmp = &taskq_sys_mp;
|
|
|
|
void taskq_init(void); /* called in init_main.c */
|
|
void taskq_create_thread(void *);
|
|
void taskq_barrier_task(void *);
|
|
int taskq_sleep(const volatile void *, struct mutex *, int,
|
|
const char *, int);
|
|
int taskq_next_work(struct taskq *, struct task *);
|
|
void taskq_thread(void *);
|
|
|
|
void
|
|
taskq_init(void)
|
|
{
|
|
WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
|
|
kthread_create_deferred(taskq_create_thread, systq);
|
|
WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
|
|
kthread_create_deferred(taskq_create_thread, systqmp);
|
|
}
|
|
|
|
struct taskq *
|
|
taskq_create(const char *name, unsigned int nthreads, int ipl,
|
|
unsigned int flags)
|
|
{
|
|
struct taskq *tq;
|
|
|
|
tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
|
|
if (tq == NULL)
|
|
return (NULL);
|
|
|
|
tq->tq_state = TQ_S_CREATED;
|
|
tq->tq_running = 0;
|
|
tq->tq_nthreads = nthreads;
|
|
tq->tq_name = name;
|
|
tq->tq_flags = flags;
|
|
|
|
mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
|
|
TAILQ_INIT(&tq->tq_worklist);
|
|
|
|
SLIST_INIT(&tq->tq_threads);
|
|
tq->tq_barriers = 0;
|
|
tq->tq_bgen = 0;
|
|
tq->tq_bthreads = 0;
|
|
|
|
#ifdef WITNESS
|
|
memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
|
|
tq->tq_lock_object.lo_name = name;
|
|
tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
|
|
witness_init(&tq->tq_lock_object, &taskq_lock_type);
|
|
#endif
|
|
|
|
/* try to create a thread to guarantee that tasks will be serviced */
|
|
kthread_create_deferred(taskq_create_thread, tq);
|
|
|
|
return (tq);
|
|
}
|
|
|
|
void
|
|
taskq_destroy(struct taskq *tq)
|
|
{
|
|
mtx_enter(&tq->tq_mtx);
|
|
switch (tq->tq_state) {
|
|
case TQ_S_CREATED:
|
|
/* tq is still referenced by taskq_create_thread */
|
|
tq->tq_state = TQ_S_DESTROYED;
|
|
mtx_leave(&tq->tq_mtx);
|
|
return;
|
|
|
|
case TQ_S_RUNNING:
|
|
tq->tq_state = TQ_S_DESTROYED;
|
|
break;
|
|
|
|
default:
|
|
panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
|
|
}
|
|
|
|
while (tq->tq_running > 0) {
|
|
wakeup(tq);
|
|
msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
|
|
INFSLP);
|
|
}
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
free(tq, M_DEVBUF, sizeof(*tq));
|
|
}
|
|
|
|
void
|
|
taskq_create_thread(void *arg)
|
|
{
|
|
struct taskq *tq = arg;
|
|
int rv;
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
|
|
switch (tq->tq_state) {
|
|
case TQ_S_DESTROYED:
|
|
mtx_leave(&tq->tq_mtx);
|
|
free(tq, M_DEVBUF, sizeof(*tq));
|
|
return;
|
|
|
|
case TQ_S_CREATED:
|
|
tq->tq_state = TQ_S_RUNNING;
|
|
break;
|
|
|
|
default:
|
|
panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
|
|
}
|
|
|
|
do {
|
|
tq->tq_running++;
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
if (rv != 0) {
|
|
printf("unable to create thread for \"%s\" taskq\n",
|
|
tq->tq_name);
|
|
|
|
tq->tq_running--;
|
|
/* could have been destroyed during kthread_create */
|
|
if (tq->tq_state == TQ_S_DESTROYED &&
|
|
tq->tq_running == 0)
|
|
wakeup_one(&tq->tq_running);
|
|
break;
|
|
}
|
|
} while (tq->tq_running < tq->tq_nthreads);
|
|
|
|
mtx_leave(&tq->tq_mtx);
|
|
}
|
|
|
|
void
|
|
taskq_barrier_task(void *p)
|
|
{
|
|
struct taskq *tq = p;
|
|
unsigned int gen;
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
tq->tq_bthreads++;
|
|
wakeup(&tq->tq_bthreads);
|
|
|
|
gen = tq->tq_bgen;
|
|
do {
|
|
msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
|
|
PWAIT, "tqbarend", INFSLP);
|
|
} while (gen == tq->tq_bgen);
|
|
mtx_leave(&tq->tq_mtx);
|
|
}
|
|
|
|
static void
|
|
taskq_do_barrier(struct taskq *tq)
|
|
{
|
|
struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
|
|
struct proc *thread = curproc;
|
|
struct taskq_thread *tt;
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
tq->tq_barriers++;
|
|
|
|
/* is the barrier being run from a task inside the taskq? */
|
|
SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
|
|
if (tt->tt_thread == thread) {
|
|
tq->tq_bthreads++;
|
|
wakeup(&tq->tq_bthreads);
|
|
break;
|
|
}
|
|
}
|
|
|
|
while (tq->tq_bthreads < tq->tq_nthreads) {
|
|
/* shove the task into the queue for a worker to pick up */
|
|
SET(t.t_flags, TASK_ONQUEUE);
|
|
TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
|
|
wakeup_one(tq);
|
|
|
|
msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
|
|
PWAIT, "tqbar", INFSLP);
|
|
|
|
/*
|
|
* another thread running a barrier might have
|
|
* done this work for us.
|
|
*/
|
|
if (ISSET(t.t_flags, TASK_ONQUEUE))
|
|
TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
|
|
}
|
|
|
|
if (--tq->tq_barriers == 0) {
|
|
/* we're the last one out */
|
|
tq->tq_bgen++;
|
|
wakeup(&tq->tq_bgen);
|
|
tq->tq_bthreads = 0;
|
|
} else {
|
|
unsigned int gen = tq->tq_bgen;
|
|
do {
|
|
msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
|
|
PWAIT, "tqbarwait", INFSLP);
|
|
} while (gen == tq->tq_bgen);
|
|
}
|
|
mtx_leave(&tq->tq_mtx);
|
|
}
|
|
|
|
void
|
|
taskq_barrier(struct taskq *tq)
|
|
{
|
|
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
|
|
|
|
taskq_do_barrier(tq);
|
|
}
|
|
|
|
void
|
|
taskq_del_barrier(struct taskq *tq, struct task *t)
|
|
{
|
|
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
|
|
|
|
if (task_del(tq, t))
|
|
return;
|
|
|
|
taskq_do_barrier(tq);
|
|
}
|
|
|
|
void
|
|
task_set(struct task *t, void (*fn)(void *), void *arg)
|
|
{
|
|
t->t_func = fn;
|
|
t->t_arg = arg;
|
|
t->t_flags = 0;
|
|
}
|
|
|
|
int
|
|
task_add(struct taskq *tq, struct task *w)
|
|
{
|
|
int rv = 0;
|
|
|
|
if (ISSET(w->t_flags, TASK_ONQUEUE))
|
|
return (0);
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
|
|
rv = 1;
|
|
SET(w->t_flags, TASK_ONQUEUE);
|
|
TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
|
|
#if NKCOV > 0
|
|
if (!kcov_cold)
|
|
w->t_process = curproc->p_p;
|
|
#endif
|
|
}
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
if (rv)
|
|
wakeup_one(tq);
|
|
|
|
return (rv);
|
|
}
|
|
|
|
int
|
|
task_del(struct taskq *tq, struct task *w)
|
|
{
|
|
int rv = 0;
|
|
|
|
if (!ISSET(w->t_flags, TASK_ONQUEUE))
|
|
return (0);
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
if (ISSET(w->t_flags, TASK_ONQUEUE)) {
|
|
rv = 1;
|
|
CLR(w->t_flags, TASK_ONQUEUE);
|
|
TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
|
|
}
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
return (rv);
|
|
}
|
|
|
|
int
|
|
taskq_next_work(struct taskq *tq, struct task *work)
|
|
{
|
|
struct task *next;
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
|
|
if (tq->tq_state != TQ_S_RUNNING) {
|
|
mtx_leave(&tq->tq_mtx);
|
|
return (0);
|
|
}
|
|
|
|
msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
|
|
}
|
|
|
|
TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
|
|
CLR(next->t_flags, TASK_ONQUEUE);
|
|
|
|
*work = *next; /* copy to caller to avoid races */
|
|
|
|
next = TAILQ_FIRST(&tq->tq_worklist);
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
if (next != NULL && tq->tq_nthreads > 1)
|
|
wakeup_one(tq);
|
|
|
|
return (1);
|
|
}
|
|
|
|
void
|
|
taskq_thread(void *xtq)
|
|
{
|
|
struct taskq_thread self = { .tt_thread = curproc };
|
|
struct taskq *tq = xtq;
|
|
struct task work;
|
|
int last;
|
|
|
|
if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
|
|
KERNEL_UNLOCK();
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
|
|
|
|
while (taskq_next_work(tq, &work)) {
|
|
WITNESS_LOCK(&tq->tq_lock_object, 0);
|
|
#if NKCOV > 0
|
|
kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
|
|
#endif
|
|
(*work.t_func)(work.t_arg);
|
|
#if NKCOV > 0
|
|
kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
|
|
#endif
|
|
WITNESS_UNLOCK(&tq->tq_lock_object, 0);
|
|
sched_pause(yield);
|
|
}
|
|
|
|
mtx_enter(&tq->tq_mtx);
|
|
SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
|
|
last = (--tq->tq_running == 0);
|
|
mtx_leave(&tq->tq_mtx);
|
|
|
|
if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
|
|
KERNEL_LOCK();
|
|
|
|
if (last)
|
|
wakeup_one(&tq->tq_running);
|
|
|
|
kthread_exit(0);
|
|
}
|