omx_base_sink.c

Go to the documentation of this file.
00001 
00031 #include <omxcore.h>
00032 
00033 #include <omx_base_sink.h>
00034 
00035 OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp,OMX_STRING cComponentName) {
00036   OMX_ERRORTYPE err = OMX_ErrorNone;    
00037   omx_base_sink_PrivateType* omx_base_sink_Private;
00038 
00039   if (openmaxStandComp->pComponentPrivate) {
00040     omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate;
00041   } else {
00042     omx_base_sink_Private = malloc(sizeof(omx_base_sink_PrivateType));
00043     if (!omx_base_sink_Private) {
00044       return OMX_ErrorInsufficientResources;
00045     }
00046   }
00047 
00048   // we could create our own port structures here
00049   // fixme maybe the base class could use a "port factory" function pointer?    
00050   err = omx_base_component_Constructor(openmaxStandComp,cComponentName);
00051 
00052   /* here we can override whatever defaults the base_component constructor set
00053   * e.g. we can override the function pointers in the private struct  */
00054   omx_base_sink_Private = openmaxStandComp->pComponentPrivate;
00055 
00056   omx_base_sink_Private->sPortTypesParam.nPorts = 1;
00057   omx_base_sink_Private->sPortTypesParam.nStartPortNumber = 0;  
00058 
00059   omx_base_sink_Private->BufferMgmtFunction = omx_base_sink_BufferMgmtFunction;
00060 
00061   return err;
00062 }
00063 
00064 OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
00065 {
00066   return omx_base_component_Destructor(openmaxStandComp);
00067 }
00068 
00074 void* omx_base_sink_BufferMgmtFunction (void* param) {
00075   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00076   omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00077   omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
00078   omx_base_PortType *pInPort=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
00079   tsem_t* pInputSem = pInPort->pBufferSem;
00080   queue_t* pInputQueue = pInPort->pBufferQueue;
00081   OMX_BUFFERHEADERTYPE* pInputBuffer=NULL;
00082   OMX_COMPONENTTYPE* target_component;
00083   OMX_BOOL isInputBufferNeeded=OMX_TRUE;
00084   int inBufExchanged=0;
00085 
00086   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
00087   while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||  omx_base_component_Private->state == OMX_StatePause || 
00088     omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
00089 
00090     /*Wait till the ports are being flushed*/
00091     pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00092     while( PORT_IS_BEING_FLUSHED(pInPort)) {
00093       pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00094 
00095       if(isInputBufferNeeded==OMX_FALSE) {
00096         pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00097         inBufExchanged--;
00098         pInputBuffer=NULL;
00099         isInputBufferNeeded=OMX_TRUE;
00100         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n");
00101       }
00102       DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
00103       
00104       pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00105       pthread_cond_signal(&omx_base_sink_Private->flush_all_condition);
00106       pthread_cond_wait(&omx_base_sink_Private->flush_condition,&omx_base_sink_Private->flush_mutex);
00107     }
00108     pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00109 
00110     /*No buffer to process. So wait here*/
00111     if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) && 
00112       (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
00113       DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n");
00114       tsem_down(omx_base_sink_Private->bMgmtSem);
00115     }
00116 
00117     if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
00118       DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00119       break;
00120     }
00121 
00122     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer semval=%d \n",pInputSem->semval);
00123     if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) {
00124       tsem_down(pInputSem);
00125       if(pInputQueue->nelem>0){
00126         inBufExchanged++;
00127         isInputBufferNeeded=OMX_FALSE;
00128         pInputBuffer = dequeue(pInputQueue);
00129         if(pInputBuffer == NULL){
00130           DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n");
00131           break;
00132         }
00133       }
00134     }
00135     
00136     if(isInputBufferNeeded==OMX_FALSE) {
00137       if(pInputBuffer->nFlags==OMX_BUFFERFLAG_EOS) {
00138         DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n");
00139         
00140         (*(omx_base_component_Private->callbacks->EventHandler))
00141           (openmaxStandComp,
00142           omx_base_component_Private->callbackData,
00143           OMX_EventBufferFlag, /* The command was completed */
00144           0, /* The commands was a OMX_CommandStateSet */
00145           pInputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
00146           NULL);
00147         pInputBuffer->nFlags=0;
00148       }
00149       if(omx_base_sink_Private->pMark!=NULL){
00150          omx_base_sink_Private->pMark=NULL;
00151       }
00152       target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent;
00153       if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00154         /*Clear the mark and generate an event*/
00155         (*(omx_base_component_Private->callbacks->EventHandler))
00156           (openmaxStandComp,
00157           omx_base_component_Private->callbackData,
00158           OMX_EventMark, /* The command was completed */
00159           1, /* The commands was a OMX_CommandStateSet */
00160           0, /* The state has been changed in message->messageParam2 */
00161           pInputBuffer->pMarkData);
00162       } else if(pInputBuffer->hMarkTargetComponent!=NULL){
00163         /*If this is not the target component then pass the mark*/
00164                 DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n");
00165       }
00166       if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen != 0) {
00167         (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer);
00168       }
00169       else {
00170         /*It no buffer management call back the explicitly consume input buffer*/
00171         pInputBuffer->nFilledLen = 0;
00172       }
00173       /*Input Buffer has been completely consumed. So, get new input buffer*/
00174 
00175       if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) {
00176         /*Waiting at paused state*/
00177         tsem_wait(omx_base_sink_Private->bStateSem);
00178       }
00179 
00180       /*Input Buffer has been completely consumed. So, return input buffer*/
00181       if(pInputBuffer->nFilledLen==0) {
00182         pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00183         inBufExchanged--;
00184         pInputBuffer=NULL;
00185         isInputBufferNeeded = OMX_TRUE;
00186       }
00187 
00188     }
00189   }
00190   DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00191   return NULL;
00192 }

Generated for OpenMAX Bellagio rel. 0.3.5-svn by  doxygen 1.5.1
SourceForge.net Logo