OStreamReceiver.cpp

Go to the documentation of this file.
00001 /*
00002  *  Copyright (C) Massimo Cora' 2006 <maxcvs@email.it>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU Library General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00017  */
00018 
00019 
00020 #include "OStreamReceiver.hh"
00021 
00022 
00023 //--------------------------------------------------------------------------
00024 // constructor
00025 
00026 OStreamReceiver::OStreamReceiver( const char* dest_rtsp_url, 
00027                                                                  on_packet_video_received_cb* fv, 
00028                                                                  const void *video_callback_data, 
00029                                                                  on_packet_audio_received_cb* fa, 
00030                                                                  const void *audio_callback_data, 
00031                                                                  const char* receiving_iface /* = 0.0.0.0 */,
00032                                                                  unsigned int sink_buffer_size /* = 200000 */ ) :
00033                                                           f_on_packet_video_received_cb( fv ),
00034                                                           _callback_video_data_client( video_callback_data ),
00035                                                           f_on_packet_audio_received_cb( fa ),
00036                                                           _callback_audio_data_client( audio_callback_data ),
00037                                                           _sink_buffer_size( sink_buffer_size )
00038 {
00039 
00040         _scheduler = BasicTaskScheduler::createNew();
00041         _env = BasicUsageEnvironment::createNew( *_scheduler );
00042 
00043         _receiving_iface = strdup( receiving_iface );
00044 
00045         // set up the endtime [0 means to play following the SDP time]
00046         _end_time = 0;
00047         _error_status = ERROR_NORMAL_STATUS;
00048         _client_protocol_name = "RTSP";
00049         _watch_variable = 0;
00050         
00051         _media_session = NULL;
00052         _arrival_check_timer_task = NULL;
00053 
00054         if ( createRTSPClient() == false ) {
00055                 shutdown();
00056                 _error_status = ERROR_FAILED_CREATE_RTSP_CLIENT;
00057                 return;
00058         }
00059 
00060         if ( getSDPDescriptionFromURL( dest_rtsp_url ) < 0 ) {
00061                 shutdown();
00062                 _error_status = ERROR_FAILED_SDP_DESCRIPTION;
00063                 return;
00064         }
00065 
00066         if ( createMediaSession() == false ) {
00067                 shutdown();
00068                 _error_status = ERROR_FAILED_CREATE_MEDIASESSION;
00069                 return;
00070         }
00071 }
00072 
00073 
00074 //--------------------------------------------------------------------------
00075 //
00076 
00077 OStreamReceiver::~OStreamReceiver() {
00078 
00079         if ( _sdp_description != NULL )
00080                 delete[] _sdp_description;
00081 
00082         if ( _receiving_iface != NULL )
00083                 free( _receiving_iface );
00084 
00085 
00086         _env->reclaim();
00087         delete _scheduler;
00088 }
00089 
00090 
00091 //--------------------------------------------------------------------------
00092 // simply return an error level identified by an int
00093 // return 0 on "ok, ready to stream"
00094 
00095 int OStreamReceiver::isReadyToStream() 
00096 {
00097         return _error_status;
00098 }
00099 
00100 
00101 
00102 //--------------------------------------------------------------------------
00103 // start playing the global media session
00104 
00105 bool OStreamReceiver::clientStartPlayingSession( ) 
00106 {
00107         return _rtsp_client->playMediaSession( *_media_session ) == 1 ? true : false;
00108 }
00109 
00110 
00111 
00112 
00113 
00114 //--------------------------------------------------------------------------
00115 // setup a single media subsession
00116 
00117 bool OStreamReceiver::clientSetupSubsession( void* mediaSubSess ) 
00118 {
00119         MediaSubsession* media_subsession = (MediaSubsession*)mediaSubSess;
00120         return _rtsp_client->setupMediaSubsession(*media_subsession, 
00121                                 false, false) == 1 ? true : false;
00122 }
00123 
00124 
00125 
00126 //--------------------------------------------------------------------------
00127 //
00128 
00129 void OStreamReceiver::closeMediaSinks() {
00130 
00131         if ( _media_session == NULL ) 
00132                 return;
00133 
00134         MediaSubsessionIterator iter( *_media_session );
00135         MediaSubsession* _media_subsession;
00136         
00137         // let's close its subsession
00138         while ( ( _media_subsession = iter.next() ) != NULL ) {
00139                 Medium::close( _media_subsession->sink );
00140                 _media_subsession->sink = NULL;
00141         }
00142 }
00143 
00144 
00145 //--------------------------------------------------------------------------
00146 // shutdown the tasks delayed and closes the media sinks
00147 
00148 void OStreamReceiver::shutdown( ) 
00149 {
00150         _env->taskScheduler().unscheduleDelayedTask( _arrival_check_timer_task );
00151         _watch_variable = 1;
00152 }
00153 
00154 
00155 //--------------------------------------------------------------------------
00156 //
00157 /*
00158 void OStreamReceiver::retrieve_wh_from_session_name( char* name, int* width, int* height )
00159 {
00160         // ok it's just a matter of string parsing
00161         _media_session->sessionName();
00162 }
00163 */
00164 
00165 //--------------------------------------------------------------------------
00166 // start receiving RTP packets from the rtsp://host.com/stream provided URL
00167 // return: error_status
00168 
00169 int OStreamReceiver::startReceiving() {
00170 
00171         // first of all check that no errors occurred in stream and class
00172         // initialization
00173         if ( _error_status != ERROR_NORMAL_STATUS )
00174                 return _error_status;
00175 
00176         // Then, setup the "RTPSource"s for the session:
00177         MediaSubsessionIterator iter(*_media_session);
00178         MediaSubsession *media_subsession;
00179         bool made_progress = false;
00180 
00181         while ( (media_subsession = iter.next()) != NULL) {
00182                 if ( !media_subsession->initiate() ) {
00183                         *_env << "Unable to create receiver for \"" << media_subsession->mediumName()
00184                                                         << "/" << media_subsession->codecName()
00185                                                         << "\" subsession: " << _env->getResultMsg() << "\n";
00186                         _error_status = ERROR_FAILED_TO_CREATE_RECEIVER;
00187                 } else {
00188                         *_env << "Created receiver for \"" << media_subsession->mediumName()
00189                                                         << "/" << media_subsession->codecName()
00190                                                         << "\" subsession (client ports " << media_subsession->clientPortNum()
00191                                                         << "-" << media_subsession->clientPortNum() + 1 << ")\n";
00192                         made_progress = true;
00193                         
00194                         if ( media_subsession->rtpSource() != NULL ) {
00195                                 // let some time for reordering the packets
00196                                 unsigned const thresh = 500000; // 0.5 seconds
00197                                 media_subsession->rtpSource()->setPacketReorderingThresholdTime( thresh );
00198                         }
00199                 }        
00200         }
00201         
00202         // are we able to continue?
00203         if ( !made_progress )  {
00204                 shutdown();
00205                 return _error_status;
00206         }
00207 
00208         // Perform additional 'setup' on each subsession, before playing them:
00209         setupStreams();
00210 
00211         // Create and start "OStreamBufferedSink"s for each subsession:
00212         made_progress = False;
00213         
00214 
00215         iter.reset();
00216         int stream_count = 0;
00217         while ((media_subsession = iter.next()) != NULL) {
00218                 
00219                 // was it initialized?
00220                 if ( media_subsession->readSource() == NULL )
00221                         continue; 
00222 
00223                 OStreamBufferedSink* buffered_sink;
00224 
00225                 // Normal case:
00226                 buffered_sink = OStreamBufferedSink::createNew(*_env, 
00227                                                         f_on_packet_video_received_cb, _callback_video_data_client,
00228                                                         f_on_packet_audio_received_cb, _callback_audio_data_client,
00229                                                         media_subsession->codecName(), stream_count,
00230                                                         _receiving_iface,
00231                                                         _sink_buffer_size );
00232 
00233                 media_subsession->sink = buffered_sink;
00234 
00235                 if ( media_subsession->sink == NULL ) {
00236                         _error_status = ERROR_FAILED_FILE_SINK_CREATION;
00237                 } else {
00238                         
00239                         // we have a new stream, increment them.
00240                         stream_count++;
00241 
00242                         // let's start it!
00243                         media_subsession->sink->startPlaying( *(media_subsession->readSource()),
00244                                                                                                         subsessionAfterPlaying,
00245                                                                                                         media_subsession );
00246 
00247                         // Also set a handler to be called if a RTCP "BYE" arrives
00248                         // for this subsession:
00249                         if (media_subsession->rtcpInstance() != NULL) {
00250                                 media_subsession->rtcpInstance()->setByeHandler( subsessionByeHandler,
00251                                                                       media_subsession);
00252                         }
00253 
00254                         made_progress = true;
00255                 }
00256         }
00257 
00258         if ( !made_progress ) {
00259                 shutdown();
00260                 return _error_status;
00261         }
00262 
00263         // Finally, start playing each subsession, to start the data flow:
00264         startPlayingStreams( this );
00265         
00266         // returns only after a setting to watchvariable = 1
00267         // it's a blocking call.
00268         _env->taskScheduler().doEventLoop( &_watch_variable ); 
00269 
00270         // Close our output sinks:
00271         closeMediaSinks();
00272 
00273         // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
00274         if ( ( media_subsession != NULL) && ( _rtsp_client != NULL ) )
00275                 _rtsp_client->teardownMediaSession( *_media_session );
00276 
00277         Medium::close( _media_session );
00278 
00279         // Finally, shut down our client:
00280         Medium::close( _rtsp_client );
00281 
00282         return _error_status;
00283 }
00284 
00285 
00286 //--------------------------------------------------------------------------
00287 //
00288 
00289 void OStreamReceiver::subsessionByeHandler(void* mediaSubSess) {
00290         // Act now as if the subsession had closed:
00291         subsessionAfterPlaying( (MediaSubsession*)mediaSubSess );
00292 }
00293 
00294 
00295 //--------------------------------------------------------------------------
00296 //
00297 
00298 void OStreamReceiver::stopReceiving() 
00299 {
00300         shutdown();
00301 }
00302 
00303 
00304 //--------------------------------------------------------------------------
00305 //
00306 
00307 void OStreamReceiver::checkForPacketArrival( void* ostreamer_receiver_ptr ) 
00308 {
00309         OStreamReceiver *klass = (OStreamReceiver *)ostreamer_receiver_ptr;
00310 
00311         // Check each mediaSubsession, to see whether it has received data packets:
00312         unsigned numSubsessionsChecked = 0;
00313         unsigned numSubsessionsWithReceivedData = 0;
00314         unsigned numSubsessionsThatHaveBeenSynced = 0;
00315 
00316         MediaSubsessionIterator iter( *(klass)->_media_session);
00317         MediaSubsession* media_subsession;
00318         
00319         // we will loop between all the subsession of the media session
00320         // i.e. in multi-stream mode we will have a subsession for each face
00321         // streamed
00322         while (( media_subsession = iter.next()) != NULL &&
00323                         klass->_error_status == ERROR_NORMAL_STATUS ) {
00324                                 
00325                 RTPSource* src = media_subsession->rtpSource();
00326                                 
00327                 if (src == NULL) 
00328                         continue;
00329                                                                 
00330                 ++numSubsessionsChecked;
00331 
00332                 if ( src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0 ) {
00333                         // At least one data packet has arrived
00334                         ++numSubsessionsWithReceivedData;
00335                 }
00336                 if ( src->hasBeenSynchronizedUsingRTCP() ) {
00337                         ++numSubsessionsThatHaveBeenSynced;
00338                 }
00339         }
00340 
00341         unsigned numSubsessionsToCheck = numSubsessionsChecked;
00342 
00343         // No luck, so reschedule this check again, after a delay:
00344         int uSecsToDelay = 100000; // 100 ms
00345         klass->_arrival_check_timer_task = 
00346                 klass->_env->taskScheduler().scheduleDelayedTask( uSecsToDelay,
00347                                                                                                                 (TaskFunc*)OStreamReceiver::checkForPacketArrival, 
00348                                                                                                                 klass );
00349 }
00350 
00351 
00352 
00353 //--------------------------------------------------------------------------
00354 //
00355 
00356 void OStreamReceiver::startPlayingStreams( void* ostreamer_receiver_ptr ) 
00357 {
00358         OStreamReceiver *klass = (OStreamReceiver *)ostreamer_receiver_ptr;
00359 
00360         if ( !klass->clientStartPlayingSession() ) {
00361                 *klass->_env << "Failed to start playing session: " << klass->_env->getResultMsg() << "\n";
00362                 klass->shutdown();
00363                 return;
00364         } else {
00365                 *klass->_env << "Started playing session\n";
00366         }
00367 
00368         // Figure out how long to delay (if at all) before shutting down, or
00369         // repeating the playing
00370 
00371         if ( klass->_end_time == 0 ) 
00372                 klass->_end_time = klass->_media_session->playEndTime(); // use SDP end time
00373 
00374         // Watch for incoming packets (if desired):
00375         klass->checkForPacketArrival( klass );
00376 }
00377 
00378 
00379 
00380 
00381 //--------------------------------------------------------------------------
00382 //
00383 
00384 void OStreamReceiver::subsessionAfterPlaying( void* media_sub_sess ) {
00385 
00386         // Begin by closing this media subsession:
00387         MediaSubsession* media_subsession = (MediaSubsession*)media_sub_sess;
00388         Medium::close( media_subsession->sink );
00389         media_subsession->sink = NULL;
00390 
00391         // Next, check whether *all* subsessions have now been closed:
00392         MediaSession& media_session = media_subsession->parentSession();
00393         MediaSubsessionIterator iter(media_session);
00394         while ((media_subsession = iter.next()) != NULL) {
00395                 if (media_subsession->sink != NULL) 
00396                         return; // this subsession is still active
00397         }
00398 }
00399 
00400 //--------------------------------------------------------------------------
00401 //
00402 
00403 void OStreamReceiver::setupStreams() 
00404 {
00405         MediaSubsessionIterator iter(*_media_session);
00406         MediaSubsession *media_subsession;
00407         bool made_progress = false;
00408 
00409         while ( (media_subsession = iter.next()) != NULL ) {
00410                 if ( media_subsession->clientPortNum() == 0 ) 
00411                         continue; // port # was not set
00412 
00413                 if ( !clientSetupSubsession( media_subsession ) ) {
00414                         _error_status = ERROR_FAILED_SETUP_STREAMS;
00415                         *_env << "Failed to setup \"" << media_subsession->mediumName()
00416                                                         << "/" << media_subsession->codecName()
00417                                                         << "\" subsession: " << _env->getResultMsg() << "\n";
00418                 } else {
00419                         *_env << "Setup \"" << media_subsession->mediumName()
00420                                 << "/" << media_subsession->codecName()
00421                                 << "\" subsession (client ports " << media_subsession->clientPortNum()
00422                                 << "-" << media_subsession->clientPortNum() + 1 << ")\n";
00423                         made_progress = true;
00424                 }
00425         }
00426         if ( !made_progress ) {
00427                 shutdown();
00428                 return;
00429         }
00430 }
00431 
00432 
00433 //--------------------------------------------------------------------------
00434 // on error returns -1, else the rtsp describe status
00435 
00436 int OStreamReceiver::getSDPDescriptionFromURL( char const* url ) {
00437 
00438     _sdp_description = _rtsp_client->describeURL( url );
00439         if ( _sdp_description == NULL ) {
00440                 *_env << "Failed to get a SDP description from URL \"" << url
00441                         << "\": " << *_env->getResultMsg() << "\n";
00442             return -1;
00443         }
00444 
00445         *_env << "Opened URL \"" << url
00446                 << "\", returning a SDP description:\n" << _sdp_description << "\n";
00447 
00448         return _rtsp_client->describeStatus();
00449 }
00450 
00451 
00452 //--------------------------------------------------------------------------
00453 //
00454 
00455 bool OStreamReceiver::createRTSPClient() {
00456 
00457         _rtsp_client = RTSPClient::createNew( *_env );
00458         if ( _rtsp_client == NULL ) {
00459                 *_env << "Failed to create " << 
00460                                 " client: " << _env->getResultMsg() << "\n";
00461                 return false;
00462         }
00463 
00464         return true;
00465 }
00466 
00467 
00468 //--------------------------------------------------------------------------
00469 // Create a media session object from this SDP description
00470 
00471 bool OStreamReceiver::createMediaSession() {
00472         
00473         // new media session object
00474         _media_session = MediaSession::createNew( *_env, _sdp_description );
00475 
00476         if ( _media_session == NULL ) {
00477                 *_env << "Failed to create a MediaSession object from the SDP description: " << _env->getResultMsg() << "\n";
00478                 return false;
00479         }
00480         
00481         return true;
00482 }
00483 

Generated on Tue Dec 26 10:32:38 2006 for Omnimeeting by  doxygen 1.4.7