From: Mathieu Desnoyers on
Ring buffer backend, with page allocation, data read/write API, cpu hotplug
management.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
---
include/linux/ringbuffer/backend.h | 141 +++++
include/linux/ringbuffer/backend_internal.h | 418 +++++++++++++++
include/linux/ringbuffer/backend_types.h | 80 ++
lib/ringbuffer/Makefile | 1
lib/ringbuffer/ring_buffer_backend.c | 755 ++++++++++++++++++++++++++++
5 files changed, 1395 insertions(+)

Index: linux.trees.git/lib/ringbuffer/Makefile
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/Makefile 2010-07-09 18:09:01.000000000 -0400
@@ -0,0 +1 @@
+obj-y += ring_buffer_backend.o
Index: linux.trees.git/include/linux/ringbuffer/backend.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/backend.h 2010-07-09 18:11:39.000000000 -0400
@@ -0,0 +1,141 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_H
+#define _LINUX_RING_BUFFER_BACKEND_H
+
+/*
+ * linux/ringbuffer/backend.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Ring buffer backend (API).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ *
+ * Credits to Steven Rostedt for proposing to use an extra-subbuffer owned by
+ * the reader in flight recorder mode.
+ */
+
+#include <linux/types.h>
+#include <linux/sched.h>
+#include <linux/timer.h>
+#include <linux/wait.h>
+#include <linux/poll.h>
+#include <linux/list.h>
+#include <linux/fs.h>
+#include <linux/mm.h>
+
+/* Internal helpers */
+#include <linux/ringbuffer/backend_internal.h>
+#include <linux/ringbuffer/frontend_internal.h>
+
+/* Ring buffer backend API */
+
+/* Ring buffer backend access (read/write) */
+
+extern size_t ring_buffer_read(struct ring_buffer_backend *bufb,
+ size_t offset, void *dest, size_t len);
+
+extern int __ring_buffer_copy_to_user(struct ring_buffer_backend *bufb,
+ size_t offset, void __user *dest,
+ size_t len);
+
+extern int ring_buffer_read_cstr(struct ring_buffer_backend *bufb,
+ size_t offset, void *dest, size_t len);
+
+extern struct page **
+ring_buffer_read_get_page(struct ring_buffer_backend *bufb, size_t offset,
+ void ***virt);
+
+/*
+ * Return the address where a given offset is located.
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's never on a page boundary, it's safe to write directly to this address,
+ * as long as the write is never bigger than a page size.
+ */
+extern void *
+ring_buffer_offset_address(struct ring_buffer_backend *bufb,
+ size_t offset);
+extern void *
+ring_buffer_read_offset_address(struct ring_buffer_backend *bufb,
+ size_t offset);
+
+/**
+ * ring_buffer_write - write data to a buffer backend
+ * @config : ring buffer instance configuration
+ * @ctx: ring buffer context. (input arguments only)
+ * @src : source pointer to copy from
+ * @len : length of data to copy
+ *
+ * This function copies "len" bytes of data from a source pointer to a buffer
+ * backend, at the current context offset. This is more or less a buffer
+ * backend-specific memcpy() operation. Calls the slow path (_ring_buffer_write)
+ * if copy is crossing a page boundary.
+ */
+static inline
+void ring_buffer_write(const struct ring_buffer_config *config,
+ struct ring_buffer_ctx *ctx,
+ const void *src, size_t len)
+{
+ struct ring_buffer_backend *bufb = &ctx->buf->backend;
+ struct channel_backend *chanb = &ctx->chan->backend;
+ size_t sbidx, index;
+ size_t offset = ctx->buf_offset;
+ ssize_t pagecpy;
+ struct ring_buffer_backend_pages *rpages;
+ unsigned long sb_bindex, id;
+
+ offset &= chanb->buf_size - 1;
+ sbidx = offset >> chanb->subbuf_size_order;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
+ id = bufb->buf_wsb[sbidx].id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(ctx->chan,
+ config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ if (likely(pagecpy == len))
+ ring_buffer_do_copy(config,
+ rpages->p[index].virt
+ + (offset & ~PAGE_MASK),
+ src, len);
+ else
+ _ring_buffer_write(bufb, offset, src, len, 0);
+ ctx->buf_offset += len;
+}
+
+/*
+ * This accessor counts the number of unread records in a buffer.
+ * It only provides a consistent value if no reads not writes are performed
+ * concurrently.
+ */
+static inline
+unsigned long ring_buffer_get_records_unread(
+ const struct ring_buffer_config *config,
+ struct ring_buffer *buf)
+{
+ struct ring_buffer_backend *bufb = &buf->backend;
+ struct ring_buffer_backend_pages *pages;
+ unsigned long records_unread = 0, sb_bindex, id;
+ unsigned int i;
+
+ for (i = 0; i < bufb->chan->backend.num_subbuf; i++) {
+ id = bufb->buf_wsb[i].id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ pages = bufb->array[sb_bindex];
+ records_unread += v_read(config, &pages->records_unread);
+ }
+ if (config->mode == RING_BUFFER_OVERWRITE) {
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ pages = bufb->array[sb_bindex];
+ records_unread += v_read(config, &pages->records_unread);
+ }
+ return records_unread;
+}
+
+ssize_t ring_buffer_file_splice_read(struct file *in, loff_t *ppos,
+ struct pipe_inode_info *pipe, size_t len,
+ unsigned int flags);
+loff_t ring_buffer_no_llseek(struct file *file, loff_t offset, int origin);
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_H */
Index: linux.trees.git/include/linux/ringbuffer/backend_internal.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/backend_internal.h 2010-07-09 18:11:58.000000000 -0400
@@ -0,0 +1,418 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
+#define _LINUX_RING_BUFFER_BACKEND_INTERNAL_H
+
+/*
+ * linux/ringbuffer/backend_internal.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Ring buffer backend (internal helpers).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/config.h>
+#include <linux/ringbuffer/backend_types.h>
+#include <linux/ringbuffer/frontend_types.h>
+#include <linux/string.h>
+
+/* Ring buffer backend API presented to the frontend */
+
+/* Ring buffer and channel backend create/free */
+
+int ring_buffer_backend_create(struct ring_buffer_backend *bufb,
+ struct channel_backend *chan,
+ int cpu);
+void channel_backend_unregister_notifiers(struct channel_backend *chanb);
+void ring_buffer_backend_free(struct ring_buffer_backend *bufb);
+int channel_backend_init(struct channel_backend *chanb,
+ const char *name,
+ const struct ring_buffer_config *config,
+ void *priv, size_t subbuf_size,
+ size_t num_subbuf);
+void channel_backend_free(struct channel_backend *chanb);
+
+void ring_buffer_backend_reset(struct ring_buffer_backend *bufb);
+void channel_backend_reset(struct channel_backend *chanb);
+
+int ring_buffer_backend_init(void);
+void ring_buffer_backend_exit(void);
+
+extern void _ring_buffer_write(struct ring_buffer_backend *bufb,
+ size_t offset, const void *src, size_t len,
+ ssize_t pagecpy);
+
+/*
+ * Subbuffer ID bits for overwrite mode. Need to fit within a single word to be
+ * exchanged atomically.
+ *
+ * Top half word, except lowest bit, belongs to "offset", which is used to keep
+ * to count the produced buffers. For overwrite mode, this provides the
+ * consumer with the capacity to read subbuffers in order, handling the
+ * situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit
+ * systems) concurrently with a single execution of get_subbuf (between offset
+ * sampling and subbuffer ID exchange).
+ */
+
+#define HALF_ULONG_BITS (BITS_PER_LONG >> 1)
+
+#define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1)
+#define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT)
+#define SB_ID_OFFSET_MASK (~(SB_ID_OFFSET_COUNT - 1))
+/*
+ * Lowest bit of top word half belongs to noref. Used only for overwrite mode.
+ */
+#define SB_ID_NOREF_SHIFT (SB_ID_OFFSET_SHIFT - 1)
+#define SB_ID_NOREF_COUNT (1UL << SB_ID_NOREF_SHIFT)
+#define SB_ID_NOREF_MASK SB_ID_NOREF_COUNT
+/*
+ * In overwrite mode: lowest half of word is used for index.
+ * Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit.
+ * In producer-consumer mode: whole word used for index.
+ */
+#define SB_ID_INDEX_SHIFT 0
+#define SB_ID_INDEX_COUNT (1UL << SB_ID_INDEX_SHIFT)
+#define SB_ID_INDEX_MASK (SB_ID_NOREF_COUNT - 1)
+
+/*
+ * Construct the subbuffer id from offset, index and noref. Use only the index
+ * for producer-consumer mode (offset and noref are only used in overwrite
+ * mode).
+ */
+static inline
+unsigned long subbuffer_id(const struct ring_buffer_config *config,
+ unsigned long offset, unsigned long noref,
+ unsigned long index)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ return (offset << SB_ID_OFFSET_SHIFT)
+ | (noref << SB_ID_NOREF_SHIFT)
+ | index;
+ else
+ return index;
+}
+
+/*
+ * Compare offset with the offset contained within id. Return 1 if the offset
+ * bits are identical, else 0.
+ */
+static inline
+int subbuffer_id_compare_offset(const struct ring_buffer_config *config,
+ unsigned long id, unsigned long offset)
+{
+ return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT);
+}
+
+static inline
+unsigned long subbuffer_id_get_index(const struct ring_buffer_config *config,
+ unsigned long id)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ return id & SB_ID_INDEX_MASK;
+ else
+ return id;
+}
+
+static inline
+unsigned long subbuffer_id_is_noref(const struct ring_buffer_config *config,
+ unsigned long id)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ return !!(id & SB_ID_NOREF_MASK);
+ else
+ return 1;
+}
+
+/*
+ * Only used by reader on subbuffer ID it has exclusive access to. No volatile
+ * needed.
+ */
+static inline
+void subbuffer_id_set_noref(const struct ring_buffer_config *config,
+ unsigned long *id)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ *id |= SB_ID_NOREF_MASK;
+}
+
+static inline
+void subbuffer_id_set_noref_offset(const struct ring_buffer_config *config,
+ unsigned long *id, unsigned long offset)
+{
+ unsigned long tmp;
+
+ if (config->mode == RING_BUFFER_OVERWRITE) {
+ tmp = *id;
+ tmp &= ~SB_ID_OFFSET_MASK;
+ tmp |= offset << SB_ID_OFFSET_SHIFT;
+ tmp |= SB_ID_NOREF_MASK;
+ /* Volatile store, read concurrently by readers. */
+ ACCESS_ONCE(*id) = tmp;
+ }
+}
+
+/* No volatile access, since already used locally */
+static inline
+void subbuffer_id_clear_noref(const struct ring_buffer_config *config,
+ unsigned long *id)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ *id &= ~SB_ID_NOREF_MASK;
+}
+
+/*
+ * For overwrite mode, cap the number of subbuffers per buffer to:
+ * 2^16 on 32-bit architectures
+ * 2^32 on 64-bit architectures
+ * This is required to fit in the index part of the ID. Return 0 on success,
+ * -EPERM on failure.
+ */
+static inline
+int subbuffer_id_check_index(const struct ring_buffer_config *config,
+ unsigned long num_subbuf)
+{
+ if (config->mode == RING_BUFFER_OVERWRITE)
+ return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0;
+ else
+ return 0;
+}
+
+static inline
+void subbuffer_count_record(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx)
+{
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+ v_inc(config, &bufb->array[sb_bindex]->records_commit);
+}
+
+/*
+ * Reader has exclusive subbuffer access for record consumption. No need to
+ * perform the decrement atomically.
+ */
+static inline
+void subbuffer_consume_record(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb)
+{
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
+ CHAN_WARN_ON(bufb->chan,
+ !v_read(config, &bufb->array[sb_bindex]->records_unread));
+ /* Non-atomic decrement protected by exclusive subbuffer access */
+ _v_dec(config, &bufb->array[sb_bindex]->records_unread);
+ v_inc(config, &bufb->records_read);
+}
+
+static inline
+unsigned long subbuffer_get_records_count(
+ const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx)
+{
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+ return v_read(config, &bufb->array[sb_bindex]->records_commit);
+}
+
+/*
+ * Must be executed at subbuffer delivery when the writer has _exclusive_
+ * subbuffer access. See ring_buffer_check_deliver() for details.
+ * ring_buffer_get_records_count() must be called to get the records count
+ * before this function, because it resets the records_commit count.
+ */
+static inline
+unsigned long subbuffer_count_records_overrun(
+ const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx)
+{
+ struct ring_buffer_backend_pages *pages;
+ unsigned long overruns, sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+ pages = bufb->array[sb_bindex];
+ overruns = v_read(config, &pages->records_unread);
+ v_set(config, &pages->records_unread,
+ v_read(config, &pages->records_commit));
+ v_set(config, &pages->records_commit, 0);
+
+ return overruns;
+}
+
+static inline
+void subbuffer_set_data_size(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx,
+ unsigned long data_size)
+{
+ struct ring_buffer_backend_pages *pages;
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+ pages = bufb->array[sb_bindex];
+ pages->data_size = data_size;
+}
+
+static inline
+unsigned long subbuffer_get_read_data_size(
+ const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb)
+{
+ struct ring_buffer_backend_pages *pages;
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
+ pages = bufb->array[sb_bindex];
+ return pages->data_size;
+}
+
+static inline
+unsigned long subbuffer_get_data_size(
+ const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx)
+{
+ struct ring_buffer_backend_pages *pages;
+ unsigned long sb_bindex;
+
+ sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id);
+ pages = bufb->array[sb_bindex];
+ return pages->data_size;
+}
+
+/**
+ * ring_buffer_clear_noref - Clear the noref subbuffer flag, called by writer.
+ */
+static inline
+void ring_buffer_clear_noref(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx)
+{
+ unsigned long id, new_id;
+
+ if (config->mode != RING_BUFFER_OVERWRITE)
+ return;
+
+ /*
+ * Performing a volatile access to read the sb_pages, because we want to
+ * read a coherent version of the pointer and the associated noref flag.
+ */
+ id = ACCESS_ONCE(bufb->buf_wsb[idx].id);
+ for (;;) {
+ /* This check is called on the fast path for each record. */
+ if (likely(!subbuffer_id_is_noref(config, id))) {
+ /*
+ * Store after load dependency ordering the writes to
+ * the subbuffer after load and test of the noref flag
+ * matches the memory barrier implied by the cmpxchg()
+ * in update_read_sb_index().
+ */
+ return; /* Already writing to this buffer */
+ }
+ new_id = id;
+ subbuffer_id_clear_noref(config, &new_id);
+ new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
+ if (likely(new_id == id))
+ break;
+ id = new_id;
+ }
+}
+
+/**
+ * ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset,
+ * called by writer.
+ */
+static inline
+void ring_buffer_set_noref_offset(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ unsigned long idx,
+ unsigned long offset)
+{
+ if (config->mode != RING_BUFFER_OVERWRITE)
+ return;
+
+ /*
+ * Because ring_buffer_set_noref() is only called by a single thread
+ * (the one which updated the cc_sb value), there are no concurrent
+ * updates to take care of: other writers have not updated cc_sb, so
+ * they cannot set the noref flag, and concurrent readers cannot modify
+ * the pointer because the noref flag is not set yet.
+ * The smp_wmb() in ring_buffer_commit() takes care of ordering writes
+ * to the subbuffer before this set noref operation.
+ * subbuffer_set_noref() uses a volatile store to deal with concurrent
+ * readers of the noref flag.
+ */
+ CHAN_WARN_ON(bufb->chan,
+ subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id));
+ /*
+ * Memory barrier that ensures counter stores are ordered before set
+ * noref and offset.
+ */
+ smp_mb();
+ subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset);
+}
+
+/**
+ * update_read_sb_index - Read-side subbuffer index update.
+ */
+static inline
+int update_read_sb_index(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ struct channel_backend *chanb,
+ unsigned long consumed_idx,
+ unsigned long consumed_count)
+{
+ unsigned long old_id, new_id;
+
+ if (config->mode == RING_BUFFER_OVERWRITE) {
+ /*
+ * Exchange the target writer subbuffer with our own unused
+ * subbuffer. No need to use ACCESS_ONCE() here to read the
+ * old_wpage, because the value read will be confirmed by the
+ * following cmpxchg().
+ */
+ old_id = bufb->buf_wsb[consumed_idx].id;
+ if (unlikely(!subbuffer_id_is_noref(config, old_id)))
+ return -EAGAIN;
+ /*
+ * Make sure the offset count we are expecting matches the one
+ * indicated by the writer.
+ */
+ if (unlikely(!subbuffer_id_compare_offset(config, old_id,
+ consumed_count)))
+ return -EAGAIN;
+ CHAN_WARN_ON(bufb->chan,
+ !subbuffer_id_is_noref(config, bufb->buf_rsb.id));
+ new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
+ bufb->buf_rsb.id);
+ if (unlikely(old_id != new_id))
+ return -EAGAIN;
+ bufb->buf_rsb.id = new_id;
+ subbuffer_id_clear_noref(config, &bufb->buf_rsb.id);
+ } else {
+ /* No page exchange, use the writer page directly */
+ bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id;
+ subbuffer_id_clear_noref(config, &bufb->buf_rsb.id);
+ }
+ return 0;
+}
+
+/*
+ * Use the architecture-specific memcpy implementation for constant-sized
+ * inputs, but rely on an inline memcpy for length statically unknown.
+ * The function call to memcpy is just way too expensive for a fast path.
+ */
+#define ring_buffer_do_copy(config, dest, src, len) \
+do { \
+ size_t __len = (len); \
+ if (__builtin_constant_p(len)) \
+ memcpy((dest), (src), __len); \
+ else \
+ inline_memcpy((dest), (src), __len); \
+} while (0)
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_INTERNAL_H */
Index: linux.trees.git/lib/ringbuffer/ring_buffer_backend.c
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_backend.c 2010-07-09 18:13:38.000000000 -0400
@@ -0,0 +1,755 @@
+/*
+ * ring_buffer_backend.c
+ *
+ * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/vmalloc.h>
+#include <linux/stddef.h>
+#include <linux/module.h>
+#include <linux/string.h>
+#include <linux/bitops.h>
+#include <linux/delay.h>
+#include <linux/errno.h>
+#include <linux/slab.h>
+#include <linux/cpu.h>
+#include <linux/mm.h>
+
+#include <linux/ringbuffer/config.h>
+#include <linux/ringbuffer/backend.h>
+#include <linux/ringbuffer/frontend.h>
+
+/**
+ * ring_buffer_backend_allocate - allocate a channel buffer
+ * @config: ring buffer instance configuration
+ * @buf: the buffer struct
+ * @size: total size of the buffer
+ * @num_subbuf: number of subbuffers
+ * @extra_reader_sb: need extra subbuffer for reader
+ */
+static
+int ring_buffer_backend_allocate(const struct ring_buffer_config *config,
+ struct ring_buffer_backend *bufb,
+ size_t size, size_t num_subbuf,
+ int extra_reader_sb)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
+ unsigned long subbuf_size, mmap_offset = 0;
+ unsigned long num_subbuf_alloc;
+ struct page **pages;
+ void **virt;
+ unsigned long i;
+
+ num_pages = size >> PAGE_SHIFT;
+ num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
+ subbuf_size = chanb->subbuf_size;
+ num_subbuf_alloc = num_subbuf;
+
+ if (extra_reader_sb) {
+ num_pages += num_pages_per_subbuf; /* Add pages for reader */
+ num_subbuf_alloc++;
+ }
+
+ pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ if (unlikely(!pages))
+ goto pages_error;
+
+ virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ if (unlikely(!virt))
+ goto virt_error;
+
+ bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
+ * num_subbuf_alloc,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ if (unlikely(!bufb->array))
+ goto array_error;
+
+ for (i = 0; i < num_pages; i++) {
+ pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
+ GFP_KERNEL | __GFP_ZERO, 0);
+ if (unlikely(!pages[i]))
+ goto depopulate;
+ virt[i] = page_address(pages[i]);
+ }
+ bufb->num_pages_per_subbuf = num_pages_per_subbuf;
+
+ /* Allocate backend pages array elements */
+ for (i = 0; i < num_subbuf_alloc; i++) {
+ bufb->array[i] =
+ kzalloc_node(ALIGN(
+ sizeof(struct ring_buffer_backend_pages) +
+ sizeof(struct ring_buffer_backend_page)
+ * num_pages_per_subbuf,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ if (!bufb->array[i])
+ goto free_array;
+ }
+
+ /* Allocate write-side subbuffer table */
+ bufb->buf_wsb = kzalloc_node(ALIGN(
+ sizeof(struct ring_buffer_backend_subbuffer)
+ * num_subbuf,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ if (unlikely(!bufb->buf_wsb))
+ goto free_array;
+
+ for (i = 0; i < num_subbuf; i++)
+ bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
+
+ /* Assign read-side subbuffer table */
+ if (extra_reader_sb)
+ bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
+ num_subbuf_alloc - 1);
+ else
+ bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
+
+ /* Assign pages to page index */
+ for (i = 0; i < num_subbuf_alloc; i++) {
+ for (j = 0; j < num_pages_per_subbuf; j++) {
+ CHAN_WARN_ON(chanb, page_idx > num_pages);
+ bufb->array[i]->p[j].virt = virt[page_idx];
+ bufb->array[i]->p[j].page = pages[page_idx];
+ page_idx++;
+ }
+ if (config->output == RING_BUFFER_MMAP) {
+ bufb->array[i]->mmap_offset = mmap_offset;
+ mmap_offset += subbuf_size;
+ }
+ }
+
+ /*
+ * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
+ * will not fault.
+ */
+ vmalloc_sync_all();
+ kfree(virt);
+ kfree(pages);
+ return 0;
+
+free_array:
+ for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
+ kfree(bufb->array[i]);
+depopulate:
+ /* Free all allocated pages */
+ for (i = 0; (i < num_pages && pages[i]); i++)
+ __free_page(pages[i]);
+ kfree(bufb->array);
+array_error:
+ kfree(virt);
+virt_error:
+ kfree(pages);
+pages_error:
+ return -ENOMEM;
+}
+
+int ring_buffer_backend_create(struct ring_buffer_backend *bufb,
+ struct channel_backend *chanb,
+ int cpu)
+{
+ const struct ring_buffer_config *config = chanb->config;
+
+ bufb->chan = container_of(chanb, struct channel, backend);
+ bufb->cpu = cpu;
+
+ return ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
+ chanb->num_subbuf,
+ chanb->extra_reader_sb);
+}
+
+void ring_buffer_backend_free(struct ring_buffer_backend *bufb)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ unsigned long i, j, num_subbuf_alloc;
+
+ num_subbuf_alloc = chanb->num_subbuf;
+ if (chanb->extra_reader_sb)
+ num_subbuf_alloc++;
+
+ kfree(bufb->buf_wsb);
+ for (i = 0; i < num_subbuf_alloc; i++) {
+ for (j = 0; j < bufb->num_pages_per_subbuf; j++)
+ __free_page(bufb->array[i]->p[j].page);
+ kfree(bufb->array[i]);
+ }
+ kfree(bufb->array);
+ bufb->allocated = 0;
+}
+
+void ring_buffer_backend_reset(struct ring_buffer_backend *bufb)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ unsigned long num_subbuf_alloc;
+ unsigned int i;
+
+ num_subbuf_alloc = chanb->num_subbuf;
+ if (chanb->extra_reader_sb)
+ num_subbuf_alloc++;
+
+ for (i = 0; i < chanb->num_subbuf; i++)
+ bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
+ if (chanb->extra_reader_sb)
+ bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
+ num_subbuf_alloc - 1);
+ else
+ bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
+
+ for (i = 0; i < num_subbuf_alloc; i++) {
+ /* Don't reset mmap_offset */
+ v_set(config, &bufb->array[i]->records_commit, 0);
+ v_set(config, &bufb->array[i]->records_unread, 0);
+ bufb->array[i]->data_size = 0;
+ /* Don't reset backend page and virt addresses */
+ }
+ /* Don't reset num_pages_per_subbuf, cpu, allocated */
+ v_set(config, &bufb->records_read, 0);
+}
+
+/*
+ * The frontend is responsible for also calling ring_buffer_backend_reset for
+ * each buffer when calling channel_backend_reset.
+ */
+void channel_backend_reset(struct channel_backend *chanb)
+{
+ struct channel *chan = container_of(chanb, struct channel, backend);
+ const struct ring_buffer_config *config = chanb->config;
+
+ /*
+ * Don't reset buf_size, subbuf_size, subbuf_size_order,
+ * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
+ * priv, notifiers, config, cpumask and name.
+ */
+ chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+/**
+ * ring_buffer_cpu_hp_callback - CPU hotplug callback
+ * @nb: notifier block
+ * @action: hotplug action to take
+ * @hcpu: CPU number
+ *
+ * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
+ */
+static
+int __cpuinit ring_buffer_cpu_hp_callback(struct notifier_block *nb,
+ unsigned long action,
+ void *hcpu)
+{
+ unsigned int cpu = (unsigned long)hcpu;
+ struct channel_backend *chanb = container_of(nb, struct channel_backend,
+ cpu_hp_notifier);
+ const struct ring_buffer_config *config = chanb->config;
+ struct ring_buffer *buf;
+ int ret;
+
+ CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ switch (action) {
+ case CPU_UP_PREPARE:
+ case CPU_UP_PREPARE_FROZEN:
+ buf = per_cpu_ptr(chanb->buf, cpu);
+ ret = ring_buffer_create(buf, chanb, cpu);
+ if (ret) {
+ printk(KERN_ERR
+ "ring_buffer_cpu_hp_callback: cpu %d "
+ "buffer creation failed\n", cpu);
+ return NOTIFY_BAD;
+ }
+ break;
+ case CPU_DEAD:
+ case CPU_DEAD_FROZEN:
+ /* No need to do a buffer switch here, because it will happen
+ * when tracing is stopped, or will be done by switch timer CPU
+ * DEAD callback. */
+ break;
+ }
+ return NOTIFY_OK;
+}
+#endif
+
+/**
+ * channel_backend_init - initialize a channel backend
+ * @chanb: channel backend
+ * @name: channel name
+ * @config: client ring buffer configuration
+ * @priv: client private data
+ * @parent: dentry of parent directory, %NULL for root directory
+ * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
+ * @num_subbuf: number of sub-buffers (power of 2)
+ *
+ * Returns channel pointer if successful, %NULL otherwise.
+ *
+ * Creates per-cpu channel buffers using the sizes and attributes
+ * specified. The created channel buffer files will be named
+ * name_0...name_N-1. File permissions will be %S_IRUSR.
+ *
+ * Called with CPU hotplug disabled.
+ */
+int channel_backend_init(struct channel_backend *chanb,
+ const char *name,
+ const struct ring_buffer_config *config,
+ void *priv, size_t subbuf_size,
+ size_t num_subbuf)
+{
+ struct channel *chan = container_of(chanb, struct channel, backend);
+ unsigned int i;
+ int ret;
+
+ if (!name)
+ return -EPERM;
+
+ if (!(subbuf_size && num_subbuf))
+ return -EPERM;
+
+ /* Check that the subbuffer size is larger than a page. */
+ CHAN_WARN_ON(chanb, subbuf_size < PAGE_SIZE);
+
+ /*
+ * Make sure the number of subbuffers and subbuffer size are power of 2.
+ */
+ CHAN_WARN_ON(chanb, hweight32(subbuf_size) != 1);
+ CHAN_WARN_ON(chanb, hweight32(num_subbuf) != 1);
+
+ ret = subbuffer_id_check_index(config, num_subbuf);
+ if (ret)
+ return ret;
+
+ chanb->priv = priv;
+ chanb->buf_size = num_subbuf * subbuf_size;
+ chanb->subbuf_size = subbuf_size;
+ chanb->buf_size_order = get_count_order(chanb->buf_size);
+ chanb->subbuf_size_order = get_count_order(subbuf_size);
+ chanb->num_subbuf_order = get_count_order(num_subbuf);
+ chanb->extra_reader_sb =
+ (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
+ chanb->num_subbuf = num_subbuf;
+ strlcpy(chanb->name, name, NAME_MAX);
+ chanb->config = config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
+ return -ENOMEM;
+ }
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ /* Allocating the buffer per-cpu structures */
+ chanb->buf = alloc_percpu(struct ring_buffer);
+ if (!chanb->buf)
+ goto free_cpumask;
+
+ /*
+ * In case of non-hotplug cpu, if the ring-buffer is allocated
+ * in early initcall, it will not be notified of secondary cpus.
+ * In that off case, we need to allocate for all possible cpus.
+ */
+#ifdef CONFIG_HOTPLUG_CPU
+ /*
+ * buf->backend.allocated test takes care of concurrent CPU
+ * hotplug.
+ * Priority higher than frontend, so we create the ring buffer
+ * before we start the timer.
+ */
+ chanb->cpu_hp_notifier.notifier_call =
+ ring_buffer_cpu_hp_callback;
+ chanb->cpu_hp_notifier.priority = 5;
+ register_hotcpu_notifier(&chanb->cpu_hp_notifier);
+
+ get_online_cpus();
+ for_each_online_cpu(i) {
+ ret = ring_buffer_create(per_cpu_ptr(chanb->buf, i),
+ chanb, i);
+ if (ret)
+ goto free_bufs; /* cpu hotplug locked */
+ }
+ put_online_cpus();
+#else
+ for_each_possible_cpu(i) {
+ ret = ring_buffer_create(per_cpu_ptr(chanb->buf, i),
+ chanb, i);
+ if (ret)
+ goto free_bufs; /* cpu hotplug locked */
+ }
+#endif
+ } else {
+ chanb->buf = kzalloc(sizeof(struct ring_buffer), GFP_KERNEL);
+ if (!chanb->buf)
+ goto free_cpumask;
+ ret = ring_buffer_create(chanb->buf, chanb, -1);
+ if (ret)
+ goto free_bufs;
+ }
+ chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
+
+ return 0;
+
+free_bufs:
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ for_each_possible_cpu(i) {
+ struct ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
+
+ if (!buf->backend.allocated)
+ continue;
+ ring_buffer_free(buf);
+ }
+#ifdef CONFIG_HOTPLUG_CPU
+ put_online_cpus();
+#endif
+ free_percpu(chanb->buf);
+ } else
+ kfree(chanb->buf);
+free_cpumask:
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ free_cpumask_var(chanb->cpumask);
+ return -ENOMEM;
+}
+
+/**
+ * channel_backend_unregister_notifiers - unregister notifiers
+ * @chan: the channel
+ *
+ * Holds CPU hotplug.
+ */
+void channel_backend_unregister_notifiers(struct channel_backend *chanb)
+{
+ const struct ring_buffer_config *config = chanb->config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
+}
+
+/**
+ * channel_backend_free - destroy the channel
+ * @chan: the channel
+ *
+ * Destroy all channel buffers and frees the channel.
+ */
+void channel_backend_free(struct channel_backend *chanb)
+{
+ const struct ring_buffer_config *config = chanb->config;
+ unsigned int i;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ for_each_possible_cpu(i) {
+ struct ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
+
+ if (!buf->backend.allocated)
+ continue;
+ ring_buffer_free(buf);
+ }
+ free_cpumask_var(chanb->cpumask);
+ free_percpu(chanb->buf);
+ } else {
+ struct ring_buffer *buf = chanb->buf;
+
+ CHAN_WARN_ON(chanb, !buf->backend.allocated);
+ ring_buffer_free(buf);
+ kfree(buf);
+ }
+}
+
+/**
+ * ring_buffer_write - write data to a ring_buffer buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @src : source address
+ * @len : length to write
+ * @pagecpy : page size copied so far
+ */
+void _ring_buffer_write(struct ring_buffer_backend *bufb, size_t offset,
+ const void *src, size_t len, ssize_t pagecpy)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ size_t sbidx, index;
+ struct ring_buffer_backend_pages *rpages;
+ unsigned long sb_bindex, id;
+
+ do {
+ len -= pagecpy;
+ src += pagecpy;
+ offset += pagecpy;
+ sbidx = offset >> chanb->subbuf_size_order;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+
+ /*
+ * Underlying layer should never ask for writes across
+ * subbuffers.
+ */
+ CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+
+ pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+ id = bufb->buf_wsb[sbidx].id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ ring_buffer_do_copy(config,
+ rpages->p[index].virt
+ + (offset & ~PAGE_MASK),
+ src, pagecpy);
+ } while (unlikely(len != pagecpy));
+}
+EXPORT_SYMBOL_GPL(_ring_buffer_write);
+
+/**
+ * ring_buffer_read - read data from ring_buffer_buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination address
+ * @len : length to copy to destination
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * Returns the length copied.
+ */
+size_t ring_buffer_read(struct ring_buffer_backend *bufb, size_t offset,
+ void *dest, size_t len)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ size_t index;
+ ssize_t pagecpy, orig_len;
+ struct ring_buffer_backend_pages *rpages;
+ unsigned long sb_bindex, id;
+
+ orig_len = len;
+ offset &= chanb->buf_size - 1;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ if (unlikely(!len))
+ return 0;
+ for (;;) {
+ pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
+ pagecpy);
+ len -= pagecpy;
+ if (likely(!len))
+ break;
+ dest += pagecpy;
+ offset += pagecpy;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ /*
+ * Underlying layer should never ask for reads across
+ * subbuffers.
+ */
+ CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+ }
+ return orig_len;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read);
+
+/**
+ * ring_buffer_copy_to_user - read data from ring_buffer_buffer to userspace
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination userspace address
+ * @len : length to copy to destination
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * access_ok() must have been performed on dest addresses prior to call this
+ * function.
+ * Returns -EFAULT on error, 0 if ok.
+ */
+int __ring_buffer_copy_to_user(struct ring_buffer_backend *bufb,
+ size_t offset, void __user *dest, size_t len)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ size_t index;
+ ssize_t pagecpy, orig_len;
+ struct ring_buffer_backend_pages *rpages;
+ unsigned long sb_bindex, id;
+
+ orig_len = len;
+ offset &= chanb->buf_size - 1;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ if (unlikely(!len))
+ return 0;
+ for (;;) {
+ pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ if (__copy_to_user(dest,
+ rpages->p[index].virt + (offset & ~PAGE_MASK),
+ pagecpy))
+ return -EFAULT;
+ len -= pagecpy;
+ if (likely(!len))
+ break;
+ dest += pagecpy;
+ offset += pagecpy;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ /*
+ * Underlying layer should never ask for reads across
+ * subbuffers.
+ */
+ CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+ }
+ return 0;
+}
+EXPORT_SYMBOL_GPL(__ring_buffer_copy_to_user);
+
+/**
+ * ring_buffer_read_cstr - read a C-style string from ring_buffer_buffer.
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @dest : destination address
+ * @len : destination's length
+ *
+ * return string's length
+ * Should be protected by get_subbuf/put_subbuf.
+ */
+int ring_buffer_read_cstr(struct ring_buffer_backend *bufb, size_t offset,
+ void *dest, size_t len)
+{
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ size_t index;
+ ssize_t pagecpy, pagelen, strpagelen, orig_offset;
+ char *str;
+ struct ring_buffer_backend_pages *rpages;
+ unsigned long sb_bindex, id;
+
+ offset &= chanb->buf_size - 1;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ orig_offset = offset;
+ for (;;) {
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
+ pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
+ strpagelen = strnlen(str, pagelen);
+ if (len) {
+ pagecpy = min_t(size_t, len, strpagelen);
+ if (dest) {
+ memcpy(dest, str, pagecpy);
+ dest += pagecpy;
+ }
+ len -= pagecpy;
+ }
+ offset += strpagelen;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ if (strpagelen < pagelen)
+ break;
+ /*
+ * Underlying layer should never ask for reads across
+ * subbuffers.
+ */
+ CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
+ }
+ if (dest && len)
+ ((char *)dest)[0] = 0;
+ return offset - orig_offset;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_cstr);
+
+/**
+ * ring_buffer_read_get_page - Get a whole page to read from
+ * @bufb : buffer backend
+ * @offset : offset within the buffer
+ * @virt : pointer to page address (output)
+ *
+ * Should be protected by get_subbuf/put_subbuf.
+ * Returns the pointer to the page struct pointer.
+ */
+struct page **ring_buffer_read_get_page(struct ring_buffer_backend *bufb,
+ size_t offset, void ***virt)
+{
+ size_t index;
+ struct ring_buffer_backend_pages *rpages;
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ unsigned long sb_bindex, id;
+
+ offset &= chanb->buf_size - 1;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ *virt = &rpages->p[index].virt;
+ return &rpages->p[index].page;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_get_page);
+
+/**
+ * ring_buffer_read_offset_address - get address of a location within the buffer
+ * @bufb : buffer backend
+ * @offset : offset within the buffer.
+ *
+ * Return the address where a given offset is located (for read).
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's never on a page boundary, it's safe to write directly to this address,
+ * as long as the write is never bigger than a page size.
+ */
+void *ring_buffer_read_offset_address(struct ring_buffer_backend *bufb,
+ size_t offset)
+{
+ size_t index;
+ struct ring_buffer_backend_pages *rpages;
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ unsigned long sb_bindex, id;
+
+ offset &= chanb->buf_size - 1;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ id = bufb->buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ return rpages->p[index].virt + (offset & ~PAGE_MASK);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_offset_address);
+
+/**
+ * ring_buffer_offset_address - get address of a location within the buffer
+ * @bufb : buffer backend
+ * @offset : offset within the buffer.
+ *
+ * Return the address where a given offset is located.
+ * Should be used to get the current subbuffer header pointer. Given we know
+ * it's always at the beginning of a page, it's safe to write directly to this
+ * address, as long as the write is never bigger than a page size.
+ */
+void *ring_buffer_offset_address(struct ring_buffer_backend *bufb,
+ size_t offset)
+{
+ size_t sbidx, index;
+ struct ring_buffer_backend_pages *rpages;
+ struct channel_backend *chanb = &bufb->chan->backend;
+ const struct ring_buffer_config *config = chanb->config;
+ unsigned long sb_bindex, id;
+
+ offset &= chanb->buf_size - 1;
+ sbidx = offset >> chanb->subbuf_size_order;
+ index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ id = bufb->buf_wsb[sbidx].id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ rpages = bufb->array[sb_bindex];
+ CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
+ && subbuffer_id_is_noref(config, id));
+ return rpages->p[index].virt + (offset & ~PAGE_MASK);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_offset_address);
Index: linux.trees.git/include/linux/ringbuffer/backend_types.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/backend_types.h 2010-07-09 18:09:01.000000000 -0400
@@ -0,0 +1,80 @@
+#ifndef _LINUX_RING_BUFFER_BACKEND_TYPES_H
+#define _LINUX_RING_BUFFER_BACKEND_TYPES_H
+
+/*
+ * linux/ringbuffer/backend_types.h
+ *
+ * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Ring buffer backend (types).
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/cpumask.h>
+#include <linux/types.h>
+
+struct ring_buffer_backend_page {
+ void *virt; /* page virtual address (cached) */
+ struct page *page; /* pointer to page structure */
+};
+
+struct ring_buffer_backend_pages {
+ unsigned long mmap_offset; /* offset of the subbuffer in mmap */
+ union v_atomic records_commit; /* current records committed count */
+ union v_atomic records_unread; /* records to read */
+ unsigned long data_size; /* Amount of data to read from subbuf */
+ struct ring_buffer_backend_page p[];
+};
+
+struct ring_buffer_backend_subbuffer {
+ /* Identifier for subbuf backend pages. Exchanged atomically. */
+ unsigned long id; /* backend subbuffer identifier */
+};
+
+/*
+ * Forward declaration of frontend-specific channel and ring_buffer.
+ */
+struct channel;
+struct ring_buffer;
+
+struct ring_buffer_backend {
+ /* Array of ring_buffer_backend_subbuffer for writer */
+ struct ring_buffer_backend_subbuffer *buf_wsb;
+ /* ring_buffer_backend_subbuffer for reader */
+ struct ring_buffer_backend_subbuffer buf_rsb;
+ /*
+ * Pointer array of backend pages, for whole buffer.
+ * Indexed by ring_buffer_backend_subbuffer identifier (id) index.
+ */
+ struct ring_buffer_backend_pages **array;
+ unsigned int num_pages_per_subbuf;
+
+ struct channel *chan; /* Associated channel */
+ int cpu; /* This buffer's cpu. -1 if global. */
+ union v_atomic records_read; /* Number of records read */
+ unsigned int allocated:1; /* Bool: is buffer allocated ? */
+};
+
+struct channel_backend {
+ unsigned long buf_size; /* Size of the buffer */
+ unsigned long subbuf_size; /* Sub-buffer size */
+ unsigned int subbuf_size_order; /* Order of sub-buffer size */
+ unsigned int num_subbuf_order; /*
+ * Order of number of sub-buffers/buffer
+ * for writer.
+ */
+ unsigned int buf_size_order; /* Order of buffer size */
+ int extra_reader_sb:1; /* Bool: has extra reader subbuffer */
+ struct ring_buffer *buf; /* Channel per-cpu buffers */
+
+ unsigned long num_subbuf; /* Number of sub-buffers for writer */
+ u64 start_tsc; /* Channel creation TSC value */
+ void *priv; /* Client-specific information */
+ struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
+ const struct ring_buffer_config *config; /* Ring buffer configuration */
+ cpumask_var_t cpumask; /* Allocated per-cpu buffers cpumask */
+ char name[NAME_MAX]; /* Channel name */
+};
+
+#endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/