Implement compression #3

Merged
kodi merged 3 commits from compression into main 2024-10-06 19:51:44 +02:00
5 changed files with 90 additions and 31 deletions
Showing only changes of commit fb98dd4864 - Show all commits

View File

@ -19,6 +19,7 @@ defmodule Amethyst.ConnectionHandler do
This module is responsible for handling incoming packets and sending outgoing packets. It keeps track of what state the connection is in and which game should
receive the packets.
"""
alias Amethyst.Minecraft.Write
require Logger
@spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec()
@ -63,6 +64,10 @@ defmodule Amethyst.ConnectionHandler do
decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false)
state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state)
loop(socket, connstate, version, state)
{:set_compression, threshold} ->
Logger.debug("Enabling comrpession with threshold #{threshold}")
state = Map.put(state, :compression_threshold, threshold)
loop(socket, connstate, version, state)
{:set_position, position} ->
prev_position = Map.get(state, :position)
state = Map.put(state, :position, position)
@ -83,9 +88,9 @@ defmodule Amethyst.ConnectionHandler do
if prev_cp == nil do
MapSet.new([])
else
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), Map.get(state, :view_distance, 16)))
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), 16)) # This 16 would be the server view distance!!
end
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), Map.get(state, :view_distance, 16)))
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), 16))
new_chunks = MapSet.difference(chunks, prev_chunks)
Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...")
# We can process all chunks in parallel
@ -112,6 +117,9 @@ defmodule Amethyst.ConnectionHandler do
{:get_encryption, from} ->
send(from, Map.get(state, :decryption_state))
loop(socket, connstate, version, state)
{:get_compression, from} ->
send(from, Map.get(state, :compression_threshold))
loop(socket, connstate, version, state)
{:packet, id, data} ->
state = handle_packet(id, data, connstate, version, state)
loop(socket, connstate, version, state)
@ -251,11 +259,24 @@ defmodule Amethyst.ConnectionHandler do
try do
data = connstate.serialize(packet, version)
length = byte_size(data) |> Amethyst.Minecraft.Write.varint()
to_write = case Map.get(state, :compression_threshold) do
nil ->
# No compression
length <> data
threshold when byte_size(length <> data) < threshold ->
# Compression unnecessary
<<0>> <> length <> data
threshold when byte_size(length <> data) >= threshold ->
# Compression
cdata = :zlib.compress(data)
outer_length = byte_size(length <> cdata)
Write.varint(outer_length) <> length <> cdata
end
case Map.get(state, :encryption_state) do
nil ->
:gen_tcp.send(socket, length <> data)
:gen_tcp.send(socket, to_write)
estate ->
encrypted = :crypto.crypto_update(estate, length <> data)
encrypted = :crypto.crypto_update(estate, to_write)
:gen_tcp.send(socket, encrypted)
end
rescue

View File

@ -35,7 +35,7 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil)
receive(socket, pid, nil, nil)
end)}
end
@ -44,63 +44,93 @@ defmodule Amethyst.ConnectionReceiver do
{:ok, spawn_link(fn ->
Process.set_label("ConnectionReceiver for #{inspect(socket)}")
{:ok, pid} = Amethyst.ConnectionHandler.start_link(socket, Amethyst.ConnectionState.Handshake, 0)
receive(socket, pid, nil)
receive(socket, pid, nil, nil)
end)}
end
@spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state()) :: no_return()
def receive(socket, sender, cstate) do
case get_packet(socket, cstate) do
@spec receive(:gen_tcp.socket(), pid(), nil | :crypto.crypto_state(), nil | pos_integer()) :: no_return()
def receive(socket, sender, estate, cstate) do
case get_packet(socket, estate, cstate) do
:closed -> send(sender, :closed)
Process.exit(self(), :normal)
{:error, error} -> Logger.error("Error reading packet: #{error}")
{id, data} -> send(sender, {:packet, id, data})
end
if cstate == nil do
# Ask the handler if the encryption state has changed
estate = if estate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_encryption, self()})
Logger.debug("Asking for news on encryption...")
receive do
nil -> receive(socket, sender, cstate)
nil -> nil
some ->
Logger.debug("Enabling decryption!")
receive(socket, sender, some)
Logger.debug("Enabled decryption")
some
end
else
receive(socket, sender, cstate)
else nil end
cstate = if cstate == nil do
# Ask the handler if we have encryption now
send(sender, {:get_compression, self()})
Logger.debug("Asking for news on compression...")
receive do
nil -> nil
some ->
Logger.debug("Enabled decompression")
some
end
else nil end
receive(socket, sender, estate, cstate)
end
def get_packet(client, cstate) do
case get_varint(client, "", cstate) do
def get_packet(client, estate, cstate) do
case get_varint(client, "", estate) do
:closed -> :closed
{:error, error} -> {:error, error}
{[length], ""} ->
recv = :gen_tcp.recv(client, length)
case recv do
{:ok, full_packet} ->
full_packet = case cstate do
nil -> full_packet
ds -> :crypto.crypto_update(ds, full_packet)
data = :gen_tcp.recv(client, length)
case data do
{:ok, data} ->
# Perform decryption and decompression
decrypted = if estate == nil do
data
else
:crypto.crypto_update(estate, data)
end
({[id], data} = Read.start(full_packet) |> Read.varint() |> Read.stop()
{id, data})
full_packet = if cstate == nil do
# Using "without compression" container
decrypted
else
# Using "with compression" container
{[dlength], rest} = Read.start(decrypted) |> Read.varint |> Read.stop
if dlength == 0 do
# Uncompressed data
rest
else
# Compressed data
rest |> :zlib.uncompress()
end
end
{[id], data} = Read.start(full_packet) |> Read.varint |> Read.stop
{id, data}
{:error, :closed} -> :closed
{:error, error} -> {:error, error}
end
end
end
defp get_varint(client, acc, cstate) do
defp get_varint(client, acc, estate) do
case :gen_tcp.recv(client, 1) do
{:ok, byte} ->
byte = case cstate do
byte = case estate do
nil -> byte
ds -> :crypto.crypto_update(ds, byte)
end
case byte do
<<0::1, _::7>> -> Read.start(acc <> byte) |> Read.varint() |> Read.stop()
<<1::1, _::7>> -> get_varint(client, acc <> byte, cstate)
<<1::1, _::7>> -> get_varint(client, acc <> byte, estate)
end
{:error, :closed} -> :closed
{:error, error} -> {:error, error}

View File

@ -90,6 +90,13 @@ defmodule Amethyst.ConnectionState.Login do
verify_token = Amethyst.Keys.decrypt(verify_token)
if verify_token == Map.get(state, :verify_token, :never) do
send(self(), {:set_encryption, secret})
if Application.get_env(:amethyst, :compression, nil) != nil do
threshold = Application.get_env(:amethyst, :compression, 0)
send(self(), {:send_packet, %{packet_type: :set_compression, threshold: threshold}})
send(self(), {:set_compression, threshold})
end
send(self(), {:send_packet, %{
packet_type: :login_success,
uuid: Map.get(state, :uuid),

View File

@ -18,7 +18,7 @@ defmodule Example.Game do
@impl true
@spec player_position(any(), {any(), any(), any()}, any()) :: :ok
def player_position(from, {x, y, z}, _refs) do
Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}")
# Logger.info("Player at #{inspect(from)} moved to #{x}, #{y}, #{z}")
send(from, {:set_position, {x, y, z}})
:ok
end

View File

@ -4,4 +4,5 @@ config :amethyst,
port: 25599, # Bogus port for testing, avoids unexpected conflicts
encryption: true, # Whether or not to request encryption from clients.
auth: false, # Whether or not users should be authenticated with Mojang.
compression: nil, # Packets larger than this amount are sent compressed. Set to nil to disable compression.
default_game: Example.Game # Which game new players should be sent to