00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <iostream>
00023 #include <math.h>
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <string.h>
00027 #include <sys/socket.h>
00028 #include <netinet/in.h>
00029 #include <arpa/inet.h>
00030 #include <netdb.h>
00031 #include <sys/types.h>
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <unistd.h>
00035 #include <errno.h>
00036
00037 #include <generic.h>
00038 #include <outchannels.h>
00039 #include <jutils.h>
00040 #include <config.h>
00041
00042 #ifdef HAVE_LAME
00043 #include <out_lame.h>
00044 #endif
00045
00046 #ifdef HAVE_VORBIS
00047 #include <out_vorbis.h>
00048 #endif
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 OutChannel::OutChannel(char *myname)
00062 : Entry() {
00063 func("OutChannel::OutChannel(%s) %p",myname,this);
00064
00065 quit = false;
00066 initialized = false;
00067 running = false;
00068 sprintf(name,"%s",myname);
00069 sprintf(version, " ");
00070 encoded = 0;
00071 fd = NULL;
00072 _thread_initialized = false;
00073 _thread_init();
00074
00075 idseed = 0;
00076
00077 erbapipa = new Pipe(OUT_PIPESIZE);
00078 erbapipa->set_block(true,true);
00079 erbapipa->set_block_timeout(500,500);
00080
00081
00082 quality(4.0);
00083 bps(24);
00084 freq(22050);
00085 channels(1);
00086 lowpass(0);
00087 highpass(0);
00088
00089
00090
00091 }
00092
00093 OutChannel::~OutChannel() {
00094 func("OutChannel::~OutChannel");
00095
00096
00097 quit = true;
00098 unlock();
00099
00100 initialized = false;
00101
00102 jsleep(0,50);
00103
00104 Shouter *ice = (Shouter*)icelist.begin();
00105 lock_ice();
00106 while(ice) {
00107 icelist.rem(1);
00108 delete ice;
00109 ice = (Shouter*)icelist.begin();
00110 }
00111 unlock_ice();
00112
00113 delete erbapipa;
00114
00115 if(fd) dump_stop();
00116
00117
00118
00119
00120 }
00121
00122 void OutChannel::start() {
00123 Shouter *ice = (Shouter*) icelist.begin();
00124 lock_ice();
00125 while(ice) {
00126 if( ice->apply_profile() )
00127 ice->start();
00128 ice = (Shouter*) ice->next;
00129 }
00130 unlock_ice();
00131 pthread_create(&_thread, &_attr, &kickoff, this);
00132 }
00133
00134 void OutChannel::run() {
00135 int res;
00136
00137
00138
00139
00140
00141
00142
00143 running = true;
00144 while(!quit) {
00145
00146
00147 encoding = false;
00148 if(fd) encoding = true;
00149 Shouter *ice = (Shouter*)icelist.begin();
00150 while(ice) {
00151 if(ice->running) encoding = true;
00152 ice = (Shouter*)ice->next;
00153 }
00154 if(!initialized) encoding = false;
00155
00156 if(!encoding) {
00157 jsleep(0,50);
00158 shout();
00159 if(quit) break;
00160 continue;
00161 } else jsleep(0,5);
00162
00163
00164
00165
00166
00167
00168 encoded = 0;
00169
00170 lock();
00171 encode();
00172 unlock();
00173
00174 if(encoded<1) continue;
00175
00176
00177 calc_bitrate(encoded);
00178
00179 res = shout();
00180
00181
00182 res = dump();
00183
00184
00185
00186 }
00187
00188 running = false;
00189 }
00190
00191 int OutChannel::create_ice() {
00192 Shouter *ice = new Shouter();
00193
00194 if(!ice) {
00195 error("can't create icecast shouter");
00196 return -1;
00197 }
00198
00199
00200 lock_ice();
00201 icelist.add(ice);
00202 idseed++;
00203 ice->id = id + idseed;
00204
00205
00206 switch(tipo) {
00207
00208 case MP3:
00209 ice->format = SHOUT_FORMAT_MP3;
00210 ice->login(SHOUT_PROTOCOL_HTTP);
00211 break;
00212
00213 case OGG:
00214 ice->format = SHOUT_FORMAT_VORBIS;
00215 ice->login(SHOUT_PROTOCOL_HTTP);
00216 break;
00217
00218 default:
00219 error("codec is not streamable");
00220 delete ice;
00221 return -1;
00222 }
00223
00224 ice->apply_profile();
00225
00226 unlock_ice();
00227 func("outchannel id %i creates new shouter %p with id %i",id,ice,ice->id);
00228 return ice->id;
00229 }
00230
00231 bool OutChannel::delete_ice(int iceid) {
00232
00233 Shouter *ice = get_ice(iceid);
00234 if(!ice) {
00235 warning("OutChannel::delete_ice(%i) : invalid id",iceid);
00236 return false;
00237 }
00238
00239 if(ice) {
00240 lock_ice();
00241 ice->rem();
00242 if(ice->running) ice->stop();
00243 delete ice;
00244 unlock_ice();
00245 }
00246
00247 func("outchannel id %i deleted shouter id %i",id,iceid);
00248 return true;
00249 }
00250
00251 Shouter *OutChannel::get_ice(int iceid) {
00252 return (Shouter*)icelist.pick_id(iceid);
00253 }
00254
00255 bool OutChannel::apply_ice(int iceid) {
00256 bool res = false;
00257
00258 Shouter *ice = get_ice(iceid);
00259
00260 if(ice) {
00261 lock_ice();
00262 res = ice->apply_profile();
00263 unlock_ice();
00264 }
00265 return res;
00266 }
00267
00268 bool OutChannel::connect_ice(int iceid, bool on) {
00269 bool res = false;
00270 func("OutChannel::connect_ice(%i,%i)",iceid,on);
00271 Shouter *ice = get_ice(iceid);
00272 if(!ice) {
00273 error("Outchannel::connect_ice : can't find shouter with id %i",iceid);
00274 return false;
00275 }
00276
00277 lock_ice();
00278 res = (on) ? ice->start() : ice->stop();
00279 unlock_ice();
00280
00281 return res;
00282 }
00283
00284 int OutChannel::shout() {
00285 int res, sentout = 0;
00286 time_t now = time(NULL);
00287 lock_ice();
00288 Shouter *ice = (Shouter*)icelist.begin();
00289 while(ice) {
00290 if(ice->running) {
00291 res = ice->send(buffer,encoded);
00292 if(res<0) {
00293 if(res==-1) { ice = (Shouter*)ice->next; continue; }
00294 if(res==-2) {
00295 error("fatal error on stream to %s:%u",ice->host(),ice->port());
00296 ice->stop();
00297 notice("retrying to connect to %s:%u%s after %i seconds",
00298 ice->host(), ice->port(), ice->mount(), RETRY_DELAY);
00299 ice->retry = now;
00300
00301
00302 }
00303 } else sentout += res;
00304 } else if(ice->retry>0) {
00305 if((now - ice->retry) > RETRY_DELAY) {
00306 notice("try to reconnect to %s:%u%s",
00307 ice->host(), ice->port(), ice->mount());
00308 if( ice->start() ) ice->retry = 0;
00309 else ice->retry = now;
00310 }
00311 }
00312 ice = (Shouter*)ice->next;
00313 }
00314 unlock_ice();
00315 return sentout;
00316 }
00317
00318 bool OutChannel::dump_start(char *file) {
00319 struct stat st;
00320 char temp[MAX_PATH_SIZE];
00321 int num = 0;
00322
00323 if(fd) {
00324 warning("%s channel allready dumping to %s",name,fd_name);
00325 return false;
00326 }
00327
00328
00329 snprintf(temp,MAX_PATH_SIZE,"%s",file);
00330 while(stat(temp, &st) != -1) {
00331
00332 num++;
00333 snprintf(temp,MAX_PATH_SIZE,"%s.%i",file,num);
00334 }
00335
00336 fd = fopen(temp,"wb");
00337 if(!fd) {
00338 error("%s channel can't open %s for writing",name,temp);
00339 act("%s",strerror(errno));
00340 return(false);
00341 }
00342
00343 strncpy(fd_name,temp,MAX_PATH_SIZE);
00344 notice("%s channel dumping to file %s",name,fd_name);
00345
00346 return true;
00347 }
00348
00349 bool OutChannel::dump_stop() {
00350 func("OutChanne::dump_stop()");
00351 if(!fd) {
00352 warning("%s channel is not dumping to any file",name);
00353 return false;
00354 }
00355
00356 fflush(fd);
00357
00358 act("%s channel stops dumping to %s",name,fd_name);
00359
00360 fclose(fd);
00361 fd = NULL;
00362
00363 return true;
00364 }
00365
00366 bool OutChannel::dump() {
00367 int res;
00368 if(!fd) return false;
00369 func("OutChannel::dump() encoded %i",encoded);
00370 fflush(fd);
00371 if(!encoded) return true;
00372 res = fwrite(buffer,1,encoded,fd);
00373 if(res != encoded)
00374 warning("skipped %u bytes dumping to file %s",encoded - res,fd_name);
00375 return true;
00376 }
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 bool OutChannel::calc_bitrate(int enc) {
00388
00389 bytes_accu += enc;
00390 now = dtime();
00391 if((now-prev)>1) {
00392 bitrate = (bytes_accu<<2);
00393 bytes_accu = 0;
00394 prev = now;
00395 return true;
00396 }
00397 return false;
00398 }
00399
00400 char *OutChannel::quality(float in) {
00401 int q = (int)fabs(in);
00402 _quality = in;
00403
00404 if(!in) {
00405 snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00406 return quality_desc;
00407 }
00408
00409
00410
00411
00412 switch(q) {
00413
00414 #define BPSVAL(b,f) \
00415 bps(b); freq(f); \
00416 snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00417
00418
00419 case 0: BPSVAL(8,11025); break;
00420 case 1: BPSVAL(16,16000); break;
00421 case 2: BPSVAL(16,22050); break;
00422 case 3: BPSVAL(24,16000); break;
00423 case 4: BPSVAL(24,22050); break;
00424 case 5: BPSVAL(48,22050); break;
00425 case 6: BPSVAL(56,22050); break;
00426 case 7: BPSVAL(64,44100); break;
00427 case 8: BPSVAL(96,44100); break;
00428 case 9: BPSVAL(128,44100); break;
00429
00430 }
00431 return quality_desc;
00432 }
00433
00434 void OutChannel::push(void *data, int len) {
00435 int errors = 0;
00436
00437 if(!encoding) return;
00438 if(!initialized) return;
00439
00440
00441
00442
00443
00444
00445 while( running &&
00446 erbapipa->write(len,data) < 0 ) {
00447
00448 jsleep(0,30);
00449 errors++;
00450 if(errors>20) {
00451 warning("%s encoder is stuck, pipe is full",name);
00452 return;
00453 }
00454 }
00455
00456 }
00457
00458
00459
00460 void OutChannel::_thread_init() {
00461 if(_thread_initialized) return;
00462
00463 func("OutChannel::thread_init()");
00464 if(pthread_mutex_init (&_mutex,NULL) == -1)
00465 error("error initializing POSIX thread mutex");
00466 if(pthread_mutex_init (&_mutex_ice,NULL) == -1)
00467 error("error initializing POSIX thread mutex");
00468 if(pthread_cond_init (&_cond, NULL) == -1)
00469 error("error initializing POSIX thread condition");
00470 if(pthread_attr_init (&_attr) == -1)
00471 error("error initializing POSIX thread attribute");
00472
00473
00474
00475 pthread_attr_setdetachstate(&_attr,PTHREAD_CREATE_DETACHED);
00476
00477 _thread_initialized = true;
00478 }
00479
00480 void OutChannel::_thread_destroy() {
00481 if(!_thread_initialized) return;
00482
00483
00484
00485 if(running) {
00486 signal();
00487 lock(); unlock();
00488 }
00489
00490 if(pthread_mutex_destroy(&_mutex) == -1)
00491 error("error destroying POSIX thread mutex");
00492 if(pthread_cond_destroy(&_cond) == -1)
00493 error("error destroying POSIX thread condition");
00494 if(pthread_attr_destroy(&_attr) == -1)
00495 error("error destroying POSIX thread attribute");
00496 _thread_initialized = false;
00497 }