import nacl.utils import socket import time from nacl.public import PrivateKey, Box from nacl.secret import SecretBox # --- Helper Classes --- class ByteStream: """ Helper class for reading and writing data types to a byte buffer. Simulates the ByteStream used on the server. """ def __init__(self, initial_bytes=b''): self.buffer = bytearray(initial_bytes) self.offset = 0 def write(self, data): self.buffer.extend(data) def write_byte(self, value): self.write(bytes([value])) def write_int(self, value, length=4): self.write(value.to_bytes(length, 'big', signed=True)) def write_string(self, value): if value is None: self.write_int(-1) else: encoded_str = value.encode('utf-8') self.write_int(len(encoded_str)) self.write(encoded_str) def get_bytes(self): return bytes(self.buffer) # --- Crypto Implementation --- def next_nonce(nonce): """ Replicates the NextNonce logic from the C# server. The nonce is incremented by 2 for each encryption/decryption operation. """ c = 2 for i in range(len(nonce)): c += nonce[i] nonce[i] = c & 0xff c >>= 8 return nonce # --- Main Client Logic --- class Client: def __init__(self, server_ip, server_port): self.server_ip = server_ip self.server_port = server_port self.socket = None # Server's public key is static and known. # It's derived from the hardcoded private key in the server source. server_private_key_bytes = bytes([ 158, 217, 110, 5, 87, 249, 222, 234, 204, 121, 177, 228, 59, 79, 93, 217, 25, 33, 113, 185, 119, 171, 205, 246, 11, 185, 185, 22, 140, 152, 107, 20 ]) server_private_key = PrivateKey(server_private_key_bytes) self.server_public_key = server_private_key.public_key # Client generates a new, temporary key pair for each session. self.client_private_key = PrivateKey.generate() self.client_public_key = self.client_private_key.public_key # Will be populated after the handshake self.shared_secret = None self.session_key = None self.client_nonce = None # RNonce on server self.server_nonce = None # SNonce on server self.encryptor = None self.decryptor = None def connect(self): """Establishes the TCP connection.""" print("[*] Connecting to server...") self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.server_ip, self.server_port)) print("[+] Connected!") def send_packet(self, message_id, payload): """Constructs and sends a packet in the server's expected format.""" header = message_id.to_bytes(2, 'big') + \ len(payload).to_bytes(3, 'big') + \ (0).to_bytes(2, 'big') # Version self.socket.sendall(header + payload) print(f"[*] Sent packet {message_id} with payload length {len(payload)}") def read_packet(self): """Reads a packet from the server.""" header = self.socket.recv(7) if not header: return None, None message_id = int.from_bytes(header[0:2], 'big') length = int.from_bytes(header[2:5], 'big') payload = self.socket.recv(length) print(f"[*] Received packet {message_id} with payload length {len(payload)}") return message_id, payload def perform_handshake_and_login(self): """ Manages the entire connection, handshake, and account creation process. """ # --- 1. ClientHello (10100) --- # This message is sent unencrypted. It signals the start of the login sequence. print("\n--- Step 1: Sending ClientHello (10100) ---") client_hello_payload = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # Dummy payload self.send_packet(10100, client_hello_payload) # --- 2. LoginMessage (10101) --- # This is where the cryptographic handshake truly begins. print("\n--- Step 2: Preparing and Sending LoginMessage (10101) ---") from nacl.hash import blake2b # The client computes the shared secret 's' *before* sending the message. # This will be used to decrypt the server's response. self.shared_secret = Box.beforenm(self.server_public_key, self.client_private_key) print(f"[+] Calculated Shared Secret (s): {self.shared_secret.hex()}") # The client must generate a nonce (called RNonce on the server) # and include it inside the encrypted part of the login message. self.client_nonce = nacl.utils.random(24) print(f"[+] Generated Client Nonce (RNonce): {self.client_nonce.hex()}") # The actual login data. login_data_bs = ByteStream() login_data_bs.write_int(0) # Account ID High (0 for new account) login_data_bs.write_int(0) # Account ID Low (0 for new account) login_data_bs.write_string(None) # Pass Token login_data_bs.write_int(53) # Client Major login_data_bs.write_int(135) # Client Minor login_data_bs.write_int(0) # Client Build login_data_bs.write_string("2a1d35a5749747761159b36050b8b6314f849641") # Fingerprint SHA login_data_bs.write_int(0) # Unknown login_data_bs.write_string("en-US") # lang # The server expects the encrypted payload to start with a temporary session key (which it ignores) # and our RNonce. payload_to_encrypt_bs = ByteStream() payload_to_encrypt_bs.write(nacl.utils.random(24)) # a temporary client key, server discards it payload_to_encrypt_bs.write(self.client_nonce) payload_to_encrypt_bs.write(login_data_bs.get_bytes()) # The nonce for the Box encryption is a hash of the public keys. login_nonce = blake2b(self.client_public_key.encode() + self.server_public_key.encode(), encoder=nacl.encoding.RawEncoder)[:24] # Encrypt the payload containing RNonce and login data. encrypted_login_payload = Box(self.client_private_key, self.server_public_key).encrypt(payload_to_encrypt_bs.get_bytes(), login_nonce) # The final packet payload is the client's raw public key + the encrypted data. final_login_payload = self.client_public_key.encode() + encrypted_login_payload self.send_packet(10101, final_login_payload) # --- 3. Receive and Decrypt ServerHello (20100) --- print("\n--- Step 3: Receiving and Processing ServerHello (20100) ---") message_id, payload = self.read_packet() if message_id != 20100: print(f"[!!!] Error: Expected ServerHello (20100), got {message_id}. Aborting.") return # The server encrypts its response using a nonce derived from OUR RNonce, and the public keys. server_response_nonce = blake2b(self.client_nonce + self.client_public_key.encode() + self.server_public_key.encode(), encoder=nacl.encoding.RawEncoder)[:24] # We decrypt the server's response using our pre-calculated shared secret 's'. decrypted_payload = Box.open_afternm(payload, server_response_nonce, self.shared_secret) # The decrypted payload of ServerHello contains the server's nonce (SNonce) # and the final symmetric session key. self.server_nonce = decrypted_payload[0:24] self.session_key = decrypted_payload[24:56] print(f"[+] Decryption successful!") print(f"[+] Session Key: {self.session_key.hex()}") print(f"[+] Server Nonce (SNonce): {self.server_nonce.hex()}") # Now we can setup the symmetric encryptor/decryptor for the rest of the session. self.encryptor = SecretBox(self.session_key) self.decryptor = SecretBox(self.session_key) # --- 4. Receive LoginOk (20104) --- # This packet confirms login and provides account details. It is ENCRYPTED. print("\n--- Step 4: Receiving and Processing LoginOk (20104) ---") message_id, payload = self.read_packet() if message_id != 20104: print(f"[!!!] Error: Expected LoginOk (20104), got {message_id}. Aborting.") return # The nonce for symmetric encryption is the SNonce, which must be incremented by 2 before use. self.server_nonce = next_nonce(bytearray(self.server_nonce)) try: decrypted_login_ok = self.decryptor.decrypt(payload, bytes(self.server_nonce)) print("[+] LoginOk packet decrypted successfully!") # Here you would parse the decrypted_login_ok bytestream # to get your new Account ID, Token, etc. print("[SUCCESS] Account created and login complete!") print("You are now in a fully encrypted session with the server.") except nacl.exceptions.CryptoError as e: print(f"[!!!] FAILED to decrypt LoginOk packet: {e}") if __name__ == '__main__': # IMPORTANT: Replace with the actual server IP and Port SERVER_IP = "127.0.0.1" SERVER_PORT = 9339 client = Client(SERVER_IP, SERVER_PORT) try: client.connect() client.perform_handshake_and_login() except Exception as e: print(f"\n[!!!] An error occurred: {e}") finally: if client.socket: client.socket.close() print("\n[*] Connection closed.")