From: Michael S. Tsirkin on
> +/* The structure to notify the virtqueue for async socket */
> +struct vhost_notifier {
> + struct list_head list;
> + struct vhost_virtqueue *vq;
> + int head;
> + int size;
> + int log;
> + void *ctrl;
> + void (*dtor)(struct vhost_notifier *);
> +};
> +

So IMO, this is not the best interface between vhost
and your driver, exposing them to each other unnecessarily.

If you think about it, your driver should not care about this structure.
It could get e.g. a kiocb (sendmsg already gets one), and call ki_dtor
on completion. vhost could save it's state in ki_user_data. If your
driver needs to add more data to do more tracking, I think it can put
skb pointer in the private pointer.

--
MST
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Xin, Xiaohui on
>> +/* The structure to notify the virtqueue for async socket */
>> +struct vhost_notifier {
>> + struct list_head list;
> >+ struct vhost_virtqueue *vq;
> >+ int head;
> >+ int size;
> >+ int log;
> >+ void *ctrl;
> >+ void (*dtor)(struct vhost_notifier *);
> >+};
> >+

>So IMO, this is not the best interface between vhost
>and your driver, exposing them to each other unnecessarily.
>
>If you think about it, your driver should not care about this structure.
>It could get e.g. a kiocb (sendmsg already gets one), and call ki_dtor
>on completion. vhost could save it's state in ki_user_data. If your
>driver needs to add more data to do more tracking, I think it can put
>skb pointer in the private pointer.

Then if I remove the struct vhost_notifier, and just use struct kiocb, but don't use the one got from sendmsg or recvmsg, but allocated within the page_info structure, and don't implement any aio logic related to it, is that ok?

Sorry, I made a patch, but don't know how to reply mail with a good formatted patch here....

Thanks
Xiaohui
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Michael S. Tsirkin on
On Mon, Mar 15, 2010 at 04:46:50PM +0800, Xin, Xiaohui wrote:
> >> +/* The structure to notify the virtqueue for async socket */
> >> +struct vhost_notifier {
> >> + struct list_head list;
> > >+ struct vhost_virtqueue *vq;
> > >+ int head;
> > >+ int size;
> > >+ int log;
> > >+ void *ctrl;
> > >+ void (*dtor)(struct vhost_notifier *);
> > >+};
> > >+
>
> >So IMO, this is not the best interface between vhost
> >and your driver, exposing them to each other unnecessarily.
> >
> >If you think about it, your driver should not care about this structure.
> >It could get e.g. a kiocb (sendmsg already gets one), and call ki_dtor
> >on completion. vhost could save it's state in ki_user_data. If your
> >driver needs to add more data to do more tracking, I think it can put
> >skb pointer in the private pointer.
>
> Then if I remove the struct vhost_notifier, and just use struct kiocb, but don't use the one got from sendmsg or recvmsg, but allocated within the page_info structure, and don't implement any aio logic related to it, is that ok?

Hmm, not sure I understand. It seems both cleaner and easier to use the
iocb passed to sendmsg/recvmsg. No? I am not saying you necessarily must
implement full aio directly.

> Sorry, I made a patch, but don't know how to reply mail with a good formatted patch here....
>
> Thanks
> Xiaohui

Maybe Documentation/email-clients.txt will help?
Generally you do it like this (at start of mail):

Subject: one line patch summary (overrides mail subject)

multilie patch description

Signed-off-by: <...>

---

Free text comes after --- delimeter, before patch.

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index a140dad..e830b30 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -106,22 +106,41 @@ static void handle_tx(struct vhost_net *net)




--
MST
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Michael S. Tsirkin on
On Tue, Mar 16, 2010 at 05:32:17PM +0800, Xin Xiaohui wrote:
> The vhost-net backend now only supports synchronous send/recv
> operations. The patch provides multiple submits and asynchronous
> notifications. This is needed for zero-copy case.
>
> Signed-off-by: Xin Xiaohui <xiaohui.xin(a)intel.com>
> ---
>
> Michael,
> I don't use the kiocb comes from the sendmsg/recvmsg,
> since I have embeded the kiocb in page_info structure,
> and allocate it when page_info allocated.

So what I suggested was that vhost allocates and tracks the iocbs, and
passes them to your device with sendmsg/ recvmsg calls. This way your
device won't need to share structures and locking strategy with vhost:
you get an iocb, handle it, invoke a callback to notify vhost about
completion.

