Begin restructuring of the connection handler
Some checks failed
Build & Test / nix-build (push) Failing after 17s
Some checks failed
Build & Test / nix-build (push) Failing after 17s
This commit is contained in:
parent
47bb453178
commit
b0dcb5bc08
@ -22,108 +22,69 @@ defmodule Amethyst.ConnectionHandler do
|
|||||||
alias Amethyst.Minecraft.Write
|
alias Amethyst.Minecraft.Write
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
defstruct handler: self(),
|
||||||
|
version: 0,
|
||||||
|
connection_state: Amethyst.ConnectionState.Handshake,
|
||||||
|
game: nil,
|
||||||
|
encryption_state: nil,
|
||||||
|
decryption_state: nil,
|
||||||
|
compression_threshold: nil,
|
||||||
|
authenticated: false,
|
||||||
|
position: {0, 0, 0},
|
||||||
|
player_state: %{}
|
||||||
|
|
||||||
@spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec()
|
@spec child_spec(:gen_tcp.socket()) :: Supervisor.child_spec()
|
||||||
def child_spec(socket) do
|
def child_spec(socket) do
|
||||||
%{
|
%{
|
||||||
id: __MODULE__,
|
id: __MODULE__,
|
||||||
start: {__MODULE__, :start, [socket, Amethyst.ConnectionState.Handshake, 0]}
|
start: {__MODULE__, :start, [socket]}
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec start(:gen_tcp.socket(), atom(), integer()) :: no_return()
|
@spec start(:gen_tcp.socket(), atom(), integer()) :: no_return()
|
||||||
def start(socket, connstate, version) do
|
def start(socket) do
|
||||||
{:ok, spawn(fn ->
|
{:ok, spawn(fn ->
|
||||||
Process.set_label("ConnectionHandler for #{inspect(socket)}")
|
Process.set_label("ConnectionHandler for #{inspect(socket)}")
|
||||||
loop(socket, connstate, version, %{})
|
loop(socket, %__MODULE__{handler: self()})
|
||||||
end)}
|
end)}
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec start_link(:gen_tcp.socket(), atom(), integer()) :: no_return()
|
@spec start_link(:gen_tcp.socket(), atom(), integer()) :: no_return()
|
||||||
def start_link(socket, connstate, version) do
|
def start_link(socket) do
|
||||||
{:ok, spawn_link(fn ->
|
{:ok, spawn_link(fn ->
|
||||||
Process.set_label("ConnectionHandler for #{inspect(socket)}")
|
Process.set_label("ConnectionHandler for #{inspect(socket)}")
|
||||||
loop(socket, connstate, version, %{})
|
loop(socket, %__MODULE__{handler: self()})
|
||||||
end)}
|
end)}
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec loop(:gen_tcp.socket(), atom(), integer(), map()) :: no_return()
|
@doc """
|
||||||
defp loop(socket, connstate, version, state) do
|
Informs the `handler` that a packet was received and that it should handle it.
|
||||||
|
"""
|
||||||
|
@spec packet(handler :: pid(), id :: pos_integer(), data :: binary())
|
||||||
|
def packet(handler, id, data) do
|
||||||
|
send(handler, {:packet, id, data})
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec close(handler :: pid())
|
||||||
|
def close(handler) do
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec loop(:gen_tcp.socket(), %__MODULE__{}) :: no_return()
|
||||||
|
defp loop(socket, state) do
|
||||||
receive do
|
receive do
|
||||||
{:disconnect, reason} ->
|
:closed ->
|
||||||
disconnect(socket, reason, connstate, version, state)
|
# Socket is closed, handle our side of the disconnect
|
||||||
Process.exit(self(), :normal)
|
Process.exit(self(), :normal)
|
||||||
{:set_state, newstate} ->
|
{:packet, id, data} ->
|
||||||
Logger.debug("Switching to state #{newstate} from #{connstate}")
|
state = Enum.reduce(handle_packet(id, data, state), state, fn op, state -> op.(state) end)
|
||||||
loop(socket, newstate, version, state)
|
loop(socket, state)
|
||||||
{:set_version, newversion} ->
|
{:run, ops} ->
|
||||||
Logger.debug("Switching to version #{newversion} from #{version}")
|
state = Enum.reduce(ops, state, fn op, state -> op.(state) end)
|
||||||
loop(socket, connstate, newversion, state)
|
loop(socket, state)
|
||||||
{:set_encryption, secret} ->
|
{:get, from, op} ->
|
||||||
Logger.debug("Enabling encryption with shared secret #{inspect(secret)}")
|
send(from, op.(state))
|
||||||
encryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, true)
|
loop(socket, state)
|
||||||
decryption_state = :crypto.crypto_init(:aes_128_cfb8, secret, secret, false)
|
|
||||||
state = state |> Map.put(:encryption_state, encryption_state) |> Map.put(:decryption_state, decryption_state)
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
{:set_compression, threshold} ->
|
|
||||||
Logger.debug("Enabling comrpession with threshold #{threshold}")
|
|
||||||
state = Map.put(state, :compression, threshold)
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
{:set_position, position} ->
|
|
||||||
prev_position = Map.get(state, :position)
|
|
||||||
state = Map.put(state, :position, position)
|
|
||||||
# If there was no prev position, we consider that we
|
|
||||||
# definitely moved
|
|
||||||
prev_cp = if prev_position == nil do nil else chunk_pos(elem(prev_position, 0), elem(prev_position, 2)) end
|
|
||||||
cp = chunk_pos(elem(position, 0), elem(position, 2))
|
|
||||||
if prev_cp != cp do
|
|
||||||
Logger.debug("Client entered new chunk #{inspect(cp)}")
|
|
||||||
# We changed chunk borders, update center chunk and begin sending new chunks
|
|
||||||
send(self(), {:send_packet, %{
|
|
||||||
packet_type: :set_center_chunk,
|
|
||||||
chunk_x: elem(cp, 0),
|
|
||||||
chunk_z: elem(cp, 1)
|
|
||||||
}})
|
|
||||||
# Figure out which new chunks are visible
|
|
||||||
prev_chunks =
|
|
||||||
if prev_cp == nil do
|
|
||||||
MapSet.new([])
|
|
||||||
else
|
|
||||||
MapSet.new(visible_chunks_from(elem(prev_cp, 0), elem(prev_cp, 1), 16)) # This 16 would be the server view distance!!
|
|
||||||
end
|
|
||||||
chunks = MapSet.new(visible_chunks_from(elem(cp, 0), elem(cp, 1), 16))
|
|
||||||
new_chunks = MapSet.difference(chunks, prev_chunks)
|
|
||||||
Logger.debug("Sending #{MapSet.size(new_chunks)} chunks...")
|
|
||||||
# We can process all chunks in parallel
|
|
||||||
me = self()
|
|
||||||
ts = state |> Map.get(:game) |> Map.get(:refs) |> Map.get(:task_supervisor)
|
|
||||||
Task.Supervisor.async(ts, fn ->
|
|
||||||
Task.Supervisor.async_stream(ts,
|
|
||||||
new_chunks,
|
|
||||||
fn chunk -> process_chunk(me, chunk, state) end,
|
|
||||||
[ordered: false]
|
|
||||||
) |> Stream.run() end)
|
|
||||||
end
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
{:send_packet, packet} ->
|
|
||||||
# Logger.debug("Sending packet #{inspect(packet)}")
|
|
||||||
send_packet(socket, connstate, packet, version, state)
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
after 0 ->
|
|
||||||
# Received stuff from the connection receiver is lower priority
|
|
||||||
receive do
|
|
||||||
:closed ->
|
|
||||||
Logger.info("Connection #{inspect(socket)} closed.")
|
|
||||||
Process.exit(self(), :normal)
|
|
||||||
{:get_encryption, from} ->
|
|
||||||
send(from, Map.get(state, :decryption_state))
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
{:get_compression, from} ->
|
|
||||||
send(from, Map.get(state, :compression))
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
{:packet, id, data} ->
|
|
||||||
state = handle_packet(id, data, connstate, version, state)
|
|
||||||
loop(socket, connstate, version, state)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -251,6 +212,7 @@ defmodule Amethyst.ConnectionHandler do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@spec handle_packet(pos_integer(), binary(), %__MODULE__{}) :: [(%__MODULE__{} -> %__MODULE__{})]
|
||||||
defp handle_packet(id, data, connstate, version, state) do
|
defp handle_packet(id, data, connstate, version, state) do
|
||||||
try do
|
try do
|
||||||
packet = connstate.deserialize(id, version, data)
|
packet = connstate.deserialize(id, version, data)
|
||||||
|
Loading…
Reference in New Issue
Block a user