windows - Hvordan kan en interprocess producent forbrugerbesked passerer mekanisme være beskyttet mod korruption på grund af en side nedbrud?

Indlæg af Hanne Mølgaard Plasc

Problem



Jeg har implementeret en interprocess message kø i delt hukommelse for en producent og en forbruger på Windows.





Jeg bruger en navngivet semafor til at tælle tomme slots, en navngivet semafor for at tælle fulde slots og en navngivet mutex for at beskytte datastrukturen i delt hukommelse.





Overvej for eksempel forbrugersiden. Producentens side er ens.
Først venter det på den fulde semafor derefter (1) det tager en besked fra køen under mutexen og så signalerer den tomme semaforen (2)





Problemet:





Hvis forbrugerprocessen går ned mellem (1) og (2) så er antallet af slots i køen, der kan bruges af processen, effektivt reduceret med en.
Antag, at mens forbrugeren er nede, kan producenten håndtere køen, der bliver fyldt op. (det kan enten angive en timeout, når du venter på den tomme semafor eller endda angive 0 for ingen ventetid).





Når forbrugeren genstarter, kan den fortsætte med at læse data fra køen. Dataene vil ikke være overskredet, men selv efter at det tømmer alle fulde slots, vil producenten have en mindre tom plads til brug.





Efter flere sådanne genstart vil køen ikke have slots, der kan bruges, og der kan ikke sendes meddelelser.





Spørgsmål:





Hvordan kan denne situation undgås eller gendannes fra?

Bedste reference


Her er en oversigt over en simpel tilgang, der bruger begivenheder frem for semaforer:


DWORD increment\_offset(DWORD offset)
{
    offset++;
    if (offset == QUEUE\_LENGTH*2) offset = 0;
    return offset;
}

void consumer(void)
{
    for (;;)
    {
        DWORD current\_write\_offset = InterlockedCompareExchange(write\_offset, 0, 0);

        if ((current\_write\_offset != *read\_offset + QUEUE\_LENGTH) && 
            (current\_write\_offset + QUEUE\_LENGTH != *read\_offset))
        {
            // Queue is not full, make sure producer is awake
            SetEvent(signal\_producer\_event);
        }

        if (*read\_offset == current\_write\_offset)
        {
            // Queue is empty, wait for producer to add a message
            WaitForSingleObject(signal\_consumer\_event, INFINITE);
            continue;
        }

        MemoryBarrier();
        \_ReadWriteBarrier;

        consume((*read\_offset) \% QUEUE\_LENGTH);

        InterlockedExchange(read\_offset, increment\_offset(*read\_offset));
    }
}

void producer(void)
{
    for (;;)
    {
        DWORD current\_read\_offset = InterlockedCompareExchange(read\_offset, 0, 0);

        if (current\_read\_offset != *write\_offset)
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(signal\_consumer\_event);
        }

        if ((*write\_offset == current\_read\_offset + QUEUE\_LENGTH) ||
            (*write\_offset + QUEUE\_LENGTH == current\_read\_offset))
        {
            // Queue is full, wait for consumer to remove a message
            WaitForSingleObject(signal\_producer\_event, INFINITE);
            continue;
        }

        produce((*write\_offset) \% QUEUE\_LENGTH);

        MemoryBarrier();
        \_ReadWriteBarrier;

        InterlockedExchange(write\_offset, increment\_offset(*write\_offset));
    }
}