This also gets rid of the 'receiver' callback.

> Please have a review and thanks for the instruction
> for replying email which helps me a lot.
>
> Thanks,
> Xiaohui
>
> drivers/vhost/net.c | 159 +++++++++++++++++++++++++++++++++++++++++++++++--
> drivers/vhost/vhost.h | 12 ++++
> 2 files changed, 166 insertions(+), 5 deletions(-)
>
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 22d5fef..5483848 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -17,11 +17,13 @@
> #include <linux/workqueue.h>
> #include <linux/rcupdate.h>
> #include <linux/file.h>
> +#include <linux/aio.h>
>
> #include <linux/net.h>
> #include <linux/if_packet.h>
> #include <linux/if_arp.h>
> #include <linux/if_tun.h>
> +#include <linux/mpassthru.h>
>
> #include <net/sock.h>
>
> @@ -91,6 +93,12 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> net->tx_poll_state = VHOST_NET_POLL_STARTED;
> }
>
> +static void handle_async_rx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq);
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq);
> +

A couple of style comments:

- It's better to arrange functions in such order that forward declarations
aren't necessary. Since we don't have recursion, this should always be
possible.

- continuation lines should be idented at least at the position of '('
on the previous line.

> /* Expects to be always run from workqueue - which acts as
> * read-size critical section for our kind of RCU. */
> static void handle_tx(struct vhost_net *net)
> @@ -124,6 +132,8 @@ static void handle_tx(struct vhost_net *net)
> tx_poll_stop(net);
> hdr_size = vq->hdr_size;
>
> + handle_async_tx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -151,6 +161,12 @@ static void handle_tx(struct vhost_net *net)
> /* Skip header. TODO: support TSO. */
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
> msg.msg_iovlen = out;
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->head = head;
> + msg.msg_control = (void *)vq;

So here a device gets a pointer to vhost_virtqueue structure. If it gets
an iocb and invokes a callback, it would not care about vhost internals.

> + }
> +
> len = iov_length(vq->iov, out);
> /* Sanity check */
> if (!len) {
> @@ -166,6 +182,10 @@ static void handle_tx(struct vhost_net *net)
> tx_poll_start(net, sock);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> if (err != len)
> pr_err("Truncated TX packet: "
> " len %d != %zd\n", err, len);
> @@ -177,6 +197,8 @@ static void handle_tx(struct vhost_net *net)
> }
> }
>
> + handle_async_tx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
> @@ -206,7 +228,8 @@ static void handle_rx(struct vhost_net *net)
> int err;
> size_t hdr_size;
> struct socket *sock = rcu_dereference(vq->private_data);
> - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
> + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
> + vq->link_state == VHOST_VQ_LINK_SYNC))
> return;
>
> use_mm(net->dev.mm);
> @@ -214,9 +237,18 @@ static void handle_rx(struct vhost_net *net)
> vhost_disable_notify(vq);
> hdr_size = vq->hdr_size;
>
> - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
> + /* In async cases, for write logging, the simple way is to get
> + * the log info always, and really logging is decided later.
> + * Thus, when logging enabled, we can get log, and when logging
> + * disabled, we can get log disabled accordingly.
> + */
> +


This adds overhead and might be one of the reasons
your patch does not perform that well. A better way
would be to flush outstanding requests or reread the vq
when logging is enabled.

> + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> vq->log : NULL;
>
> + handle_async_rx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -245,6 +277,11 @@ static void handle_rx(struct vhost_net *net)
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> msg.msg_iovlen = in;
> len = iov_length(vq->iov, in);
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->head = head;
> + vq->_log = log;
> + msg.msg_control = (void *)vq;
> + }
> /* Sanity check */
> if (!len) {
> vq_err(vq, "Unexpected header len for RX: "
> @@ -259,6 +296,10 @@ static void handle_rx(struct vhost_net *net)
> vhost_discard_vq_desc(vq);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> /* TODO: Should check and handle checksum. */
> if (err > len) {
> pr_err("Discarded truncated rx packet: "
> @@ -284,10 +325,85 @@ static void handle_rx(struct vhost_net *net)
> }
> }
>
> + handle_async_rx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
>
> +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&vq->notify_lock, flags);
> + if (!list_empty(&vq->notifier)) {
> + iocb = list_first_entry(&vq->notifier,
> + struct kiocb, ki_list);
> + list_del(&iocb->ki_list);
> + }
> + spin_unlock_irqrestore(&vq->notify_lock, flags);
> + return iocb;
> +}
> +
> +static void handle_async_rx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + struct vhost_log *vq_log = NULL;
> + int rx_total_len = 0;
> + int log, size;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> + if (vq != &net->dev.vqs[VHOST_NET_VQ_RX])
> + return;
> +
> + if (vq->receiver)
> + vq->receiver(vq);
> + vq_log = unlikely(vhost_has_feature(
> + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, iocb->ki_nbytes);
> + log = (int)iocb->ki_user_data;
> + size = iocb->ki_nbytes;
> + rx_total_len += iocb->ki_nbytes;
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + if (unlikely(vq_log))
> + vhost_log_write(vq, vq_log, log, size);
> + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + int tx_total_len = 0;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> + if (vq != &net->dev.vqs[VHOST_NET_VQ_TX])
> + return;
> +

Hard to see why the second check would be necessary

> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, 0);
> + tx_total_len += iocb->ki_nbytes;
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> static void handle_tx_kick(struct work_struct *work)
> {
> struct vhost_virtqueue *vq;
> @@ -462,7 +578,19 @@ static struct socket *get_tun_socket(int fd)
> return sock;
> }
>
> -static struct socket *get_socket(int fd)
> +static struct socket *get_mp_socket(int fd)
> +{
> + struct file *file = fget(fd);
> + struct socket *sock;
> + if (!file)
> + return ERR_PTR(-EBADF);
> + sock = mp_get_socket(file);
> + if (IS_ERR(sock))
> + fput(file);
> + return sock;
> +}
> +
> +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> {
> struct socket *sock;
> if (fd == -1)
> @@ -473,9 +601,26 @@ static struct socket *get_socket(int fd)
> sock = get_tun_socket(fd);
> if (!IS_ERR(sock))
> return sock;
> + sock = get_mp_socket(fd);
> + if (!IS_ERR(sock)) {
> + vq->link_state = VHOST_VQ_LINK_ASYNC;
> + return sock;
> + }
> return ERR_PTR(-ENOTSOCK);
> }
>
> +static void vhost_init_link_state(struct vhost_net *n, int index)
> +{
> + struct vhost_virtqueue *vq = n->vqs + index;
> +
> + WARN_ON(!mutex_is_locked(&vq->mutex));
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->receiver = NULL;
> + INIT_LIST_HEAD(&vq->notifier);
> + spin_lock_init(&vq->notify_lock);
> + }
> +}
> +
> static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> {
> struct socket *sock, *oldsock;
> @@ -493,12 +638,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> }
> vq = n->vqs + index;
> mutex_lock(&vq->mutex);
> - sock = get_socket(fd);
> + vq->link_state = VHOST_VQ_LINK_SYNC;
> + sock = get_socket(vq, fd);
> if (IS_ERR(sock)) {
> r = PTR_ERR(sock);
> goto err;
> }
>
> + vhost_init_link_state(n, index);
> +
> /* start polling new socket */
> oldsock = vq->private_data;
> if (sock == oldsock)
> @@ -507,8 +655,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> vhost_net_disable_vq(n, vq);
> rcu_assign_pointer(vq->private_data, sock);
> vhost_net_enable_vq(n, vq);
> - mutex_unlock(&vq->mutex);
> done:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> if (oldsock) {
> vhost_net_flush_vq(n, index);
> @@ -516,6 +664,7 @@ done:
> }
> return r;
> err:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> return r;
> }
> diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> index d1f0453..297af1c 100644
> --- a/drivers/vhost/vhost.h
> +++ b/drivers/vhost/vhost.h
> @@ -43,6 +43,11 @@ struct vhost_log {
> u64 len;
> };
>
> +enum vhost_vq_link_state {
> + VHOST_VQ_LINK_SYNC = 0,
> + VHOST_VQ_LINK_ASYNC = 1,
> +};
> +
> /* The virtqueue structure describes a queue attached to a device. */
> struct vhost_virtqueue {
> struct vhost_dev *dev;
> @@ -96,6 +101,13 @@ struct vhost_virtqueue {
> /* Log write descriptors */
> void __user *log_base;
> struct vhost_log log[VHOST_NET_MAX_SG];
> + /*Differiate async socket for 0-copy from normal*/
> + enum vhost_vq_link_state link_state;
> + int head;
> + int _log;
> + struct list_head notifier;
> + spinlock_t notify_lock;
> + void (*receiver)(struct vhost_virtqueue *);
> };
>
> struct vhost_dev {
> --
> 1.5.4.4
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
From: Xin, Xiaohui on
>> Michael,
>> I don't use the kiocb comes from the sendmsg/recvmsg,
> >since I have embeded the kiocb in page_info structure,
> >and allocate it when page_info allocated.

>So what I suggested was that vhost allocates and tracks the iocbs, and
>passes them to your device with sendmsg/ recvmsg calls. This way your
>device won't need to share structures and locking strategy with vhost:
>you get an iocb, handle it, invoke a callback to notify vhost about
>completion.

>This also gets rid of the 'receiver' callback

I'm not sure receiver callback can be removed here:
The patch describes a work flow like this:
netif_receive_skb() gets the packet, it does nothing but just queue the skb
and wakeup the handle_rx() of vhost. handle_rx() then calls the receiver callback
to deal with skb and and get the necessary notify info into a list, vhost owns the
list and in the same handle_rx() context use it to complete.

We use "receiver" callback here is because only handle_rx() is waked up from
netif_receive_skb(), and we need mp device context to deal with the skb and
notify info attached to it. We also have some lock in the callback function.

If I remove the receiver callback, I can only deal with the skb and notify
info in netif_receive_skb(), but this function is in an interrupt context,
which I think lock is not allowed there. But I cannot remove the lock there.


>> Please have a review and thanks for the instruction
>> for replying email which helps me a lot.
>>
> >Thanks,
> >Xiaohui
> >
> > drivers/vhost/net.c | 159 +++++++++++++++++++++++++++++++++++++++++++++++--
>> drivers/vhost/vhost.h | 12 ++++
>> 2 files changed, 166 insertions(+), 5 deletions(-)
>>
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> >index 22d5fef..5483848 100644
> >--- a/drivers/vhost/net.c
> >+++ b/drivers/vhost/net.c
> >@@ -17,11 +17,13 @@
> > #include <linux/workqueue.h>
> > #include <linux/rcupdate.h>
> > #include <linux/file.h>
> >+#include <linux/aio.h>
> >
> > #include <linux/net.h>
> > #include <linux/if_packet.h>
> > #include <linux/if_arp.h>
> > #include <linux/if_tun.h>
> >+#include <linux/mpassthru.h>
> >
> > #include <net/sock.h>
> >
> >@@ -91,6 +93,12 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock)
> > net->tx_poll_state = VHOST_NET_POLL_STARTED;
> > }
> >
> >+static void handle_async_rx_events_notify(struct vhost_net *net,
> >+ struct vhost_virtqueue *vq);
> >+
> >+static void handle_async_tx_events_notify(struct vhost_net *net,
> >+ struct vhost_virtqueue *vq);
> >+

