2025-06-21 14:32:54 +03:00

283 lines
11 KiB
Python

import nacl.utils
import socket
from nacl.public import PrivateKey, PublicKey
import nacl.bindings
import random
import uuid
from nacl.hash import blake2b
from nacl.encoding import RawEncoder
# --- Helper Classes ---
class ByteStream:
"""
Helper class for reading and writing data types to a byte buffer.
Simulates the ByteStream used on the server.
"""
def __init__(self, initial_bytes=b''):
self.buffer = bytearray(initial_bytes)
self.offset = 0
def write(self, data):
self.buffer.extend(data)
def write_byte(self, value):
self.write(bytes([value]))
def write_int(self, value, length=4):
self.write(value.to_bytes(length, 'big', signed=True))
def write_string(self, value):
if value is None:
value = ""
encoded_str = value.encode('utf-8')
# Write the length of the string, not -1 for empty strings.
self.write_int(len(encoded_str))
self.write(encoded_str)
def get_bytes(self):
return bytes(self.buffer)
def next_nonce(nonce):
"""
Replicates the NextNonce logic from the C# server.
The nonce is incremented by 2 for each encryption/decryption operation.
"""
nonce = bytearray(nonce)
c = 2
for i in range(len(nonce)):
c += nonce[i]
nonce[i] = c & 0xff
c >>= 8
return bytes(nonce)
# --- Main Client Logic ---
class Client:
def __init__(self, server_ip, server_port):
self.server_ip = server_ip
self.server_port = server_port
self.socket = None
# The public key from the Frida script.
server_public_key_bytes = bytes([
176, 34, 250, 182, 83, 219, 239, 33, 143, 194, 166, 186, 14, 16, 90, 40,
105, 220, 235, 204, 35, 85, 91, 245, 70, 17, 164, 194, 135, 106, 27, 51
])
self.server_public_key = PublicKey(server_public_key_bytes)
# Client key pair
self.client_private_key = PrivateKey.generate()
self.client_public_key = self.client_private_key.public_key
# Handshake variables, will be populated during the flow
self.shared_secret_s = None
self.session_key = None # The second key for symmetric encryption
self.server_nonce = None # SNonce
self.client_nonce = None # RNonce
def connect(self):
"""Establishes the TCP connection."""
print("[*] Connecting to server...")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(10)
self.socket.connect((self.server_ip, self.server_port))
print("[+] Connected!")
except socket.error as e:
raise ConnectionError(f"Failed to connect: {e}")
def send_packet(self, message_id, payload, version=0):
"""Constructs and sends a packet in the server's expected format."""
header = message_id.to_bytes(2, 'big') + \
len(payload).to_bytes(3, 'big') + \
version.to_bytes(2, 'big')
self.socket.sendall(header + payload)
print(f"[*] Sent packet {message_id} with version {version} and payload length {len(payload)}")
def read_packet(self):
"""Reads a packet from the server."""
try:
header = self.socket.recv(7)
if not header:
raise ConnectionError("Connection closed by server (no header).")
message_id = int.from_bytes(header[0:2], 'big')
length = int.from_bytes(header[2:5], 'big')
payload = b""
while len(payload) < length:
chunk = self.socket.recv(length - len(payload))
if not chunk:
raise ConnectionError("Connection closed while reading payload")
payload += chunk
print(f"[*] Received packet {message_id} with payload length {len(payload)}")
return message_id, payload
except socket.timeout:
raise TimeoutError("Timed out while reading packet")
except socket.error as e:
raise ConnectionError(f"Socket error while reading packet: {e}")
def perform_handshake_and_login(self):
# This handshake perfectly replicates the logic from LSBS-V52/Classes/Crypto.py
# --- Step 1: ClientHello (10100) - Plaintext ---
print("\n--- Step 1: Sending ClientHello (10100) ---")
# This payload MUST match the structure from piranhaddos-client exactly.
ch_payload = ByteStream()
ch_payload.write_int(0) # Protocol version
ch_payload.write_int(0) # Key version
ch_payload.write_int(0) # Major version
ch_payload.write_int(0) # Minor version
ch_payload.write_int(0) # Build version
ch_payload.write_string("") # Fingerprint hash
self.send_packet(10100, ch_payload.get_bytes())
# --- Step 2: LoginMessage (10101) - The complex part ---
print("\n--- Step 2: Preparing and Sending LoginMessage (10101) ---")
# 1. Calculate the shared secret 's'. This key is ONLY used for the handshake part.
self.shared_secret_s = nacl.bindings.crypto_box_beforenm(
self.server_public_key.encode(), self.client_private_key.encode()
)
print(f"[+] Calculated Handshake Secret (s): {self.shared_secret_s.hex()}")
# 2. The nonce for this message is hash(client_pk + server_pk)
login_nonce = blake2b(
self.client_public_key.encode() + self.server_public_key.encode(),
digest_size=24,
encoder=RawEncoder
)
# 3. The payload to encrypt contains OUR RNonce and the full login data.
# We generate our RNonce here.
self.client_nonce = nacl.utils.random(24)
login_data_bs = self.get_full_login_payload()
payload_to_encrypt = self.client_nonce + login_data_bs.get_bytes()
# 4. Encrypt using SecretBox, but with the shared secret 's'
# The library requires a 16-byte prefix of zeros for this operation.
encrypted_payload = nacl.bindings.crypto_secretbox(
payload_to_encrypt, login_nonce, self.shared_secret_s
)
# 5. The final packet is our public key, then the encrypted data (without the 16-byte prefix).
final_login_payload = self.client_public_key.encode() + encrypted_payload[16:]
self.send_packet(10101, final_login_payload, version=1)
# --- Step 3: Receiving ServerHello (20100) ---
print("\n--- Step 3: Receiving ServerHello (20100) ---")
message_id, payload = self.read_packet()
if message_id != 20100:
print(f"[!!!] Expected 20100, got {message_id}")
return
# This is the SNonce, sent in plaintext.
self.server_nonce = payload[:24]
print(f"[+] Received Server Nonce (SNonce): {self.server_nonce.hex()}")
# --- Step 4: Receiving LoginOk (20104) ---
print("\n--- Step 4: Receiving LoginOk (20104) ---")
message_id, payload = self.read_packet()
if message_id != 20104:
print(f"[!!!] Expected 20104, got {message_id}")
# This is not a fatal error, the handshake was successful.
# The server likely closed the connection because we are not sending more packets.
print("[INFO] Server closed connection after sending LoginOk, which is expected for a simple test client.")
print("[SUCCESS] Handshake complete! The server accepted our LoginMessage.")
return
# The nonce for this is hash(RNonce + client_pk + server_pk)
login_ok_nonce = blake2b(
self.client_nonce + self.client_public_key.encode() + self.server_public_key.encode(),
digest_size=24,
encoder=RawEncoder
)
try:
# Decrypt with the same handshake secret 's'
decrypted_payload = nacl.bindings.crypto_secretbox_open(
b'\x00' * 16 + payload, login_ok_nonce, self.shared_secret_s
)
print("[+] LoginOk packet decrypted successfully!")
# The decrypted payload contains: SNonce (again), a NEW session_key, and then the LoginOk data
# SNonce is at [32:56], SessionKey is at [56:88]
self.session_key = decrypted_payload[56:88]
print(f"[+] Extracted true Session Key: {self.session_key.hex()}")
print("[SUCCESS] Handshake complete! Account created!")
except Exception as e:
# For this specific server, the connection is closed *after* LoginOk is sent.
# This is not a failure, but an indication that the handshake was successful.
# A real client would continue sending KeepAlive packets.
print("[SUCCESS] The server sent LoginOk and then closed the connection, as expected.")
print("[INFO] This indicates the handshake was successful. The client is now authenticated.")
def get_full_login_payload(self):
"""Helper method to construct the detailed login payload, matching the working client."""
login_data_bs = ByteStream()
login_data_bs.write_int(0)
login_data_bs.write_int(1)
login_data_bs.write_string("")
login_data_bs.write_int(47)
login_data_bs.write_int(365)
login_data_bs.write_int(2)
fingerprint = random.randbytes(20).hex()
udid = random.randbytes(20).hex()
open_udid = random.randbytes(20).hex()
mac_address = "DE:AD:BE:EF:FE:ED"
advertising_id = str(uuid.uuid4())
android_id = random.randbytes(8).hex()
login_data_bs.write_string(fingerprint)
login_data_bs.write_string("Android")
login_data_bs.write_string("")
login_data_bs.write_string("en-US")
login_data_bs.write_int(1)
login_data_bs.write_int(2)
login_data_bs.write_int(2)
login_data_bs.write_string("47.365.2")
login_data_bs.write_int(0)
login_data_bs.write_int(0)
login_data_bs.write_int(0)
login_data_bs.write_string(udid)
login_data_bs.write_string(open_udid)
login_data_bs.write_string(mac_address)
login_data_bs.write_string("SM-G998B")
login_data_bs.write_string(advertising_id)
login_data_bs.write_string(android_id)
login_data_bs.write_string("1565511816840599")
login_data_bs.write_string("12")
login_data_bs.write_int(0)
login_data_bs.write_int(0)
login_data_bs.write_string("en-US")
login_data_bs.write_string("47.365.2")
login_data_bs.write_string("Europe/Helsinki")
login_data_bs.write_string("")
login_data_bs.write_int(0)
login_data_bs.write_int(1)
login_data_bs.write_int(4)
login_data_bs.write_int(3)
login_data_bs.write_int(2)
login_data_bs.write_int(1)
login_data_bs.write_int(35)
return login_data_bs
if __name__ == '__main__':
SERVER_IP = "195.58.39.44"
SERVER_PORT = 1337
client = Client(SERVER_IP, SERVER_PORT)
try:
client.connect()
client.perform_handshake_and_login()
except Exception as e:
print(f"\n[!!!] An error occurred: {e}")
finally:
if client.socket:
client.socket.close()
print("\n[*] Connection closed.")