OpenCV multithreading (Windows/.NET) forsinker flere sekunder fra videooptagelse

Indlæg af Hanne Mølgaard Plasc

Problem



Jeg har et multithreaded openCV-program, der bruger 4 tråde til at gøre følgende:


Tråd 1-> kalder cvQueryFrame(), som rammer rammebillederne fra kameraet en efter en og gemmer dem til en std::vector inputBuffer


Tråd 2-> udfører tærskelværdi på inputBuffer[0], eksemplarer resulterer i en anden std::vector kaldet filterOutputBuffer


Tråd 3-> udfører optisk strømningsalgoritme/tegner strømningsfelt for de to første elementer i filterOutputBuffer, kopier resulterer til en anden std::vector kaldet ofOutputBuffer.


Tråd 4-> viser billedet ved hjælp af cvShowImage(ofOutputBuffer[0])


Så i det væsentlige forestillede jeg hver tråd at udføre opgaven på det første element i den tilsvarende input vektor/buffer og lagre resultatet bag på den tilsvarende output vektor. Sort af som 3 fabriksarbejdere gør deres del på samlebåndet, så kaster slutresultatet i en spand til den næste fyr.


Jeg konfigurerer mutexes for alle buffere og programmet virker, kun udgangen forsinkes flere sekunder fra live-kamera streamen.


Jeg kørte en ikke-multithreaded version af det samme program (der brugte en kæmpe mens (ægte) loop), og det løb i realtid med kun lejlighedsvis stotter.


Hvorfor er min samtidige gennemførelse forsinket i præstation så meget?


Nedenfor er trådfunktionerne:


    void writeBuffer()
    {
        cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl;
        CvCapture *capture = 0;
        IplImage *frame = 0;
        DWORD waitResult;

        if (!(capture = cvCaptureFromCAM(0)))
            cout << "Cannot initialize camera!" << endl;

        //now start grabbing frames and storing into the vector inputBuffer
        while (true)
        {
            //cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl;
            waitResult = WaitForSingleObject(hMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT\_OBJECT\_0:
                    frame = cvQueryFrame(capture); //store the image into frame
                    if(!frame)
                    {
                        cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl;
                    }
                    //cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl;
                    inputBuffer.push\_back(*frame);
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl;
            }
            if(!ReleaseMutex(hMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl;
            }
            //else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl;
            //signal hDoneGettingFrame
            PulseEvent(hDoneGettingFrame);
        }
            cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }


    void opticalFlow()
    {
    ...
        DWORD waitResult;

        //start grabbing frames from the vector inputBuffer
        cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
        while(true)
        {
            waitResult = WaitForSingleObject(fMutex, INFINITE);
            switch(waitResult) 
            {
                // The thread got ownership of the mutex
                case WAIT\_OBJECT\_0: 
                    //grab first two frames from buffer (inputBuffer[0-1]) and process them
                    if(filterOutputBuffer.size() > 1)
                    {   
                        frame1 = filterOutputBuffer[0];
                        frame2 = filterOutputBuffer[1];
                        filterOutputBuffer.erase(filterOutputBuffer.begin());
                    }
                    else 
                    {
                        if(!ReleaseMutex(fMutex)) 
                            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;
                        //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                        continue;
                    }
                break; 
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                    continue;
            }
            if(!ReleaseMutex(fMutex)) 
            { 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            }
    ...
    //Do optical flow stuff
    ...
    waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult)
            {
                // The thread got ownership of the mutex
                case WAIT\_OBJECT\_0:
                    //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                    ofOutputBuffer.push\_back(*frame1\_3C);
                break;
                default:
                    cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
    }
        cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
    }

    void filterImage()
{
    DWORD waitResult;
...

    //start grabbing frames from the vector inputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl;
    while(true)
    {
        waitResult = WaitForSingleObject(hMutex, INFINITE);
        switch(waitResult) 
        {
            // The thread got ownership of the mutex
            case WAIT\_OBJECT\_0: 
                //grab first frame and then release mutex
                if(inputBuffer.size() > 0)
                {   
                    frame = inputBuffer[0];
                    inputBuffer.erase(inputBuffer.begin());
                }
                else 
                {
                    if(!ReleaseMutex(hMutex)) 
                        cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
                    //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl;
                    continue;
                }
            break; 
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl;
                continue;
        }
        if(!ReleaseMutex(hMutex)) 
        { 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
        }
...
//Tresholding Image Stuff
...
        //cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl;
        waitResult = WaitForSingleObject(fMutex, INFINITE);
        switch(waitResult)
        {
            // The thread got ownership of the mutex
            case WAIT\_OBJECT\_0:
                //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl;
                filterOutputBuffer.push\_back(*out);
            break;
            default:
                cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl;
        }
        if(!ReleaseMutex(fMutex)) 
            cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl;

    }
}

void displayImage()
{
    DWORD waitResult;
    IplImage final;
    int c;
    cvNamedWindow("Image", CV\_WINDOW\_AUTOSIZE);
    //start grabbing frames from the vector ouputBuffer
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl;
    while (true)
    {
            waitResult = WaitForSingleObject(oMutex, INFINITE);
            switch(waitResult) 
            {
                    // The thread got ownership of the mutex
                    case WAIT\_OBJECT\_0:
                        if(ofOutputBuffer.size() > 0)
                        {
                            //cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl;
                            final = ofOutputBuffer[0];
                            ofOutputBuffer.erase(ofOutputBuffer.begin());
                        }
                        else 
                        {
                            if(!ReleaseMutex(oMutex)) 
                                cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl;
                            //else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl;
                            continue;
                        }
                    break;
                    default:
                        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl;
                        continue;
            }
            if(!ReleaseMutex(oMutex)) 
                cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;
            //else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl;

            //cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl;
            cvShowImage("Image", &final);
            c = cvWaitKey(1);
    }
    cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl;
}


Her er hovedfunktionen:


void main()
{
    hMutex = CreateMutex(NULL, FALSE, NULL);
    oMutex = CreateMutex(NULL, FALSE, NULL);
    fMutex = CreateMutex(NULL, FALSE, NULL);

    hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);
    hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL);

    TName[0]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
    TName[1]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)filterImage, NULL, 0, &ThreadID);
    TName[2]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
    TName[3]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)displayImage, NULL, 0, &ThreadID);
    WaitForMultipleObjects(4, TName, TRUE, INFINITE);
    CloseHandle(TName);
}