>A couple of style comments:
>
>- It's better to arrange functions in such order that forward declarations
>aren't necessary. Since we don't have recursion, this should always be
>possible.

>- continuation lines should be idented at least at the position of '('
>on the previous line.

Thanks. I'd correct that.

>> /* Expects to be always run from workqueue - which acts as
>> * read-size critical section for our kind of RCU. */
>> static void handle_tx(struct vhost_net *net)
>> @@ -124,6 +132,8 @@ static void handle_tx(struct vhost_net *net)
>> tx_poll_stop(net);
>> hdr_size = vq->hdr_size;
>>
>> + handle_async_tx_events_notify(net, vq);
> >+
>> for (;;) {
>> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
>> ARRAY_SIZE(vq->iov),
> >@@ -151,6 +161,12 @@ static void handle_tx(struct vhost_net *net)
>> /* Skip header. TODO: support TSO. */
>> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out);
>> msg.msg_iovlen = out;
> >+
> >+ if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> >+ vq->head = head;
> >+ msg.msg_control = (void *)vq;

>So here a device gets a pointer to vhost_virtqueue structure. If it gets
>an iocb and invokes a callback, it would not care about vhost internals.

>> + }
>> +
>> len = iov_length(vq->iov, out);
>> /* Sanity check */
>> if (!len) {
>> @@ -166,6 +182,10 @@ static void handle_tx(struct vhost_net *net)
>> tx_poll_start(net, sock);
>> break;
>> }
>> +
>> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
>> + continue;
>>+
>> if (err != len)
>> pr_err("Truncated TX packet: "
>> " len %d != %zd\n", err, len);
>> @@ -177,6 +197,8 @@ static void handle_tx(struct vhost_net *net)
>> }
>> }
>>
>> + handle_async_tx_events_notify(net, vq);
>> +
>> mutex_unlock(&vq->mutex);
>> unuse_mm(net->dev.mm);
>> }
>>@@ -206,7 +228,8 @@ static void handle_rx(struct vhost_net *net)
>> int err;
>> size_t hdr_size;
>> struct socket *sock = rcu_dereference(vq->private_data);
>> - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue))
>> + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) &&
>> + vq->link_state == VHOST_VQ_LINK_SYNC))
>> return;
>>
>> use_mm(net->dev.mm);
>> @@ -214,9 +237,18 @@ static void handle_rx(struct vhost_net *net)
>> vhost_disable_notify(vq);
>> hdr_size = vq->hdr_size;
>>
>> - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
>> + /* In async cases, for write logging, the simple way is to get
>> + * the log info always, and really logging is decided later.
>>+ * Thus, when logging enabled, we can get log, and when logging
>> + * disabled, we can get log disabled accordingly.
>> + */
>> +


