Implement compression #3

Merged
kodi merged 3 commits from compression into main 2024-10-06 19:51:44 +02:00
5 changed files with 93 additions and 38 deletions

View File

@ -19,6 +19,7 @@ defmodule Amethyst.ConnectionHandler do
This module is responsible for handling incoming packets and sending outgoing packets. It keeps track of what state the connection is in and which game should
receive the packets.
"""
alias Amethyst.Minecraft.Write
require Logger
@spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec()
@ -63,6 +64,10 @@ defmodule Amethyst.ConnectionHandler do
decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false)
state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state)
loop(socket, connstate, version, state)
{:set_compression, threshold} ->
Logger.debug("Enabling comrpession with threshold #{threshold}")
state = Map.put(state, :compression, threshold)
loop(socket, connstate, version, state)
{:set_position, position} ->
prev_position = Map.get(state, :position)
state = Map.put(state, :position, position)
@ -83,9 +88,9 @@ defmodule Amethyst.ConnectionHandler do
if prev_cp == nil do
MapSet.new([])
else
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), Map.get(state, :view_distance, 16)))
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), 16)) # This 16 would be the server view distance!!
end
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), Map.get(state, :view_distance, 16)))
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), 16))
new_chunks = MapSet.difference(chunks, prev_chunks)
Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...")
# We can process all chunks in parallel
@ -112,6 +117,9 @@ defmodule Amethyst.ConnectionHandler do
{:get_encryption, from} ->
send(from, Map.get(state, :decryption_state))
loop(socket, connstate, version, state)
{:get_compression, from} ->
send(from, Map.get(state, :compression))
loop(socket, connstate, version, state)
{:packet, id, data} ->
state = handle_packet(id, data, connstate, version, state)
loop(socket, connstate, version, state)
@ -250,14 +258,28 @@ defmodule Amethyst.ConnectionHandler do
defp send_packet(socket, connstate, packet, version, state) do
try do
data = connstate.serialize(packet, version)
length = byte_size(data) |> Amethyst.Minecraft.Write.varint()
case Map.get(state, :encryption_state) do
nil ->
:gen_tcp.send(socket, length <> data)
estate ->
encrypted = :crypto.crypto_update(estate, length <> data)
:gen_tcp.send(socket, encrypted)
Logger.debug("#{inspect(Map.get(state, :compression))}")
container = if Map.get(state, :compression) == nil do
# Packet ID is included in data
Write.varint(byte_size(data)) <> data
else
threshold = Map.get(state, :compression, 0)
data_length = byte_size(data)
if data_length >= threshold do
compressed = Write.varint(data_length) <> :zlib.compress(data)
Write.varint(byte_size(compressed)) <> compressed
else
compressed = Write.varint(0) <> data
Write.varint(byte_size(compressed)) <> compressed
end
end
Logger.debug(inspect(container))
encrypted = if Map.get(state, :encryption_state) == nil do
container
else
Map.get(state, :encryption_state) |> :crypto.crypto_update(container)
end
:gen_tcp.send(socket, encrypted)
rescue
e ->
Logger.error("Error sending packet #{inspect(packet)} in state #{connstate}: #{Exception.format(:error, e, __STACKTRACE__)}")

View File

@ -35,7 +35,7 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil)
receive(socket, pid, nil, nil)
end)}
end
@ -44,63 +44,88 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn_link(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil)
receive(socket, pid, nil, nil)
end)}
end
@spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state()) :: no_return()
def receive(socket, sender, cstate) do
case get_packet(socket, cstate) do
@spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state(), nil | pos_integer()) :: no_return()
def receive(socket, sender, estate, cstate) do
case get_packet(socket, estate, cstate) do
:closed -> send(sender, :closed)
Process.exit(self(), :normal)
{:error, error} -> Logger.error("Error reading packet: #{error}")
{id, data} -> send(sender, {:packet, id, data})
end
if cstate == nil do
# Ask the handler if the encryption state has changed
estate = if estate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_encryption, self()})
Logger.debug("Asking for news on encryption...")
receive do
nil -> receive(socket, sender, cstate)
some ->
Logger.debug("Enabling decryption!")
receive(socket, sender, some)
nil -> nil
some -> some
end
else
receive(socket, sender, cstate)
else estate end
cstate = if cstate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_compression, self()})
receive do
nil -> nil
some -> some
end
else cstate end
receive(socket, sender, estate, cstate)
end
def get_packet(client, cstate) do
case get_varint(client, "", cstate) do
def get_packet(client, estate, cstate) do
case get_varint(client, "", estate) do
:closed -> :closed
{:error, error} -> {:error, error}
{[length], ""} ->
recv = :gen_tcp.recv(client, length)
case recv do
{:ok, full_packet} ->
full_packet = case cstate do
nil -> full_packet
ds -> :crypto.crypto_update(ds, full_packet)
data = :gen_tcp.recv(client, length)
case data do
{:ok, data} ->
# Perform decryption and decompression
decrypted = if estate == nil do
data
else
:crypto.crypto_update(estate, data)
end
({[id], data} = Read.start(full_packet) |> Read.varint() |> Read.stop()
{id, data})
full_packet = if cstate == nil do
# Using "without compression" container
decrypted
else
# Using "with compression" container
{[dlength], rest} = Read.start(decrypted) |> Read.varint |> Read.stop
if dlength == 0 do
# Uncompressed data
rest
else
# Compressed data
rest |> :zlib.uncompress()
end
end
{[id], data} = Read.start(full_packet) |> Read.varint |> Read.stop
{id, data}
{:error, :closed} -> :closed
{:error, error} -> {:error, error}
end
end
end
defp get_varint(client, acc, cstate) do
defp get_varint(client, acc, estate) do
case :gen_tcp.recv(client, 1) do
{:ok, byte} ->
byte = case cstate do
byte = case estate do
nil -> byte
ds -> :crypto.crypto_update(ds, byte)
end
case byte do
<<0::1, _::7>> -> Read.start(acc <> byte) |> Read.varint() |> Read.stop()
<<1::1, _::7>> -> get_varint(client, acc <> byte, cstate)
<<1::1, _::7>> -> get_varint(client, acc <> byte, estate)
end
{:error, :closed} -> :closed
{:error, error} -> {:error, error}

View File

@ -90,6 +90,13 @@ defmodule Amethyst.ConnectionState.Login do
verify_token = Amethyst.Keys.decrypt(verify_token)
if verify_token == Map.get(state, :verify_token, :never) do
send(self(), {:set_encryption, secret})
if Application.get_env(:amethyst, :compression, nil) != nil do
threshold = Application.get_env(:amethyst, :compression, 0)
send(self(), {:send_packet, %{packet_type: :set_compression, threshold: threshold}})
send(self(), {:set_compression, threshold})
end
send(self(), {:send_packet, %{
packet_type: :login_success,
uuid: Map.get(state, :uuid),

View File

@ -18,7 +18,7 @@ defmodule Example.Game do
@impl true
@spec player_position(any(), {any(), any(), any()}, any()) :: :ok
def player_position(from, {x, y, z}, _refs) do
Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}")
# Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}")
send(from, {:set_position, {x, y, z}})
:ok
end

View File

@ -4,4 +4,5 @@ config :amethyst,
port: 25599, # Bogus port for testing, avoids unexpected conflicts
encryption: true, # Whether or not to request encryption from clients.
auth: false, # Whether or not users should be authenticated with Mojang.
compression: 256, # Packets larger than this amount are sent compressed. Set to nil to disable compression.
default_game: Example.Game # Which game new players should be sent to