Implement compression #3

Merged
kodi merged 3 commits from compression into main 2024-10-06 19:51:44 +02:00
5 changed files with 93 additions and 38 deletions

View File

@ -19,6 +19,7 @@ defmodule Amethyst.ConnectionHandler do
This module is responsible for handling incoming packets and sending outgoing packets. It keeps track of what state the connection is in and which game should This module is responsible for handling incoming packets and sending outgoing packets. It keeps track of what state the connection is in and which game should
receive the packets. receive the packets.
""" """
alias Amethyst.Minecraft.Write
require Logger require Logger
@spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec() @spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec()
@ -63,6 +64,10 @@ defmodule Amethyst.ConnectionHandler do
decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false) decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false)
state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state) state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state)
loop(socket, connstate, version, state) loop(socket, connstate, version, state)
{:set_compression, threshold} ->
Logger.debug("Enabling comrpession with threshold #{threshold}")
state = Map.put(state, :compression, threshold)
loop(socket, connstate, version, state)
{:set_position, position} -> {:set_position, position} ->
prev_position = Map.get(state, :position) prev_position = Map.get(state, :position)
state = Map.put(state, :position, position) state = Map.put(state, :position, position)
@ -83,9 +88,9 @@ defmodule Amethyst.ConnectionHandler do
if prev_cp == nil do if prev_cp == nil do
MapSet.new([]) MapSet.new([])
else else
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), Map.get(state, :view_distance, 16))) MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), 16)) # This 16 would be the server view distance!!
end end
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), Map.get(state, :view_distance, 16))) chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), 16))
new_chunks = MapSet.difference(chunks, prev_chunks) new_chunks = MapSet.difference(chunks, prev_chunks)
Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...") Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...")
# We can process all chunks in parallel # We can process all chunks in parallel
@ -112,6 +117,9 @@ defmodule Amethyst.ConnectionHandler do
{:get_encryption, from} -> {:get_encryption, from} ->
send(from, Map.get(state, :decryption_state)) send(from, Map.get(state, :decryption_state))
loop(socket, connstate, version, state) loop(socket, connstate, version, state)
{:get_compression, from} ->
send(from, Map.get(state, :compression))
loop(socket, connstate, version, state)
{:packet, id, data} -> {:packet, id, data} ->
state = handle_packet(id, data, connstate, version, state) state = handle_packet(id, data, connstate, version, state)
loop(socket, connstate, version, state) loop(socket, connstate, version, state)
@ -250,14 +258,28 @@ defmodule Amethyst.ConnectionHandler do
defp send_packet(socket, connstate, packet, version, state) do defp send_packet(socket, connstate, packet, version, state) do
try do try do
data = connstate.serialize(packet, version) data = connstate.serialize(packet, version)
length = byte_size(data) |> Amethyst.Minecraft.Write.varint() Logger.debug("#{inspect(Map.get(state, :compression))}")
case Map.get(state, :encryption_state) do container = if Map.get(state, :compression) == nil do
nil -> # Packet ID is included in data
:gen_tcp.send(socket, length <> data) Write.varint(byte_size(data)) <> data
estate -> else
encrypted = :crypto.crypto_update(estate, length <> data) threshold = Map.get(state, :compression, 0)
:gen_tcp.send(socket, encrypted) data_length = byte_size(data)
if data_length >= threshold do
compressed = Write.varint(data_length) <> :zlib.compress(data)
Write.varint(byte_size(compressed)) <> compressed
else
compressed = Write.varint(0) <> data
Write.varint(byte_size(compressed)) <> compressed
end
end end
Logger.debug(inspect(container))
encrypted = if Map.get(state, :encryption_state) == nil do
container
else
Map.get(state, :encryption_state) |> :crypto.crypto_update(container)
end
:gen_tcp.send(socket, encrypted)
rescue rescue
e -> e ->
Logger.error("Error sending packet #{inspect(packet)} in state #{connstate}: #{Exception.format(:error, e, __STACKTRACE__)}") Logger.error("Error sending packet #{inspect(packet)} in state #{connstate}: #{Exception.format(:error, e, __STACKTRACE__)}")

View File

