3232
3333
3434struct SocketWatcherData {
35- _Guarded_by_ (mutw ) struct pbpal_poll_data poll ;
35+ _Guarded_by_ (mutw ) struct pbpal_poll_data * poll ;
3636 CRITICAL_SECTION mutw ;
3737 HANDLE thread_handle ;
3838 DWORD thread_id ;
@@ -60,13 +60,13 @@ static int elapsed_ms(FILETIME prev_timspec, FILETIME timspec)
6060
6161int pbntf_watch_in_events (pubnub_t * pbp )
6262{
63- return pbpal_ntf_watch_in_events (& m_watcher .poll , pbp );
63+ return pbpal_ntf_watch_in_events (m_watcher .poll , pbp );
6464}
6565
6666
6767int pbntf_watch_out_events (pubnub_t * pbp )
6868{
69- return pbpal_ntf_watch_out_events (& m_watcher .poll , pbp );
69+ return pbpal_ntf_watch_out_events (m_watcher .poll , pbp );
7070}
7171
7272
@@ -84,25 +84,9 @@ void socket_watcher_thread(void* arg)
8484
8585 Sleep (1 );
8686
87+ pbpal_ntf_poll_away (m_watcher .poll , ms );
88+
8789 EnterCriticalSection (& m_watcher .mutw );
88- if (m_watcher .poll .size > 0 ) {
89- int rslt = WSAPoll (m_watcher .poll .apoll , m_watcher .poll .size , ms );
90- if (SOCKET_ERROR == rslt ) {
91- /* error? what to do about it? */
92- PUBNUB_LOG_WARNING ("poll size = %ud, error = %d\n" ,
93- (unsigned )m_watcher .poll .size ,
94- WSAGetLastError ());
95- }
96- else if (rslt > 0 ) {
97- size_t i ;
98- size_t apoll_size = m_watcher .poll .size ;
99- for (i = 0 ; i < apoll_size ; ++ i ) {
100- if (m_watcher .poll .apoll [i ].revents & (POLLIN | POLLOUT )) {
101- pbntf_requeue_for_processing (m_watcher .poll .apb [i ]);
102- }
103- }
104- }
105- }
10690 if (PUBNUB_TIMERS_API ) {
10791 FILETIME current_time ;
10892 int elapsed ;
@@ -123,6 +107,10 @@ int pbntf_init(void)
123107{
124108 InitializeCriticalSection (& m_watcher .mutw );
125109
110+ m_watcher .poll = pbpal_ntf_callback_poller_init ();
111+ if (NULL == m_watcher .poll ) {
112+ return -1 ;
113+ }
126114 pbpal_ntf_callback_queue_init (& m_watcher .queue );
127115
128116 m_watcher .thread_handle = (HANDLE )_beginthread (
@@ -132,6 +120,7 @@ int pbntf_init(void)
132120 errno );
133121 DeleteCriticalSection (& m_watcher .mutw );
134122 pbpal_ntf_callback_queue_deinit (& m_watcher .queue );
123+ pbpal_ntf_callback_poller_deinit (& m_watcher .poll );
135124 return -1 ;
136125 }
137126 m_watcher .thread_id = GetThreadId (m_watcher .thread_handle );
@@ -158,7 +147,7 @@ int pbntf_got_socket(pubnub_t* pb, pb_socket_t sockt)
158147
159148 PUBNUB_UNUSED (sockt );
160149
161- pbpal_ntf_callback_save_socket (& m_watcher .poll , pb );
150+ pbpal_ntf_callback_save_socket (m_watcher .poll , pb );
162151 if (PUBNUB_TIMERS_API ) {
163152 m_watcher .timer_head = pubnub_timer_list_add (m_watcher .timer_head , pb );
164153 }
@@ -177,7 +166,7 @@ void pbntf_lost_socket(pubnub_t* pb, pb_socket_t sockt)
177166
178167 PUBNUB_UNUSED (sockt );
179168
180- pbpal_ntf_callback_remove_socket (& m_watcher .poll , pb );
169+ pbpal_ntf_callback_remove_socket (m_watcher .poll , pb );
181170 pbpal_remove_timer_safe (pb , & m_watcher .timer_head );
182171 pbpal_ntf_callback_remove_from_queue (& m_watcher .queue , pb );
183172
@@ -191,7 +180,7 @@ void pbntf_update_socket(pubnub_t* pb, pb_socket_t socket)
191180
192181 EnterCriticalSection (& m_watcher .mutw );
193182
194- pbpal_ntf_callback_update_socket (& m_watcher .poll , pb );
183+ pbpal_ntf_callback_update_socket (m_watcher .poll , pb );
195184
196185 LeaveCriticalSection (& m_watcher .mutw );
197186}
0 commit comments