From: Mathieu Desnoyers on
Implements per-cpu-local iterator and channel-wide iterator. Implements a read()
file operation based on these iterators. These iterators or the read() file
operation can be used by ring buffer clients.

The channel-wide iterator implements timestamp-ordered fusion merge of per-cpu
channels using a priority heap.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
---
include/linux/ringbuffer/iterator.h | 70 ++
lib/ringbuffer/Makefile | 1
lib/ringbuffer/ring_buffer_iterator.c | 795 ++++++++++++++++++++++++++++++++++
3 files changed, 866 insertions(+)

Index: linux.trees.git/lib/ringbuffer/ring_buffer_iterator.c
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_iterator.c 2010-07-09 18:35:12.000000000 -0400
@@ -0,0 +1,795 @@
+/*
+ * ring_buffer_iterator.c
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Ring buffer and channel iterators. Get each event of a channel in order. Uses
+ * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
+ * complexity for the "get next event" operation.
+ *
+ * Author:
+ * Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/jiffies.h>
+#include <linux/module.h>
+
+/*
+ * Safety factor taking into account internal kernel interrupt latency.
+ * Assuming 250ms worse-case latency.
+ */
+#define MAX_SYSTEM_LATENCY 250
+
+/*
+ * Maximum delta expected between trace clocks. At most 1 jiffy delta.
+ */
+#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
+
+/**
+ * ring_buffer_get_next_record - Get the next record in a buffer.
+ * @chan: channel
+ * @buf: buffer
+ *
+ * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
+ * buffer is empty and finalized. The buffer must already be opened for reading.
+ */
+ssize_t ring_buffer_get_next_record(struct channel *chan,
+ struct ring_buffer *buf)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer_iter *iter = &buf->iter;
+ int ret;
+
+restart:
+ switch (iter->state) {
+ case ITER_GET_SUBBUF:
+ ret = ring_buffer_get_subbuf(buf, &iter->consumed);
+ if (ret && !ACCESS_ONCE(buf->finalized)
+ && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+ /*
+ * Use "pull" scheme for global buffers. The reader
+ * itself flushes the buffer to "pull" data not visible
+ * to readers yet. Flush current subbuffer and re-try.
+ *
+ * Per-CPU buffers rather use a "push" scheme because
+ * the IPI needed to flush all CPU's buffers is too
+ * costly. In the "push" scheme, the reader waits for
+ * the writer periodic deferrable timer to flush the
+ * buffers (keeping track of a quiescent state
+ * timestamp). Therefore, the writer "pushes" data out
+ * of the buffers rather than letting the reader "pull"
+ * data from the buffer.
+ */
+ ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ ret = ring_buffer_get_subbuf(buf, &iter->consumed);
+ }
+ if (ret)
+ return ret;
+ iter->data_size = ring_buffer_get_read_data_size(config, buf);
+ iter->read_offset = iter->consumed;
+ /* skip header */
+ iter->read_offset += config->cb.subbuffer_header_size();
+ iter->state = ITER_TEST_RECORD;
+ goto restart;
+ case ITER_TEST_RECORD:
+ if (iter->read_offset - iter->consumed >= iter->data_size) {
+ iter->state = ITER_PUT_SUBBUF;
+ } else {
+ CHAN_WARN_ON(chan, !config->cb.record_get);
+ config->cb.record_get(config, chan, buf,
+ iter->read_offset,
+ &iter->header_len,
+ &iter->payload_len,
+ &iter->timestamp);
+ iter->read_offset += iter->header_len;
+ subbuffer_consume_record(config, &buf->backend);
+ iter->state = ITER_NEXT_RECORD;
+ return iter->payload_len;
+ }
+ goto restart;
+ case ITER_NEXT_RECORD:
+ iter->read_offset += iter->payload_len;
+ iter->state = ITER_TEST_RECORD;
+ goto restart;
+ case ITER_PUT_SUBBUF:
+ ring_buffer_put_subbuf(buf, iter->consumed);
+ iter->state = ITER_GET_SUBBUF;
+ goto restart;
+ default:
+ CHAN_WARN_ON(chan, 1); /* Should not happen */
+ return -EPERM;
+ }
+}
+EXPORT_SYMBOL_GPL(ring_buffer_get_next_record);
+
+static int buf_is_higher(void *a, void *b)
+{
+ struct ring_buffer *bufa = a;
+ struct ring_buffer *bufb = b;
+
+ /* Consider lowest timestamps to be at the top of the heap */
+ return (bufa->iter.timestamp < bufb->iter.timestamp);
+}
+
+static
+void ring_buffer_get_empty_buf_records(const struct ring_buffer_config *config,
+ struct channel *chan)
+{
+ struct ptr_heap *heap = &chan->iter.heap;
+ struct ring_buffer *buf, *tmp;
+ ssize_t len;
+
+ list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
+ iter.empty_node) {
+ len = ring_buffer_get_next_record(chan, buf);
+
+ /*
+ * Deal with -EAGAIN and -ENODATA.
+ * len >= 0 means record contains data.
+ * -EBUSY should never happen, because we support only one
+ * reader.
+ */
+ switch (len) {
+ case -EAGAIN:
+ /* Keep node in empty list */
+ break;
+ case -ENODATA:
+ /*
+ * Buffer is finalized. Don't add to list of empty
+ * buffer, because it has no more data to provide, ever.
+ */
+ list_del(&buf->iter.empty_node);
+ break;
+ case -EBUSY:
+ CHAN_WARN_ON(chan, 1);
+ break;
+ default:
+ /*
+ * Insert buffer into the heap, remove from empty buffer
+ * list. The heap should never overflow.
+ */
+ CHAN_WARN_ON(chan, len < 0);
+ list_del(&buf->iter.empty_node);
+ CHAN_WARN_ON(chan, heap_insert(heap, buf) != NULL);
+ }
+ }
+}
+
+static
+void ring_buffer_wait_for_qs(const struct ring_buffer_config *config,
+ struct channel *chan)
+{
+ u64 timestamp_qs;
+ unsigned long wait_msecs;
+
+ /*
+ * No need to wait if no empty buffers are present.
+ */
+ if (list_empty(&chan->iter.empty_head))
+ return;
+
+ timestamp_qs = config->cb.ring_buffer_clock_read(chan);
+ /*
+ * We need to consider previously empty buffers.
+ * Do a get next buf record on each of them. Add them to
+ * the heap if they have data. If at least one of them
+ * don't have data, we need to wait for
+ * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
+ * buffers have been switched either by the timer or idle entry) and
+ * check them again, adding them if they have data.
+ */
+ ring_buffer_get_empty_buf_records(config, chan);
+
+ /*
+ * No need to wait if no empty buffers are present.
+ */
+ if (list_empty(&chan->iter.empty_head))
+ return;
+
+ /*
+ * We need to wait for the buffer switch timer to run. If the
+ * CPU is idle, idle entry performed the switch.
+ * TODO: we could optimize further by skipping the sleep if all
+ * empty buffers belong to idle or offline cpus.
+ */
+ wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
+ wait_msecs += MAX_SYSTEM_LATENCY;
+ msleep(wait_msecs);
+ ring_buffer_get_empty_buf_records(config, chan);
+ /*
+ * Any buffer still in the empty list here cannot possibly
+ * contain an event with a timestamp prior to "timestamp_qs".
+ * The new quiescent state timestamp is the one we grabbed
+ * before waiting for buffer data. It is therefore safe to
+ * ignore empty buffers up to last_qs timestamp for fusion
+ * merge.
+ */
+ chan->iter.last_qs = timestamp_qs;
+}
+
+/**
+ * channel_get_next_record - Get the next record in a channel.
+ * @chan: channel
+ * @ret_buf: the buffer in which the event is located (output)
+ *
+ * Returns the size of new current event, -EAGAIN if all buffers are empty,
+ * -ENODATA if all buffers are empty and finalized. The channel must already be
+ * opened for reading.
+ */
+
+ssize_t channel_get_next_record(struct channel *chan,
+ struct ring_buffer **ret_buf)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer *buf;
+ struct ptr_heap *heap;
+ ssize_t len;
+
+ if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+ *ret_buf = channel_get_ring_buffer(config, chan, 0);
+ return ring_buffer_get_next_record(chan, *ret_buf);
+ }
+
+ heap = &chan->iter.heap;
+
+ /*
+ * get next record for topmost buffer.
+ */
+ buf = heap_maximum(heap);
+ if (buf) {
+ len = ring_buffer_get_next_record(chan, buf);
+ /*
+ * Deal with -EAGAIN and -ENODATA.
+ * len >= 0 means record contains data.
+ */
+ switch (len) {
+ case -EAGAIN:
+ buf->iter.timestamp = 0;
+ list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+ /* Remove topmost buffer from the heap */
+ CHAN_WARN_ON(chan, heap_remove(heap) != buf);
+ break;
+ case -ENODATA:
+ /*
+ * Buffer is finalized. Remove buffer from heap and
+ * don't add to list of empty buffer, because it has no
+ * more data to provide, ever.
+ */
+ CHAN_WARN_ON(chan, heap_remove(heap) != buf);
+ break;
+ case -EBUSY:
+ CHAN_WARN_ON(chan, 1);
+ break;
+ default:
+ /*
+ * Reinsert buffer into the heap. Note that heap can be
+ * partially empty, so we need to use
+ * heap_replace_max().
+ */
+ CHAN_WARN_ON(chan, len < 0);
+ CHAN_WARN_ON(chan, heap_replace_max(heap, buf) != buf);
+ break;
+ }
+ }
+
+ buf = heap_maximum(heap);
+ if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
+ /*
+ * Deal with buffers previously showing no data.
+ * Add buffers containing data to the heap, update
+ * last_qs.
+ */
+ ring_buffer_wait_for_qs(config, chan);
+ }
+
+ *ret_buf = buf = heap_maximum(heap);
+ if (buf) {
+ /*
+ * If this warning triggers, you probably need to check your
+ * system interrupt latency. Typical causes: too many printk()
+ * output going to a serial console with interrupts off.
+ * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
+ * Observed on SMP KVM setups with trace_clock().
+ */
+ if (chan->iter.last_timestamp
+ > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
+ printk(KERN_WARNING "ring_buffer: timestamps going "
+ "backward. Last time %llu ns, cpu %d, "
+ "current time %llu ns, cpu %d, "
+ "delta %llu ns.\n",
+ chan->iter.last_timestamp, chan->iter.last_cpu,
+ buf->iter.timestamp, buf->backend.cpu,
+ chan->iter.last_timestamp - buf->iter.timestamp);
+ CHAN_WARN_ON(chan, 1);
+ }
+ chan->iter.last_timestamp = buf->iter.timestamp;
+ chan->iter.last_cpu = buf->backend.cpu;
+ return buf->iter.payload_len;
+ } else {
+ /* Heap is empty */
+ if (list_empty(&chan->iter.empty_head))
+ return -ENODATA; /* All buffers finalized */
+ else
+ return -EAGAIN; /* Temporarily empty */
+ }
+}
+EXPORT_SYMBOL_GPL(channel_get_next_record);
+
+static
+void ring_buffer_iterator_init(struct channel *chan, struct ring_buffer *buf)
+{
+ if (buf->iter.allocated)
+ return;
+
+ buf->iter.allocated = 1;
+ if (chan->iter.read_open && !buf->iter.read_open) {
+ CHAN_WARN_ON(chan, ring_buffer_open_read(buf) != 0);
+ buf->iter.read_open = 1;
+ }
+
+ /* Add to list of buffers without any current record */
+ if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+static
+int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
+ unsigned long action,
+ void *hcpu)
+{
+ unsigned int cpu = (unsigned long)hcpu;
+ struct channel *chan = container_of(nb, struct channel,
+ hp_iter_notifier);
+ struct ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct ring_buffer_config *config = chan->backend.config;
+
+ if (!chan->hp_iter_enable)
+ return NOTIFY_DONE;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ switch (action) {
+ case CPU_DOWN_FAILED:
+ case CPU_DOWN_FAILED_FROZEN:
+ case CPU_ONLINE:
+ case CPU_ONLINE_FROZEN:
+ ring_buffer_iterator_init(chan, buf);
+ return NOTIFY_OK;
+ default:
+ return NOTIFY_DONE;
+ }
+}
+#endif
+
+int channel_iterator_init(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer *buf;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ int cpu, ret;
+
+ INIT_LIST_HEAD(&chan->iter.empty_head);
+ ret = heap_init(&chan->iter.heap,
+ num_possible_cpus()
+ * sizeof(struct ring_buffer *),
+ GFP_KERNEL, buf_is_higher);
+ if (ret)
+ return ret;
+ /*
+ * In case of non-hotplug cpu, if the ring-buffer is allocated
+ * in early initcall, it will not be notified of secondary cpus.
+ * In that off case, we need to allocate for all possible cpus.
+ */
+#ifdef CONFIG_HOTPLUG_CPU
+ chan->hp_iter_notifier.notifier_call =
+ channel_iterator_cpu_hotplug;
+ chan->hp_iter_notifier.priority = 10;
+ register_cpu_notifier(&chan->hp_iter_notifier);
+ get_online_cpus();
+ for_each_online_cpu(cpu) {
+ buf = per_cpu_ptr(chan->backend.buf, cpu);
+ ring_buffer_iterator_init(chan, buf);
+ }
+ chan->hp_iter_enable = 1;
+ put_online_cpus();
+#else
+ for_each_possible_cpu(cpu) {
+ buf = per_cpu_ptr(chan->backend.buf, cpu);
+ ring_buffer_iterator_init(chan, buf);
+ }
+#endif
+ } else {
+ buf = channel_get_ring_buffer(config, chan, 0);
+ ring_buffer_iterator_init(chan, buf);
+ }
+ return 0;
+}
+
+void channel_iterator_unregister_notifiers(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ chan->hp_iter_enable = 0;
+ unregister_cpu_notifier(&chan->hp_iter_notifier);
+ }
+}
+
+void channel_iterator_free(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ heap_free(&chan->iter.heap);
+}
+
+int ring_buffer_iterator_open(struct ring_buffer *buf)
+{
+ struct channel *chan = buf->backend.chan;
+ const struct ring_buffer_config *config = chan->backend.config;
+ CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+ return ring_buffer_open_read(buf);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_iterator_open);
+
+/*
+ * Note: Iterators must not be mixed with other types of outputs, because an
+ * iterator can leave the buffer in "GET" state, which is not consistent with
+ * other types of output (mmap, splice, raw data read).
+ */
+void ring_buffer_iterator_release(struct ring_buffer *buf)
+{
+ ring_buffer_release_read(buf);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_iterator_release);
+
+int channel_iterator_open(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer *buf;
+ int ret = 0, cpu;
+
+ CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ /* Allow CPU hotplug to keep track of opened reader */
+ chan->iter.read_open = 1;
+ for_each_channel_cpu(cpu, chan) {
+ buf = channel_get_ring_buffer(config, chan, cpu);
+ ret = ring_buffer_iterator_open(buf);
+ if (ret)
+ goto error;
+ buf->iter.read_open = 1;
+ }
+ put_online_cpus();
+ } else {
+ buf = channel_get_ring_buffer(config, chan, 0);
+ ret = ring_buffer_iterator_open(buf);
+ }
+ return ret;
+error:
+ /* Error should always happen on CPU 0, hence no close is required. */
+ CHAN_WARN_ON(chan, cpu != 0);
+ put_online_cpus();
+ return ret;
+}
+EXPORT_SYMBOL_GPL(channel_iterator_open);
+
+void channel_iterator_release(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer *buf;
+ int cpu;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ buf = channel_get_ring_buffer(config, chan, cpu);
+ if (buf->iter.read_open) {
+ ring_buffer_iterator_release(buf);
+ buf->iter.read_open = 0;
+ }
+ }
+ chan->iter.read_open = 0;
+ put_online_cpus();
+ } else {
+ buf = channel_get_ring_buffer(config, chan, 0);
+ ring_buffer_iterator_release(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(channel_iterator_release);
+
+void ring_buffer_iterator_reset(struct ring_buffer *buf)
+{
+ struct channel *chan = buf->backend.chan;
+
+ if (buf->iter.state != ITER_GET_SUBBUF)
+ ring_buffer_put_subbuf(buf, buf->iter.consumed);
+ buf->iter.state = ITER_GET_SUBBUF;
+ /* Remove from heap (if present). */
+ if (heap_cherrypick(&chan->iter.heap, buf))
+ list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+ buf->iter.timestamp = 0;
+ buf->iter.header_len = 0;
+ buf->iter.payload_len = 0;
+ buf->iter.consumed = 0;
+ buf->iter.read_offset = 0;
+ buf->iter.data_size = 0;
+ /* Don't reset allocated and read_open */
+}
+
+void channel_iterator_reset(struct channel *chan)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ struct ring_buffer *buf;
+ int cpu;
+
+ /* Empty heap, put into empty_head */
+ while ((buf = heap_remove(&chan->iter.heap)) != NULL)
+ list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+
+ for_each_channel_cpu(cpu, chan) {
+ buf = channel_get_ring_buffer(config, chan, cpu);
+ ring_buffer_iterator_reset(buf);
+ }
+ /* Don't reset read_open */
+ chan->iter.last_qs = 0;
+ chan->iter.last_timestamp = 0;
+ chan->iter.last_cpu = 0;
+ chan->iter.len_left = 0;
+}
+
+/*
+ * Ring buffer payload extraction read() implementation.
+ */
+static
+ssize_t channel_ring_buffer_file_read(struct file *filp,
+ char __user *user_buf,
+ size_t count,
+ loff_t *ppos,
+ struct channel *chan,
+ struct ring_buffer *buf,
+ int fusionmerge)
+{
+ const struct ring_buffer_config *config = chan->backend.config;
+ size_t read_count = 0, read_offset;
+ ssize_t len;
+
+ might_sleep();
+ if (!access_ok(VERIFY_WRITE, user_buf, count))
+ return -EFAULT;
+
+ /* Finish copy of previous record */
+ if (*ppos != 0) {
+ if (read_count < count) {
+ len = chan->iter.len_left;
+ read_offset = *ppos;
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
+ && fusionmerge)
+ buf = heap_maximum(&chan->iter.heap);
+ CHAN_WARN_ON(chan, !buf);
+ goto skip_get_next;
+ }
+ }
+
+ while (read_count < count) {
+ size_t copy_len, space_left;
+
+ if (fusionmerge)
+ len = channel_get_next_record(chan, &buf);
+ else
+ len = ring_buffer_get_next_record(chan, buf);
+len_test:
+ if (len < 0) {
+ /*
+ * Check if buffer is finalized (end of file).
+ */
+ if (len == -ENODATA) {
+ /* A 0 read_count will tell about end of file */
+ goto nodata;
+ }
+ if (filp->f_flags & O_NONBLOCK) {
+ if (!read_count)
+ read_count = -EAGAIN;
+ goto nodata;
+ } else {
+ int error;
+
+ /*
+ * No data available at the moment, return what
+ * we got.
+ */
+ if (read_count)
+ goto nodata;
+
+ /*
+ * Wait for returned len to be >= 0 or -ENODATA.
+ */
+ if (fusionmerge)
+ error = wait_event_interruptible(
+ chan->read_wait,
+ ((len = channel_get_next_record(chan,
+ &buf)), len != -EAGAIN));
+ else
+ error = wait_event_interruptible(
+ buf->read_wait,
+ ((len = ring_buffer_get_next_record(
+ chan, buf)), len != -EAGAIN));
+ CHAN_WARN_ON(chan, len == -EBUSY);
+ if (error) {
+ read_count = error;
+ goto nodata;
+ }
+ CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
+ goto len_test;
+ }
+ }
+ read_offset = buf->iter.read_offset;
+skip_get_next:
+ space_left = count - read_count;
+ if (len <= space_left) {
+ copy_len = len;
+ chan->iter.len_left = 0;
+ *ppos = 0;
+ } else {
+ copy_len = space_left;
+ chan->iter.len_left = len - copy_len;
+ *ppos = read_offset + copy_len;
+ }
+ if (__ring_buffer_copy_to_user(&buf->backend, read_offset,
+ &user_buf[read_count],
+ copy_len)) {
+ /*
+ * Leave the len_left and ppos values at their current
+ * state, as we currently have a valid event to read.
+ */
+ return -EFAULT;
+ }
+ read_count += copy_len;
+ };
+ return read_count;
+
+nodata:
+ *ppos = 0;
+ chan->iter.len_left = 0;
+ return read_count;
+}
+
+/**
+ * ring_buffer_sp_file_read - Read buffer record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t ring_buffer_file_read(struct file *filp,
+ char __user *user_buf,
+ size_t count,
+ loff_t *ppos)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct ring_buffer *buf = inode->i_private;
+ struct channel *chan = buf->backend.chan;
+
+ return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
+ chan, buf, 0);
+}
+
+/**
+ * channel_file_read - Read channel record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t channel_file_read(struct file *filp,
+ char __user *user_buf,
+ size_t count,
+ loff_t *ppos)
+{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct channel *chan = inode->i_private;
+ const struct ring_buffer_config *config = chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ return channel_ring_buffer_file_read(filp, user_buf, count,
+ ppos, chan, NULL, 1);
+ else {
+ struct ring_buffer *buf =
+ channel_get_ring_buffer(config, chan, 0);
+ return channel_ring_buffer_file_read(filp, user_buf, count,
+ ppos, chan, buf, 0);
+ }
+}
+
+static
+int ring_buffer_file_open(struct inode *inode, struct file *file)
+{
+ struct ring_buffer *buf = inode->i_private;
+ int ret;
+
+ ret = ring_buffer_iterator_open(buf);
+ if (ret)
+ return ret;
+
+ file->private_data = buf;
+ ret = nonseekable_open(inode, file);
+ if (ret)
+ goto release_iter;
+ return 0;
+
+release_iter:
+ ring_buffer_iterator_release(buf);
+ return ret;
+}
+
+static
+int ring_buffer_file_release(struct inode *inode, struct file *file)
+{
+ struct ring_buffer *buf = inode->i_private;
+
+ ring_buffer_iterator_release(buf);
+ return 0;
+}
+
+static
+int channel_file_open(struct inode *inode, struct file *file)
+{
+ struct channel *chan = inode->i_private;
+ int ret;
+
+ ret = channel_iterator_open(chan);
+ if (ret)
+ return ret;
+
+ file->private_data = chan;
+ ret = nonseekable_open(inode, file);
+ if (ret)
+ goto release_iter;
+ return 0;
+
+release_iter:
+ channel_iterator_release(chan);
+ return ret;
+}
+
+static
+int channel_file_release(struct inode *inode, struct file *file)
+{
+ struct channel *chan = inode->i_private;
+
+ channel_iterator_release(chan);
+ return 0;
+}
+
+const struct file_operations channel_payload_file_operations = {
+ .open = channel_file_open,
+ .release = channel_file_release,
+ .read = channel_file_read,
+ .llseek = ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(channel_payload_file_operations);
+
+const struct file_operations ring_buffer_payload_file_operations = {
+ .open = ring_buffer_file_open,
+ .release = ring_buffer_file_release,
+ .read = ring_buffer_file_read,
+ .llseek = ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(ring_buffer_payload_file_operations);
Index: linux.trees.git/include/linux/ringbuffer/iterator.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/iterator.h 2010-07-09 18:34:50.000000000 -0400
@@ -0,0 +1,70 @@
+#ifndef _LINUX_RING_BUFFER_ITERATOR_H
+#define _LINUX_RING_BUFFER_ITERATOR_H
+
+/*
+ * linux/ringbuffer/iterator.h
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Ring buffer and channel iterators.
+ *
+ * Author:
+ * Mathieu Desnoyers <mathieu.desnoyers(a)efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/backend.h>
+#include <linux/ringbuffer/frontend.h>
+
+/*
+ * ring_buffer_get_next_record advances the buffer read position to the next
+ * record. It returns either the size of the next record, -EAGAIN if there is
+ * currently no data available, or -ENODATA if no data is available and buffer
+ * is finalized.
+ */
+extern ssize_t ring_buffer_get_next_record(struct channel *chan,
+ struct ring_buffer *buf);
+
+/*
+ * channel_get_next_record advances the buffer read position to the next record.
+ * It returns either the size of the next record, -EAGAIN if there is currently
+ * no data available, or -ENODATA if no data is available and buffer is
+ * finalized.
+ * Returns the current buffer in ret_buf.
+ */
+extern ssize_t channel_get_next_record(struct channel *chan,
+ struct ring_buffer **ret_buf);
+
+/**
+ * read_current_record - copy the buffer current record into dest.
+ * @buf: ring buffer
+ * @dest: destination where the record should be copied
+ *
+ * dest should be large enough to contain the record. Returns the number of
+ * bytes copied.
+ */
+static inline size_t read_current_record(struct ring_buffer *buf, void *dest)
+{
+ return ring_buffer_read(&buf->backend, buf->iter.read_offset,
+ dest, buf->iter.payload_len);
+}
+
+extern int ring_buffer_iterator_open(struct ring_buffer *buf);
+extern void ring_buffer_iterator_release(struct ring_buffer *buf);
+extern int channel_iterator_open(struct channel *chan);
+extern void channel_iterator_release(struct channel *chan);
+
+extern const struct file_operations channel_payload_file_operations;
+extern const struct file_operations ring_buffer_payload_file_operations;
+
+/*
+ * Used internally.
+ */
+int channel_iterator_init(struct channel *chan);
+void channel_iterator_unregister_notifiers(struct channel *chan);
+void channel_iterator_free(struct channel *chan);
+void channel_iterator_reset(struct channel *chan);
+void ring_buffer_iterator_reset(struct ring_buffer *buf);
+
+#endif /* _LINUX_RING_BUFFER_ITERATOR_H */
Index: linux.trees.git/lib/ringbuffer/Makefile
===================================================================
--- linux.trees.git.orig/lib/ringbuffer/Makefile 2010-07-09 18:31:10.000000000 -0400
+++ linux.trees.git/lib/ringbuffer/Makefile 2010-07-09 18:34:50.000000000 -0400
@@ -1,5 +1,6 @@
obj-y += ring_buffer_backend.o
obj-y += ring_buffer_frontend.o
+obj-y += ring_buffer_iterator.o
obj-y += ring_buffer_vfs.o
obj-y += ring_buffer_splice.o
obj-y += ring_buffer_mmap.o

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo(a)vger.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/