omx_base_source.c

Go to the documentation of this file.
00001 
00032 #include <omxcore.h>
00033 #include <omx_base_source.h>
00034 
00035 OMX_ERRORTYPE omx_base_source_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName) {
00036 
00037   OMX_ERRORTYPE err = OMX_ErrorNone;    
00038   omx_base_source_PrivateType* omx_base_source_Private;
00039 
00040   if (openmaxStandComp->pComponentPrivate) {
00041     omx_base_source_Private = (omx_base_source_PrivateType*)openmaxStandComp->pComponentPrivate;
00042   } else {
00043     omx_base_source_Private = malloc(sizeof(omx_base_source_PrivateType));
00044     if (!omx_base_source_Private) {
00045       return OMX_ErrorInsufficientResources;
00046     }
00047   }
00048 
00049   // we could create our own port structures here
00050   // fixme maybe the base class could use a "port factory" function pointer?    
00051   err = omx_base_component_Constructor(openmaxStandComp, cComponentName);
00052 
00053   /* here we can override whatever defaults the base_component constructor set
00054   * e.g. we can override the function pointers in the private struct  */
00055   omx_base_source_Private = openmaxStandComp->pComponentPrivate;
00056   omx_base_source_Private->sPortTypesParam.nPorts = 1;
00057   omx_base_source_Private->sPortTypesParam.nStartPortNumber = 0;    
00058   omx_base_source_Private->BufferMgmtFunction = omx_base_source_BufferMgmtFunction;
00059 
00060   return err;
00061 }
00062 
00063 OMX_ERRORTYPE omx_base_source_Destructor(OMX_COMPONENTTYPE *openmaxStandComp) {
00064 
00065   return omx_base_component_Destructor(openmaxStandComp);
00066 }
00067 
00073 void* omx_base_source_BufferMgmtFunction (void* param) {
00074 
00075   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00076   omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00077   omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
00078   omx_base_PortType *pOutPort = (omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
00079   tsem_t* pOutputSem = pOutPort->pBufferSem;
00080   queue_t* pOutputQueue = pOutPort->pBufferQueue;
00081   OMX_BUFFERHEADERTYPE* pOutputBuffer = NULL;
00082   OMX_COMPONENTTYPE* target_component;
00083   OMX_BOOL isOutputBufferNeeded = OMX_TRUE;
00084   int outBufExchanged = 0;
00085 
00086   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
00087   while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||  
00088     omx_base_component_Private->state == OMX_StatePause || omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
00089 
00090     /*Wait till the ports are being flushed*/
00091     pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00092     while( PORT_IS_BEING_FLUSHED(pOutPort)) {
00093       pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00094 
00095       if(isOutputBufferNeeded == OMX_FALSE) {
00096         pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
00097         outBufExchanged--;
00098         pOutputBuffer = NULL;
00099         isOutputBufferNeeded = OMX_TRUE;
00100         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output buffer\n");
00101       }
00102       DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
00103       
00104       pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00105       pthread_cond_signal(&omx_base_source_Private->flush_all_condition);
00106       pthread_cond_wait(&omx_base_source_Private->flush_condition, &omx_base_source_Private->flush_mutex);
00107     }
00108     pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00109 
00110     /*No buffer to process. So wait here*/
00111     if((isOutputBufferNeeded==OMX_TRUE && pOutputSem->semval==0) && 
00112       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
00113       DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer \n");
00114       tsem_down(omx_base_source_Private->bMgmtSem);
00115     }
00116 
00117     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00118       DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00119       break;
00120     }
00121 
00122     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer semval=%d \n",pOutputSem->semval);
00123     if(pOutputSem->semval > 0 && isOutputBufferNeeded == OMX_TRUE ) {
00124       tsem_down(pOutputSem);
00125       if(pOutputQueue->nelem>0){
00126         outBufExchanged++;
00127         isOutputBufferNeeded = OMX_FALSE;
00128         pOutputBuffer = dequeue(pOutputQueue);
00129         if(pOutputBuffer == NULL){
00130           DEBUG(DEB_LEV_ERR, "In %s Had NULL output buffer!!\n",__func__);
00131           break;
00132         }
00133       }
00134     }
00135     
00136     if(isOutputBufferNeeded == OMX_FALSE) {
00137       if(pOutputBuffer->nFlags == OMX_BUFFERFLAG_EOS) {
00138         DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in output buffer\n");
00139         
00140         (*(omx_base_component_Private->callbacks->EventHandler))
00141           (openmaxStandComp,
00142           omx_base_component_Private->callbackData,
00143           OMX_EventBufferFlag, /* The command was completed */
00144           0, /* The commands was a OMX_CommandStateSet */
00145           pOutputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
00146           NULL);
00147         pOutputBuffer->nFlags = 0;
00148       }
00149       if(omx_base_source_Private->pMark!=NULL){
00150          omx_base_source_Private->pMark=NULL;
00151       }
00152       target_component = (OMX_COMPONENTTYPE*)pOutputBuffer->hMarkTargetComponent;
00153       if(target_component == (OMX_COMPONENTTYPE *)openmaxStandComp) {
00154         /*Clear the mark and generate an event*/
00155         (*(omx_base_component_Private->callbacks->EventHandler))
00156           (openmaxStandComp,
00157           omx_base_component_Private->callbackData,
00158           OMX_EventMark, /* The command was completed */
00159           1, /* The commands was a OMX_CommandStateSet */
00160           0, /* The state has been changed in message->messageParam2 */
00161           pOutputBuffer->pMarkData);
00162       } else if(pOutputBuffer->hMarkTargetComponent != NULL) {
00163         /*If this is not the target component then pass the mark*/
00164         DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Source!!\n");
00165       }
00166 
00167       if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer->nFilledLen == 0) {
00168         (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer);
00169       } else {
00170         /*It no buffer management call back then don't produce any output buffer*/
00171         pOutputBuffer->nFilledLen = 0;
00172       }
00173       if(omx_base_source_Private->state == OMX_StatePause && !PORT_IS_BEING_FLUSHED(pOutPort)) {
00174         /*Waiting at paused state*/
00175         tsem_wait(omx_base_source_Private->bStateSem);
00176       }
00177 
00178       /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
00179       if(pOutputBuffer->nFilledLen != 0 || pOutputBuffer->nFlags == OMX_BUFFERFLAG_EOS) {
00180         pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
00181         outBufExchanged--;
00182         pOutputBuffer = NULL;
00183         isOutputBufferNeeded = OMX_TRUE;
00184       }
00185 
00186     }
00187   }
00188   DEBUG(DEB_LEV_SIMPLE_SEQ, "Exiting Buffer Management Thread\n");
00189   return NULL;
00190 }
00191 
00198 void* omx_base_source_twoport_BufferMgmtFunction (void* param) {
00199   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00200   omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00201   omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
00202   omx_base_PortType *pOutPort[2];
00203   tsem_t* pOutputSem[2];
00204   queue_t* pOutputQueue[2];
00205   OMX_BUFFERHEADERTYPE* pOutputBuffer[2];
00206   OMX_COMPONENTTYPE* target_component;
00207   OMX_BOOL isOutputBufferNeeded[2];
00208   int i,outBufExchanged[2];
00209 
00210   pOutPort[0]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
00211   pOutPort[1]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1];
00212   pOutputSem[0] = pOutPort[0]->pBufferSem;
00213   pOutputSem[1] = pOutPort[1]->pBufferSem;
00214   pOutputQueue[0] = pOutPort[0]->pBufferQueue;
00215   pOutputQueue[1] = pOutPort[1]->pBufferQueue;
00216   pOutputBuffer[1]= pOutputBuffer[0]=NULL;
00217   isOutputBufferNeeded[0]=isOutputBufferNeeded[1]=OMX_TRUE;
00218   outBufExchanged[0]=outBufExchanged[1]=0;
00219 
00220   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
00221   while(omx_base_source_Private->state == OMX_StateIdle || omx_base_source_Private->state == OMX_StateExecuting ||  omx_base_source_Private->state == OMX_StatePause || 
00222     omx_base_source_Private->transientState == OMX_TransStateLoadedToIdle){
00223 
00224     /*Wait till the ports are being flushed*/
00225     pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00226     while( PORT_IS_BEING_FLUSHED(pOutPort[0]) || 
00227            PORT_IS_BEING_FLUSHED(pOutPort[1])) {
00228       pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00229       
00230       DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n", 
00231         __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
00232 
00233       if(isOutputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[1])) {
00234         pOutPort[1]->ReturnBufferFunction(pOutPort[1],pOutputBuffer[1]);
00235         outBufExchanged[1]--;
00236         pOutputBuffer[1]=NULL;
00237         isOutputBufferNeeded[1]=OMX_TRUE;
00238         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 1 buffer\n");
00239       }
00240 
00241       if(isOutputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[0])) {
00242         pOutPort[0]->ReturnBufferFunction(pOutPort[0],pOutputBuffer[0]);
00243         outBufExchanged[0]--;
00244         pOutputBuffer[0]=NULL;
00245         isOutputBufferNeeded[0]=OMX_TRUE;
00246         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 0 buffer\n");
00247       }
00248 
00249       DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n", 
00250         __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
00251   
00252       pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00253       pthread_cond_signal(&omx_base_source_Private->flush_all_condition);
00254       pthread_cond_wait(&omx_base_source_Private->flush_condition,&omx_base_source_Private->flush_mutex);
00255     }
00256     pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00257 
00258     /*No buffer to process. So wait here*/
00259     if((isOutputBufferNeeded[0]==OMX_TRUE && pOutputSem[0]->semval==0) && 
00260       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
00261       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00262       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 0\n");
00263       tsem_down(omx_base_source_Private->bMgmtSem);
00264       
00265     }
00266     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00267       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00268       break;
00269     }
00270     if((isOutputBufferNeeded[1]==OMX_TRUE && pOutputSem[1]->semval==0) && 
00271       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid) &&
00272        !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
00273       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00274       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 1\n");
00275       tsem_down(omx_base_source_Private->bMgmtSem);
00276       
00277     }
00278     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00279       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00280       break;
00281     }
00282  
00283     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer 0 semval=%d \n",pOutputSem[0]->semval);
00284     if(pOutputSem[0]->semval>0 && isOutputBufferNeeded[0]==OMX_TRUE ) {
00285       tsem_down(pOutputSem[0]);
00286       if(pOutputQueue[0]->nelem>0){
00287         outBufExchanged[0]++;
00288         isOutputBufferNeeded[0]=OMX_FALSE;
00289         pOutputBuffer[0] = dequeue(pOutputQueue[0]);
00290         if(pOutputBuffer[0] == NULL){
00291           DEBUG(DEB_LEV_ERR, "Had NULL output buffer!!\n");
00292           break;
00293         }
00294       }
00295     }
00296     /*When we have input buffer to process then get one output buffer*/
00297     if(pOutputSem[1]->semval>0 && isOutputBufferNeeded[1]==OMX_TRUE) {
00298       tsem_down(pOutputSem[1]);
00299       if(pOutputQueue[1]->nelem>0){
00300         outBufExchanged[1]++;
00301         isOutputBufferNeeded[1]=OMX_FALSE;
00302         pOutputBuffer[1] = dequeue(pOutputQueue[1]);
00303         if(pOutputBuffer[1] == NULL){
00304           DEBUG(DEB_LEV_ERR, "Had NULL output buffer!! op is=%d,iq=%d\n",pOutputSem[1]->semval,pOutputQueue[1]->nelem);
00305           break;
00306         }
00307       }
00308     }
00309 
00310     for(i=0;i<omx_base_source_Private->sPortTypesParam.nPorts;i++) {
00311       /*Process Output buffer of Port i */
00312       if(isOutputBufferNeeded[i]==OMX_FALSE) {
00313         if(omx_base_source_Private->pMark!=NULL){
00314           pOutputBuffer[i]->hMarkTargetComponent=omx_base_source_Private->pMark->hMarkTargetComponent;
00315           pOutputBuffer[i]->pMarkData=omx_base_source_Private->pMark->pMarkData;
00316           omx_base_source_Private->pMark=NULL;
00317         }
00318         target_component=(OMX_COMPONENTTYPE*)pOutputBuffer[i]->hMarkTargetComponent;
00319         if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00320           /*Clear the mark and generate an event*/
00321           (*(omx_base_source_Private->callbacks->EventHandler))
00322             (openmaxStandComp,
00323             omx_base_source_Private->callbackData,
00324             OMX_EventMark, /* The command was completed */
00325             1, /* The commands was a OMX_CommandStateSet */
00326             i, /* The state has been changed in message->messageParam2 */
00327             pOutputBuffer[i]->pMarkData);
00328         } else if(pOutputBuffer[i]->hMarkTargetComponent!=NULL){
00329           /*If this is not the target component then pass the mark*/
00330           pOutputBuffer[i]->pMarkData=NULL;
00331         }
00332         //pOutputBuffer[0]->nTimeStamp = pOutputBuffer[0]->nTimeStamp;
00333 
00334         if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer[i]->nFilledLen == 0) {
00335           //(*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer[0], pOutputBuffer[1]);
00336           (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer[i]);
00337         } else {
00338           /*It no buffer management call back then don't produce any output buffer*/
00339           pOutputBuffer[i]->nFilledLen = 0;
00340         }
00341       
00342         if(pOutputBuffer[i]->nFlags==OMX_BUFFERFLAG_EOS && pOutputBuffer[i]->nFilledLen==0) {
00343           DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pOutputBuffer[i]->nFilledLen);
00344           (*(omx_base_source_Private->callbacks->EventHandler))
00345             (openmaxStandComp,
00346             omx_base_source_Private->callbackData,
00347             OMX_EventBufferFlag, /* The command was completed */
00348             i, /* The commands was a OMX_CommandStateSet */
00349             pOutputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
00350             NULL);
00351         }
00352         if(omx_base_source_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
00353           /*Waiting at paused state*/
00354           tsem_wait(omx_base_component_Private->bStateSem);
00355         }
00356 
00357          /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
00358         if(pOutputBuffer[i]->nFilledLen!=0 || pOutputBuffer[i]->nFlags==OMX_BUFFERFLAG_EOS){
00359           pOutPort[i]->ReturnBufferFunction(pOutPort[i],pOutputBuffer[i]);
00360           outBufExchanged[i]--;
00361           pOutputBuffer[i]=NULL;
00362           isOutputBufferNeeded[i]=OMX_TRUE;
00363         }
00364       }
00365     }
00366   }
00367   DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00368   return NULL;
00369 }

Generated for OpenMAX Bellagio rel. 0.3.5-svn by  doxygen 1.5.1
SourceForge.net Logo