Commit 69f47b1d authored by Kaiden Fey's avatar Kaiden Fey Committed by Katharina Fey

netmod-tcp: finishing basic server logic

parent abd1b5ea
//! Handle an Io pair channel
use async_std::sync::{channel, Receiver, Sender};
static CHANNEL_WIDTH: usize = 3;
#[derive(Debug)]
pub(crate) struct IoPair<T> {
pub(crate) rx: Receiver<T>,
pub(crate) tx: Sender<T>,
}
impl<T> Default for IoPair<T> {
fn default() -> Self {
let (tx, rx) = channel(CHANNEL_WIDTH);
Self { tx, rx }
}
}
//! A tcp overlay netmod to connect router across the internet
mod error;
mod io;
mod peer;
mod proto;
mod ptr;
mod routes;
mod server;
// mod peers;
// mod socket;
pub use error::{Error, Result};
pub(crate) use peer::{DstAddr, Peer, SourceAddr};
pub(crate) use io::IoPair;
pub(crate) use peer::{DstAddr, Peer, PeerState, SourceAddr};
pub(crate) use proto::{Packet, PacketBuilder};
pub(crate) use ptr::{AtomPtr, Ref};
pub(crate) use routes::Routes;
pub(crate) use server::Server;
pub(crate) use ptr::{AtomPtr, Ref};
use async_std::sync::{Arc, Receiver, RwLock};
use async_trait::async_trait;
......
......@@ -27,7 +27,7 @@
//! channel, which means they will return immediately, even if the
//! connection is currently down.
use crate::{AtomPtr, Packet, Ref};
use crate::{AtomPtr, IoPair, Packet, Ref};
use async_std::{
io::prelude::WriteExt,
net::TcpStream,
......@@ -51,8 +51,6 @@ mod id {
}
}
static CHANNEL_WIDTH: usize = 3;
/// Address from which packets are sent
pub(crate) type SourceAddr = SocketAddr;
......@@ -62,19 +60,6 @@ pub(crate) type DstAddr = SocketAddr;
/// A thread-safe locked sending stream
type LockedStream = Arc<RwLock<Option<TcpStream>>>;
#[derive(Debug)]
struct IoPair {
pub(self) rx: Receiver<Packet>,
pub(self) tx: Sender<Packet>,
}
impl Default for IoPair {
fn default() -> Self {
let (tx, rx) = channel(CHANNEL_WIDTH);
Self { tx, rx }
}
}
/// Encode the different states a `Peer` can be in
pub(crate) enum PeerState {
/// Only a receiving channel exists
......@@ -107,7 +92,7 @@ pub(crate) struct Peer {
#[doc(hidden)]
_run: Arc<AtomicBool>,
/// Store packets until they can be delivered
io: Arc<IoPair>,
io: Arc<IoPair<Packet>>,
}
impl Peer {
......@@ -166,7 +151,7 @@ impl Peer {
}
/// Internal utility to verify that this peer is still alive
fn alive(&self) -> bool {
pub(crate) fn alive(&self) -> bool {
self._run.load(Ordering::Relaxed)
}
......@@ -213,6 +198,7 @@ impl Peer {
// And woosh!
if let Err(e) = s.as_mut().unwrap().write_all(&buf).await {
error!("Failed to send message: {}!", e.to_string());
*s = None; // We mark ourselves as missing uplink
continue; // try again?
}
......
......@@ -44,6 +44,9 @@ impl Routes {
}
/// Add a new peer to the system with a destination address
///
/// This function is called when adding a peer via the static set
/// of peers to connect to.
pub(crate) async fn add_via_dst(self: &Arc<Self>, dst: DstAddr) -> usize {
let p = Peer::open(dst.clone(), self.port);
let id = p.id;
......@@ -73,6 +76,18 @@ impl Routes {
self.src_map.read().await.get(src).map(|id| *id)
}
/// Perform a peer lookup via it's source address and port
///
/// This function is called when receiving a HELLO packet to
/// verify whether or not this peer is theoretically known to the
/// system. If a peer says hello, and we know the destination
/// address (and we're running in STATIC mode), then we can safely
/// peer with it.
pub(crate) async fn find_via_srcport(self: &Arc<Self>, src: &SourceAddr, port: u16) -> Option<usize> {
let imply_dst = DstAddr::new(src.ip(), port);
self.dst_map.read().await.get(&imply_dst).map(|id| *id)
}
/// Upgrade an existing peer with a destination address
///
/// The existing src peer will be dropped. If a dst peer is
......
//! TCP incoming connection server
use crate::{Mode, PacketBuilder, Result, Routes};
use crate::{IoPair, Mode, Packet, PacketBuilder, Peer, PeerState, Result, Routes, SourceAddr};
use async_std::{
net::{TcpListener, TcpStream},
stream::StreamExt,
task,
};
use netmod::Frame;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tracing::{error, info, trace};
use std::time::Duration;
use tracing::{error, info, trace, warn};
/// The listening server part of the tcp driver
pub(crate) struct Server {
......@@ -19,6 +21,7 @@ pub(crate) struct Server {
routes: Arc<Routes>,
port: u16,
mode: Mode,
incoming: IoPair<Frame>,
}
impl Server {
......@@ -34,6 +37,7 @@ impl Server {
.map(|inner| {
Arc::new(Self {
alive: Default::default(),
incoming: IoPair::default(),
inner,
routes,
port,
......@@ -51,6 +55,11 @@ impl Server {
self.alive.fetch_and(false, Ordering::Relaxed);
}
/// Get the next available frame
pub(crate) async fn next(self: &Arc<Self>) -> Frame {
self.incoming.rx.recv().await.unwrap()
}
/// Spawn a handler task for incoming connections
pub(crate) fn run(self: &Arc<Self>) {
let s = Arc::clone(self);
......@@ -111,12 +120,83 @@ impl Server {
}
};
// Match on the peer-state, message payload tuple
// Match on the peer-state, message payload tuple. Each
// scenario is documented on the handler function to keep
// this match block as small and readable as possible.
// Avoid useless logging in this block too!
use Packet::*;
use PeerState::*;
match (peer.state(), f) {
(Duplex, Frame(f)) | (RxOnly, Frame(f)) => self.handle_frame(f).await,
(_, Hello { port }) => self.handle_hello(&src_addr, port).await,
(RxOnly, KeepAlive) => self.rx_keepalive(),
(Duplex, KeepAlive) => self.dup_keepalive(Arc::clone(&peer)),
_ => todo!(),
}
}
trace!("Exiting connetion work-loop; was there a connection drop?");
}
/// Handle an incoming frame message
async fn handle_frame(self: &Arc<Self>, p: Frame) {
self.incoming.tx.send(p).await;
}
/// A keepalive on an RXonly connection
///
/// It means that currently the TX connection is down. This
/// function can't really do anything about that though, so we log
/// the incident and hope that we will re-establish a connection
/// soon.
fn rx_keepalive(self: &Arc<Self>) {
warn!("Received a Keep-alive, but don't have TX link! Waiting for introducer to do it's job...");
}
/// A keepalive on a valid duplex connection
fn dup_keepalive(self: &Arc<Self>, peer: Arc<Peer>) {
trace!("Receiving keep-alive and queueing reply!");
task::spawn(async move { Self::send_keepalive(peer).await });
}
/// Handle an incoming HELLO message on Tx, or Rx only connections
///
/// A hello can come from a peer that we have said hello to before
/// (TxOnly), or a peer that has just introduced itself without us
/// knowing it before (RxOnly). If the node is running in dynamic
/// mode, check if the peer is in the set of "theoretically known
/// peers" before accepting the hello.
async fn handle_hello(self: &Arc<Self>, src_addr: &SourceAddr, port: u16) {
let maybe_id = self.routes.find_via_srcport(src_addr, port).await;
let upm = "Received HELLO from unknown peer.";
match (self.mode, maybe_id) {
(Mode::Static, None) => {
info!("{} Running STATIC: dropping packet!", upm);
return;
}
(Mode::Dynamic, None) => {
trace!("{} Running DYNAMIC: establishing reverse connection!", upm);
let id = self.routes.find_via_src(src_addr).await.unwrap();
self.routes.upgrade(id, port).await;
}
(mode, Some(id)) => {
trace!(
"[Mode: {}] Received HELLO from known peer; responding with keep-alive",
match mode {
Mode::Dynamic => "dynamic",
Mode::Static => "static",
}
);
let peer = self.routes.get_peer(id).await.unwrap();
task::spawn(async move { Self::send_keepalive(peer) });
}
}
}
/// Wait n seconds and then reply to a keep-alive
async fn send_keepalive(peer: Arc<Peer>) {
task::sleep(Duration::from_secs(10)).await;
peer.send(Packet::KeepAlive).await;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment