88#include <linux/trace_clock.h>
99#include <linux/trace_seq.h>
1010#include <linux/spinlock.h>
11+ #include <linux/irq_work.h>
1112#include <linux/debugfs.h>
1213#include <linux/uaccess.h>
1314#include <linux/hardirq.h>
@@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
442443 return ret ;
443444}
444445
446+ struct rb_irq_work {
447+ struct irq_work work ;
448+ wait_queue_head_t waiters ;
449+ bool waiters_pending ;
450+ };
451+
445452/*
446453 * head_page == tail_page && head == tail then buffer is empty.
447454 */
@@ -476,6 +483,8 @@ struct ring_buffer_per_cpu {
476483 struct list_head new_pages ; /* new pages to add */
477484 struct work_struct update_pages_work ;
478485 struct completion update_done ;
486+
487+ struct rb_irq_work irq_work ;
479488};
480489
481490struct ring_buffer {
@@ -495,6 +504,8 @@ struct ring_buffer {
495504 struct notifier_block cpu_notify ;
496505#endif
497506 u64 (* clock )(void );
507+
508+ struct rb_irq_work irq_work ;
498509};
499510
500511struct ring_buffer_iter {
@@ -506,6 +517,118 @@ struct ring_buffer_iter {
506517 u64 read_stamp ;
507518};
508519
520+ /*
521+ * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
522+ *
523+ * Schedules a delayed work to wake up any task that is blocked on the
524+ * ring buffer waiters queue.
525+ */
526+ static void rb_wake_up_waiters (struct irq_work * work )
527+ {
528+ struct rb_irq_work * rbwork = container_of (work , struct rb_irq_work , work );
529+
530+ wake_up_all (& rbwork -> waiters );
531+ }
532+
533+ /**
534+ * ring_buffer_wait - wait for input to the ring buffer
535+ * @buffer: buffer to wait on
536+ * @cpu: the cpu buffer to wait on
537+ *
538+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
539+ * as data is added to any of the @buffer's cpu buffers. Otherwise
540+ * it will wait for data to be added to a specific cpu buffer.
541+ */
542+ void ring_buffer_wait (struct ring_buffer * buffer , int cpu )
543+ {
544+ struct ring_buffer_per_cpu * cpu_buffer ;
545+ DEFINE_WAIT (wait );
546+ struct rb_irq_work * work ;
547+
548+ /*
549+ * Depending on what the caller is waiting for, either any
550+ * data in any cpu buffer, or a specific buffer, put the
551+ * caller on the appropriate wait queue.
552+ */
553+ if (cpu == RING_BUFFER_ALL_CPUS )
554+ work = & buffer -> irq_work ;
555+ else {
556+ cpu_buffer = buffer -> buffers [cpu ];
557+ work = & cpu_buffer -> irq_work ;
558+ }
559+
560+
561+ prepare_to_wait (& work -> waiters , & wait , TASK_INTERRUPTIBLE );
562+
563+ /*
564+ * The events can happen in critical sections where
565+ * checking a work queue can cause deadlocks.
566+ * After adding a task to the queue, this flag is set
567+ * only to notify events to try to wake up the queue
568+ * using irq_work.
569+ *
570+ * We don't clear it even if the buffer is no longer
571+ * empty. The flag only causes the next event to run
572+ * irq_work to do the work queue wake up. The worse
573+ * that can happen if we race with !trace_empty() is that
574+ * an event will cause an irq_work to try to wake up
575+ * an empty queue.
576+ *
577+ * There's no reason to protect this flag either, as
578+ * the work queue and irq_work logic will do the necessary
579+ * synchronization for the wake ups. The only thing
580+ * that is necessary is that the wake up happens after
581+ * a task has been queued. It's OK for spurious wake ups.
582+ */
583+ work -> waiters_pending = true;
584+
585+ if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty (buffer )) ||
586+ (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu (buffer , cpu )))
587+ schedule ();
588+
589+ finish_wait (& work -> waiters , & wait );
590+ }
591+
592+ /**
593+ * ring_buffer_poll_wait - poll on buffer input
594+ * @buffer: buffer to wait on
595+ * @cpu: the cpu buffer to wait on
596+ * @filp: the file descriptor
597+ * @poll_table: The poll descriptor
598+ *
599+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
600+ * as data is added to any of the @buffer's cpu buffers. Otherwise
601+ * it will wait for data to be added to a specific cpu buffer.
602+ *
603+ * Returns POLLIN | POLLRDNORM if data exists in the buffers,
604+ * zero otherwise.
605+ */
606+ int ring_buffer_poll_wait (struct ring_buffer * buffer , int cpu ,
607+ struct file * filp , poll_table * poll_table )
608+ {
609+ struct ring_buffer_per_cpu * cpu_buffer ;
610+ struct rb_irq_work * work ;
611+
612+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty (buffer )) ||
613+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu (buffer , cpu )))
614+ return POLLIN | POLLRDNORM ;
615+
616+ if (cpu == RING_BUFFER_ALL_CPUS )
617+ work = & buffer -> irq_work ;
618+ else {
619+ cpu_buffer = buffer -> buffers [cpu ];
620+ work = & cpu_buffer -> irq_work ;
621+ }
622+
623+ work -> waiters_pending = true;
624+ poll_wait (filp , & work -> waiters , poll_table );
625+
626+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty (buffer )) ||
627+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu (buffer , cpu )))
628+ return POLLIN | POLLRDNORM ;
629+ return 0 ;
630+ }
631+
509632/* buffer may be either ring_buffer or ring_buffer_per_cpu */
510633#define RB_WARN_ON (b , cond ) \
511634 ({ \
@@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
10611184 cpu_buffer -> lock = (arch_spinlock_t )__ARCH_SPIN_LOCK_UNLOCKED ;
10621185 INIT_WORK (& cpu_buffer -> update_pages_work , update_pages_handler );
10631186 init_completion (& cpu_buffer -> update_done );
1187+ init_irq_work (& cpu_buffer -> irq_work .work , rb_wake_up_waiters );
10641188
10651189 bpage = kzalloc_node (ALIGN (sizeof (* bpage ), cache_line_size ()),
10661190 GFP_KERNEL , cpu_to_node (cpu ));
@@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
11561280 buffer -> clock = trace_clock_local ;
11571281 buffer -> reader_lock_key = key ;
11581282
1283+ init_irq_work (& buffer -> irq_work .work , rb_wake_up_waiters );
1284+
11591285 /* need at least two pages */
11601286 if (nr_pages < 2 )
11611287 nr_pages = 2 ;
@@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
26102736 rb_end_commit (cpu_buffer );
26112737}
26122738
2739+ static __always_inline void
2740+ rb_wakeups (struct ring_buffer * buffer , struct ring_buffer_per_cpu * cpu_buffer )
2741+ {
2742+ if (buffer -> irq_work .waiters_pending ) {
2743+ buffer -> irq_work .waiters_pending = false;
2744+ /* irq_work_queue() supplies it's own memory barriers */
2745+ irq_work_queue (& buffer -> irq_work .work );
2746+ }
2747+
2748+ if (cpu_buffer -> irq_work .waiters_pending ) {
2749+ cpu_buffer -> irq_work .waiters_pending = false;
2750+ /* irq_work_queue() supplies it's own memory barriers */
2751+ irq_work_queue (& cpu_buffer -> irq_work .work );
2752+ }
2753+ }
2754+
26132755/**
26142756 * ring_buffer_unlock_commit - commit a reserved
26152757 * @buffer: The buffer to commit to
@@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
26292771
26302772 rb_commit (cpu_buffer , event );
26312773
2774+ rb_wakeups (buffer , cpu_buffer );
2775+
26322776 trace_recursive_unlock ();
26332777
26342778 preempt_enable_notrace ();
@@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer,
28012945
28022946 rb_commit (cpu_buffer , event );
28032947
2948+ rb_wakeups (buffer , cpu_buffer );
2949+
28042950 ret = 0 ;
28052951 out :
28062952 preempt_enable_notrace ();
0 commit comments