>This adds overhead and might be one of the reasons
>your patch does not perform that well. A better way
>would be to flush outstanding requests or reread the vq
>when logging is enabled.

Since the guest may submit a lot of buffers and h/w have already used them
to allocate host skb, it's difficult to know how many and which one is the
outstanding request, it's not just only inside in notifier list or sk->receive_queue.

But what does reread mean?

> + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) |
> + (vq->link_state == VHOST_VQ_LINK_ASYNC) ?
> vq->log : NULL;
>
> + handle_async_rx_events_notify(net, vq);
> +
> for (;;) {
> head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
> ARRAY_SIZE(vq->iov),
> @@ -245,6 +277,11 @@ static void handle_rx(struct vhost_net *net)
> s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in);
> msg.msg_iovlen = in;
> len = iov_length(vq->iov, in);
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->head = head;
> + vq->_log = log;
> + msg.msg_control = (void *)vq;
> + }
> /* Sanity check */
> if (!len) {
> vq_err(vq, "Unexpected header len for RX: "
> @@ -259,6 +296,10 @@ static void handle_rx(struct vhost_net *net)
> vhost_discard_vq_desc(vq);
> break;
> }
> +
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC)
> + continue;
> +
> /* TODO: Should check and handle checksum. */
> if (err > len) {
> pr_err("Discarded truncated rx packet: "
> @@ -284,10 +325,85 @@ static void handle_rx(struct vhost_net *net)
> }
> }
>
> + handle_async_rx_events_notify(net, vq);
> +
> mutex_unlock(&vq->mutex);
> unuse_mm(net->dev.mm);
> }
>
> +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&vq->notify_lock, flags);
> + if (!list_empty(&vq->notifier)) {
> + iocb = list_first_entry(&vq->notifier,
> + struct kiocb, ki_list);
> + list_del(&iocb->ki_list);
> + }
> + spin_unlock_irqrestore(&vq->notify_lock, flags);
> + return iocb;
> +}
> +
> +static void handle_async_rx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + struct vhost_log *vq_log = NULL;
> + int rx_total_len = 0;
> + int log, size;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> + if (vq != &net->dev.vqs[VHOST_NET_VQ_RX])
> + return;
> +
> + if (vq->receiver)
> + vq->receiver(vq);
> + vq_log = unlikely(vhost_has_feature(
> + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL;
> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, iocb->ki_nbytes);
> + log = (int)iocb->ki_user_data;
> + size = iocb->ki_nbytes;
> + rx_total_len += iocb->ki_nbytes;
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + if (unlikely(vq_log))
> + vhost_log_write(vq, vq_log, log, size);
> + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> +static void handle_async_tx_events_notify(struct vhost_net *net,
> + struct vhost_virtqueue *vq)
> +{
> + struct kiocb *iocb = NULL;
> + int tx_total_len = 0;
> +
> + if (vq->link_state != VHOST_VQ_LINK_ASYNC)
> + return;
> + if (vq != &net->dev.vqs[VHOST_NET_VQ_TX])
> + return;
> +

Hard to see why the second check would be necessary

> + while ((iocb = notify_dequeue(vq)) != NULL) {
> + vhost_add_used_and_signal(&net->dev, vq,
> + iocb->ki_pos, 0);
> + tx_total_len += iocb->ki_nbytes;
> + if (iocb->ki_dtor)
> + iocb->ki_dtor(iocb);
> + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) {
> + vhost_poll_queue(&vq->poll);
> + break;
> + }
> + }
> +}
> +
> static void handle_tx_kick(struct work_struct *work)
> {
> struct vhost_virtqueue *vq;
> @@ -462,7 +578,19 @@ static struct socket *get_tun_socket(int fd)
> return sock;
> }
>
> -static struct socket *get_socket(int fd)
> +static struct socket *get_mp_socket(int fd)
> +{
> + struct file *file = fget(fd);
> + struct socket *sock;
> + if (!file)
> + return ERR_PTR(-EBADF);
> + sock = mp_get_socket(file);
> + if (IS_ERR(sock))
> + fput(file);
> + return sock;
> +}
> +
> +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd)
> {
> struct socket *sock;
> if (fd == -1)
> @@ -473,9 +601,26 @@ static struct socket *get_socket(int fd)
> sock = get_tun_socket(fd);
> if (!IS_ERR(sock))
> return sock;
> + sock = get_mp_socket(fd);
> + if (!IS_ERR(sock)) {
> + vq->link_state = VHOST_VQ_LINK_ASYNC;
> + return sock;
> + }
> return ERR_PTR(-ENOTSOCK);
> }
>
> +static void vhost_init_link_state(struct vhost_net *n, int index)
> +{
> + struct vhost_virtqueue *vq = n->vqs + index;
> +
> + WARN_ON(!mutex_is_locked(&vq->mutex));
> + if (vq->link_state == VHOST_VQ_LINK_ASYNC) {
> + vq->receiver = NULL;
> + INIT_LIST_HEAD(&vq->notifier);
> + spin_lock_init(&vq->notify_lock);
> + }
> +}
> +
> static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> {
> struct socket *sock, *oldsock;
> @@ -493,12 +638,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> }
> vq = n->vqs + index;
> mutex_lock(&vq->mutex);
> - sock = get_socket(fd);
> + vq->link_state = VHOST_VQ_LINK_SYNC;
> + sock = get_socket(vq, fd);
> if (IS_ERR(sock)) {
> r = PTR_ERR(sock);
> goto err;
> }
>
> + vhost_init_link_state(n, index);
> +
> /* start polling new socket */
> oldsock = vq->private_data;
> if (sock == oldsock)
> @@ -507,8 +655,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
> vhost_net_disable_vq(n, vq);
> rcu_assign_pointer(vq->private_data, sock);
> vhost_net_enable_vq(n, vq);
> - mutex_unlock(&vq->mutex);
> done:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> if (oldsock) {
> vhost_net_flush_vq(n, index);
> @@ -516,6 +664,7 @@ done:
> }
> return r;
> err:
> + mutex_unlock(&vq->mutex);
> mutex_unlock(&n->dev.mutex);
> return r;
> }
> diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h
> index d1f0453..297af1c 100644
> --- a/drivers/vhost/vhost.h
> +++ b/drivers/vhost/vhost.h
> @@ -43,6 +43,11 @@ struct vhost_log {
> u64 len;
> };
>
> +enum vhost_vq_link_state {
> + VHOST_VQ_LINK_SYNC = 0,
> + VHOST_VQ_LINK_ASYNC = 1,
> +};
> +
> /* The virtqueue structure describes a queue attached to a device. */
> struct vhost_virtqueue {
> struct vhost_dev *dev;
> @@ -96,6 +101,13 @@ struct vhost_virtqueue {
> /* Log write descriptors */
> void __user *log_base;
> struct vhost_log log[VHOST_NET_MAX_SG];
> + /*Differiate async socket for 0-copy from normal*/
> + enum vhost_vq_link_state link_state;
> + int head;
> + int _log;
> + struct list_head notifier;
> + spinlock_t notify_lock;
> + void (*receiver)(struct vhost_virtqueue *);
> };
>
> struct vhost_dev {
> --
> 1.5.4.4
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/