c ++ - Implementering af en simpel trådpulje

Indlæg af Hanne Mølgaard Plasc

Problem



Jeg har i øjeblikket brug for en enkel og effektiv trådpuljeimplementering. Jeg har søgt her og også på Google og fundet adskillige interessante links, men intet jeg har fundet hidtil synes at være egnet. De fleste af de implementeringer, jeg har fundet på internettet, er enten for komplicerede eller mangler nogle af de vigtigste funktioner, jeg har brug for.


Også jeg vil ikke bruge kode, som jeg ikke forstår, så jeg besluttede at kode det selv (nogle gange genopfinde hjulet hjælper mig med at skubbe mig fremad med hensyn til viden og erfaring). Jeg forstår selvfølgelig den grundlæggende idé bag trådpuljen , men nogle implementeringsdetaljer er stadig lidt uklare for mig.Det skyldes nok, at den slags trådpulje, jeg har brug for, er lidt speciel.Lad mig beskrive det .Jeg har en opgave, der er gjort hundredvis af gange på en bestemt (stor ) buffer. Jeg har målt, at ydeevnen er meget bedre, hvis jeg bruger tråde til denne opgave - bufferen er opdelt i subbuffere, og hver tråd udfører sin opgave på underbufferen og returnerer resultatet. Alle resultaterne fra alle tråde bliver derefter tilføjet sammen, hvilket giver mig den endelige løsning.


Men da dette er gjort, bliver jeg meget ofte tabt dyrebar tid på grund af at mange tråde bliver skabt (på grund af den overhead, der følger med oprettelse af tråd). Så jeg vil gerne have en pulje af tråde, der ville udføre denne opgave, i stedet for Oprettelse af et nyt sæt tråde hver gang.


For at være mere klar, er det, hvad jeg har hidtil:



  • Split bufferen i N subbuffere af samme størrelse

  • For hver underbuffer skal du oprette en tråd og køre den på underbufferen

  • Vent til alle tråde skal fuldføres (WaitForMultipleObjects), tilføj resultaterne og ødelæg trådene

  • Gentag



Det jeg gerne vil opnå er dette:



  • Split bufferen i N subbuffere af samme størrelse

  • Tildel hver underbuffer til en tråd fra trådpoolen (som har nøjagtige N-tråde)

  • Når en tråd er færdig, lad den sove, indtil en anden opgave er klar

  • Når alle tråde er færdige (og sover), tilsammen de resultater, de producerede

  • Gentag ved at vække tråderne og tildele dem nye opgaver



Som du kan se, er dette lidt af en særlig trådpulje, da jeg skal vente på, at trådene skal afslutte. I grund og grund vil jeg slippe af med overhead for at skabe tråde hele tiden, da programmet går gennem hundredtusinder af iterationer, så det kan skabe og ødelægge millioner af tråde i løbet af dets levetid. Gode ​​nyheder er, at jeg ikke har brug for nogen synkronisering overhovedet mellem tråde, de alle får deres egne data og lagerplads placerer resultaterne. Men jeg må vente til alle tråde er færdige, og jeg har den endelige løsning, fordi den næste opgave afhænger af resultaterne fra den foregående opgave.


Mit hovedproblem er med håndtering af tråde:



  • Hvordan laver jeg mine tråde 'sove' og vækker dem, når en ny opgave er klar?

  • Hvordan venter jeg på, at alle trådene er færdige?



Jeg vil være taknemmelig for enhver hjælp. Du er også velkommen til at stille spørgsmål, hvis jeg ikke var klar nok. Tak!

Bedste reference


For mig er den foretrukne måde at kommunikere med tråde via betingelsesvariabler . Fordi du kan definere den nødvendige tilstand og signal, når det ændres. I dit tilfælde kan du kombinere det med en kø , hvormed underbufferne er bestået, så hver tråd venter, mens køen er tom. Resultatet kan derefter sættes på en anden kø , hvor administrationskøen venter, indtil alle tråde har sendt resultatet til køen (referencen til denne kø sendes som en anmodning sammen med underbufferne) .

Andre referencer 1


Har du kigget på andre threadpool implementeringer? Såsom http://threadpool.sourceforge.net/for eksempel. Hvad du vil opnå er ikke helt nyt. En måde at gøre tråde venter på en ny opgave er at blokere på en mutex og blokere den mutex, når en anden opgave er klar. Du kan også få besked om, at de er færdige ved at bruge en form for underretning fra tråden tilbage til forældren. [3]


I min arbejdsgruppe har jeg brugt trådpuljer/tråde tungt og har brugt ØMQ til kommunikation på tværs af tråde, hvilket gør det muligt for tråden at blokere på en read() anmodning fra ØMQ, når den er klar til nyt arbejde.


Med lidt forskning og med lidt tid og kræfter bør du være i stand til at finde ud af, hvordan man enten bygger eller udnytter eksisterende rammer/værktøjer til at opbygge, hvad du har brug for. Så kan du komme tilbage til SO når du har nogle kode, du har problemer med.

Andre referencer 2