@ -35,7 +35,7 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn(fn -> {:ok, spawn(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}") Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0) {:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil) receive(socket, pid, nil, nil)
end)} end)}
end end
@ -44,63 +44,88 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn_link(fn -> {:ok, spawn_link(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}") Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0) {:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil) receive(socket, pid, nil, nil)
end)} end)}
end end
@spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state()) :: no_return() @spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state(), nil | pos_integer()) :: no_return()
def receive(socket, sender, cstate) do def receive(socket, sender, estate, cstate) do
case get_packet(socket, cstate) do case get_packet(socket, estate, cstate) do
:closed -> send(sender, :closed) :closed -> send(sender, :closed)
Process.exit(self(), :normal) Process.exit(self(), :normal)
{:error, error} -> Logger.error("Error reading packet: #{error}") {:error, error} -> Logger.error("Error reading packet: #{error}")
{id, data} -> send(sender, {:packet, id, data}) {id, data} -> send(sender, {:packet, id, data})
end end
if cstate == nil do
# Ask the handler if the encryption state has changed estate = if estate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_encryption, self()}) send(sender, {:get_encryption, self()})
Logger.debug("Asking for news on encryption...")
receive do receive do
nil -> receive(socket, sender, cstate) nil -> nil
some -> some -> some
Logger.debug("Enabling decryption!")
receive(socket, sender, some)
end end
else else estate end
receive(socket, sender, cstate)
end cstate = if cstate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_compression, self()})
receive do
nil -> nil
some -> some
end
else cstate end
receive(socket, sender, estate, cstate)
end end
def get_packet(client, cstate) do def get_packet(client, estate, cstate) do
case get_varint(client, "", cstate) do case get_varint(client, "", estate) do
:closed -> :closed :closed -> :closed
{:error, error} -> {:error, error} {:error, error} -> {:error, error}
{[length], ""} -> {[length], ""} ->
recv = :gen_tcp.recv(client, length) data = :gen_tcp.recv(client, length)
case recv do case data do
{:ok, full_packet} -> {:ok, data} ->
full_packet = case cstate do # Perform decryption and decompression
nil -> full_packet decrypted = if estate == nil do
ds -> :crypto.crypto_update(ds, full_packet) data
else
:crypto.crypto_update(estate, data)
end end
({[id], data} = Read.start(full_packet) |> Read.varint() |> Read.stop()
{id, data}) full_packet = if cstate == nil do
# Using "without compression" container
decrypted
else
# Using "with compression" container
{[dlength], rest} = Read.start(decrypted) |> Read.varint |> Read.stop
if dlength == 0 do
# Uncompressed data
rest
else
# Compressed data
rest |> :zlib.uncompress()
end
end
{[id], data} = Read.start(full_packet) |> Read.varint |> Read.stop
{id, data}
{:error, :closed} -> :closed {:error, :closed} -> :closed
{:error, error} -> {:error, error} {:error, error} -> {:error, error}
end end
end end
end end
defp get_varint(client, acc, cstate) do defp get_varint(client, acc, estate) do
case :gen_tcp.recv(client, 1) do case :gen_tcp.recv(client, 1) do
{:ok, byte} -> {:ok, byte} ->
byte = case cstate do byte = case estate do
nil -> byte nil -> byte
ds -> :crypto.crypto_update(ds, byte) ds -> :crypto.crypto_update(ds, byte)
end end
case byte do case byte do
<<0::1, _::7>> -> Read.start(acc <> byte) |> Read.varint() |> Read.stop() <<0::1, _::7>> -> Read.start(acc <> byte) |> Read.varint() |> Read.stop()
<<1::1, _::7>> -> get_varint(client, acc <> byte, cstate) <<1::1, _::7>> -> get_varint(client, acc <> byte, estate)
end end
{:error, :closed} -> :closed {:error, :closed} -> :closed
{:error, error} -> {:error, error} {:error, error} -> {:error, error}

View File

@ -90,6 +90,13 @@ defmodule Amethyst.ConnectionState.Login do
verify_token = Amethyst.Keys.decrypt(verify_token) verify_token = Amethyst.Keys.decrypt(verify_token)
if verify_token == Map.get(state, :verify_token, :never) do if verify_token == Map.get(state, :verify_token, :never) do
send(self(), {:set_encryption, secret}) send(self(), {:set_encryption, secret})
if Application.get_env(:amethyst, :compression, nil) != nil do
threshold = Application.get_env(:amethyst, :compression, 0)
send(self(), {:send_packet, %{packet_type: :set_compression, threshold: threshold}})
send(self(), {:set_compression, threshold})
end
send(self(), {:send_packet, %{ send(self(), {:send_packet, %{
packet_type: :login_success, packet_type: :login_success,
uuid: Map.get(state, :uuid), uuid: Map.get(state, :uuid),

View File

@ -18,7 +18,7 @@ defmodule Example.Game do
@impl true @impl true
@spec player_position(any(), {any(), any(), any()}, any()) :: :ok @spec player_position(any(), {any(), any(), any()}, any()) :: :ok
def player_position(from, {x, y, z}, _refs) do def player_position(from, {x, y, z}, _refs) do
Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}") # Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}")
send(from, {:set_position, {x, y, z}}) send(from, {:set_position, {x, y, z}})
:ok :ok
end end

View File

@ -4,4 +4,5 @@ config :amethyst,
port: 25599, # Bogus port for testing, avoids unexpected conflicts port: 25599, # Bogus port for testing, avoids unexpected conflicts
encryption: true, # Whether or not to request encryption from clients. encryption: true, # Whether or not to request encryption from clients.
auth: false, # Whether or not users should be authenticated with Mojang. auth: false, # Whether or not users should be authenticated with Mojang.
compression: 256, # Packets larger than this amount are sent compressed. Set to nil to disable compression.
default_game: Example.Game # Which game new players should be sent to default_game: Example.Game # Which game new players should be sent to