libzypp  17.34.1
eventdispatcher_glib.cc
Go to the documentation of this file.
1 #include "timer.h"
3 #include "private/threaddata_p.h"
4 
8 #include <zypp-core/zyppng/base/UnixSignalSource>
9 
10 namespace zyppng {
11 
12 static int inline readMask () {
13  return ( G_IO_IN | G_IO_HUP );
14 }
15 
16 static int inline writeMask () {
17  return ( G_IO_OUT );
18 }
19 
20 static int inline excpMask () {
21  return ( G_IO_PRI );
22 }
23 
24 static int inline evModeToMask ( int mode ) {
25  int cond = 0;
26  if ( mode & AbstractEventSource::Read ) {
27  cond = readMask() | G_IO_ERR;
28  }
29  if ( mode & AbstractEventSource::Write ) {
30  cond = cond | writeMask() | G_IO_ERR;
31  }
32  if ( mode & AbstractEventSource::Exception ) {
33  cond = cond | excpMask() | G_IO_ERR;
34  }
35  return cond;
36 }
37 
38 static int inline gioConditionToEventTypes ( const GIOCondition rEvents, const int requestedEvs ) {
39  int ev = 0;
40  if ( ( rEvents & requestedEvs ) != 0 ) {
41  if ( ( rEvents & readMask() ) && ( requestedEvs & readMask() ) )
43  if ( ( rEvents & writeMask() ) && ( requestedEvs & writeMask() ) )
45  if ( ( rEvents & excpMask()) && ( requestedEvs & excpMask() ) )
47  if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
49  }
50  return ev;
51 }
52 
53 static GSourceFuncs abstractEventSourceFuncs = {
57  nullptr,
58  nullptr,
59  nullptr
60 };
61 
63  GAbstractEventSource *src = nullptr;
64  src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
65  (void) new (&src->pollfds) std::vector<GUnixPollFD>();
66 
67  src->eventSource = nullptr;
68  src->_ev = ev;
69  return src;
70 }
71 
73 {
74  for ( GUnixPollFD &fd : src->pollfds ) {
75  if ( fd.tag )
76  g_source_remove_unix_fd( &src->source, fd.tag );
77  }
78 
79  src->pollfds.clear();
80  src->pollfds.std::vector< GUnixPollFD >::~vector();
81  g_source_destroy( &src->source );
82  g_source_unref( &src->source );
83 }
84 
85 gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
86 {
87  //we can not yet determine if the GSource is ready, polling FDs also have no
88  //timeout, so lets continue
89  if ( timeout )
90  *timeout = -1;
91  return false;
92 }
93 
94 //here we need to figure out which FDs are pending
95 gboolean GAbstractEventSource::check( GSource *source )
96 {
97  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
98 
99  //check for pending and remove orphaned entries
100  bool hasPending = false;
101 
102  for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
103  if ( fdIt->tag == nullptr ) {
104  //this pollfd was removed, clear it from the list
105  //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
106  //next check it is removed for good
107  fdIt = src->pollfds.erase( fdIt );
108  } else {
109  GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
110  if ( pendEvents & G_IO_NVAL ){
111  //that poll is broken, do we need to do more????
112  fdIt = src->pollfds.erase( fdIt );
113  } else {
114  hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
115  fdIt++;
116  }
117  }
118  }
119 
120  //if the pollfds are empty trigger dispatch so this source can be removed
121  return hasPending || src->pollfds.empty();
122 }
123 
124 //Trigger all event sources that have been activated
125 gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
126 {
127  GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
128 
129  if ( !src )
130  return G_SOURCE_REMOVE;
131 
132  //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
133  //were we trigger all ready FDs
134  if ( src->pollfds.empty() ) {
135  auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
136 
137  if ( it != src->_ev->_eventSources.end() ) {
139  src->_ev->_eventSources.erase( it );
140  return G_SOURCE_REMOVE;
141  }
142  }
143 
144  for ( const GUnixPollFD &pollfd : src->pollfds ) {
145  //do not trigger orphaned ones
146  if ( pollfd.tag != nullptr ) {
147  GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
148 
149  if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
150  int ev = gioConditionToEventTypes( pendEvents, pollfd.reqEvents );
151  // we require all event objects to be used in shared_ptr form, by doing this we make sure that the object is not destroyed
152  // while we still use it. However this WILL throw in case of using the EventSource outside of shared_ptr bounds
153  auto eventSourceLocked = src->eventSource->shared_this<AbstractEventSource>();
154  eventSourceLocked->onFdReady( pollfd.pollfd, ev );
155  }
156  }
157  }
158 
159  return G_SOURCE_CONTINUE;
160 }
161 
162 static GSourceFuncs glibTimerSourceFuncs = {
166  nullptr,
167  nullptr,
168  nullptr
169 };
170 
171 //check when this timer expires and set the correct timeout
172 gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
173 {
174  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
175  if ( !source )
176  return false; //not ready for dispatch
177 
178  if ( !source->_t )
179  return false;
180 
181  uint64_t nextTimeout = source->_t->remaining();
182  if ( timeout ) {
183  //this would be a really looong timeout, but be safe
184  if ( nextTimeout > G_MAXINT )
185  *timeout = G_MAXINT;
186  else
187  *timeout = static_cast<gint>( nextTimeout );
188  }
189  return ( nextTimeout == 0 );
190 }
191 
192 //this is essentially the same as prepare
193 gboolean GLibTimerSource::check(GSource *source)
194 {
195  return prepare( source, nullptr );
196 }
197 
198 //emit the expired timers, restart timers that are no single shots
199 gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
200 {
201  GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
202  if ( !source )
203  return true;
204 
205  if ( source->_t == nullptr )
206  return true;
207  //this will emit the expired signal and reset the timer
208  //or stop it in case its a single shot timer
209  source->_t->shared_this<Timer>()->expire();
210  return true;
211 }
212 
214 {
215  GLibTimerSource *src = nullptr;
216  src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
217  src->_t = nullptr;
218  return src;
219 }
220 
222 {
223  g_source_destroy( &src->source );
224  g_source_unref( &src->source );
225 }
226 
230 static gboolean eventLoopIdleFunc ( gpointer user_data )
231 {
232  auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
233  if ( dPtr ) {
234  if( dPtr->runIdleTasks() ) {
235  return G_SOURCE_CONTINUE;
236  }
237 
238  g_source_unref ( dPtr->_idleSource );
239  dPtr->_idleSource = nullptr;
240  }
241  return G_SOURCE_REMOVE;
242 }
243 
245 {
246  source = g_child_watch_source_new( pid );
247 }
248 
250  : tag( other.tag )
251  , source( other.source )
252  , callback( std::move( other.callback ) )
253 {
254  other.source = nullptr;
255 }
256 
258 {
259  if ( source ) {
260  g_source_destroy( source );
261  g_source_unref( source );
262  }
263 }
264 
266 {
267  tag = other.tag;
268  source = other.source;
269  callback = std::move( other.callback );
270  other.source = nullptr;
271  return *this;
272 }
273 
275 {
276  _myThreadId = std::this_thread::get_id();
277 
278  //if we get a context specified ( usually when created for main thread ) we use it
279  //otherwise we create our own
280  if ( ctx ) {
281  _ctx = ctx;
282  g_main_context_ref ( _ctx );
283  } else {
284  _ctx = g_main_context_new();
285  }
286  // Enable this again once we switch to a full async API that requires a eventloop before calling any zypp functions
287  // g_main_context_push_thread_default( _ctx );
288 }
289 
291 {
292  std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
294  });
295  std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
297  });
298  _runningTimers.clear();
299  _eventSources.clear();
300 
301  if ( _idleSource ) {
302  g_source_destroy( _idleSource );
303  g_source_unref ( _idleSource );
304  }
305 
306  //g_main_context_pop_thread_default( _ctx );
307  g_main_context_unref( _ctx );
308 }
309 
311 {
312  //run all user defined idle functions
313  //if they return true, they are executed again in the next idle run
314  decltype ( _idleFuncs ) runQueue;
315  runQueue.swap( _idleFuncs );
316 
317  while ( runQueue.size() ) {
318  EventDispatcher::IdleFunction fun( std::move( runQueue.front() ) );
319  runQueue.pop();
320  if ( fun() )
321  _idleFuncs.push( std::move(fun) );
322  }
323 
324  //keep this as the last thing to call after all user code was executed
325  if ( _unrefLater.size() )
326  _unrefLater.clear();
327 
328  return _idleFuncs.size() || _unrefLater.size();
329 }
330 
332 {
333  if ( !_idleSource ) {
334  _idleSource = g_idle_source_new ();
335  g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
336  g_source_attach ( _idleSource, _ctx );
337  }
338 }
339 
340 std::shared_ptr<EventDispatcher> EventDispatcherPrivate::create()
341 {
342  return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
343 }
344 
345 void EventDispatcherPrivate::waitPidCallback( GPid pid, gint status, gpointer user_data )
346 {
347  EventDispatcherPrivate *that = reinterpret_cast<EventDispatcherPrivate *>( user_data );
348 
349  try {
350  auto data = std::move( that->_waitPIDs.at(pid) );
351  that->_waitPIDs.erase( pid );
352 
353  if ( data.callback )
354  data.callback( pid, status );
355 
356  g_spawn_close_pid( pid );
357 
358  // no need to take care of releasing the GSource, the event loop took care of that
359 
360  } catch ( const std::out_of_range &e ) {
361  return;
362  }
363 }
364 
366 
368  : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx), *this ) )
369 {
370 }
371 
373 {
374 }
375 
376 void EventDispatcher::updateEventSource( AbstractEventSource &notifier, int fd, int mode )
377 {
378  Z_D();
379  if ( notifier.eventDispatcher().lock().get() != this )
380  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
381 
382  AbstractEventSource *notifyPtr = &notifier;
383 
384  GAbstractEventSource *evSrc = nullptr;
385  auto &evSrcList = d->_eventSources;
386  auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ]( const auto elem ){ return elem->eventSource == notifyPtr; } );
387  if ( itToEvSrc == evSrcList.end() ) {
388 
389  evSrc = GAbstractEventSource::create( d );
390  evSrc->eventSource = notifyPtr;
391  evSrcList.push_back( evSrc );
392 
393  g_source_attach( &evSrc->source, d->_ctx );
394 
395  } else
396  evSrc = (*itToEvSrc);
397 
398  int cond = evModeToMask( mode );
399  auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
400  return currPollFd.pollfd == fd;
401  });
402 
403  if ( it != evSrc->pollfds.end() ) {
404  //found
405  it->reqEvents = static_cast<GIOCondition>( cond );
406  g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
407  } else {
408  evSrc->pollfds.push_back(
409  GUnixPollFD {
410  static_cast<GIOCondition>(cond),
411  fd,
412  g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
413  }
414  );
415  }
416 }
417 
419 {
420  Z_D();
421 
422  AbstractEventSource *ptr = &notifier;
423 
424  if ( notifier.eventDispatcher().lock().get() != this )
425  ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
426 
427  auto &evList = d->_eventSources;
428  auto it = std::find_if( evList.begin(), evList.end(), [ ptr ]( const auto elem ){ return elem->eventSource == ptr; } );
429 
430  if ( it == evList.end() )
431  return;
432 
433  auto &fdList = (*it)->pollfds;
434 
435  if ( fd == -1 ) {
436  //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
437  //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
438  //for the fd's
439  for ( auto &pFD : fdList ) {
440  if ( pFD.tag )
441  g_source_remove_unix_fd( &(*it)->source, pFD.tag );
442  pFD.pollfd = -1;
443  pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
444  }
445  } else {
446  auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
447  if ( fdIt != fdList.end() ) {
448  if ( fdIt->tag )
449  g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
450  //also do not remove here, mark as orphaned only to not break iterating in dispatch()
451  fdIt->tag = nullptr;
452  fdIt->pollfd = -1;
453  }
454  }
455 }
456 
458 {
459  Z_D();
460  //make sure timer is not double registered
461  for ( const GLibTimerSource *t : d->_runningTimers ) {
462  if ( t->_t == &timer )
463  return;
464  }
465 
467  newSrc->_t = &timer;
468  d->_runningTimers.push_back( newSrc );
469 
470  g_source_attach( &newSrc->source, d->_ctx );
471 }
472 
474 {
475  Z_D();
476  auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ]( const GLibTimerSource *src ){
477  return src->_t == &timer;
478  });
479 
480  if ( it != d->_runningTimers.end() ) {
481  GLibTimerSource *src = *it;
482  d->_runningTimers.erase( it );
484  }
485 }
486 
488 {
489  return d_func()->_ctx;
490 }
491 
492 bool EventDispatcher::waitForFdEvent( const int fd, int events , int &revents , int &timeout )
493 {
494  GPollFD pollFd;
495  pollFd.fd = fd;
496  pollFd.events = evModeToMask(events);
497 
498  bool eventTriggered = false;
499  zypp::AutoDispose<GTimer *> timer( g_timer_new(), &g_timer_destroy );
500  while ( !eventTriggered ) {
501  g_timer_start( *timer );
502  const int res = g_poll( &pollFd, 1, timeout );
503  switch ( res ) {
504  case 0: //timeout
505  timeout = 0;
506  return false;
507  case -1: { // interrupt
508  // if timeout is -1 we wait until eternity
509  if ( timeout == -1 )
510  continue;
511 
512  timeout -= g_timer_elapsed( *timer, nullptr );
513  if ( timeout < 0 ) timeout = 0;
514  if ( timeout <= 0 )
515  return false;
516 
517  if ( errno == EINTR )
518  continue;
519 
520  ERR << "g_poll error: " << strerror(errno) << std::endl;
521  return false;
522  }
523  case 1:
524  eventTriggered = true;
525  break;
526  }
527  }
528 
529  revents = gioConditionToEventTypes( (GIOCondition)pollFd.revents, evModeToMask(events) );
530  return true;
531 }
532 
533 void EventDispatcher::trackChildProcess( int pid, std::function<void (int, int)> callback )
534 {
535  Z_D();
536  GlibWaitPIDData data ( pid );
537  data.callback = std::move(callback);
538 
539  g_source_set_callback ( data.source, (GSourceFunc) &EventDispatcherPrivate::waitPidCallback , d_ptr.get(), nullptr );
540  data.tag = g_source_attach ( data.source, d->_ctx );
541  d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
542 }
543 
545 {
546  Z_D();
547  try {
548  d->_waitPIDs.erase( pid );
549  } catch ( const std::out_of_range &e ) {
550  return false;
551  }
552  return true;
553 }
554 
556 {
557  Z_D();
558  // lazy init
559  UnixSignalSourceRef r;
560  if ( d->_signalSource.expired ()) {
561  d->_signalSource = r = UnixSignalSource::create();
562  } else {
563  r = d->_signalSource.lock ();
564  }
565  return r;
566 }
567 
569 {
570  return g_main_context_iteration( d_func()->_ctx, false );
571 }
572 
574 {
575  auto d = instance()->d_func();
576  d->_idleFuncs.push( std::move(callback) );
577  d->enableIdleSource();
578 }
579 
580 void EventDispatcher::unrefLaterImpl( std::shared_ptr<void> &&ptr )
581 {
582  Z_D();
583  d->_unrefLater.push_back( std::move(ptr) );
584  d->enableIdleSource();
585 }
586 
588 {
589  d_func()->_unrefLater.clear();
590 }
591 
593 {
594  return d_func()->_runningTimers.size();
595 }
596 
597 std::shared_ptr<EventDispatcher> EventDispatcher::instance()
598 {
599  return ThreadData::current().dispatcher();
600 }
601 
602 void EventDispatcher::setThreadDispatcher(const std::shared_ptr<EventDispatcher> &disp)
603 {
605 }
606 
607 }
DlContextRefType _ctx
Definition: rpmmd.cc:66
virtual void removeTimer(Timer &timer)
std::vector< std::shared_ptr< void > > _unrefLater
std::vector< GAbstractEventSource * > _eventSources
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition: Exception.h:429
std::function< bool()> IdleFunction
static UnixSignalSourceRef create()
static void destruct(GAbstractEventSource *src)
virtual void onFdReady(int fd, int events)=0
static int writeMask()
static std::shared_ptr< EventDispatcher > create()
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
static gboolean check(GSource *source)
#define ERR
Definition: Logger.h:100
static GLibTimerSource * create()
#define Z_D()
Definition: zyppglobal.h:104
struct _GPollFD GPollFD
Definition: ZYppImpl.h:26
EventDispatcher::WaitPidCallback callback
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user...
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
std::string strerror(int errno_r)
Return string describing the error_r code.
Definition: String.cc:54
static void destruct(GLibTimerSource *src)
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
#define nullptr
Definition: Easy.h:55
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
static GSourceFuncs glibTimerSourceFuncs
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource &notifier, int fd, int mode)
std::vector< GLibTimerSource * > _runningTimers
static std::shared_ptr< EventDispatcher > instance()
static GSourceFuncs abstractEventSourceFuncs
EventDispatcherPrivate * _ev
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
virtual void registerTimer(Timer &timer)
static gboolean prepare(GSource *, gint *timeout)
static ThreadData & current()
Definition: threaddata.cc:16
std::vector< GUnixPollFD > pollfds
virtual void removeEventSource(AbstractEventSource &notifier, int fd=-1)
Base class for Exception.
Definition: Exception.h:146
std::queue< EventDispatcher::IdleFunction > _idleFuncs
static int excpMask()
static int evModeToMask(int mode)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition: AutoDispose.h:94
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static int readMask()
static gboolean prepare(GSource *src, gint *timeout)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
std::weak_ptr< EventDispatcher > eventDispatcher() const
std::unique_ptr< BasePrivate > d_ptr
Definition: base.h:174
std::shared_ptr< EventDispatcher > dispatcher()
Definition: threaddata_p.h:24
void invokeOnIdleImpl(IdleFunction &&callback)
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
Definition: threaddata.cc:41
UnixSignalSourceRef unixSignalSource()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
std::shared_ptr< T > shared_this() const
Definition: base.h:113
static gboolean check(GSource *source)
std::unordered_map< int, GlibWaitPIDData > _waitPIDs