libzypp  17.34.1
provideworker.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "provideworker.h"
11 #include <zypp-core/base/DtorReset>
12 #include <zypp-core/AutoDispose.h>
13 #include <zypp-core/Url.h>
14 #include <zypp-core/Date.h>
15 #include <zypp-core/zyppng/pipelines/AsyncResult>
17 #include <zypp-core/fs/PathInfo.h>
18 #include <zypp-core/fs/TmpPath.h>
20 #include <zypp-core/zyppng/base/AutoDisconnect>
21 #include <zypp-core/zyppng/base/EventDispatcher>
22 #include <zypp-media/MediaConfig>
23 #include <ostream>
24 #include <fstream>
25 
27 
28 #undef ZYPP_BASE_LOGGER_LOGGROUP
29 #define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
30 
31 namespace zyppng::worker {
32 
33  using namespace zyppng::operators;
34 
35  RequestCancelException::RequestCancelException() : zypp::media::MediaException ("Request was cancelled")
36  { }
37 
38  ProvideWorker::ProvideWorker(std::string_view workerName) : _workerName(workerName)
39  {
40  // do not change the order of these calls, otherwise showing the threadname does not work
41  // enableLogForwardingMode will initialize the log which would override the current thread name
43  ThreadData::current().setName( workerName );
44 
45  // we use a singleshot timer that triggers message handling
47  _msgAvail->setSingleShot(true);
48 
49  // another timer to trigger a delayed shutdown
52  }, *this );
53  _delayedShutdown->setSingleShot(true);
54  }
55 
57  { }
58 
60  {
61  return _stream;
62  }
63 
64  expected<void> ProvideWorker::run( int recv, int send )
65  {
66  // reentry not supported
67  assert ( !_isRunning );
68 
70  _isRunning = true;
71 
72  initLog();
73 
74  zypp::OnScopeExit cleanup([&](){
75  _stream.reset();
76  _controlIO.reset();
77  _loop.reset();
78  });
79 
81  if ( !_controlIO->openFds( { recv }, send ) ) {
82  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to open control FDs")) );
83  }
84 
87 
89 
90  return executeHandshake () | and_then( [&]() {
91  AutoDisconnect disC[] = {
94  };
95  _loop->run();
96  if ( _fatalError )
98  return expected<void>::success();
99  });
100  }
101 
102  std::deque<ProvideWorkerItemRef> &ProvideWorker::requestQueue()
103  {
104  return _pendingProvides;
105  }
106 
108  return _provNotificationMode;
109  }
110 
113  }
114 
116  {
117  // by default we log to strErr, if user code wants to change that it can overload this function
119  }
120 
121  ProvideWorkerItemRef ProvideWorker::makeItem( ProvideMessage &&spec )
122  {
123  return std::make_shared<ProvideWorkerItem>( std::move(spec) );
124  }
125 
126  void ProvideWorker::provideStart(const uint32_t id, const zypp::Url &url, const zypp::filesystem::Pathname &localFile, const zypp::Pathname &stagingFile )
127  {
128  if ( !_stream->sendMessage( ProvideMessage::createProvideStarted ( id
129  , url
130  , localFile.empty () ? std::optional<std::string>() : localFile.asString ()
131  , stagingFile.empty () ? std::optional<std::string>() : stagingFile.asString ()
132  ) ) ) {
133  ERR << "Failed to send ProvideStart message" << std::endl;
134  }
135  }
136 
137  void ProvideWorker::provideSuccess(const uint32_t id, bool cacheHit, const zypp::filesystem::Pathname &localFile, const HeaderValueMap extra )
138  {
139  MIL_PRV << "Sending provideSuccess for id " << id << " file " << localFile << std::endl;
140  auto msg = ProvideMessage::createProvideFinished( id ,localFile.asString() ,cacheHit);
141  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
142  for ( const auto &val : i->second )
143  msg.addValue( i->first, val );
144  }
145  if ( !_stream->sendMessage( msg ) ) {
146  ERR << "Failed to send ProvideSuccess message" << std::endl;
147  }
148  }
149 
150  void ProvideWorker::provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra )
151  {
152  MIL_PRV << "Sending provideFailed for request " << id << " err: " << reason << std::endl;
153  auto msg = ProvideMessage::createErrorResponse ( id, code, reason, transient );
154  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
155  for ( const auto &val : i->second )
156  msg.addValue( i->first, val );
157  }
158  if ( !_stream->sendMessage( msg ) ) {
159  ERR << "Failed to send ProvideFailed message" << std::endl;
160  }
161  }
162 
163 
164  void ProvideWorker::provideFailed ( const uint32_t id, const uint code, const bool transient, const zypp::Exception &e )
165  {
167  if ( !e.historyEmpty() ) {
169  }
170  provideFailed( id
171  , code
172  , e.asUserString()
173  , transient
174  , extra );
175  }
176 
177 
178  void ProvideWorker::attachSuccess(const uint32_t id, const std::optional<std::string> &localMountPoint)
179  {
180  MIL_PRV << "Sending attachSuccess for request " << id << std::endl;
181  if ( !_stream->sendMessage( ProvideMessage::createAttachFinished ( id, localMountPoint ) ) ) {
182  ERR << "Failed to send AttachFinished message" << std::endl;
183  } else {
184  MIL << "Sent back attach success" << std::endl;
185  }
186  }
187 
188  void ProvideWorker::detachSuccess(const uint32_t id)
189  {
190  MIL_PRV << "Sending detachSuccess for request " << id << std::endl;
191  if ( !_stream->sendMessage( ProvideMessage::createDetachFinished ( id ) ) ) {
192  ERR << "Failed to send DetachFinished message" << std::endl;
193  }
194  }
195 
196  expected<ProvideMessage> ProvideWorker::sendAndWaitForResponse( const ProvideMessage &request , const std::vector<uint> &responseCodes )
197  {
198  // make sure immediateShutdown is not called while we are blocking here
199  zypp::DtorReset delayedReset( _inControllerRequest );
200  _inControllerRequest = true;
201 
202  if ( !_stream->sendMessage( request ) )
203  return expected<ProvideMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send message")) );
204 
205  // flush the io device, this will block until all bytes are written
206  _controlIO->flush();
207 
208  while ( !_fatalError ) {
209 
210  const auto &msg = _stream->nextMessageWait() | [&]( auto &&nextMessage ) {
211  if ( !nextMessage ) {
212  if ( _fatalError )
214  else
215  return expected<RpcMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to wait for response")) );
216  }
217  return expected<RpcMessage>::success( std::move(*nextMessage) );
218  } | and_then ( [&]( auto && m) {
219  return parseReceivedMessage(m);
220  } );
221 
222  if ( !msg ) {
223  ERR << "Failed to receive message" << std::endl;
224  return msg;
225  }
226 
227  if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
228  return msg;
229  }
230 
231  // remember other messages for later
232  MIL << "Remembering message for later: " << msg->code () << std::endl;
233  _pendingMessages.push_back(*msg);
234  _msgAvail->start(0);
235  }
237  }
238 
239  ProvideWorker::MediaChangeRes ProvideWorker::requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc )
240  {
242  | [&]( expected<ProvideMessage> &&m ) {
243  if ( !m ) {
244  MIL << "Failed to wait for message, aborting the request " << std::endl;
245  return ProvideWorker::MediaChangeRes::ABORT;
246  }
247  MIL << "Wait finished, with messages still pending: " << this->_pendingMessages.size() << " and provs still pending: " << this->_pendingProvides.size() << std::endl;
248  if ( m->code() == ProvideMessage::Code::MediaChanged )
249  return ProvideWorker::MediaChangeRes::SUCCESS;
250  else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
251  return ProvideWorker::MediaChangeRes::SKIP;
252  else
253  return ProvideWorker::MediaChangeRes::ABORT;
254  };
255  }
256 
257  expected<AuthInfo> ProvideWorker::requireAuthorization( const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername, const int64_t lastTimestamp, const std::map<std::string, std::string> &extraFields )
258  {
259  return sendAndWaitForResponse( ProvideMessage::createAuthDataRequest( id, url, lastTriedUsername, lastTimestamp, extraFields ), { ProvideMessage::Code::AuthInfo, ProvideMessage::Code::NoAuthData } )
260  | and_then( [&]( ProvideMessage &&m ) {
261  if ( m.code() == ProvideMessage::Code::AuthInfo ) {
262 
263  AuthInfo inf;
264  m.forEachVal( [&]( const std::string &name, const ProvideMessage::FieldVal &val ) {
265  if ( name == AuthInfoMsgFields::Username ) {
266  inf.username = val.asString();
267  } else if ( name == AuthInfoMsgFields::Password ) {
268  inf.password = val.asString();
269  } else if ( name == AuthInfoMsgFields::AuthTimestamp ) {
270  inf.last_auth_timestamp = val.asInt64();
271  } else {
272  if ( !val.isString() ) {
273  ERR << "Ignoring invalid extra value, " << name << " is not of type string" << std::endl;
274  }
275  inf.extraKeys[name] = val.asString();
276  }
277  return true;
278  });
279  return expected<AuthInfo>::success(inf);
280 
281  }
283  });
284  }
285 
287  {
288  return *_controlIO.get();
289  }
290 
292  {
293  const auto &helo = _stream->nextMessageWait();
294  if ( !helo ) {
295  ERR << "Could not receive a handshake message, aborting" << std::endl;
296  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to receive handshake message")) );;
297  }
298 
299  auto exp = _stream->parseMessage<zyppng::worker::Configuration>( *helo );
300  if ( !exp ) {
301  invalidMessageReceived( exp.error() );
302  return expected<void>::error(exp.error());
303  }
304 
305  return std::move(*exp) | [&]( auto &&conf ) {
306 
307  _workerConf = std::move(conf);
308 
309  auto &mediaConf = zypp::MediaConfig::instance();
310  for( const auto &[key,value] : _workerConf ) {
311  zypp::Url keyUrl( key );
312  if ( keyUrl.getScheme() == "zconfig" && keyUrl.getAuthority() == "main" ) {
313  mediaConf.setConfigValue( keyUrl.getAuthority(), zypp::Pathname(keyUrl.getPathName()).basename(), value );
314  }
315  }
316 
317  return initialize( _workerConf ) | and_then([&]( WorkerCaps &&caps ){
318 
319  caps.set_worker_name( _workerName.data() );
320 
321  caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
322  if ( !_stream->sendMessage ( caps ) ) {
323  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send capabilities")) );
324  }
325  return expected<void>::success ();
326  });
327  };
328  }
329 
331  {
332  if ( _fatalError )
333  return;
334 
335  while ( _pendingMessages.size () ) {
336  auto m = _pendingMessages.front ();
337  _pendingMessages.pop_front ();
339  }
340 
341  if ( !_fatalError && _pendingProvides.size() ) {
342  provide();
343  }
344 
345  // keep poking until there are no provides anymore
346  if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
347  _msgAvail->start(0);
348  }
349 
350  }
351 
353  {
354  if ( _inControllerRequest ) {
355  _delayedShutdown->start(0);
356  return;
357  }
358 
360  _loop->quit ();
361  }
362 
364  {
365  MIL << "Read FD closed, exiting." << std::endl;
367  }
368 
370  {
371  MIL << "Write FD closed, exiting." << std::endl;
373  }
374 
376  {
377  while ( auto message = _stream->nextMessage() ) {
378  if ( _fatalError )
379  break;
380  pushSingleMessage(*message);
381  }
382  }
383 
385  {
386  invalidMessageReceived( std::exception_ptr() );
387  }
388 
389  void ProvideWorker::invalidMessageReceived( std::exception_ptr p )
390  {
391  ERR << "Received a invalid message on the input stream, aborting" << std::endl;
392  if ( p )
393  _fatalError = p;
394  else
397  _loop->quit();
398  }
399 
401  {
402  const auto code = provide.code();
403  // we only accept requests here
405 
406  MIL_PRV << "Received request: " << code << std::endl;
407 
408  if ( code == ProvideMessage::Code::Cancel ) {
409  const auto &i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [ id = provide.requestId() ]( const auto &it ){ return it->_spec.requestId() == id; } );
410  if ( i != _pendingProvides.end() ) {
411  switch ( (*i)->_state ) {
413  _stream->sendMessage ( ProvideMessage::createErrorResponse ( provide.requestId (), ProvideMessage::Code::Cancelled, "Cancelled by user." ) );
414  _pendingProvides.erase(i);
415  break;
417  cancel(i);
418  break;
420  break;
421  }
422  MIL << "Received Cancel for unknown request: " << provide.requestId() << ", ignoring!" << std::endl;
423  }
424  return;
425  }
426 
428  return;
429  }
430  ERR << "Unsupported request with code: " << code << " received!" << std::endl;
431  }
432 
434  {
435  const auto &handle = [&]( const RpcMessage &message ){
436  const auto &msgTypeName = message.messagetypename();
437  if ( msgTypeName == ProvideMessage::staticTypeName() ) {
438  return parseReceivedMessage( message )
439  | and_then( [&]( ProvideMessage &&provide ){
440  _pendingMessages.push_back(provide);
441  _msgAvail->start(0);
442  return expected<void>::success();
443  });
444  }
445  return expected<void>::error( ZYPP_EXCPT_PTR( std::invalid_argument(zypp::str::Str()<<"Unknown message received: " << message.messagetypename())) );
446  };
447 
448  const auto &exp = handle( message );
449  if ( !exp ) {
450  try {
451  std::rethrow_exception ( exp.error () );
452  } catch ( const zypp::Exception &e ) {
453  ERR << "Catched exception during message handling: " << e << std::endl;
454  } catch ( const std::exception &e ) {
455  ERR << "Catched exception during message handling: " << e.what()<< std::endl;
456  } catch ( ... ) {
457  ERR << "Unknown Exception during message handling" << std::endl;
458  }
459  }
460  }
461 
463  {
464  auto exp = ProvideMessage::create(m);
465  if ( !exp )
466  invalidMessageReceived( exp.error() );
467  return exp;
468  }
469 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:537
void enableLogForwardingMode(bool enable=true)
Definition: LogControl.cc:903
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
#define MIL
Definition: Logger.h:98
const std::string & asString() const
ProvideNotificatioMode provNotificationMode() const
ProvideNotificatioMode _provNotificationMode
void pushSingleMessage(const RpcMessage &msg)
std::exception_ptr _fatalError
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
virtual void cancel(const std::deque< ProvideWorkerItemRef >::iterator &request)=0
static Ptr create(IODevice::Ptr iostr)
expected< ProvideMessage > parseReceivedMessage(const RpcMessage &m)
SignalProxy< void()> sigInvalidMessageReceived()
ValueMap::iterator endList()
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
RpcMessageStream::Ptr _stream
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:433
void addValue(const std::string &name, const FieldVal &value)
SignalProxy< void()> sigMessageReceived()
void provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
bool isString() const
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
#define ERR
Definition: Logger.h:100
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
constexpr std::string_view Password("password")
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
void invalidMessageReceived(std::exception_ptr p)
constexpr std::string_view Username("username")
int64_t asInt64() const
ProviderConfiguration _workerConf
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
void logToStdErr()
Log to std::err.
Definition: LogControl.cc:916
bool empty() const
Test for an empty path.
Definition: Pathname.h:116
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
virtual void immediateShutdown()
Definition: provideworker.h:86
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
Definition: String.h:211
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
ProvideWorker(std::string_view workerName)
const std::string & asString() const
String representation.
Definition: Pathname.h:93
Just inherits Exception to separate media exceptions.
std::string asUserString() const
Translated error message as string suitable for the user.
Definition: Exception.cc:101
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
bool historyEmpty() const
Whether the history list is empty.
Definition: Exception.h:263
AsyncDataSource & controlIO()
std::string historyAsString() const
The history as string.
Definition: Exception.cc:165
const std::string & messagetypename() const
expected< void > executeHandshake()
RpcMessageStream::Ptr messageStream() const
std::string getAuthority() const
Returns the encoded authority component of the URL.
Definition: Url.cc:545
ValueMap::iterator beginList()
static expected success(ConsParams &&...params)
Definition: expected.h:115
#define MIL_PRV
Definition: providedbg_p.h:35
std::deque< ProvideWorkerItemRef > & requestQueue()
constexpr std::string_view AuthTimestamp("auth_timestamp")
std::map< std::string, std::string > extraKeys
Definition: provideworker.h:39
static ThreadData & current()
Definition: threaddata.cc:16
void attachSuccess(const uint32_t id, const std::optional< std::string > &localMountPoint={})
Base class for Exception.
Definition: Exception.h:146
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition: base.h:163
void handleSingleMessage(const ProvideMessage &provide)
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
Definition: Url.cc:608
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
AsyncDataSource::Ptr _controlIO
void detachSuccess(const uint32_t id)
constexpr std::string_view History("history")
ResultType and_then(const expected< T, E > &exp, Function &&f)
Definition: expected.h:367
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
void setName(T &&name)
Definition: threaddata_p.h:17
std::deque< ProvideWorkerItemRef > _pendingProvides
Easy-to use interface to the ZYPP dependency resolver.
Definition: Application.cc:19
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
std::deque< ProvideMessage > _pendingMessages
bool provide(const Pathname &delta_r, const Pathname &new_r, const Progress &report_r)
Apply a binary delta to on-disk data to re-create a new rpm.
Url manipulation class.
Definition: Url.h:91
virtual expected< WorkerCaps > initialize(const Configuration &conf)=0
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static MediaConfig & instance()
Definition: mediaconfig.cc:46
std::shared_ptr< RpcMessageStream > Ptr
static ProvideMessage createAttachFinished(const uint32_t reqId, const std::optional< std::string > &localMountPoint={})