Commit f540d579 authored by Katharina Fey's avatar Katharina Fey

qrpc-sdk: implemeting server listen hook

parent 114b7104
......@@ -18,22 +18,26 @@ pub(crate) fn elog<S: Into<String>>(msg: S, code: u16) -> ! {
std::process::exit(code.into());
}
use qrpc_sdk::{RpcSocket, };
#[async_std::main]
async fn main() {
log::parse_log_level();
let app = cfg::cli();
let cfg = cfg::match_fold(app);
let _state = State::new(&cfg).await;
// !no_upnp means upnp has _not_ been disabled
if !cfg.no_upnp {
if upnp::open_port(cfg.port).is_none() {
error!("Failed to open UPNP port; your router probably doesn't support it...");
}
}
let _ = future::timeout(Duration::from_secs(10), async {
let _: () = future::poll_fn(|_| Poll::Pending).await;
}).await;
// let app = cfg::cli();
// let cfg = cfg::match_fold(app);
// let _state = State::new(&cfg).await;
// // !no_upnp means upnp has _not_ been disabled
// if !cfg.no_upnp {
// if upnp::open_port(cfg.port).is_none() {
// error!("Failed to open UPNP port; your router probably doesn't support it...");
// }
// }
// let _ = future::timeout(Duration::from_secs(10), async {
// let _: () = future::poll_fn(|_| Poll::Pending).await;
// }).await;
}
......@@ -13,6 +13,7 @@ use std::{
io::Result,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
thread,
time::Duration,
};
......@@ -42,7 +43,18 @@ pub struct RpcSocket {
}
impl RpcSocket {
/// Create a new socket to the RPC system
fn new_socket<P: AsRef<Path>>(path: P) -> Result<(Socket, SockAddr)> {
let addr = SockAddr::unix(path)?;
let socket = Socket::new(
Domain::unix(),
Type::seqpacket(), // this _may_ not be supported on MacOS
None,
)?;
Ok((socket, addr))
}
/// Connect to an established socket to the RPC system
///
/// To listen for new connections you need to explicitly call
/// `listen(...)`, otherwise it will only act as a sending socket,
......@@ -51,17 +63,50 @@ impl RpcSocket {
Self::with_duration(path, Duration::from_secs(5))
}
/// Create a new QRPC socket. This function is meant for servers
///
/// Because creating the socket is synonymous with listening for
/// connections on it, this function wraps both `new` (sort of),
/// and `listen`, meaning that you _must_ provide a closure at
/// this point.
pub fn create<P, F>(path: P, handle_connection: F) -> Result<Arc<Self>>
where
P: AsRef<Path>,
F: Fn(Socket, SockAddr) + Send + Sync + 'static,
{
let (inner, addr) = Self::new_socket(path)?;
inner.bind(&addr)?;
inner.listen(32)?;
let arc = Arc::new(Self {
inner,
addr,
timeout: Duration::from_secs(5),
run: AtomicBool::from(true),
listening: AtomicBool::from(true),
});
// We spawn a dedicated thread because socket2 is a non-async
// library and we don't want to accidentally deadlock our
// whole executor on this code. Besides, it's kinda the
// primary hot-path on the qrpc system, so a thread might be
// warranted. TODO: look into how async-std can handle this!
let arc2 = Arc::clone(&arc);
thread::spawn(move || {
while let Ok((sock, addr)) = arc2.inner.accept() {
handle_connection(sock, addr);
}
});
Ok(arc)
}
/// Create a new socket with an explicit timeout duration
///
/// Setup is the same as when calling `new`, except that you can
/// choose an explicit timeout, instead of the default.
pub fn with_duration<P: AsRef<Path>>(path: P, timeout: Duration) -> Result<Arc<Self>> {
let addr = SockAddr::unix(path)?;
let mut inner = Socket::new(
Domain::unix(),
Type::seqpacket(), // this _may_ not be supported on MacOS
None,
)?;
let (inner, addr) = Self::new_socket(path)?;
inner.connect(&addr)?;
Ok(Arc::new(Self {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment