summaryrefslogtreecommitdiff
path: root/kernel/padata.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/padata.c')
-rw-r--r--kernel/padata.c137
1 files changed, 48 insertions, 89 deletions
diff --git a/kernel/padata.c b/kernel/padata.c
index 282b489a286d..c50975f43b34 100644
--- a/kernel/padata.c
+++ b/kernel/padata.c
@@ -33,6 +33,8 @@
#define MAX_OBJ_NUM 1000
+static void padata_free_pd(struct parallel_data *pd);
+
static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
{
int cpu, target_cpu;
@@ -63,15 +65,11 @@ static int padata_cpu_hash(struct parallel_data *pd)
static void padata_parallel_worker(struct work_struct *parallel_work)
{
struct padata_parallel_queue *pqueue;
- struct parallel_data *pd;
- struct padata_instance *pinst;
LIST_HEAD(local_list);
local_bh_disable();
pqueue = container_of(parallel_work,
struct padata_parallel_queue, work);
- pd = pqueue->pd;
- pinst = pd->pinst;
spin_lock(&pqueue->parallel.lock);
list_replace_init(&pqueue->parallel.list, &local_list);
@@ -134,6 +132,7 @@ int padata_do_parallel(struct padata_instance *pinst,
padata->cb_cpu = cb_cpu;
target_cpu = padata_cpu_hash(pd);
+ padata->cpu = target_cpu;
queue = per_cpu_ptr(pd->pqueue, target_cpu);
spin_lock(&queue->parallel.lock);
@@ -157,8 +156,6 @@ EXPORT_SYMBOL(padata_do_parallel);
* A pointer to the control struct of the next object that needs
* serialization, if present in one of the percpu reorder queues.
*
- * NULL, if all percpu reorder queues are empty.
- *
* -EINPROGRESS, if the next object that needs serialization will
* be parallel processed by another cpu and is not yet present in
* the cpu's reorder queue.
@@ -168,25 +165,12 @@ EXPORT_SYMBOL(padata_do_parallel);
*/
static struct padata_priv *padata_get_next(struct parallel_data *pd)
{
- int cpu, num_cpus;
- unsigned int next_nr, next_index;
struct padata_parallel_queue *next_queue;
struct padata_priv *padata;
struct padata_list *reorder;
+ int cpu = pd->cpu;
- num_cpus = cpumask_weight(pd->cpumask.pcpu);
-
- /*
- * Calculate the percpu reorder queue and the sequence
- * number of the next object.
- */
- next_nr = pd->processed;
- next_index = next_nr % num_cpus;
- cpu = padata_index_to_cpu(pd, next_index);
next_queue = per_cpu_ptr(pd->pqueue, cpu);
-
- padata = NULL;
-
reorder = &next_queue->reorder;
spin_lock(&reorder->lock);
@@ -197,7 +181,8 @@ static struct padata_priv *padata_get_next(struct parallel_data *pd)
list_del_init(&padata->list);
atomic_dec(&pd->reorder_objects);
- pd->processed++;
+ pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1,
+ false);
spin_unlock(&reorder->lock);
goto out;
@@ -220,6 +205,7 @@ static void padata_reorder(struct parallel_data *pd)
struct padata_priv *padata;
struct padata_serial_queue *squeue;
struct padata_instance *pinst = pd->pinst;
+ struct padata_parallel_queue *next_queue;
/*
* We need to ensure that only one cpu can work on dequeueing of
@@ -238,12 +224,11 @@ static void padata_reorder(struct parallel_data *pd)
padata = padata_get_next(pd);
/*
- * All reorder queues are empty, or the next object that needs
- * serialization is parallel processed by another cpu and is
- * still on it's way to the cpu's reorder queue, nothing to
- * do for now.
+ * If the next object that needs serialization is parallel
+ * processed by another cpu and is still on it's way to the
+ * cpu's reorder queue, nothing to do for now.
*/
- if (!padata || PTR_ERR(padata) == -EINPROGRESS)
+ if (PTR_ERR(padata) == -EINPROGRESS)
break;
/*
@@ -252,7 +237,6 @@ static void padata_reorder(struct parallel_data *pd)
* so exit immediately.
*/
if (PTR_ERR(padata) == -ENODATA) {
- del_timer(&pd->timer);
spin_unlock_bh(&pd->lock);
return;
}
@@ -271,28 +255,27 @@ static void padata_reorder(struct parallel_data *pd)
/*
* The next object that needs serialization might have arrived to
- * the reorder queues in the meantime, we will be called again
- * from the timer function if no one else cares for it.
+ * the reorder queues in the meantime.
*
- * Ensure reorder_objects is read after pd->lock is dropped so we see
- * an increment from another task in padata_do_serial. Pairs with
+ * Ensure reorder queue is read after pd->lock is dropped so we see
+ * new objects from another task in padata_do_serial. Pairs with
* smp_mb__after_atomic in padata_do_serial.
*/
smp_mb();
- if (atomic_read(&pd->reorder_objects)
- && !(pinst->flags & PADATA_RESET))
- mod_timer(&pd->timer, jiffies + HZ);
- else
- del_timer(&pd->timer);
- return;
+ next_queue = per_cpu_ptr(pd->pqueue, pd->cpu);
+ if (!list_empty(&next_queue->reorder.list))
+ queue_work(pinst->wq, &pd->reorder_work);
}
-static void padata_reorder_timer(unsigned long arg)
+static void invoke_padata_reorder(struct work_struct *work)
{
- struct parallel_data *pd = (struct parallel_data *)arg;
+ struct parallel_data *pd;
+ local_bh_disable();
+ pd = container_of(work, struct parallel_data, reorder_work);
padata_reorder(pd);
+ local_bh_enable();
}
static void padata_serial_worker(struct work_struct *serial_work)
@@ -300,6 +283,7 @@ static void padata_serial_worker(struct work_struct *serial_work)
struct padata_serial_queue *squeue;
struct parallel_data *pd;
LIST_HEAD(local_list);
+ int cnt;
local_bh_disable();
squeue = container_of(serial_work, struct padata_serial_queue, work);
@@ -309,6 +293,8 @@ static void padata_serial_worker(struct work_struct *serial_work)
list_replace_init(&squeue->serial.list, &local_list);
spin_unlock(&squeue->serial.lock);
+ cnt = 0;
+
while (!list_empty(&local_list)) {
struct padata_priv *padata;
@@ -318,9 +304,12 @@ static void padata_serial_worker(struct work_struct *serial_work)
list_del_init(&padata->list);
padata->serial(padata);
- atomic_dec(&pd->refcnt);
+ cnt++;
}
local_bh_enable();
+
+ if (atomic_sub_and_test(cnt, &pd->refcnt))
+ padata_free_pd(pd);
}
/**
@@ -333,29 +322,22 @@ static void padata_serial_worker(struct work_struct *serial_work)
*/
void padata_do_serial(struct padata_priv *padata)
{
- int cpu;
- struct padata_parallel_queue *pqueue;
- struct parallel_data *pd;
-
- pd = padata->pd;
-
- cpu = get_cpu();
- pqueue = per_cpu_ptr(pd->pqueue, cpu);
+ struct parallel_data *pd = padata->pd;
+ struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue,
+ padata->cpu);
spin_lock(&pqueue->reorder.lock);
- atomic_inc(&pd->reorder_objects);
list_add_tail(&padata->list, &pqueue->reorder.list);
+ atomic_inc(&pd->reorder_objects);
spin_unlock(&pqueue->reorder.lock);
/*
- * Ensure the atomic_inc of reorder_objects above is ordered correctly
+ * Ensure the addition to the reorder list is ordered correctly
* with the trylock of pd->lock in padata_reorder. Pairs with smp_mb
* in padata_reorder.
*/
smp_mb__after_atomic();
- put_cpu();
-
padata_reorder(pd);
}
EXPORT_SYMBOL(padata_do_serial);
@@ -404,9 +386,14 @@ static void padata_init_pqueues(struct parallel_data *pd)
struct padata_parallel_queue *pqueue;
cpu_index = 0;
- for_each_cpu(cpu, pd->cpumask.pcpu) {
+ for_each_possible_cpu(cpu) {
pqueue = per_cpu_ptr(pd->pqueue, cpu);
- pqueue->pd = pd;
+
+ if (!cpumask_test_cpu(cpu, pd->cpumask.pcpu)) {
+ pqueue->cpu_index = -1;
+ continue;
+ }
+
pqueue->cpu_index = cpu_index;
cpu_index++;
@@ -440,12 +427,13 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
padata_init_pqueues(pd);
padata_init_squeues(pd);
- setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
atomic_set(&pd->seq_nr, -1);
atomic_set(&pd->reorder_objects, 0);
- atomic_set(&pd->refcnt, 0);
+ atomic_set(&pd->refcnt, 1);
pd->pinst = pinst;
spin_lock_init(&pd->lock);
+ pd->cpu = cpumask_first(pd->cpumask.pcpu);
+ INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
return pd;
@@ -468,31 +456,6 @@ static void padata_free_pd(struct parallel_data *pd)
kfree(pd);
}
-/* Flush all objects out of the padata queues. */
-static void padata_flush_queues(struct parallel_data *pd)
-{
- int cpu;
- struct padata_parallel_queue *pqueue;
- struct padata_serial_queue *squeue;
-
- for_each_cpu(cpu, pd->cpumask.pcpu) {
- pqueue = per_cpu_ptr(pd->pqueue, cpu);
- flush_work(&pqueue->work);
- }
-
- del_timer_sync(&pd->timer);
-
- if (atomic_read(&pd->reorder_objects))
- padata_reorder(pd);
-
- for_each_cpu(cpu, pd->cpumask.cbcpu) {
- squeue = per_cpu_ptr(pd->squeue, cpu);
- flush_work(&squeue->work);
- }
-
- BUG_ON(atomic_read(&pd->refcnt) != 0);
-}
-
static void __padata_start(struct padata_instance *pinst)
{
pinst->flags |= PADATA_INIT;
@@ -506,10 +469,6 @@ static void __padata_stop(struct padata_instance *pinst)
pinst->flags &= ~PADATA_INIT;
synchronize_rcu();
-
- get_online_cpus();
- padata_flush_queues(pinst->pd);
- put_online_cpus();
}
/* Replace the internal control structure with a new one. */
@@ -530,8 +489,8 @@ static void padata_replace(struct padata_instance *pinst,
if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
notification_mask |= PADATA_CPU_SERIAL;
- padata_flush_queues(pd_old);
- padata_free_pd(pd_old);
+ if (atomic_dec_and_test(&pd_old->refcnt))
+ padata_free_pd(pd_old);
if (notification_mask)
blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
@@ -661,8 +620,8 @@ int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
struct cpumask *serial_mask, *parallel_mask;
int err = -EINVAL;
- mutex_lock(&pinst->lock);
get_online_cpus();
+ mutex_lock(&pinst->lock);
switch (cpumask_type) {
case PADATA_CPU_PARALLEL:
@@ -680,8 +639,8 @@ int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
err = __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
out:
- put_online_cpus();
mutex_unlock(&pinst->lock);
+ put_online_cpus();
return err;
}