python - Python3 Windows multiprocessing passagekontakt til behandling

Indlæg af Hanne Mølgaard Plasc

Problem



Jeg forsøger at gøre multiprocessing ServerApp til arbejde på Windows. Jeg antager, at problemet mangler os.fork() -funktionen, så jeg bliver nødt til at videregive socket på en eller anden måde, hvilket ikke er pickleable (?!) .


Jeg har set at dette kan være muligt ved hjælp af reduce\_handle og rebuild\_handle fra multiprocessing.reduction som vist her, men disse metoder er ikke tilgængelige i Python 3 (?!). Selvom jeg har ledige [[duplicate og steal\_handle til rådighed Jeg kan ikke finde et eksempel på hvordan man bruger dem, eller om jeg har brug for dem overhovedet. [29]


Jeg vil også gerne vide, om logging kommer til at være problemet, når du opretter en ny proces?


Her er min ServerApp prøve:


import logging
import socket

from select import select
from threading import Thread
from multiprocessing import Queue
from multiprocessing import Process
from sys import stdout
from time import sleep


class ServerApp(object):

    logger = logging.getLogger(\_\_name\_\_)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('\%(asctime)s \%(levelname)s \%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


    def conn\_handler(self, connection, address, buffer):

        self.logger.info("[\%d] - Connection from \%s:\%d", self.id, address[0], address[1])

        try:
            while True:

                command = None
                received\_data = b''
                readable, writable, exceptional = select([connection], [], [], 0)  # Check for client commands

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'


                if command == 'Something':
                    connection.sendall(command\_response)
                else:
                    print(':(')

        except Exception as e:
            print(e)
        finally:
            connection.close()
            self.client\_buffers.remove(buffer)
            self.logger.info("[\%d] - Connection from \%s:\%d has been closed.", self.id, address[0], address[1])


    def join(self):

        while self.listener.is\_alive():
            self.listener.join(0.5)


    def acceptor(self):

        while True:
            self.logger.info("[\%d] - Waiting for connection on \%s:\%d", self.id, self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and add it o list of all client buffers
            buffer = Queue()
            self.client\_buffers.append(buffer)

            process = Process(target=self.conn\_handler, args=(conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process has its own reference.
            conn.close()


    def \_\_init\_\_(self, id, port=4545, ip='127.0.0.1', method='tcp', buffer\_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer\_size = buffer\_size

        # Additional attributes here....

        self.clients = []
        self.client\_buffers = []


    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF\_INET, socket.SOCK\_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        self.listener = Thread(target=self.acceptor)  # Run acceptor thread to handle new connection
        self.listener.daemon = True
        self.listener.start()

Bedste reference


For at tillade tilslutning af stikning (herunder stik) til python3, skal du bruge mulitprocessing.allow\_connection\_pickling. Det registrerer reduktionsanordninger til stikkontakter i ForkingPickler. For eksempel:


import socket
import multiprocessing as mp
mp.allow\_connection\_pickling()


def \_test\_connection(conn):
    msg = conn.recv(2)
    conn.send(msg)
    conn.close()
    print("ok")

if \_\_name\_\_ == '\_\_main\_\_':
    server, client = socket.socketpair()

    p = mp.Process(target=\_test\_connection, args=(server,))
    p.start()

    client.settimeout(5)

    msg = b'42'
    client.send(msg)
    assert client.recv(2) == msg

    p.join()
    assert p.exitcode == 0

    client.close()
    server.close()


Jeg har også bemærket, at du har nogle andre problemer uberørt til pickling af socket.



  • Når du bruger self.conn\_handler som mål, vil multiprocessing forsøge at plukke hele objektet self. Dette er et problem, da dit objekt indeholder nogle Thread, der ikke kan syltes. Du skal således fjerne self fra lukningen af ​​din målfunktion. Det kan gøres ved at bruge @staticmethod dekoratøren og ved at fjerne alt omtale af self i funktionen.

  • Modulet logging er heller ikke gjort til at håndtere flere processer. Dybest set vil alle logfilerne fra den lancerede Process gå tabt med din nuværende kode. For at løse det kan du enten starte en ny logging, når du starter den anden Process (i begyndelsen af ​​conn\_handler) eller brug multiprocessing logfunktionen.



Dette kan give noget som dette:


import logging
import socket

from select import select
from threading import Thread
from multiprocessing import util, get\_context
from sys import stdout
from time import sleep

util.log\_to\_stderr(20)
ctx = get\_context("spawn")


class ServerApp(object):

    logger = logging.getLogger(\_\_name\_\_)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler(stdout)
    formatter = logging.Formatter('\%(asctime)s \%(levelname)s \%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    def \_\_init\_\_(self, id, port=4545, ip='127.0.0.1', method='tcp',
                buffer\_size=2048):

        self.id = id
        self.port = port
        self.ip = ip

        self.socket = None
        self.listener = None
        self.buffer\_size = buffer\_size

        # Additional attributes here....

        self.clients = []
        self.client\_buffers = []

    @staticmethod
    def conn\_handler(id, connection, address, buffer):

        print("test")
        util.info("[\%d] - Connection from \%s:\%d", id, address[0], address[1])

        try:
            while True:

                command = None
                received\_data = b''
                # Check for client commands
                readable, writable, exceptional = select([connection], [], [],
                                                        0)

                if readable:
                    # Get Command  ... There is more code here
                    command = 'Something'

                if command == 'Something':
                    connection.sendall(b"Coucouc")
                    break
                else:
                    print(':(')
                sleep(.1)

        except Exception as e:
            print(e)
        finally:
            connection.close()
            util.info("[\%d] - Connection from \%s:\%d has been closed.", id,
                    address[0], address[1])
            print("Close")

    def join(self):

        while self.listener.is\_alive():
            self.listener.join(0.5)

    def acceptor(self):

        while True:
            self.logger.info("[\%d] - Waiting for connection on \%s:\%d", self.id,
                            self.ip, self.port)

            # Accept a connection on the bound socket and fork a child process
            # to handle it.
            conn, address = self.socket.accept()

            # Create Queue which will represent buffer for specific client and
            # add it o list of all client buffers
            buffer = ctx.Queue()
            self.client\_buffers.append(buffer)

            process = ctx.Process(target=self.conn\_handler,
                                args=(self.id, conn, address, buffer))
            process.daemon = True
            process.start()
            self.clients.append(process)

            # Close the connection fd in the parent, since the child process
            # has its own reference.
            conn.close()

    def run(self):

        # Create TCP socket, bind port and listen for incoming connections
        self.socket = socket.socket(socket.AF\_INET, socket.SOCK\_STREAM)
        self.socket.bind((self.ip, self.port))
        self.socket.listen(5)

        # Run acceptor thread to handle new connection
        self.listener = Thread(target=self.acceptor)
        self.listener.daemon = True
        self.listener.start()

        self.listener.join()


def main():
    app = ServerApp(0)
    app.run()


if \_\_name\_\_ == '\_\_main\_\_':
    main()


Jeg testede det kun på Unix og python3.6, men det burde ikke have adfærd for anderledes, da jeg bruger spildkonteksten , which should behave like the Process` i windows.