Bedste reference


Multithreaded applikation deler CPU gange mellem tråde. Derfor eksisterer kontekstomskiftere, når en anden tråd ville være i runde tilstand. Muligvis vil skifte mellem tråde øge CPU-tiden, hvilket resulterer i, at applikationen bliver langsommere.

Andre referencer 1


Prøv at bruge threadPool det vil minimere den tid, der forbruges fra CPU'en for at rejse mellem tråde.

Andre referencer 2


Nå for at begynde med, hvis jeg ruller loop af din første trådfunktion omkring en smule:


if(!ReleaseMutex(hMutex)){} 
PulseEvent(hDoneGettingFrame);
waitResult = WaitForSingleObject(hMutex, INFINITE);


For at sætte det på en anden måde holder din første tråd på køens mutex for næsten hele løbet af den første trådsløjfe, så det forhindrer den anden tråd i at komme overalt. Jeg gætter på, at alle de andre tråde '-koden er den samme?


Når du skubber data runde producent-forbrugerkøer i en pipeline, er ideen, at du skal holde køen låst kun for den mindste tid. Gør din behandling på et bufferobjekt, lås derefter køen, tryk på objektreferencen, og luk derefter køen straks op. Derefter signalere en semafor (eller lignende), så den næste tråd kan behandle objektet, når det kan.


Lad ikke køen være låst! Mutex bør ikke være der for den anden tråd at vente på arbejde - det er at beskytte køen fra flere adgang. Du har brug for en anden signalering for at opretholde køetællingen og for trådene at vente på arbejde. Brug ikke en begivenhed til det, uanset hvor mange andre eksempler du har set på nettet - brug en semafor, hvis du skal rulle din egen producent-forbrugerkø.


Bedre - brug en P-C køklasse, der allerede virker - se på klasserne BlockingCollections.

Andre referencer 3


Semaforer gjorde tricket! I stedet for at bruge separate mutexes, har jeg lige lavet en semafor og lad alle trådene arbejde igennem det.


Tak, det løber hurtigt og glat nu!


void main()
{
    hSemaphore = CreateSemaphore( 
        NULL,           // default security attributes
        MAX\_THREADS,  // available count (when a thread enters, it decreases)
        MAX\_THREADS,  // maximum count
        NULL);          // unnamed semaphore

    if (hSemaphore == NULL) 
    {
        printf("CreateSemaphore error: \%d
", GetLastError());
        return;
    }


    TName[0]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)writeBuffer, NULL, 0, &ThreadID);
    TName[2]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)opticalFlow, NULL, 0, &ThreadID);
    TName[3]= CreateThread(NULL, 0, (LPTHREAD\_START\_ROUTINE)displayImage, NULL, 0, &ThreadID);
    WaitForMultipleObjects(4, TName, TRUE, INFINITE);
    CloseHandle(TName);
}   


Og i trådene ...


   //instead of separate Mutexes, just wait for semaphore
    waitResult = WaitForSingleObject(hSemaphore, INFINITE);
                switch(waitResult) 
                {

                            ...

                            }
                if(!ReleaseSemaphore(hSemaphore, 1, NULL)) 
                    cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl;