4 Commits

Author SHA1 Message Date
84f7009ad2 Add more proper cleanup for the server
Some checks failed
Build library & run tests / build (unix) (push) Failing after 34s
Build library & run tests / build (tcp) (push) Successful in 35s
Build library & run tests / docs (push) Successful in 37s
2024-06-25 11:39:07 +02:00
267b741ac4 implement std::error::Error for the error type
All checks were successful
Build library & run tests / build (unix) (push) Successful in 33s
Build library & run tests / build (tcp) (push) Successful in 34s
Build library & run tests / docs (push) Successful in 35s
2024-06-25 00:03:29 +02:00
bffb41e8a1 implement Drop for client struct
All checks were successful
Build library & run tests / build (unix) (push) Successful in 34s
Build library & run tests / build (tcp) (push) Successful in 35s
Build library & run tests / docs (push) Successful in 37s
2024-06-24 23:44:44 +02:00
b5870e62fe Implement Display for the error enum
All checks were successful
Build library & run tests / build (unix) (push) Successful in 33s
Build library & run tests / build (tcp) (push) Successful in 35s
Build library & run tests / docs (push) Successful in 36s
2024-06-24 23:01:58 +02:00
3 changed files with 58 additions and 20 deletions

View File

@@ -67,9 +67,9 @@ Your handler can now be used by the server. You can easily bind your server to a
use shared::ExampleServer; use shared::ExampleServer;
let handler = ExampleHandler { state: 0 }; let handler = ExampleHandler { state: 0 };
let server_task = tokio::spawn(ExampleServer::bind(handler, "127.0.0.1:1234")); let server_task = ExampleServer::bind(handler, "127.0.0.1:1234").await;
// Or, if you're using the 'unix' feature... // Or, if you're using the 'unix' feature...
let server_task = tokio::spawn(ExampleServer::bind(handler, "/tmp/sock")); let server_task = ExampleServer::bind(handler, "/tmp/sock").await;
``` ```

View File

