88#include  <linux/trace_clock.h> 
99#include  <linux/trace_seq.h> 
1010#include  <linux/spinlock.h> 
11+ #include  <linux/irq_work.h> 
1112#include  <linux/debugfs.h> 
1213#include  <linux/uaccess.h> 
1314#include  <linux/hardirq.h> 
@@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
442443	return  ret ;
443444}
444445
446+ struct  rb_irq_work  {
447+ 	struct  irq_work 			work ;
448+ 	wait_queue_head_t 		waiters ;
449+ 	bool 				waiters_pending ;
450+ };
451+ 
445452/* 
446453 * head_page == tail_page && head == tail then buffer is empty. 
447454 */ 
@@ -476,6 +483,8 @@ struct ring_buffer_per_cpu {
476483	struct  list_head 		new_pages ; /* new pages to add */ 
477484	struct  work_struct 		update_pages_work ;
478485	struct  completion 		update_done ;
486+ 
487+ 	struct  rb_irq_work 		irq_work ;
479488};
480489
481490struct  ring_buffer  {
@@ -495,6 +504,8 @@ struct ring_buffer {
495504	struct  notifier_block 		cpu_notify ;
496505#endif 
497506	u64 				(* clock )(void );
507+ 
508+ 	struct  rb_irq_work 		irq_work ;
498509};
499510
500511struct  ring_buffer_iter  {
@@ -506,6 +517,118 @@ struct ring_buffer_iter {
506517	u64 				read_stamp ;
507518};
508519
520+ /* 
521+  * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 
522+  * 
523+  * Schedules a delayed work to wake up any task that is blocked on the 
524+  * ring buffer waiters queue. 
525+  */ 
526+ static  void  rb_wake_up_waiters (struct  irq_work  * work )
527+ {
528+ 	struct  rb_irq_work  * rbwork  =  container_of (work , struct  rb_irq_work , work );
529+ 
530+ 	wake_up_all (& rbwork -> waiters );
531+ }
532+ 
533+ /** 
534+  * ring_buffer_wait - wait for input to the ring buffer 
535+  * @buffer: buffer to wait on 
536+  * @cpu: the cpu buffer to wait on 
537+  * 
538+  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 
539+  * as data is added to any of the @buffer's cpu buffers. Otherwise 
540+  * it will wait for data to be added to a specific cpu buffer. 
541+  */ 
542+ void  ring_buffer_wait (struct  ring_buffer  * buffer , int  cpu )
543+ {
544+ 	struct  ring_buffer_per_cpu  * cpu_buffer ;
545+ 	DEFINE_WAIT (wait );
546+ 	struct  rb_irq_work  * work ;
547+ 
548+ 	/* 
549+ 	 * Depending on what the caller is waiting for, either any 
550+ 	 * data in any cpu buffer, or a specific buffer, put the 
551+ 	 * caller on the appropriate wait queue. 
552+ 	 */ 
553+ 	if  (cpu  ==  RING_BUFFER_ALL_CPUS )
554+ 		work  =  & buffer -> irq_work ;
555+ 	else  {
556+ 		cpu_buffer  =  buffer -> buffers [cpu ];
557+ 		work  =  & cpu_buffer -> irq_work ;
558+ 	}
559+ 
560+ 
561+ 	prepare_to_wait (& work -> waiters , & wait , TASK_INTERRUPTIBLE );
562+ 
563+ 	/* 
564+ 	 * The events can happen in critical sections where 
565+ 	 * checking a work queue can cause deadlocks. 
566+ 	 * After adding a task to the queue, this flag is set 
567+ 	 * only to notify events to try to wake up the queue 
568+ 	 * using irq_work. 
569+ 	 * 
570+ 	 * We don't clear it even if the buffer is no longer 
571+ 	 * empty. The flag only causes the next event to run 
572+ 	 * irq_work to do the work queue wake up. The worse 
573+ 	 * that can happen if we race with !trace_empty() is that 
574+ 	 * an event will cause an irq_work to try to wake up 
575+ 	 * an empty queue. 
576+ 	 * 
577+ 	 * There's no reason to protect this flag either, as 
578+ 	 * the work queue and irq_work logic will do the necessary 
579+ 	 * synchronization for the wake ups. The only thing 
580+ 	 * that is necessary is that the wake up happens after 
581+ 	 * a task has been queued. It's OK for spurious wake ups. 
582+ 	 */ 
583+ 	work -> waiters_pending  =  true;
584+ 
585+ 	if  ((cpu  ==  RING_BUFFER_ALL_CPUS  &&  ring_buffer_empty (buffer )) || 
586+ 	    (cpu  !=  RING_BUFFER_ALL_CPUS  &&  ring_buffer_empty_cpu (buffer , cpu )))
587+ 		schedule ();
588+ 
589+ 	finish_wait (& work -> waiters , & wait );
590+ }
591+ 
592+ /** 
593+  * ring_buffer_poll_wait - poll on buffer input 
594+  * @buffer: buffer to wait on 
595+  * @cpu: the cpu buffer to wait on 
596+  * @filp: the file descriptor 
597+  * @poll_table: The poll descriptor 
598+  * 
599+  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 
600+  * as data is added to any of the @buffer's cpu buffers. Otherwise 
601+  * it will wait for data to be added to a specific cpu buffer. 
602+  * 
603+  * Returns POLLIN | POLLRDNORM if data exists in the buffers, 
604+  * zero otherwise. 
605+  */ 
606+ int  ring_buffer_poll_wait (struct  ring_buffer  * buffer , int  cpu ,
607+ 			  struct  file  * filp , poll_table  * poll_table )
608+ {
609+ 	struct  ring_buffer_per_cpu  * cpu_buffer ;
610+ 	struct  rb_irq_work  * work ;
611+ 
612+ 	if  ((cpu  ==  RING_BUFFER_ALL_CPUS  &&  !ring_buffer_empty (buffer )) || 
613+ 	    (cpu  !=  RING_BUFFER_ALL_CPUS  &&  !ring_buffer_empty_cpu (buffer , cpu )))
614+ 		return  POLLIN  | POLLRDNORM ;
615+ 
616+ 	if  (cpu  ==  RING_BUFFER_ALL_CPUS )
617+ 		work  =  & buffer -> irq_work ;
618+ 	else  {
619+ 		cpu_buffer  =  buffer -> buffers [cpu ];
620+ 		work  =  & cpu_buffer -> irq_work ;
621+ 	}
622+ 
623+ 	work -> waiters_pending  =  true;
624+ 	poll_wait (filp , & work -> waiters , poll_table );
625+ 
626+ 	if  ((cpu  ==  RING_BUFFER_ALL_CPUS  &&  !ring_buffer_empty (buffer )) || 
627+ 	    (cpu  !=  RING_BUFFER_ALL_CPUS  &&  !ring_buffer_empty_cpu (buffer , cpu )))
628+ 		return  POLLIN  | POLLRDNORM ;
629+ 	return  0 ;
630+ }
631+ 
509632/* buffer may be either ring_buffer or ring_buffer_per_cpu */ 
510633#define  RB_WARN_ON (b , cond )						\
511634	({								\
@@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
10611184	cpu_buffer -> lock  =  (arch_spinlock_t )__ARCH_SPIN_LOCK_UNLOCKED ;
10621185	INIT_WORK (& cpu_buffer -> update_pages_work , update_pages_handler );
10631186	init_completion (& cpu_buffer -> update_done );
1187+ 	init_irq_work (& cpu_buffer -> irq_work .work , rb_wake_up_waiters );
10641188
10651189	bpage  =  kzalloc_node (ALIGN (sizeof (* bpage ), cache_line_size ()),
10661190			    GFP_KERNEL , cpu_to_node (cpu ));
@@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
11561280	buffer -> clock  =  trace_clock_local ;
11571281	buffer -> reader_lock_key  =  key ;
11581282
1283+ 	init_irq_work (& buffer -> irq_work .work , rb_wake_up_waiters );
1284+ 
11591285	/* need at least two pages */ 
11601286	if  (nr_pages  <  2 )
11611287		nr_pages  =  2 ;
@@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
26102736	rb_end_commit (cpu_buffer );
26112737}
26122738
2739+ static  __always_inline  void 
2740+ rb_wakeups (struct  ring_buffer  * buffer , struct  ring_buffer_per_cpu  * cpu_buffer )
2741+ {
2742+ 	if  (buffer -> irq_work .waiters_pending ) {
2743+ 		buffer -> irq_work .waiters_pending  =  false;
2744+ 		/* irq_work_queue() supplies it's own memory barriers */ 
2745+ 		irq_work_queue (& buffer -> irq_work .work );
2746+ 	}
2747+ 
2748+ 	if  (cpu_buffer -> irq_work .waiters_pending ) {
2749+ 		cpu_buffer -> irq_work .waiters_pending  =  false;
2750+ 		/* irq_work_queue() supplies it's own memory barriers */ 
2751+ 		irq_work_queue (& cpu_buffer -> irq_work .work );
2752+ 	}
2753+ }
2754+ 
26132755/** 
26142756 * ring_buffer_unlock_commit - commit a reserved 
26152757 * @buffer: The buffer to commit to 
@@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
26292771
26302772	rb_commit (cpu_buffer , event );
26312773
2774+ 	rb_wakeups (buffer , cpu_buffer );
2775+ 
26322776	trace_recursive_unlock ();
26332777
26342778	preempt_enable_notrace ();
@@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer,
28012945
28022946	rb_commit (cpu_buffer , event );
28032947
2948+ 	rb_wakeups (buffer , cpu_buffer );
2949+ 
28042950	ret  =  0 ;
28052951 out :
28062952	preempt_enable_notrace ();
0 commit comments