Main Page   Modules   Class Hierarchy   Compound List   File List   Compound Members   File Members  

outchannels.cpp

00001 /* MuSE - Multiple Streaming Engine
00002  * Copyright (C) 2000-2003 Denis Rojo aka jaromil <jaromil@dyne.org>
00003  *
00004  * This source code is free software; you can redistribute it and/or
00005  * modify it under the terms of the GNU Public License as published 
00006  * by the Free Software Foundation; either version 2 of the License,
00007  * or (at your option) any later version.
00008  *
00009  * This source code is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00012  * Please refer to the GNU Public License for more details.
00013  *
00014  * You should have received a copy of the GNU Public License along with
00015  * this source code; if not, write to:
00016  * Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00017  *
00018  * "$Id: outchannels.cpp,v 1.8 2004/12/15 18:18:06 jaromil Exp $"
00019  *
00020  */
00021 
00022 #include <iostream>
00023 #include <math.h>
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <string.h>
00027 #include <sys/socket.h>
00028 #include <netinet/in.h>
00029 #include <arpa/inet.h>
00030 #include <netdb.h>
00031 #include <sys/types.h>
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <unistd.h>
00035 #include <errno.h>
00036 
00037 #include <generic.h>
00038 #include <outchannels.h>
00039 #include <jutils.h>
00040 #include <config.h>
00041 
00042 #ifdef HAVE_LAME
00043 #include <out_lame.h>
00044 #endif
00045 
00046 #ifdef HAVE_VORBIS
00047 #include <out_vorbis.h>
00048 #endif
00049 
00050 
00051 /* I think a tighter bound could be:  (mt, March 2000)
00052  * MPEG1:
00053  *    num_samples*(bitrate/8)/samplerate + 4*1152*(bitrate/8)/samplerate + 512
00054  * MPEG2:
00055  *    num_samples*(bitrate/8)/samplerate + 4*576*(bitrate/8)/samplerate + 256
00056  */
00057 
00058 /* mp3 encoder */
00059 
00060 
00061 OutChannel::OutChannel(char *myname)
00062   : Entry() {
00063   func("OutChannel::OutChannel(%s) %p",myname,this);
00064 
00065   quit = false;
00066   initialized = false;
00067   running = false;
00068   sprintf(name,"%s",myname);
00069   sprintf(version, "  ");
00070   encoded = 0;
00071   fd = NULL;
00072   _thread_initialized = false;
00073   _thread_init();
00074 
00075   idseed = 0;
00076 
00077   erbapipa = new Pipe(OUT_PIPESIZE);
00078   erbapipa->set_block(true,true);
00079   erbapipa->set_block_timeout(500,500);
00080 
00081   /* setup defaults */
00082   quality(4.0);
00083   bps(24);
00084   freq(22050);
00085   channels(1);
00086   lowpass(0);
00087   highpass(0);
00088     
00089   //  profile_changed = true;
00090 
00091 }
00092 
00093 OutChannel::~OutChannel() {
00094   func("OutChannel::~OutChannel");
00095 
00096   
00097   quit = true;
00098   unlock();// signal();  
00099 
00100   initialized = false;
00101 
00102   jsleep(0,50);
00103 
00104   Shouter *ice = (Shouter*)icelist.begin();
00105   lock_ice();
00106   while(ice) {
00107     icelist.rem(1);
00108     delete ice;
00109     ice = (Shouter*)icelist.begin();
00110   }
00111   unlock_ice();
00112 
00113   delete erbapipa;
00114 
00115   if(fd) dump_stop();
00116 
00117 
00118   /* QUAAAA */
00119   //_thread_destroy();
00120 }
00121 
00122 void OutChannel::start() {
00123   Shouter *ice = (Shouter*) icelist.begin();
00124   lock_ice();
00125   while(ice) {
00126     if( ice->apply_profile() )
00127       ice->start();
00128     ice = (Shouter*) ice->next;
00129   }
00130   unlock_ice();
00131   pthread_create(&_thread, &_attr, &kickoff, this);
00132 }
00133 
00134 void OutChannel::run() {
00135   int res;
00136   /*
00137   if(!initialized) {
00138     warning("OutChannel::run() : output channel uninitialized, thread won't start");
00139     return;
00140   }
00141   */
00142 
00143   running = true;
00144   while(!quit) {
00145 
00146     /* check if we must encode */
00147     encoding = false;
00148     if(fd) encoding = true;
00149     Shouter *ice = (Shouter*)icelist.begin();
00150     while(ice) {
00151       if(ice->running) encoding = true;
00152       ice = (Shouter*)ice->next;
00153     }
00154     if(!initialized) encoding = false;
00155     
00156     if(!encoding) {
00157       jsleep(0,50);
00158       shout(); /* in case there are waiting retries */
00159       if(quit) break;
00160       continue;
00161     } else jsleep(0,5); /* avoid a tight loop */
00162 
00163     /* erbapipa sucking is now done in instantiated classes
00164        inside the encode() method
00165        (see vorbis and lame classes)
00166     */
00167 
00168     encoded = 0;
00169 
00170     lock();
00171     encode();
00172     unlock();
00173 
00174     if(encoded<1) continue;
00175     
00176 
00177     calc_bitrate(encoded);
00178     /* stream it to the net */
00179     res = shout();
00180 
00181     /* save it on the harddisk */
00182     res = dump();
00183 
00184     /* TODO: flush when erbapipa->read != OUT_CHUNK */
00185     
00186   }
00187 
00188   running = false;
00189 }
00190 
00191 int OutChannel::create_ice() {
00192   Shouter *ice = new Shouter();
00193 
00194   if(!ice) {
00195     error("can't create icecast shouter");
00196     return -1;
00197   }
00198 
00199   /* the icecast id is the position in the linklist */
00200   lock_ice();
00201   icelist.add(ice);
00202   idseed++;
00203   ice->id = id + idseed; 
00204 
00205 
00206   switch(tipo) {
00207 
00208   case MP3:
00209     ice->format = SHOUT_FORMAT_MP3;
00210     ice->login(SHOUT_PROTOCOL_HTTP);
00211     break;
00212 
00213   case OGG:
00214     ice->format = SHOUT_FORMAT_VORBIS;
00215     ice->login(SHOUT_PROTOCOL_HTTP);
00216     break;
00217 
00218   default:
00219     error("codec is not streamable");
00220     delete ice;
00221     return -1;
00222   }
00223 
00224   ice->apply_profile();
00225 
00226   unlock_ice();
00227   func("outchannel id %i creates new shouter %p with id %i",id,ice,ice->id);
00228   return ice->id;
00229 }
00230 
00231 bool OutChannel::delete_ice(int iceid) {
00232 
00233   Shouter *ice = get_ice(iceid);
00234   if(!ice) {
00235     warning("OutChannel::delete_ice(%i) : invalid id",iceid);
00236     return false;
00237   }
00238 
00239   if(ice) {
00240     lock_ice();
00241     ice->rem();
00242     if(ice->running) ice->stop();
00243     delete ice;
00244     unlock_ice();
00245   }
00246 
00247   func("outchannel id %i deleted shouter id %i",id,iceid);
00248   return true;
00249 }
00250 
00251 Shouter *OutChannel::get_ice(int iceid) {
00252   return (Shouter*)icelist.pick_id(iceid);
00253 }
00254 
00255 bool OutChannel::apply_ice(int iceid) {
00256   bool res = false;
00257   
00258   Shouter *ice = get_ice(iceid);
00259 
00260   if(ice) {
00261     lock_ice();
00262     res = ice->apply_profile();
00263     unlock_ice();
00264   }
00265   return res;
00266 }
00267 
00268 bool OutChannel::connect_ice(int iceid, bool on) {
00269   bool res = false;
00270   func("OutChannel::connect_ice(%i,%i)",iceid,on);
00271   Shouter *ice = get_ice(iceid);
00272   if(!ice) { 
00273     error("Outchannel::connect_ice : can't find shouter with id %i",iceid);
00274     return false;
00275   }
00276 
00277   lock_ice();
00278   res = (on) ? ice->start() : ice->stop();
00279   unlock_ice();
00280   
00281   return res;
00282 }
00283 
00284 int OutChannel::shout() {
00285   int res, sentout = 0;
00286   time_t now = time(NULL);
00287   lock_ice();
00288   Shouter *ice = (Shouter*)icelist.begin();
00289   while(ice) {
00290     if(ice->running) {
00291       res = ice->send(buffer,encoded);
00292       if(res<0) { /* there is an error: -1=temporary , -2=fatal */
00293         if(res==-1) { ice = (Shouter*)ice->next; continue; }
00294         if(res==-2) {
00295           error("fatal error on stream to %s:%u",ice->host(),ice->port());
00296           ice->stop();
00297           notice("retrying to connect to %s:%u%s after %i seconds",
00298                  ice->host(), ice->port(), ice->mount(), RETRY_DELAY);
00299           ice->retry = now;
00300           //      ice = (Shouter*)ice->next;
00301           //      continue;
00302         }
00303       } else sentout += res;
00304     } else if(ice->retry>0) {
00305       if((now - ice->retry) > RETRY_DELAY) {
00306         notice("try to reconnect to %s:%u%s",
00307                ice->host(), ice->port(), ice->mount());
00308         if( ice->start() ) ice->retry = 0;
00309         else ice->retry = now;
00310       }
00311     }
00312     ice = (Shouter*)ice->next;
00313   }
00314   unlock_ice();
00315   return sentout;
00316 }
00317 
00318 bool OutChannel::dump_start(char *file) {
00319   struct stat st;
00320   char temp[MAX_PATH_SIZE];
00321   int num = 0;
00322   
00323   if(fd) {
00324     warning("%s channel allready dumping to %s",name,fd_name);
00325     return false;
00326   }
00327 
00328   /* avoid to overwrite existent files */
00329   snprintf(temp,MAX_PATH_SIZE,"%s",file);
00330   while(stat(temp, &st) != -1) {
00331     /* file EXIST */
00332     num++;
00333     snprintf(temp,MAX_PATH_SIZE,"%s.%i",file,num);
00334   }
00335 
00336   fd = fopen(temp,"wb"); /* writeonly nonblocking binary (-rw-rw-r--) */
00337   if(!fd) {
00338     error("%s channel can't open %s for writing",name,temp);
00339     act("%s",strerror(errno));
00340     return(false);
00341   }
00342 
00343   strncpy(fd_name,temp,MAX_PATH_SIZE);
00344   notice("%s channel dumping to file %s",name,fd_name);
00345 
00346   return true;
00347 }
00348 
00349 bool OutChannel::dump_stop() {
00350   func("OutChanne::dump_stop()");
00351   if(!fd) {
00352     warning("%s channel is not dumping to any file",name);
00353     return false;
00354   }
00355 
00356   fflush(fd);
00357   
00358   act("%s channel stops dumping to %s",name,fd_name);
00359   
00360   fclose(fd);
00361   fd = NULL;
00362 
00363   return true;
00364 }
00365 
00366 bool OutChannel::dump() {
00367   int res;
00368   if(!fd) return false;
00369   func("OutChannel::dump() encoded %i",encoded);
00370   fflush(fd);
00371   if(!encoded) return true;
00372   res = fwrite(buffer,1,encoded,fd);
00373   if(res != encoded)
00374     warning("skipped %u bytes dumping to file %s",encoded - res,fd_name);
00375   return true;
00376 }
00377 /*
00378 void OutChannel::bps(int in) {
00379   _bps = in;
00380   Shouter *ice = (Shouter*)icelist.begin();
00381   while(ice) {
00382     ice->_bps = in;
00383     ice = (Shouter*)ice->next;
00384   }
00385 }
00386 */
00387 bool OutChannel::calc_bitrate(int enc) {
00388   /* calcolates bitrate */
00389   bytes_accu += enc;
00390   now = dtime();
00391   if((now-prev)>1) { /* if one second passed */
00392     bitrate = (bytes_accu<<2);
00393     bytes_accu = 0;
00394     prev = now;
00395     return true;
00396   }
00397   return false;
00398 }
00399 
00400 char *OutChannel::quality(float in) {
00401   int q = (int)fabs(in);
00402   _quality = in;
00403 
00404   if(!in) {
00405     snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00406     return quality_desc;
00407   }
00408 
00409   //  if(channels()<1) channels(1);
00410   //  if(channels()>2) channels(2);
00411 
00412   switch(q) {
00413 
00414 #define BPSVAL(b,f) \
00415 bps(b); freq(f); \
00416 snprintf(quality_desc,256,"%uKbit/s %uHz",bps(),freq());
00417 //if(bps()==0) bps(b); if(freq()==0) freq(f);
00418   
00419   case 0: BPSVAL(8,11025); break;
00420   case 1: BPSVAL(16,16000); break;
00421   case 2: BPSVAL(16,22050); break;
00422   case 3: BPSVAL(24,16000); break;
00423   case 4: BPSVAL(24,22050); break;
00424   case 5: BPSVAL(48,22050); break;
00425   case 6: BPSVAL(56,22050); break;
00426   case 7: BPSVAL(64,44100); break;
00427   case 8: BPSVAL(96,44100); break;
00428   case 9: BPSVAL(128,44100); break;
00429 
00430   }
00431   return quality_desc;
00432 }
00433 
00434 void OutChannel::push(void *data, int len) {
00435   int errors = 0;
00436   /* check out if encoders are configured */
00437   if(!encoding) return;
00438   if(!initialized) return;
00439 
00440   /* push in data
00441      wait if pipe is full or occupied
00442      returns the right thing */
00443   //  func("PID %i wants to push %i bytes in %s",getpid(),len,name);
00444   //  func("pipe has %i free space",erbapipa->space());
00445   while( running && 
00446          erbapipa->write(len,data) < 0 ) {
00447     //    func("PID %i waits to push %i bytes in %s",getpid(),len,name);
00448     jsleep(0,30);
00449     errors++;
00450     if(errors>20) {
00451       warning("%s encoder is stuck, pipe is full",name);
00452       return;
00453     }
00454   }
00455   //  func("ok, %i bytes pushed succesfully in %s",len,name);
00456 }
00457 
00458 /* thread stuff */
00459 
00460 void OutChannel::_thread_init() {
00461   if(_thread_initialized) return;
00462 
00463   func("OutChannel::thread_init()");
00464   if(pthread_mutex_init (&_mutex,NULL) == -1)
00465     error("error initializing POSIX thread mutex");
00466   if(pthread_mutex_init (&_mutex_ice,NULL) == -1)
00467     error("error initializing POSIX thread mutex");
00468   if(pthread_cond_init (&_cond, NULL) == -1)
00469     error("error initializing POSIX thread condition"); 
00470   if(pthread_attr_init (&_attr) == -1)
00471     error("error initializing POSIX thread attribute");
00472   
00473   /* set the thread as detached
00474      see: man pthread_attr_init(3) */
00475   pthread_attr_setdetachstate(&_attr,PTHREAD_CREATE_DETACHED);
00476 
00477   _thread_initialized = true;
00478 }
00479 
00480 void OutChannel::_thread_destroy() {
00481   if(!_thread_initialized) return;
00482   
00483   /* we signal and then we check the thread
00484      exited by locking the conditional */
00485   if(running) {
00486     signal();
00487     lock(); unlock();
00488   }
00489 
00490   if(pthread_mutex_destroy(&_mutex) == -1)
00491     error("error destroying POSIX thread mutex");
00492   if(pthread_cond_destroy(&_cond) == -1)
00493     error("error destroying POSIX thread condition");
00494   if(pthread_attr_destroy(&_attr) == -1)
00495     error("error destroying POSIX thread attribute");
00496   _thread_initialized = false;
00497 }

Generated on Thu Dec 16 12:28:21 2004 for MuSE by doxygen1.3