'Som du kan se, er dette lidt af en særlig trådpulje, da jeg skal vente på, at trådene er færdige.' - ikke helt. Du vil have den tråd, der behandler den sidste opgave i dit job for at levere en meddelelse om jobafslutning. Afslutningsmeddelelse er en normal funktion af en theadPool, ellers ville den oprindelige tråd ikke kunne behandle et komplet sæt af resultater. Pools behandler ofte mere end én opgave/opgave-hierarki på samme tid, og færdiggørelsesmetoden skal derfor være tråd-agnostisk - ingen tilmeld () eller nogen af ​​de slags ting. Også, ingen WaitForMultipleObject () - bruger et synkroniseret objekt array, der er svært at administrere og er begrænset til 64 objekter.


Trådpuljer har typisk en pulje af tråde, der venter på en producent-forbrugerkø for opgaver. Opgaverne er sædvanligvis arvet fra nogle 'Ctask' -klasser, der frembyder threadpool-tjenester. En mekanisme til afslutning af deektion og anmeldelse er en af ​​dem.


En producent-forbruger kø er i det væsentlige en 'normal' kø klasse beskyttet mod flere adgang med en mutex og med en semafor for at tælle opgaverne i køen og for trådene at vente på. Puljetråderne passerer hver gang i køen, og de løber for evigt, venter på køsemorforen, popper opgaver fra den låsede kø, og derefter kører run () -metoden for de modtagne opgaver.


Hver opgave ville have trådpoolen indlæst som et data-medlem, da det sendes til puljen. Dette gør det muligt for en opgave at indsende flere opgaver, hvis det er nødvendigt.


Afslutning af hver opgave meddeles normalt et sted ved at kalde en begivenhedsmetode, der er medlem af opgaven, indlæst af den oprindelige tråd, før opgaven sendes til puljen.


En opgave skal også have et underopgave atomisk nedtællings heltal og en begivenhed til at vente på til afslutning af andre opgaver.


Hvordan kan dette fungere i dit eksempel? Du kunne have en 'primær' opgave, der sender opgaver til arraybehandling og venter på, at de alle skal færdiggøre.


Der bør være flere tråde i poolen end der er kerner. Jeg foreslår dobbelt så mange.


Arrayet skal deles, så en separat opgave bruges til hver sektion. Hvor mange opgaver - nok, så de tilgængelige kerner bliver brugt op, men ikke så mange, at der opstår overordnede kontekstomskiftere. For ethvert udvalg af rimelig størrelse kan vi sige, at 64 opgaver er en rimelig deling - mere end det typiske antal processorer, der er tilgængelige. Opgaverne skal heller ikke opdeles i rækkefølge for at undgå falsk deling.


Så denne 'primære' array-behandling opgave. Indlæs det med arrayreferencen, og angiv dens færdiggørelsesbegivenhed for at pege på en metode, som signalerer en begivenhed. Indsend opgaven til puljen, vent på begivenheden.


Opgaverne bliver lastet på en tråd. Dens run () -metode bruger to looper og skaber 32 array-behandlingsopgaver, hver med eget startindeks og længde i arrayet, men med ikke-på hinanden følgende startindekser. Opgaven bruger sin egen arvede submit () metode til at indlæse hver af de 32 nye opgaver på poolen. Ud over at udføre opgaverne til udførelse på trådene, indsender dette () også atom-trin et afslutningsantal helt tal og indstiller afslutningsbegivenheden for opgaverne til en privat færdiggørelsesbegivenhed, før købet af opgaven. Den private færdiggørelseshændelse atom-reducerer færdiggørelsestællingen og signalerer en hændelse, hvis nul. Efter indsendelse af alle 32 array-behandlingshændelser venter den primære opgave på den private afslutningshændelse.


Så løber de 32 array-behandlingsopgaver på trådene. Når hver enkelt er færdig, kalder den tråd, der kører den, sin færdiggørelsesbegivenhed, som reducerer fuldførelsestaletallet i den primære opgave. Til sidst fuldføres den sidste arraybehandlingsopgave, og fuldførelsestællingsallet tales ned til nul, så signalering af den begivenhed, som tråden kører den primære opgave, venter på. Den primære opgave kalder sin egen komplette begivenhed, så signaliserer den begivenhed, som den primære opgaveophavsmand venter på.


Den primære opgaveopkøber fortsætter med arrayet, der er fuldstændig behandlet.


.. eller du kunne bruge en threadpool klasse, der allerede virker, som foreslået af andre.

Andre referencer 3



  hvordan laver jeg mine tråde 'sove' og vækker dem, når en ny opgave er
  parat?



Du bruger en mutex eller semaphore afhængigt af din situation (i nogle tilfælde skal du muligvis bruge en betingelsesvariabel eller automatisk/manuel resetevent), for at gøre tråde vent på hinanden eller vågne op, når der sker noget .



  hvordan venter jeg på, at alle tråde bliver færdige?



Du skal bruge join() på hver tråd for at vente på, at den er færdig. Så hvis du har en samling af tråde, vil du måske gerne gå igennem og ringe til at deltage i hver tråd, der stadig kører.


Som en separat note: Trådpuljer eksisterer allerede, og du kan bare bruge ting som Boost.Threadpool i stedet for at genopfinde hjulet.