Bemærkninger:



  • Koden som udgivet kompilerer (givet de relevante erklæringer), men jeg har ikke ellers testet det.

  • read\_offset er en pointer til en DWORD i delt hukommelse, hvilket angiver hvilken slot der skal læses fra næste. Tilsvarende peger write\_offset på en DWORD i delt hukommelse, hvilket indikerer hvilken slot der skal skrives til næste.

  • En forskydning af QUEUE\_LENGTH + x refererer til samme slot som en forskydning af x for at disambiguere mellem en fuld kø og en tom kø. Det er derfor, at funktionen increment\_offset() kontrollerer for QUEUE\_LENGTH*2 i stedet for blot QUEUE\_LENGTH og hvorfor vi tager modulo, når vi kalder funktionerne consume() og produce(). alternativ til denne tilgang ville være at ændre producenten til aldrig at bruge den sidste ledige plads, men det spilder en slot.)

  • signal\_consumer\_event og signal\_producer\_event skal være automatisk nulstillede hændelser. Bemærk, at indstilling af en begivenhed, der allerede er indstillet, er en no-op.

  • Forbrugeren venter kun på sin begivenhed, hvis køen faktisk er tom, og producenten venter kun på sin begivenhed, hvis køen faktisk er fuld.

  • Når en af ​​processerne er vågne, skal den kontrollere køens tilstand, fordi der er en løbskondition, der kan føre til en uhyggelig vågne op.

  • Fordi jeg bruger interlocked operationer, og fordi kun én proces ad gangen bruger en bestemt slot, er der ikke brug for en mutex. Jeg har inkluderet hukommelsesbarrierer for at sikre, at de ændringer, som producenten skriver til en slot, ses af forbrugeren. Hvis du ikke er fortrolig med låsfri kode, vil du opdage, at det er trivielt at konvertere algoritmen vist til brug en mutex i stedet.

  • Bemærk at InterlockedCompareExchange(pointer, 0, 0); ser lidt kompliceret ud, men er bare en trådsikker, der svarer til *pointer, det vil sige den læser værdien ved pegeren. Tilsvarende er InterlockedExchange(pointer, value); den samme som *pointer = value; men trådssikker. Afhængig af kompilatoren og målarkitekturen er det ikke sikkert, at interlocked operationer er strengt nødvendige, men ydeevnen er ubetydelig, så jeg anbefaler programmering defensivt.



Overvej sagen, når forbrugeren styrter under (eller før) opkaldet til funktionen consume(). Når forbrugeren genstartes, vil den hente den samme besked igen og behandle den som normalt. Hvad producenten angår, er der ikke sket noget usædvanligt, bortset fra at meddelelsen tog længere tid end normalt at blive behandlet. En analog situation opstår, hvis producenten styrter under oprettelsen af ​​en meddelelse; Når den genstartede genstart bliver overskrevet, overskrives den ufuldstændige, og forbrugeren vil ikke blive påvirket.


Det er klart, at hvis crasen opstår efter opkaldet til InterlockedExchange, men før opkaldet til SetEvent i enten producenten eller forbrugeren, og hvis køen tidligere var tom eller fuld, vil den anden proces ikke være vågnede på det tidspunkt. Det bliver dog vågnet, så snart den nedbrudte proces genstartes. Du kan ikke miste spor i køen, og processerne kan ikke blokere.


Jeg tror, ​​at den enkle multiple-producent single-consumer sag ville se sådan ud:


void producer(void)
{
    for (;;)
    {
        DWORD current\_read\_offset = InterlockedCompareExchange(read\_offset, 0, 0);

        if (current\_read\_offset != *write\_offset)
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(signal\_consumer\_event);
        }

        produce\_in\_local\_cache();

        claim\_mutex();

        // read offset may have changed, re-read it
        current\_read\_offset = InterlockedCompareExchange(read\_offset, 0, 0);

        if ((*write\_offset == current\_read\_offset + QUEUE\_LENGTH) ||
            (*write\_offset + QUEUE\_LENGTH == current\_read\_offset))
        {
            // Queue is full, wait for consumer to remove a message
            WaitForSingleObject(signal\_producer\_event, INFINITE);
            continue;
        }

        copy\_from\_local\_cache\_to\_shared\_memory((*write\_offset) \% QUEUE\_LENGTH);

        MemoryBarrier();
        \_ReadWriteBarrier;

        InterlockedExchange(write\_offset, increment\_offset(*write\_offset));

        release\_mutex();
    }
}


Hvis den aktive producent går ned, bliver mutexen detekteret som forladt; du kan behandle denne sag som om mutexen blev korrekt frigivet. Hvis den nedbrudte proces har opnået så vidt som forøgelse af skriveforskydningen, vil den indsatte post blive behandlet som sædvanlig; Hvis ikke, vil den blive overskrevet af den producent, der efterfølgende påberåber mutexen. I intet tilfælde er der behov for en særlig handling.