From: Rafael J. Wysocki on
On Tuesday 08 December 2009, Rafael J. Wysocki wrote:
> On Tuesday 08 December 2009, Alan Stern wrote:
> > On Tue, 8 Dec 2009, Rafael J. Wysocki wrote:
> >
> > > However, the parent can be on a different bus type than the children, so it
> > > looks like we can only start the asynchronous path at the core level.
> >
> > Agreed.
> >
> > > > Unless you want to do _all_ of the async logic in generic code and
> > > > re-introduce the "dev->async_suspend" flag.
> > >
> > > Quite frankly, I would like to.
> > >
> > > > I would be ok with that now that the infrastructure seems so simple.
> > >
> > > Well, perhaps I should dig out my original async suspend/resume patches
> > > that didn't contain all of the non-essential stuff and post them here for
> > > discussion, after all ...
> >
> > That seems like a very good idea. IIRC they were quite similar to what
> > we have been discussing.
>
> There you go.

Below is the suspend part. It contains some extra code for rolling back the
suspend if one of the asynchronous callbacks returns error code, but apart
from this it's completely analogous to the resume part.

[This patch has only been slightly tested.]

---
drivers/base/power/main.c | 113 ++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 104 insertions(+), 9 deletions(-)

Index: linux-2.6/drivers/base/power/main.c
===================================================================
--- linux-2.6.orig/drivers/base/power/main.c
+++ linux-2.6/drivers/base/power/main.c
@@ -202,6 +202,17 @@ static void dpm_wait(struct device *dev,
wait_event(dev->power.wait_queue, !!dev->power.op_complete);
}

+static int device_pm_wait_fn(struct device *dev, void *async_ptr)
+{
+ dpm_wait(dev, *((bool *)async_ptr));
+ return 0;
+}
+
+static void dpm_wait_for_children(struct device *dev, bool async)
+{
+ device_for_each_child(dev, &async, device_pm_wait_fn);
+}
+
/**
* dpm_synchronize - Wait for PM callbacks of all devices to complete.
*/
@@ -638,6 +649,8 @@ static void dpm_complete(pm_message_t st
mutex_unlock(&dpm_list_mtx);
}

+static int async_error;
+
/**
* dpm_resume_end - Execute "resume" callbacks and complete system transition.
* @state: PM transition of the system being carried out.
@@ -685,20 +698,52 @@ static pm_message_t resume_event(pm_mess
* The driver of @dev will not receive interrupts while this function is being
* executed.
*/
-static int device_suspend_noirq(struct device *dev, pm_message_t state)
+static int __device_suspend_noirq(struct device *dev, pm_message_t state)
{
int error = 0;

- if (!dev->bus)
- return 0;
-
- if (dev->bus->pm) {
+ if (dev->bus && dev->bus->pm) {
pm_dev_dbg(dev, state, "LATE ");
error = pm_noirq_op(dev, dev->bus->pm, state);
}
+
+ dpm_finish(dev);
+
return error;
}

+static void async_suspend_noirq(void *data, async_cookie_t cookie)
+{
+ struct device *dev = (struct device *)data;
+ int error = async_error;
+
+ if (error)
+ return;
+
+ dpm_wait_for_children(dev, true);
+ error = __device_suspend_noirq(dev, pm_transition);
+ if (error) {
+ pm_dev_err(dev, pm_transition, " async LATE", error);
+ dev->power.status = DPM_OFF;
+ }
+ put_device(dev);
+
+ if (error && !async_error)
+ async_error = error;
+}
+
+static int device_suspend_noirq(struct device *dev)
+{
+ if (dev->power.async_suspend) {
+ get_device(dev);
+ async_schedule(async_suspend_noirq, dev);
+ return 0;
+ }
+
+ dpm_wait_for_children(dev, false);
+ return __device_suspend_noirq(dev, pm_transition);
+}
+
/**
* dpm_suspend_noirq - Execute "late suspend" callbacks for non-sysdev devices.
* @state: PM transition of the system being carried out.
@@ -713,14 +758,21 @@ int dpm_suspend_noirq(pm_message_t state

suspend_device_irqs();
mutex_lock(&dpm_list_mtx);
+ pm_transition = state;
list_for_each_entry_reverse(dev, &dpm_list, power.entry) {
- error = device_suspend_noirq(dev, state);
+ dev->power.status = DPM_OFF_IRQ;
+ error = device_suspend_noirq(dev);
if (error) {
pm_dev_err(dev, state, " late", error);
+ dev->power.status = DPM_OFF;
+ break;
+ }
+ if (async_error) {
+ error = async_error;
break;
}
- dev->power.status = DPM_OFF_IRQ;
}
+ dpm_synchronize();
mutex_unlock(&dpm_list_mtx);
if (error)
dpm_resume_noirq(resume_event(state));
@@ -733,7 +785,7 @@ EXPORT_SYMBOL_GPL(dpm_suspend_noirq);
* @dev: Device to handle.
* @state: PM transition of the system being carried out.
*/
-static int device_suspend(struct device *dev, pm_message_t state)
+static int __device_suspend(struct device *dev, pm_message_t state)
{
int error = 0;

@@ -773,10 +825,45 @@ static int device_suspend(struct device
}
End:
up(&dev->sem);
+ dpm_finish(dev);

return error;
}

+static void async_suspend(void *data, async_cookie_t cookie)
+{
+ struct device *dev = (struct device *)data;
+ int error = async_error;
+
+ if (error)
+ goto End;
+
+ dpm_wait_for_children(dev, true);
+ error = __device_suspend(dev, pm_transition);
+ if (error) {
+ pm_dev_err(dev, pm_transition, " async", error);
+
+ dev->power.status = DPM_SUSPENDING;
+ if (!async_error)
+ async_error = error;
+ }
+
+ End:
+ put_device(dev);
+}
+
+static int device_suspend(struct device *dev, pm_message_t state)
+{
+ if (dev->power.async_suspend) {
+ get_device(dev);
+ async_schedule(async_suspend, dev);
+ return 0;
+ }
+
+ dpm_wait_for_children(dev, false);
+ return __device_suspend(dev, pm_transition);
+}
+
/**
* dpm_suspend - Execute "suspend" callbacks for all non-sysdev devices.
* @state: PM transition of the system being carried out.
@@ -788,10 +875,12 @@ static int dpm_suspend(pm_message_t stat

INIT_LIST_HEAD(&list);
mutex_lock(&dpm_list_mtx);
+ pm_transition = state;
while (!list_empty(&dpm_list)) {
struct device *dev = to_device(dpm_list.prev);

get_device(dev);
+ dev->power.status = DPM_OFF;
mutex_unlock(&dpm_list_mtx);

error = device_suspend(dev, state);
@@ -799,16 +888,21 @@ static int dpm_suspend(pm_message_t stat
mutex_lock(&dpm_list_mtx);
if (error) {
pm_dev_err(dev, state, "", error);
+ dev->power.status = DPM_SUSPENDING;
put_device(dev);
break;
}
- dev->power.status = DPM_OFF;
if (!list_empty(&dev->power.entry))
list_move(&dev->power.entry, &list);
put_device(dev);
+ if (async_error)
+ break;
}
list_splice(&list, dpm_list.prev);
+ dpm_synchronize();
mutex_unlock(&dpm_list_mtx);
+ if (!error)
+ error = async_error;
return error;
}

@@ -867,6 +961,7 @@ static int dpm_prepare(pm_message_t stat
INIT_LIST_HEAD(&list);
mutex_lock(&dpm_list_mtx);
transition_started = true;
+ async_error = 0;
while (!list_empty(&dpm_list)) {
struct device *dev = to_device(dpm_list.next);

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Linus Torvalds on


On Tue, 8 Dec 2009, Alan Stern wrote:
>
> The semantics needed for this kind of lock aren't really the same as
> for an rwsem (although obviously an rwsem will do the job). Basically
> it needs to have the capability for multiple users to lock it (no
> blocking when acquiring a lock) and the capability for a user to wait
> until it is totally unlocked. It could be implemented trivially using
> an atomic_t counter and a waitqueue head.
>
> Is this a standard sort of lock?

Yes it is.

It's called a rwlock. The counter is for readers, the exclusion is for
writers.

Really.

And the thing is, you actually do want the rwlock semantics, because on
the resume side you want the parent to lock it for writing first (so that
the children can wait for the parent to have completed its resume.

So we actually _want_ the full rwlock semantics.

See the code I posted earlier. Here condensed into one email:

- resume:

usb_node_resume(node)
{
// Wait for parent to finish resume
down_read(node->parent->lock);
// .. before resuming outselves
node->resume(node)

// Now we're all done
up_read(node->parent->lock);
up_write(node->lock);
}

/* caller: */
..
// This won't block, because we resume parents before children,
// and the children will take the read lock.
down_write(leaf->lock);
// Do the blocking part asynchronously
async_schedule(usb_node_resume, leaf);
..

- suspend:

usb_node_suspend(node)
{
// Start our suspend. This will block if we have any
// children that are still busy suspending (they will
// have done a down_read() in their suspend).
down_write(node->lock);
node->suspend(node);
up_write(node->lock);

// This lets our parent continue
up_read(node->parent->lock);
}

/* caller: */

// This won't block, because we suspend nodes before parents
down_read(node->parent->lock);
// Do the part that may block asynchronously
async_schedule(do_usb_node_suspend, node);

It really should be that simple. Nothing more, nothing less. And with the
above, finishing the suspend (or resume) from interrupts is fine, and you
don't have any new lock that has undefined memory ordering issues etc.

Linus
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Alan Stern on
On Tue, 8 Dec 2009, Linus Torvalds wrote:

> On Tue, 8 Dec 2009, Alan Stern wrote:
> >
> > The semantics needed for this kind of lock aren't really the same as
> > for an rwsem (although obviously an rwsem will do the job). Basically
> > it needs to have the capability for multiple users to lock it (no
> > blocking when acquiring a lock) and the capability for a user to wait
> > until it is totally unlocked. It could be implemented trivially using
> > an atomic_t counter and a waitqueue head.
> >
> > Is this a standard sort of lock?
>
> Yes it is.
>
> It's called a rwlock. The counter is for readers, the exclusion is for
> writers.
>
> Really.
>
> And the thing is, you actually do want the rwlock semantics, because on
> the resume side you want the parent to lock it for writing first (so that
> the children can wait for the parent to have completed its resume.
>
> So we actually _want_ the full rwlock semantics.

I'm not convinced. Condense the description a little farther:

Suspend: Children lock the parent first. When they are
finished they unlock the parent, allowing it to
proceed.

Resume: Parent locks itself first. When it is finished
it unlocks itself, allowing the children to proceed.

The whole readers vs. writers thing is a non-sequitur. (For instance,
this never uses the fact that writers exclude each other.) In each
case a lock is taken and eventually released, allowing someone else to
stop waiting and move forward. In the suspend case we have multiple
lockers and one waiter, whereas in the resume case we have one locker
and multiple waiters.

The simplest generalization is to allow both multiple lockers and
multiple waiters. Call it a waitlock, for want of a better name:

wait_lock(wl)
{
atomic_inc(&wl->count);
}

wait_unlock(wl)
{
if (atomic_dec_and_test(&wl->count)) {
smp_mb__after_atomic_dec();
wake_up_all(wl->wqh);
}
}

wait_for_lock(wl)
{
wait_event(wl->wqh, atomic_read(&wl->count) == 0);
smp_rmb();
}

Note that both wait_lock() and wait_unlock() can be called
in_interrupt.

> See the code I posted earlier. Here condensed into one email:
>
> - resume:
>
> usb_node_resume(node)
> {
> // Wait for parent to finish resume
> down_read(node->parent->lock);
> // .. before resuming outselves
> node->resume(node)
>
> // Now we're all done
> up_read(node->parent->lock);
> up_write(node->lock);
> }
>
> /* caller: */
> ..
> // This won't block, because we resume parents before children,
> // and the children will take the read lock.
> down_write(leaf->lock);
> // Do the blocking part asynchronously
> async_schedule(usb_node_resume, leaf);
> ..

This becomes:

usb_node_resume(node)
{
// Wait for parent to finish resume
wait_for_lock(node->parent->lock);
// .. before resuming outselves
node->resume(node)

// Now we're all done
wait_unlock(node->lock);
}

/* caller: */
..
// This can't block, because wait_lock() is non-blocking.
wait_lock(node->lock);
// Do the blocking part asynchronously
async_schedule(usb_node_resume, leaf);
..

> - suspend:
>
> usb_node_suspend(node)
> {
> // Start our suspend. This will block if we have any
> // children that are still busy suspending (they will
> // have done a down_read() in their suspend).
> down_write(node->lock);
> node->suspend(node);
> up_write(node->lock);
>
> // This lets our parent continue
> up_read(node->parent->lock);
> }
>
> /* caller: */
>
> // This won't block, because we suspend nodes before parents
> down_read(node->parent->lock);
> // Do the part that may block asynchronously
> async_schedule(do_usb_node_suspend, node);

usb_node_suspend(node)
{
// Start our suspend. This will block if we have any
// children that are still busy suspending (they will
// have done a wait_lock() in their suspend).
wait_for_lock(node->lock);
node->suspend(node);

// This lets our parent continue
wait_unlock(node->parent->lock);
}

/* caller: */
..
// This can't block, because wait_lock is non-blocking.
wait_lock(node->parent->lock);
// Do the part that may block asynchronously
async_schedule(do_usb_node_suspend, node);
..

> It really should be that simple. Nothing more, nothing less. And with the
> above, finishing the suspend (or resume) from interrupts is fine, and you
> don't have any new lock that has undefined memory ordering issues etc.

Aren't waitlocks simpler than rwsems? Not as generally useful,
perhaps. But just as correct in this situation.

Alan Stern

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Linus Torvalds on


On Tue, 8 Dec 2009, Alan Stern wrote:
> >
> > So we actually _want_ the full rwlock semantics.
>
> I'm not convinced. Condense the description a little farther:
>
> Suspend: Children lock the parent first. When they are
> finished they unlock the parent, allowing it to
> proceed.
>
> Resume: Parent locks itself first. When it is finished
> it unlocks itself, allowing the children to proceed.

Yes. You can implement it with a simple lock with a count. Nobody debates
that.

But a simple counting lock _is_ a rwlock. Really. They are 100%
semantically equivalent. There is no difference.

> The whole readers vs. writers thing is a non-sequitur.

No it's not.

It's a 100% equivalent problem. It's purely a change of wording. The end
result is the same.

> The simplest generalization is to allow both multiple lockers and
> multiple waiters. Call it a waitlock, for want of a better name:

But we have that. It _has_ a better name: rwlocks.

And the reason the name is better is because now the name describes all
the semantics to anybody who has ever taken a course in operating systems
or in parallelism.

It's also a better implementation, because it actually _works_.

> wait_lock(wl)
> {
> atomic_inc(&wl->count);
> }
>
> wait_unlock(wl)
> {
> if (atomic_dec_and_test(&wl->count)) {
> smp_mb__after_atomic_dec();
> wake_up_all(wl->wqh);
> }
> }
>
> wait_for_lock(wl)
> {
> wait_event(wl->wqh, atomic_read(&wl->count) == 0);
> smp_rmb();
> }
>
> Note that both wait_lock() and wait_unlock() can be called
> in_interrupt.

And note how even though you sprinkled random memory barriers around, you
still got it wrong.

So you just implemented a buggy lock, and for what gain? Tell me exactly
why your buggy lock (assuming you'd know enough about memory ordering to
actually fix it) is better than just using the existing one?

It's certainly not smaller. It's not faster. It doesn't have support for
lockdep. And it's BUGGY.

Really. Tell me why you want to re-implement an existing lock - badly.

[ Hint: you need a smp_mb() *before* the atomic_dec() in wait-unlock, so
that anybody else who sees the new value will be guaranteed to have seen
anything else the unlocker did.

You also need a smp_mb() in the wait_for_lock(), not a smp_rmb(). Can't
allow writes to migrate up either. 'atomic_read()' does not imply any
barriers.

But most architectures can optimize these things for their particular
memory ordering model, and do so in their rwsem implementation. ]

> This becomes:
>
> usb_node_resume(node)
> {
> // Wait for parent to finish resume
> wait_for_lock(node->parent->lock);
> // .. before resuming outselves
> node->resume(node)
>
> // Now we're all done
> wait_unlock(node->lock);
> }
>
> /* caller: */
> ..
> // This can't block, because wait_lock() is non-blocking.
> wait_lock(node->lock);
> // Do the blocking part asynchronously
> async_schedule(usb_node_resume, leaf);
> ..

Umm? Same thing, different words?

That "wait_for_lock()" is equivalent to a 'read_lock()+read_unlock()'. We
_could_ expose such a mechanism for rwsem's too, but why do it? It's
actually nicer to use a real read-lock - and do it _around_ the operation,
because now the locking also automatically gets things like overlapping
suspends and resumes right.

(Which you'd obviously hope never happens, but it's nice from a conceptual
standpoint to know that the locking is robust).

> Aren't waitlocks simpler than rwsems? Not as generally useful,
> perhaps. But just as correct in this situation.

NO!

Dammit. I started this whole rant with this comment to Rafael:

"The fact is, any time anybody makes up a new locking mechanism, THEY
ALWAYS GET IT WRONG. Don't do it."

Take heed. You got it wrong. Admit it. Locking is _hard_. SMP memory
ordering is HARD.

So leave locking to the pro's. They _also_ got it wrong, but they got it
wrong several years ago, and fixed up their sh*t.

This is why you use generic locking. ALWAYS.

Linus


--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Linus Torvalds on


On Tue, 8 Dec 2009, Linus Torvalds wrote:
>
> [ Hint: you need a smp_mb() *before* the atomic_dec() in wait-unlock, so
> that anybody else who sees the new value will be guaranteed to have seen
> anything else the unlocker did.
>
> You also need a smp_mb() in the wait_for_lock(), not a smp_rmb(). Can't
> allow writes to migrate up either. 'atomic_read()' does not imply any
> barriers.
>
> But most architectures can optimize these things for their particular
> memory ordering model, and do so in their rwsem implementation. ]

Side note: if this was a real lock, you'd also needed an smp_wmb() in the
'wait_lock()' path after the atomic_inc(), to make sure that others see
the atomic lock was seen by other people before the suspend started.

In your usage scenario, I don't think it would ever be noticeable, since
the other users are always going to start running from the same thread
that did the wait_lock(), so even if they run on other CPU's, we'll have
scheduled _to_ those other CPU's and done enough memory ordering to
guarantee that they will see the thing.

So it would be ok in this situation, simply because it acts as an
initializer and never sees any real SMP issues.

But it's an example of how you now don't just depend on the locking
primitives themselves doing the right thing, you end up depending very
subtly on exactly how the lock is used. The standard locks do have the
same kind of issue for initializers, but we avoid it elsewhere because
it's so risky.

Linus
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/