@@ -79,7 +79,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
//! To start the server, you simply need to use the generated server struct and //! To start the server, you simply need to use the generated server struct and
//! pass it your handler. //! pass it your handler.
//! //!
//! ```no_run //! ```rust
//! # use eagle::Protocol; //! # use eagle::Protocol;
//! # #[derive(Protocol)] //! # #[derive(Protocol)]
//! # pub enum Example { //! # pub enum Example {
@@ -102,18 +102,17 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
//! # tokio_test::block_on(async { //! # tokio_test::block_on(async {
//! let handler = Handler; //! let handler = Handler;
//! let address = "127.0.0.1:12345"; // Or, if using the 'unix' feature, "/tmp/eagle.sock" //! let address = "127.0.0.1:12345"; // Or, if using the 'unix' feature, "/tmp/eagle.sock"
//! let server_task = tokio::spawn(ExampleServer::bind(handler, address)); //! let server = ExampleServer::bind(handler, address).await;
//! server.close().await;
//! # }); //! # });
//! ``` //! ```
//! //! Once bound, the server will begin listening for incoming connections and
//! Please note the usage of `tokio::spawn`. This is because the `bind` function //! queries. **You must remember to use the `close` method to shut down the server.**
//! will not return until the server is closed. You can use the `abort` method
//! on the task to close the server.
//! //!
//! On the client side, you can simply use the generated client struct to connect //! On the client side, you can simply use the generated client struct to connect
//! to the server and begin sending queries. //! to the server and begin sending queries.
//! //!
//! ```no_run //! ```rust
//! # use eagle::Protocol; //! # use eagle::Protocol;
//! # #[derive(Protocol)] //! # #[derive(Protocol)]
//! # pub enum Example { //! # pub enum Example {
@@ -136,11 +135,11 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
//! # tokio_test::block_on(async { //! # tokio_test::block_on(async {
//! # let handler = Handler; //! # let handler = Handler;
//! let address = "127.0.0.1:12345"; // Or, if using the 'unix' feature, "/tmp/eagle.sock" //! let address = "127.0.0.1:12345"; // Or, if using the 'unix' feature, "/tmp/eagle.sock"
//! # let server_task = tokio::spawn(ExampleServer::bind(handler, address)); //! # let server = ExampleServer::bind(handler, address).await;
//! # tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // Wait for the server to start
//! let client = ExampleClient::connect(address).await.unwrap(); //! let client = ExampleClient::connect(address).await.unwrap();
//! # // Wait for the server to start, the developer is responsible for this in production
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! assert_eq!(client.add(2, 5).await.unwrap(), 7); //! assert_eq!(client.add(2, 5).await.unwrap(), 7);
//! # server.close().await;
//! # }); //! # });
//! ``` //! ```
//! //!
@@ -335,11 +334,39 @@ fn derive_protocol(input: proc_macro2::TokenStream) -> proc_macro2::TokenStream
// Create an error and result type for sending messages // Create an error and result type for sending messages
let error_enum = quote! { let error_enum = quote! {
#[derive(Debug)] #[derive(::std::fmt::Debug)]
#vis enum #error_enum_name { #vis enum #error_enum_name {
SendError(::tokio::sync::mpsc::error::SendError<(u64, #question_enum_name)>), SendError(::tokio::sync::mpsc::error::SendError<(u64, #question_enum_name)>),
Closed, Closed,
} }
impl ::std::fmt::Display for #error_enum_name {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
match self {
#error_enum_name::SendError(e) => write!(f, "Failed to send query: {}", e),
#error_enum_name::Closed => write!(f, "Connection closed"),
}
}
}
impl ::std::error::Error for #error_enum_name {
fn source(&self) -> ::std::option::Option<&(dyn ::std::error::Error + 'static)> {
match self {
#error_enum_name::SendError(e) => ::std::option::Option::Some(e),
#error_enum_name::Closed => ::std::option::Option::None,
}
}
fn description(&self) -> &str {
match self {
#error_enum_name::SendError(_) => "Failed to send query",
#error_enum_name::Closed => "Connection closed",
}
}
fn cause(&self) -> ::std::option::Option<&dyn ::std::error::Error> {
match self {
#error_enum_name::SendError(e) => ::std::option::Option::Some(e),
#error_enum_name::Closed => ::std::option::Option::None,
}
}
}
}; };
// Create enums for the types of messages the server and client will use // Create enums for the types of messages the server and client will use
@@ -435,6 +462,13 @@ fn derive_protocol(input: proc_macro2::TokenStream) -> proc_macro2::TokenStream
sc sc
} }
pub async fn close(self) {
#info("Closing server");
for task in self.tasks.lock().await.drain(..) {
task.abort();
}
}
pub async fn accept_connections<A: #stream_addr_trait>( pub async fn accept_connections<A: #stream_addr_trait>(
&self, &self,
addr: A, addr: A,
@@ -610,7 +644,7 @@ fn derive_protocol(input: proc_macro2::TokenStream) -> proc_macro2::TokenStream
// Create a struct which the client will use to communicate // Create a struct which the client will use to communicate
let client_recv_queue_wrapper = format_ident!("__{}RecvQueueWrapper", name); let client_recv_queue_wrapper = format_ident!("__{}RecvQueueWrapper", name);
let client_struct = quote! { let client_struct = quote! {
#[derive(Clone)] #[derive(::std::clone::Clone)]
struct #client_recv_queue_wrapper { struct #client_recv_queue_wrapper {
recv_queue: ::std::sync::Arc<::tokio::sync::Mutex<::tokio::sync::mpsc::Receiver<(u64, #answer_enum_name)>>>, recv_queue: ::std::sync::Arc<::tokio::sync::Mutex<::tokio::sync::mpsc::Receiver<(u64, #answer_enum_name)>>>,
} }
@@ -657,8 +691,8 @@ fn derive_protocol(input: proc_macro2::TokenStream) -> proc_macro2::TokenStream
let connection_task = ::tokio::spawn(connection.run()); let connection_task = ::tokio::spawn(connection.run());
Ok(Self::new(send_queue, recv_queue, ::std::option::Option::Some(::std::sync::Arc::new(connection_task)), ready_notify)) Ok(Self::new(send_queue, recv_queue, ::std::option::Option::Some(::std::sync::Arc::new(connection_task)), ready_notify))
} }
pub fn close(self) { pub fn close(&mut self) {
if let ::std::option::Option::Some(task) = self.connection_task { if let ::std::option::Option::Some(task) = self.connection_task.take() {
task.abort(); task.abort();
} }
} }
@@ -699,6 +733,11 @@ fn derive_protocol(input: proc_macro2::TokenStream) -> proc_macro2::TokenStream
} }
#(#client_impl)* #(#client_impl)*
} }
impl ::std::ops::Drop for #client_struct_name {
fn drop(&mut self) {
self.close();
}
}
}; };
let expanded = quote! { let expanded = quote! {

View File

@@ -73,9 +73,8 @@ async fn e2e() {
}; };
#[cfg(feature = "tcp")] #[cfg(feature = "tcp")]
let address = format!("127.0.0.1:{}", 10000 + rand::random::<u64>() % 1000); let address = format!("127.0.0.1:{}", 10000 + rand::random::<u64>() % 1000);
let server_task = tokio::spawn(TestProtocolServer::bind(TrivialServer, address.clone())); let server = TestProtocolServer::bind(TrivialServer, address.clone()).await;
// Wait for the server to start, the developer is responsible for this in production tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // Wait for the server to start
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let client = TestProtocolClient::connect(address).await.unwrap(); let client = TestProtocolClient::connect(address).await.unwrap();
assert_eq!(client.addition(2, 5).await.unwrap(), 7); assert_eq!(client.addition(2, 5).await.unwrap(), 7);
assert_eq!( assert_eq!(
@@ -90,5 +89,5 @@ async fn e2e() {
"The number is 42" "The number is 42"
); );
client.void().await.unwrap(); client.void().await.unwrap();
server_task.abort(); server.close().await;
} }