00001
00031 #include <omxcore.h>
00032
00033 #include <omx_base_sink.h>
00034
00035 OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp,OMX_STRING cComponentName) {
00036 OMX_ERRORTYPE err = OMX_ErrorNone;
00037 omx_base_sink_PrivateType* omx_base_sink_Private;
00038
00039 if (openmaxStandComp->pComponentPrivate) {
00040 omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate;
00041 } else {
00042 omx_base_sink_Private = malloc(sizeof(omx_base_sink_PrivateType));
00043 if (!omx_base_sink_Private) {
00044 return OMX_ErrorInsufficientResources;
00045 }
00046 }
00047
00048
00049
00050 err = omx_base_component_Constructor(openmaxStandComp,cComponentName);
00051
00052
00053
00054 omx_base_sink_Private = openmaxStandComp->pComponentPrivate;
00055
00056 omx_base_sink_Private->sPortTypesParam.nPorts = 1;
00057 omx_base_sink_Private->sPortTypesParam.nStartPortNumber = 0;
00058
00059 omx_base_sink_Private->BufferMgmtFunction = omx_base_sink_BufferMgmtFunction;
00060
00061 return err;
00062 }
00063
00064 OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
00065 {
00066 return omx_base_component_Destructor(openmaxStandComp);
00067 }
00068
00074 void* omx_base_sink_BufferMgmtFunction (void* param) {
00075 OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00076 omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00077 omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
00078 omx_base_PortType *pInPort=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
00079 tsem_t* pInputSem = pInPort->pBufferSem;
00080 queue_t* pInputQueue = pInPort->pBufferQueue;
00081 OMX_BUFFERHEADERTYPE* pInputBuffer=NULL;
00082 OMX_COMPONENTTYPE* target_component;
00083 OMX_BOOL isInputBufferNeeded=OMX_TRUE;
00084 int inBufExchanged=0;
00085
00086 DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
00087 while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting || omx_base_component_Private->state == OMX_StatePause ||
00088 omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
00089
00090
00091 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00092 while( PORT_IS_BEING_FLUSHED(pInPort)) {
00093 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00094
00095 if(isInputBufferNeeded==OMX_FALSE) {
00096 pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00097 inBufExchanged--;
00098 pInputBuffer=NULL;
00099 isInputBufferNeeded=OMX_TRUE;
00100 DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n");
00101 }
00102 DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
00103
00104 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
00105 pthread_cond_signal(&omx_base_sink_Private->flush_all_condition);
00106 pthread_cond_wait(&omx_base_sink_Private->flush_condition,&omx_base_sink_Private->flush_mutex);
00107 }
00108 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
00109
00110
00111 if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) &&
00112 (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
00113 DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n");
00114 tsem_down(omx_base_sink_Private->bMgmtSem);
00115 }
00116
00117 if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
00118 DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00119 break;
00120 }
00121
00122 DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer semval=%d \n",pInputSem->semval);
00123 if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) {
00124 tsem_down(pInputSem);
00125 if(pInputQueue->nelem>0){
00126 inBufExchanged++;
00127 isInputBufferNeeded=OMX_FALSE;
00128 pInputBuffer = dequeue(pInputQueue);
00129 if(pInputBuffer == NULL){
00130 DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n");
00131 break;
00132 }
00133 }
00134 }
00135
00136 if(isInputBufferNeeded==OMX_FALSE) {
00137 if(pInputBuffer->nFlags==OMX_BUFFERFLAG_EOS) {
00138 DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n");
00139
00140 (*(omx_base_component_Private->callbacks->EventHandler))
00141 (openmaxStandComp,
00142 omx_base_component_Private->callbackData,
00143 OMX_EventBufferFlag,
00144 0,
00145 pInputBuffer->nFlags,
00146 NULL);
00147 pInputBuffer->nFlags=0;
00148 }
00149 if(omx_base_sink_Private->pMark!=NULL){
00150 omx_base_sink_Private->pMark=NULL;
00151 }
00152 target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent;
00153 if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00154
00155 (*(omx_base_component_Private->callbacks->EventHandler))
00156 (openmaxStandComp,
00157 omx_base_component_Private->callbackData,
00158 OMX_EventMark,
00159 1,
00160 0,
00161 pInputBuffer->pMarkData);
00162 } else if(pInputBuffer->hMarkTargetComponent!=NULL){
00163
00164 DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n");
00165 }
00166 if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen != 0) {
00167 (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer);
00168 }
00169 else {
00170
00171 pInputBuffer->nFilledLen = 0;
00172 }
00173
00174
00175 if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) {
00176
00177 tsem_wait(omx_base_sink_Private->bStateSem);
00178 }
00179
00180
00181 if(pInputBuffer->nFilledLen==0) {
00182 pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
00183 inBufExchanged--;
00184 pInputBuffer=NULL;
00185 isInputBufferNeeded = OMX_TRUE;
00186 }
00187
00188 }
00189 }
00190 DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00191 return NULL;
00192 }