From fb98dd48648df43ba9be2d98b3eb2f37a9a5ed7e Mon Sep 17 00:00:00 2001 From: Kodi Craft Date: Sun, 6 Oct 2024 13:49:14 +0200 Subject: [PATCH 1/3] Attempt to implement compression --- apps/amethyst/lib/apps/connection_handler.ex | 29 ++++++- apps/amethyst/lib/apps/connection_receiver.ex | 82 +++++++++++++------ apps/amethyst/lib/states/login.ex | 7 ++ apps/example_game/lib/example/game.ex | 2 +- config/runtime.exs | 1 + 5 files changed, 90 insertions(+), 31 deletions(-) diff --git a/apps/amethyst/lib/apps/connection_handler.ex b/apps/amethyst/lib/apps/connection_handler.ex index d8232a2..f19f025 100644 --- a/apps/amethyst/lib/apps/connection_handler.ex +++ b/apps/amethyst/lib/apps/connection_handler.ex @@ -19,6 +19,7 @@ defmodule Amethyst.ConnectionHandler do This module is responsible for handling incoming packets and sending outgoing packets. It keeps track of what state the connection is in and which game should receive the packets. """ + alias Amethyst.Minecraft.Write require Logger @spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec() @@ -63,6 +64,10 @@ defmodule Amethyst.ConnectionHandler do decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false) state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state) loop(socket, connstate, version, state) + {:set_compression, threshold} -> + Logger.debug("Enabling comrpession with threshold #{threshold}") + state = Map.put(state, :compression_threshold, threshold) + loop(socket, connstate, version, state) {:set_position, position} -> prev_position = Map.get(state, :position) state = Map.put(state, :position, position) @@ -83,9 +88,9 @@ defmodule Amethyst.ConnectionHandler do if prev_cp == nil do MapSet.new([]) else - MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), Map.get(state, :view_distance, 16))) + MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), 16)) # This 16 would be the server view distance!! end - chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), Map.get(state, :view_distance, 16))) + chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), 16)) new_chunks = MapSet.difference(chunks, prev_chunks) Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...") # We can process all chunks in parallel @@ -112,6 +117,9 @@ defmodule Amethyst.ConnectionHandler do {:get_encryption, from} -> send(from, Map.get(state, :decryption_state)) loop(socket, connstate, version, state) + {:get_compression, from} -> + send(from, Map.get(state, :compression_threshold)) + loop(socket, connstate, version, state) {:packet, id, data} -> state = handle_packet(id, data, connstate, version, state) loop(socket, connstate, version, state) @@ -251,11 +259,24 @@ defmodule Amethyst.ConnectionHandler do try do data = connstate.serialize(packet, version) length = byte_size(data) |> Amethyst.Minecraft.Write.varint() + to_write = case Map.get(state, :compression_threshold) do + nil -> + # No compression + length <> data + threshold when byte_size(length <> data) < threshold -> + # Compression unnecessary + <<0>> <> length <> data + threshold when byte_size(length <> data) >= threshold -> + # Compression + cdata = :zlib.compress(data) + outer_length = byte_size(length <> cdata) + Write.varint(outer_length) <> length <> cdata + end case Map.get(state, :encryption_state) do nil -> - :gen_tcp.send(socket, length <> data) + :gen_tcp.send(socket, to_write) estate -> - encrypted = :crypto.crypto_update(estate, length <> data) + encrypted = :crypto.crypto_update(estate, to_write) :gen_tcp.send(socket, encrypted) end rescue diff --git a/apps/amethyst/lib/apps/connection_receiver.ex b/apps/amethyst/lib/apps/connection_receiver.ex index fd34386..d8bd08e 100644 --- a/apps/amethyst/lib/apps/connection_receiver.ex +++ b/apps/amethyst/lib/apps/connection_receiver.ex @@ -35,7 +35,7 @@ defmodule Amethyst.ConnectionReceiver do {:ok, spawn(fn -> Process.set_label("ConnectionReceiver for #{inspect(socket)}") {:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0) - receive(socket, pid, nil) + receive(socket, pid, nil, nil) end)} end @@ -44,63 +44,93 @@ defmodule Amethyst.ConnectionReceiver do {:ok, spawn_link(fn -> Process.set_label("ConnectionReceiver for #{inspect(socket)}") {:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0) - receive(socket, pid, nil) + receive(socket, pid, nil, nil) end)} end - @spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state()) :: no_return() - def receive(socket, sender, cstate) do - case get_packet(socket, cstate) do + @spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state(), nil | pos_integer()) :: no_return() + def receive(socket, sender, estate, cstate) do + case get_packet(socket, estate, cstate) do :closed -> send(sender, :closed) Process.exit(self(), :normal) {:error, error} -> Logger.error("Error reading packet: #{error}") {id, data} -> send(sender, {:packet, id, data}) end - if cstate == nil do - # Ask the handler if the encryption state has changed + + estate = if estate == nil do + # Ask the handler if we have encryption now send(sender, {:get_encryption, self()}) Logger.debug("Asking for news on encryption...") receive do - nil -> receive(socket, sender, cstate) + nil -> nil some -> - Logger.debug("Enabling decryption!") - receive(socket, sender, some) + Logger.debug("Enabled decryption") + some end - else - receive(socket, sender, cstate) - end + else nil end + cstate = if cstate == nil do + # Ask the handler if we have encryption now + send(sender, {:get_compression, self()}) + Logger.debug("Asking for news on compression...") + receive do + nil -> nil + some -> + Logger.debug("Enabled decompression") + some + end + else nil end + + receive(socket, sender, estate, cstate) end - def get_packet(client, cstate) do - case get_varint(client, "", cstate) do + def get_packet(client, estate, cstate) do + case get_varint(client, "", estate) do :closed -> :closed {:error, error} -> {:error, error} {[length], ""} -> - recv = :gen_tcp.recv(client, length) - case recv do - {:ok, full_packet} -> - full_packet = case cstate do - nil -> full_packet - ds -> :crypto.crypto_update(ds, full_packet) + data = :gen_tcp.recv(client, length) + case data do + {:ok, data} -> + # Perform decryption and decompression + decrypted = if estate == nil do + data + else + :crypto.crypto_update(estate, data) end - ({[id], data} = Read.start(full_packet) |> Read.varint() |> Read.stop() - {id, data}) + + full_packet = if cstate == nil do + # Using "without compression" container + decrypted + else + # Using "with compression" container + {[dlength], rest} = Read.start(decrypted) |> Read.varint |> Read.stop + if dlength == 0 do + # Uncompressed data + rest + else + # Compressed data + rest |> :zlib.uncompress() + end + end + + {[id], data} = Read.start(full_packet) |> Read.varint |> Read.stop + {id, data} {:error, :closed} -> :closed {:error, error} -> {:error, error} end end end - defp get_varint(client, acc, cstate) do + defp get_varint(client, acc, estate) do case :gen_tcp.recv(client, 1) do {:ok, byte} -> - byte = case cstate do + byte = case estate do nil -> byte ds -> :crypto.crypto_update(ds, byte) end case byte do <<0::1, _::7>> -> Read.start(acc <> byte) |> Read.varint() |> Read.stop() - <<1::1, _::7>> -> get_varint(client, acc <> byte, cstate) + <<1::1, _::7>> -> get_varint(client, acc <> byte, estate) end {:error, :closed} -> :closed {:error, error} -> {:error, error} diff --git a/apps/amethyst/lib/states/login.ex b/apps/amethyst/lib/states/login.ex index a6f22fe..f5e9c3e 100644 --- a/apps/amethyst/lib/states/login.ex +++ b/apps/amethyst/lib/states/login.ex @@ -90,6 +90,13 @@ defmodule Amethyst.ConnectionState.Login do verify_token = Amethyst.Keys.decrypt(verify_token) if verify_token == Map.get(state, :verify_token, :never) do send(self(), {:set_encryption, secret}) + + if Application.get_env(:amethyst, :compression, nil) != nil do + threshold = Application.get_env(:amethyst, :compression, 0) + send(self(), {:send_packet, %{packet_type: :set_compression, threshold: threshold}}) + send(self(), {:set_compression, threshold}) + end + send(self(), {:send_packet, %{ packet_type: :login_success, uuid: Map.get(state, :uuid), diff --git a/apps/example_game/lib/example/game.ex b/apps/example_game/lib/example/game.ex index 0756f9e..62dce35 100644 --- a/apps/example_game/lib/example/game.ex +++ b/apps/example_game/lib/example/game.ex @@ -18,7 +18,7 @@ defmodule Example.Game do @impl true @spec player_position(any(), {any(), any(), any()}, any()) :: :ok def player_position(from, {x, y, z}, _refs) do - Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}") + # Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}") send(from, {:set_position, {x, y, z}}) :ok end diff --git a/config/runtime.exs b/config/runtime.exs index c879757..acd79ea 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -4,4 +4,5 @@ config :amethyst, port: 25599, # Bogus port for testing, avoids unexpected conflicts encryption: true, # Whether or not to request encryption from clients. auth: false, # Whether or not users should be authenticated with Mojang. + compression: nil, # Packets larger than this amount are sent compressed. Set to nil to disable compression. default_game: Example.Game # Which game new players should be sent to From be449164613d95fadf3fbf540daedb718b295949 Mon Sep 17 00:00:00 2001 From: Kodi Craft Date: Sun, 6 Oct 2024 15:20:39 +0200 Subject: [PATCH 2/3] Implement compression --- apps/amethyst/lib/apps/connection_handler.ex | 37 +++++++++---------- apps/amethyst/lib/apps/connection_receiver.ex | 17 +++------ 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/apps/amethyst/lib/apps/connection_handler.ex b/apps/amethyst/lib/apps/connection_handler.ex index f19f025..60968d8 100644 --- a/apps/amethyst/lib/apps/connection_handler.ex +++ b/apps/amethyst/lib/apps/connection_handler.ex @@ -258,27 +258,26 @@ defmodule Amethyst.ConnectionHandler do defp send_packet(socket, connstate, packet, version, state) do try do data = connstate.serialize(packet, version) - length = byte_size(data) |> Amethyst.Minecraft.Write.varint() - to_write = case Map.get(state, :compression_threshold) do - nil -> - # No compression - length <> data - threshold when byte_size(length <> data) < threshold -> - # Compression unnecessary - <<0>> <> length <> data - threshold when byte_size(length <> data) >= threshold -> - # Compression - cdata = :zlib.compress(data) - outer_length = byte_size(length <> cdata) - Write.varint(outer_length) <> length <> cdata + container = if Map.get(state, :compression) == nil do + # Packet ID is included in data + Write.varint(byte_size(data)) <> data + else + threshold = Map.get(state, :compression, 0) + data_length = byte_size(data) + if data_length >= threshold do + compressed = Write.varint(data_length) <> :zlib.zip(data) + Write.varint(byte_size(compressed)) <> compressed + else + compressed = Write.varint(0) <> data + Write.varint(byte_size(compressed)) <> compressed + end end - case Map.get(state, :encryption_state) do - nil -> - :gen_tcp.send(socket, to_write) - estate -> - encrypted = :crypto.crypto_update(estate, to_write) - :gen_tcp.send(socket, encrypted) + encrypted = if Map.get(state, :encryption_state) == nil do + container + else + Map.get(state, :encryption_state) |> :crypto.crypto_update(container) end + :gen_tcp.send(socket, encrypted) rescue e -> Logger.error("Error sending packet #{inspect(packet)} in state #{connstate}: #{Exception.format(:error, e, __STACKTRACE__)}") diff --git a/apps/amethyst/lib/apps/connection_receiver.ex b/apps/amethyst/lib/apps/connection_receiver.ex index d8bd08e..08c591a 100644 --- a/apps/amethyst/lib/apps/connection_receiver.ex +++ b/apps/amethyst/lib/apps/connection_receiver.ex @@ -60,25 +60,20 @@ defmodule Amethyst.ConnectionReceiver do estate = if estate == nil do # Ask the handler if we have encryption now send(sender, {:get_encryption, self()}) - Logger.debug("Asking for news on encryption...") receive do nil -> nil - some -> - Logger.debug("Enabled decryption") - some + some -> some end - else nil end + else estate end + cstate = if cstate == nil do # Ask the handler if we have encryption now send(sender, {:get_compression, self()}) - Logger.debug("Asking for news on compression...") receive do nil -> nil - some -> - Logger.debug("Enabled decompression") - some + some -> some end - else nil end + else cstate end receive(socket, sender, estate, cstate) end @@ -109,7 +104,7 @@ defmodule Amethyst.ConnectionReceiver do rest else # Compressed data - rest |> :zlib.uncompress() + rest |> :zlib.unzip() end end From 8b09c78f02793578f5a260e50240b7626c970537 Mon Sep 17 00:00:00 2001 From: Kodi Craft Date: Sun, 6 Oct 2024 15:49:55 +0200 Subject: [PATCH 3/3] Actually implement compression --- apps/amethyst/lib/apps/connection_handler.ex | 8 +++++--- apps/amethyst/lib/apps/connection_receiver.ex | 2 +- config/runtime.exs | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/amethyst/lib/apps/connection_handler.ex b/apps/amethyst/lib/apps/connection_handler.ex index 60968d8..569e7ce 100644 --- a/apps/amethyst/lib/apps/connection_handler.ex +++ b/apps/amethyst/lib/apps/connection_handler.ex @@ -66,7 +66,7 @@ defmodule Amethyst.ConnectionHandler do loop(socket, connstate, version, state) {:set_compression, threshold} -> Logger.debug("Enabling comrpession with threshold #{threshold}") - state = Map.put(state, :compression_threshold, threshold) + state = Map.put(state, :compression, threshold) loop(socket, connstate, version, state) {:set_position, position} -> prev_position = Map.get(state, :position) @@ -118,7 +118,7 @@ defmodule Amethyst.ConnectionHandler do send(from, Map.get(state, :decryption_state)) loop(socket, connstate, version, state) {:get_compression, from} -> - send(from, Map.get(state, :compression_threshold)) + send(from, Map.get(state, :compression)) loop(socket, connstate, version, state) {:packet, id, data} -> state = handle_packet(id, data, connstate, version, state) @@ -258,6 +258,7 @@ defmodule Amethyst.ConnectionHandler do defp send_packet(socket, connstate, packet, version, state) do try do data = connstate.serialize(packet, version) + Logger.debug("#{inspect(Map.get(state, :compression))}") container = if Map.get(state, :compression) == nil do # Packet ID is included in data Write.varint(byte_size(data)) <> data @@ -265,13 +266,14 @@ defmodule Amethyst.ConnectionHandler do threshold = Map.get(state, :compression, 0) data_length = byte_size(data) if data_length >= threshold do - compressed = Write.varint(data_length) <> :zlib.zip(data) + compressed = Write.varint(data_length) <> :zlib.compress(data) Write.varint(byte_size(compressed)) <> compressed else compressed = Write.varint(0) <> data Write.varint(byte_size(compressed)) <> compressed end end + Logger.debug(inspect(container)) encrypted = if Map.get(state, :encryption_state) == nil do container else diff --git a/apps/amethyst/lib/apps/connection_receiver.ex b/apps/amethyst/lib/apps/connection_receiver.ex index 08c591a..3b8e033 100644 --- a/apps/amethyst/lib/apps/connection_receiver.ex +++ b/apps/amethyst/lib/apps/connection_receiver.ex @@ -104,7 +104,7 @@ defmodule Amethyst.ConnectionReceiver do rest else # Compressed data - rest |> :zlib.unzip() + rest |> :zlib.uncompress() end end diff --git a/config/runtime.exs b/config/runtime.exs index acd79ea..a3c94b6 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -4,5 +4,5 @@ config :amethyst, port: 25599, # Bogus port for testing, avoids unexpected conflicts encryption: true, # Whether or not to request encryption from clients. auth: false, # Whether or not users should be authenticated with Mojang. - compression: nil, # Packets larger than this amount are sent compressed. Set to nil to disable compression. + compression: 256, # Packets larger than this amount are sent compressed. Set to nil to disable compression. default_game: Example.Game # Which game new players should be sent to