You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何将现有Socket服务器代码重构为Python Class类结构?

Refactoring Your Socket Server into a Class-Based Structure

Got it, let's refactor your working socket server into a clean, maintainable class-based structure. We'll wrap all logic into a Server class, properly handle state as class attributes, and keep the core workflow intact. Here's the step-by-step implementation:

1. Attribute Mapping (Global → Class)

First, let's map your original global variables to class attributes, distinguishing between public (for external interaction) and private (internal state only):

  • Public Instance Attribute:
    • server_socket: Exposed so you can call bind() and listen() as required
  • Private Instance Attributes (prefixed with _ to signal internal use):
    • _open_sockets: Tracks connected client sockets
    • _sockets_names: Maps sockets to user names
    • _admins: List of admin sockets
    • _muted_sockets: List of muted client sockets
    • _public_msgs: Queue for broadcast messages
    • _private_msgs: Queue for direct/private messages

2. Method Refactoring

All your original functions will become class methods:

  • Static methods for utility functions that don't need instance state (like get_current_time)
  • Instance methods for everything else, using self to access class state

3. Complete Class Implementation

Here's the full refactored code:

import socket
import select
import datetime

class Server:
    def __init__(self):
        # Initialize core socket
        self.server_socket = socket.socket()
        
        # Internal state attributes (private)
        self._open_sockets = []
        self._sockets_names = {}
        self._admins = []
        self._muted_sockets = []
        self._public_msgs = []
        self._private_msgs = []

    @staticmethod
    def get_current_time():
        """Returns hh:mm formatted time"""
        now = datetime.datetime.now()
        return f"{now.hour}:{now.minute}"

    def get_name(self, user_socket):
        """Returns the user's name, adds @ prefix if they're an admin"""
        name = self._sockets_names[user_socket]
        if user_socket in self._admins:
            name = "@" + name
        return name

    @staticmethod
    def get_data_length(data):
        """Returns data length as a 3-digit string for consistent transmission"""
        length = str(len(data))
        while len(length) < 3:
            length = "0" + length
        return length

    def get_socket_by_name(self, name):
        """Finds and returns the socket associated with a username, or None if not found"""
        for socket_pair, socket_name in self._sockets_names.items():
            if name == socket_name:
                return socket_pair
        return None

    def get_admins_as_string(self):
        """Returns a comma-separated list of admin names"""
        admins_names_lst = [self._sockets_names[admin] for admin in self._admins]
        return ", ".join(admins_names_lst)

    def _remove_socket(self, removed_socket):
        """Internal helper to clean up state for a disconnected client"""
        self._open_sockets.remove(removed_socket)
        if removed_socket in self._admins:
            self._admins.remove(removed_socket)
        del self._sockets_names[removed_socket]

    def handle_new_connection(self, rlist):
        """Handles incoming client connections and initializes their state"""
        for new_connection in rlist:
            if new_connection is self.server_socket:
                new_socket, address = self.server_socket.accept()
                self._open_sockets.append(new_socket)
                self._sockets_names[new_socket] = "Anonymous"
                # First connected user becomes the default admin
                if not self._admins:
                    self._admins.append(new_socket)
                    print("New Connection And Admin")
                else:
                    print("New Connection")

    def handle_receive_data(self, rlist):
        """Processes data received from connected clients"""
        for current_socket in rlist:
            if current_socket is not self.server_socket:
                try:
                    # Read fixed-length header and actual data
                    data_length = int(current_socket.recv(3).decode('utf-8'))
                    data = current_socket.recv(data_length).decode('utf-8')
                except (ConnectionResetError, ValueError):
                    # Handle abrupt client disconnects gracefully
                    self._private_msgs.append((current_socket, "bye"))
                    continue

                if data.startswith('/'):
                    self.handle_commands(current_socket, data[1:])
                else:
                    # Queue user's message for private echo and public broadcast (if not muted)
                    self._private_msgs.append((current_socket, f"You: {data}"))
                    if current_socket in self._muted_sockets:
                        self._private_msgs.append(
                            (current_socket, "You are muted and so can't send msgs to everyone. You can ask one of the admins to unmute you in a private msg.")
                        )
                    else:
                        self._public_msgs.append((current_socket, f"{self.get_name(current_socket)}: {data}"))

    def handle_sending_msgs(self, wlist):
        """Sends all queued public and private messages to appropriate clients"""
        # Process public broadcast messages
        for message in list(self._public_msgs):
            sender_socket, data = message
            formatted_data = f"{self.get_current_time()} {data}"
            for receiver_socket in wlist:
                if receiver_socket is not sender_socket:
                    receiver_socket.send(self.get_data_length(formatted_data).encode('utf-8'))
                    receiver_socket.send(formatted_data.encode('utf-8'))
            self._public_msgs.remove(message)

        # Process private direct messages
        for message in list(self._private_msgs):
            receiver_socket, data = message
            formatted_data = f"{self.get_current_time()} {data}"
            if receiver_socket in wlist:
                try:
                    receiver_socket.send(self.get_data_length(formatted_data).encode('utf-8'))
                    receiver_socket.send(formatted_data.encode('utf-8'))
                except (ConnectionResetError, BrokenPipeError):
                    # Clean up if client disconnected mid-transmission
                    self._remove_socket(receiver_socket)
            self._private_msgs.remove(message)
            # Trigger cleanup if client sent exit command
            if formatted_data.split(' ')[1] == "bye":
                self._remove_socket(receiver_socket)

    def handle_commands(self, current_socket, data):
        """Processes client commands starting with /"""
        parts = data.split(' ', 1)
        command = parts[0].lower()
        cmd_data = parts[1] if len(parts) > 1 else ""

        if command == "exit":
            self._public_msgs.append((current_socket, f"{self.get_name(current_socket)} left the chat."))
            self._private_msgs.append((current_socket, "bye"))
            print(f"Connection with {self.get_name(current_socket)} closed.")
        elif command in ['rename', 'setname']:
            if cmd_data not in self._sockets_names.values():
                if cmd_data.lower() not in ["you", "server"] and not cmd_data.startswith('@'):
                    self._sockets_names[current_socket] = cmd_data
                    self._private_msgs.append((current_socket, f"Your name has been successfully changed to {cmd_data}."))
                else:
                    self._private_msgs.append((current_socket, f"{cmd_data} is not a valid name."))
            else:
                self._private_msgs.append((current_socket, "This name is already taken."))
        elif command in ['setadmin', 'promote']:
            if current_socket in self._admins:
                if cmd_data not in self._sockets_names.values():
                    self._private_msgs.append((current_socket, "This name doesn't exist in this server."))
                else:
                    new_admin_socket = self.get_socket_by_name(cmd_data)
                    self._admins.append(new_admin_socket)
                    self._private_msgs.append((current_socket, f"{cmd_data} has been promoted to admin."))
                    self._public_msgs.append((current_socket, f"{self.get_name(current_socket)} promoted {cmd_data} to admin."))
            else:
                self._private_msgs.append((current_socket, "You don't have access to this command."))
        elif command in ['kick', 'remove']:
            if current_socket in self._admins:
                if cmd_data not in self._sockets_names.values():
                    self._private_msgs.append((current_socket, "This name doesn't exist in this server."))
                else:
                    kicked_socket = self.get_socket_by_name(cmd_data)
                    self._private_msgs.append((current_socket, f"{cmd_data} has been successfully kicked and removed."))
                    self._public_msgs.append((current_socket, f"{self.get_name(current_socket)} kicked and removed {cmd_data}"))
                    self._private_msgs.append((kicked_socket, f"{self.get_name(current_socket)} kicked you."))
                    self._private_msgs.append((kicked_socket, "bye"))
        elif command == 'mute':
            if current_socket in self._admins:
                if cmd_data not in self._sockets_names.values():
                    self._private_msgs.append((current_socket, "This name doesn't exist in this server."))
                else:
                    muted_socket = self.get_socket_by_name(cmd_data)
                    self._muted_sockets.append(muted_socket)
                    self._private_msgs.append((current_socket, f"{cmd_data} has been successfully muted."))
                    self._public_msgs.append((current_socket, f"{self.get_name(current_socket)} muted {cmd_data}"))
                    self._private_msgs.append((muted_socket, f"{self.get_name(current_socket)} muted you."))
            else:
                self._private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
        elif command == 'unmute':
            if current_socket in self._admins:
                if cmd_data not in self._sockets_names.values():
                    self._private_msgs.append((current_socket, "This name doesn't exist in this server."))
                else:
                    unmute_socket = self.get_socket_by_name(cmd_data)
                    if unmute_socket not in self._muted_sockets:
                        self._private_msgs.append((current_socket, "This user isn't muted."))
                    else:
                        self._muted_sockets.remove(unmute_socket)
                        self._private_msgs.append((current_socket, f"{cmd_data} has been successfully unmuted."))
                        self._public_msgs.append((current_socket, f"{self.get_name(current_socket)} unmuted {cmd_data}"))
                        self._private_msgs.append((unmute_socket, f"{self.get_name(current_socket)} unmuted you."))
            else:
                self._private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
        elif command in ['msg', 'message', 'prvmsg', 'privatemessage']:
            msg_parts = cmd_data.split(' ', 1)
            if len(msg_parts) < 2:
                self._private_msgs.append((current_socket, "Usage: /msg <user> <message>"))
                return
            send_to_name, msg_content = msg_parts
            if send_to_name not in self._sockets_names.values():
                self._private_msgs.append((current_socket, "This name doesn't exist in this server."))
            else:
                send_to_socket = self.get_socket_by_name(send_to_name)
                self._private_msgs.append((current_socket, f"You -> {send_to_name}: {msg_content}"))
                self._private_msgs.append((send_to_socket, f"{self.get_name(current_socket)} -> {send_to_name}: {msg_content}"))
        elif command in ['admin', 'admins', 'adminlist', 'adminslist']:
            self._private_msgs.append((current_socket, f"Admins: {self.get_admins_as_string()}"))
        elif command in ['users', 'userslist', 'user', 'userlist']:
            user_list = ", ".join(self._sockets_names.values())
            self._private_msgs.append((current_socket, f"Users: {user_list}"))
        elif command in ['help', '?']:
            commands = """\
/rename <name> - Change your name.
/msg <user> <msg> - Send a private message to <user>.
/users - List all connected users.
/admins - List all server admins.
/exit - Disconnect from the server.

Admins Only Commands:
/kick <user> - Kick <user> from the server.
/promote <user> - Promote <user> to admin.
/mute <user> - Prevent <user> from sending public messages.
/unmute <user> - Allow <user> to send public messages again."""
            self._private_msgs.append((current_socket, f"\nCommands:\n{commands}\n"))
        else:
            self._private_msgs.append((current_socket, f"{command} is not a valid command."))

    def run(self):
        """Starts the server's main event loop to handle connections and messages"""
        try:
            while True:
                # Wait for socket activity using select()
                rlist, wlist, xlist = select.select(
                    [self.server_socket] + self._open_sockets,
                    self._open_sockets,
                    []
                )
                # Process events in order
                self.handle_new_connection(rlist)
                self.handle_receive_data(rlist)
                self.handle_sending_msgs(wlist)
        finally:
            # Ensure cleanup runs even if loop exits unexpectedly
            self.stop()

    def stop(self):
        """Gracefully shuts down the server and cleans up resources"""
        # Clear message queues
        self._public_msgs.clear()
        self._private_msgs.clear()
        # Send exit notification to all remaining clients
        self._public_msgs.append((None, "bye"))
        # Process final messages
        self.handle_sending_msgs(self._open_sockets)
        # Close the server socket
        self.server_socket.close()
        print("Server shut down successfully.")

if __name__ == '__main__':
    # Create server instance and execute your requested core workflow
    server = Server()
    # Core steps as specified
    server.server_socket.bind(('0.0.0.0', 2303))
    server.server_socket.listen(5)
    try:
        server.run()
    except KeyboardInterrupt:
        print("\nShutting down server...")
        server.stop()

4. Key Refactoring Notes

  • Encapsulation: All internal state is hidden behind private attributes, preventing accidental external modifications and keeping the class interface clean.
  • Maintainability: Related logic is grouped in the Server class, making it easier to debug, extend, and reuse.
  • Error Handling: Added basic handling for abrupt client disconnects to prevent server crashes.
  • Workflow Compliance: The code strictly follows the core steps you requested: bind, listen, handle connections/data/messages, clean up queues, send exit messages, and close the socket.

内容的提问来源于stack exchange,提问作者Erez Avior

火山引擎 最新活动