00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "OStreamReceiver.hh"
00021
00022
00023
00024
00025
00026 OStreamReceiver::OStreamReceiver( const char* dest_rtsp_url,
00027 on_packet_video_received_cb* fv,
00028 const void *video_callback_data,
00029 on_packet_audio_received_cb* fa,
00030 const void *audio_callback_data,
00031 const char* receiving_iface ,
00032 unsigned int sink_buffer_size ) :
00033 f_on_packet_video_received_cb( fv ),
00034 _callback_video_data_client( video_callback_data ),
00035 f_on_packet_audio_received_cb( fa ),
00036 _callback_audio_data_client( audio_callback_data ),
00037 _sink_buffer_size( sink_buffer_size )
00038 {
00039
00040 _scheduler = BasicTaskScheduler::createNew();
00041 _env = BasicUsageEnvironment::createNew( *_scheduler );
00042
00043 _receiving_iface = strdup( receiving_iface );
00044
00045
00046 _end_time = 0;
00047 _error_status = ERROR_NORMAL_STATUS;
00048 _client_protocol_name = "RTSP";
00049 _watch_variable = 0;
00050
00051 _media_session = NULL;
00052 _arrival_check_timer_task = NULL;
00053
00054 if ( createRTSPClient() == false ) {
00055 shutdown();
00056 _error_status = ERROR_FAILED_CREATE_RTSP_CLIENT;
00057 return;
00058 }
00059
00060 if ( getSDPDescriptionFromURL( dest_rtsp_url ) < 0 ) {
00061 shutdown();
00062 _error_status = ERROR_FAILED_SDP_DESCRIPTION;
00063 return;
00064 }
00065
00066 if ( createMediaSession() == false ) {
00067 shutdown();
00068 _error_status = ERROR_FAILED_CREATE_MEDIASESSION;
00069 return;
00070 }
00071 }
00072
00073
00074
00075
00076
00077 OStreamReceiver::~OStreamReceiver() {
00078
00079 if ( _sdp_description != NULL )
00080 delete[] _sdp_description;
00081
00082 if ( _receiving_iface != NULL )
00083 free( _receiving_iface );
00084
00085
00086 _env->reclaim();
00087 delete _scheduler;
00088 }
00089
00090
00091
00092
00093
00094
00095 int OStreamReceiver::isReadyToStream()
00096 {
00097 return _error_status;
00098 }
00099
00100
00101
00102
00103
00104
00105 bool OStreamReceiver::clientStartPlayingSession( )
00106 {
00107 return _rtsp_client->playMediaSession( *_media_session ) == 1 ? true : false;
00108 }
00109
00110
00111
00112
00113
00114
00115
00116
00117 bool OStreamReceiver::clientSetupSubsession( void* mediaSubSess )
00118 {
00119 MediaSubsession* media_subsession = (MediaSubsession*)mediaSubSess;
00120 return _rtsp_client->setupMediaSubsession(*media_subsession,
00121 false, false) == 1 ? true : false;
00122 }
00123
00124
00125
00126
00127
00128
00129 void OStreamReceiver::closeMediaSinks() {
00130
00131 if ( _media_session == NULL )
00132 return;
00133
00134 MediaSubsessionIterator iter( *_media_session );
00135 MediaSubsession* _media_subsession;
00136
00137
00138 while ( ( _media_subsession = iter.next() ) != NULL ) {
00139 Medium::close( _media_subsession->sink );
00140 _media_subsession->sink = NULL;
00141 }
00142 }
00143
00144
00145
00146
00147
00148 void OStreamReceiver::shutdown( )
00149 {
00150 _env->taskScheduler().unscheduleDelayedTask( _arrival_check_timer_task );
00151 _watch_variable = 1;
00152 }
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 int OStreamReceiver::startReceiving() {
00170
00171
00172
00173 if ( _error_status != ERROR_NORMAL_STATUS )
00174 return _error_status;
00175
00176
00177 MediaSubsessionIterator iter(*_media_session);
00178 MediaSubsession *media_subsession;
00179 bool made_progress = false;
00180
00181 while ( (media_subsession = iter.next()) != NULL) {
00182 if ( !media_subsession->initiate() ) {
00183 *_env << "Unable to create receiver for \"" << media_subsession->mediumName()
00184 << "/" << media_subsession->codecName()
00185 << "\" subsession: " << _env->getResultMsg() << "\n";
00186 _error_status = ERROR_FAILED_TO_CREATE_RECEIVER;
00187 } else {
00188 *_env << "Created receiver for \"" << media_subsession->mediumName()
00189 << "/" << media_subsession->codecName()
00190 << "\" subsession (client ports " << media_subsession->clientPortNum()
00191 << "-" << media_subsession->clientPortNum() + 1 << ")\n";
00192 made_progress = true;
00193
00194 if ( media_subsession->rtpSource() != NULL ) {
00195
00196 unsigned const thresh = 500000;
00197 media_subsession->rtpSource()->setPacketReorderingThresholdTime( thresh );
00198 }
00199 }
00200 }
00201
00202
00203 if ( !made_progress ) {
00204 shutdown();
00205 return _error_status;
00206 }
00207
00208
00209 setupStreams();
00210
00211
00212 made_progress = False;
00213
00214
00215 iter.reset();
00216 int stream_count = 0;
00217 while ((media_subsession = iter.next()) != NULL) {
00218
00219
00220 if ( media_subsession->readSource() == NULL )
00221 continue;
00222
00223 OStreamBufferedSink* buffered_sink;
00224
00225
00226 buffered_sink = OStreamBufferedSink::createNew(*_env,
00227 f_on_packet_video_received_cb, _callback_video_data_client,
00228 f_on_packet_audio_received_cb, _callback_audio_data_client,
00229 media_subsession->codecName(), stream_count,
00230 _receiving_iface,
00231 _sink_buffer_size );
00232
00233 media_subsession->sink = buffered_sink;
00234
00235 if ( media_subsession->sink == NULL ) {
00236 _error_status = ERROR_FAILED_FILE_SINK_CREATION;
00237 } else {
00238
00239
00240 stream_count++;
00241
00242
00243 media_subsession->sink->startPlaying( *(media_subsession->readSource()),
00244 subsessionAfterPlaying,
00245 media_subsession );
00246
00247
00248
00249 if (media_subsession->rtcpInstance() != NULL) {
00250 media_subsession->rtcpInstance()->setByeHandler( subsessionByeHandler,
00251 media_subsession);
00252 }
00253
00254 made_progress = true;
00255 }
00256 }
00257
00258 if ( !made_progress ) {
00259 shutdown();
00260 return _error_status;
00261 }
00262
00263
00264 startPlayingStreams( this );
00265
00266
00267
00268 _env->taskScheduler().doEventLoop( &_watch_variable );
00269
00270
00271 closeMediaSinks();
00272
00273
00274 if ( ( media_subsession != NULL) && ( _rtsp_client != NULL ) )
00275 _rtsp_client->teardownMediaSession( *_media_session );
00276
00277 Medium::close( _media_session );
00278
00279
00280 Medium::close( _rtsp_client );
00281
00282 return _error_status;
00283 }
00284
00285
00286
00287
00288
00289 void OStreamReceiver::subsessionByeHandler(void* mediaSubSess) {
00290
00291 subsessionAfterPlaying( (MediaSubsession*)mediaSubSess );
00292 }
00293
00294
00295
00296
00297
00298 void OStreamReceiver::stopReceiving()
00299 {
00300 shutdown();
00301 }
00302
00303
00304
00305
00306
00307 void OStreamReceiver::checkForPacketArrival( void* ostreamer_receiver_ptr )
00308 {
00309 OStreamReceiver *klass = (OStreamReceiver *)ostreamer_receiver_ptr;
00310
00311
00312 unsigned numSubsessionsChecked = 0;
00313 unsigned numSubsessionsWithReceivedData = 0;
00314 unsigned numSubsessionsThatHaveBeenSynced = 0;
00315
00316 MediaSubsessionIterator iter( *(klass)->_media_session);
00317 MediaSubsession* media_subsession;
00318
00319
00320
00321
00322 while (( media_subsession = iter.next()) != NULL &&
00323 klass->_error_status == ERROR_NORMAL_STATUS ) {
00324
00325 RTPSource* src = media_subsession->rtpSource();
00326
00327 if (src == NULL)
00328 continue;
00329
00330 ++numSubsessionsChecked;
00331
00332 if ( src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0 ) {
00333
00334 ++numSubsessionsWithReceivedData;
00335 }
00336 if ( src->hasBeenSynchronizedUsingRTCP() ) {
00337 ++numSubsessionsThatHaveBeenSynced;
00338 }
00339 }
00340
00341 unsigned numSubsessionsToCheck = numSubsessionsChecked;
00342
00343
00344 int uSecsToDelay = 100000;
00345 klass->_arrival_check_timer_task =
00346 klass->_env->taskScheduler().scheduleDelayedTask( uSecsToDelay,
00347 (TaskFunc*)OStreamReceiver::checkForPacketArrival,
00348 klass );
00349 }
00350
00351
00352
00353
00354
00355
00356 void OStreamReceiver::startPlayingStreams( void* ostreamer_receiver_ptr )
00357 {
00358 OStreamReceiver *klass = (OStreamReceiver *)ostreamer_receiver_ptr;
00359
00360 if ( !klass->clientStartPlayingSession() ) {
00361 *klass->_env << "Failed to start playing session: " << klass->_env->getResultMsg() << "\n";
00362 klass->shutdown();
00363 return;
00364 } else {
00365 *klass->_env << "Started playing session\n";
00366 }
00367
00368
00369
00370
00371 if ( klass->_end_time == 0 )
00372 klass->_end_time = klass->_media_session->playEndTime();
00373
00374
00375 klass->checkForPacketArrival( klass );
00376 }
00377
00378
00379
00380
00381
00382
00383
00384 void OStreamReceiver::subsessionAfterPlaying( void* media_sub_sess ) {
00385
00386
00387 MediaSubsession* media_subsession = (MediaSubsession*)media_sub_sess;
00388 Medium::close( media_subsession->sink );
00389 media_subsession->sink = NULL;
00390
00391
00392 MediaSession& media_session = media_subsession->parentSession();
00393 MediaSubsessionIterator iter(media_session);
00394 while ((media_subsession = iter.next()) != NULL) {
00395 if (media_subsession->sink != NULL)
00396 return;
00397 }
00398 }
00399
00400
00401
00402
00403 void OStreamReceiver::setupStreams()
00404 {
00405 MediaSubsessionIterator iter(*_media_session);
00406 MediaSubsession *media_subsession;
00407 bool made_progress = false;
00408
00409 while ( (media_subsession = iter.next()) != NULL ) {
00410 if ( media_subsession->clientPortNum() == 0 )
00411 continue;
00412
00413 if ( !clientSetupSubsession( media_subsession ) ) {
00414 _error_status = ERROR_FAILED_SETUP_STREAMS;
00415 *_env << "Failed to setup \"" << media_subsession->mediumName()
00416 << "/" << media_subsession->codecName()
00417 << "\" subsession: " << _env->getResultMsg() << "\n";
00418 } else {
00419 *_env << "Setup \"" << media_subsession->mediumName()
00420 << "/" << media_subsession->codecName()
00421 << "\" subsession (client ports " << media_subsession->clientPortNum()
00422 << "-" << media_subsession->clientPortNum() + 1 << ")\n";
00423 made_progress = true;
00424 }
00425 }
00426 if ( !made_progress ) {
00427 shutdown();
00428 return;
00429 }
00430 }
00431
00432
00433
00434
00435
00436 int OStreamReceiver::getSDPDescriptionFromURL( char const* url ) {
00437
00438 _sdp_description = _rtsp_client->describeURL( url );
00439 if ( _sdp_description == NULL ) {
00440 *_env << "Failed to get a SDP description from URL \"" << url
00441 << "\": " << *_env->getResultMsg() << "\n";
00442 return -1;
00443 }
00444
00445 *_env << "Opened URL \"" << url
00446 << "\", returning a SDP description:\n" << _sdp_description << "\n";
00447
00448 return _rtsp_client->describeStatus();
00449 }
00450
00451
00452
00453
00454
00455 bool OStreamReceiver::createRTSPClient() {
00456
00457 _rtsp_client = RTSPClient::createNew( *_env );
00458 if ( _rtsp_client == NULL ) {
00459 *_env << "Failed to create " <<
00460 " client: " << _env->getResultMsg() << "\n";
00461 return false;
00462 }
00463
00464 return true;
00465 }
00466
00467
00468
00469
00470
00471 bool OStreamReceiver::createMediaSession() {
00472
00473
00474 _media_session = MediaSession::createNew( *_env, _sdp_description );
00475
00476 if ( _media_session == NULL ) {
00477 *_env << "Failed to create a MediaSession object from the SDP description: " << _env->getResultMsg() << "\n";
00478 return false;
00479 }
00480
00481 return true;
00482 }
00483