Commit 8e59dd98 authored by Kaiden Fey's avatar Kaiden Fey Committed by Katharina Fey

netmod-tcp: finishing 0.2.0 refactoring

parent 69f47b1d
[package]
name = "netmod-tcp"
description = "An internet overlay netmod endpoint driver"
version = "0.1.0"
authors = ["Leonora Tindall <nora@nora.codes>", "Katharina Fey <kookie@spacekookie>"]
version = "0.2.0"
authors = ["Katharina Fey <kookie@spacekookie>", "Leonora Tindall <nora@nora.codes>"]
edition = "2018"
license = "AGPL-3.0"
......
......@@ -7,8 +7,6 @@ pub type Result<T> = std::result::Result<T, Error>;
/// A generic initialisation error
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "a set of provided peers already existed")]
DuplicatePeers { peers: PeerErrs },
#[fail(display = "the selected mode does not allow for this operation")]
InvalidMode,
#[fail(display = "failed to initialise socket: invalid address")]
......@@ -17,22 +15,6 @@ pub enum Error {
FailedToSend
}
use std::net::SocketAddr;
#[derive(Debug)]
pub struct PeerErrs(Vec<SocketAddr>);
impl PeerErrs {
pub(crate) fn new(first: SocketAddr) -> std::result::Result<(), Self> {
Err(Self(vec![first]))
}
pub(crate) fn append(mut self, new: SocketAddr) -> Self {
self.0.push(new);
self
}
}
impl From<async_std::io::Error> for Error {
fn from(e: async_std::io::Error) -> Self {
use async_std::io::ErrorKind::*;
......@@ -42,9 +24,3 @@ impl From<async_std::io::Error> for Error {
}
}
}
impl From<PeerErrs> for Error {
fn from(peers: PeerErrs) -> Self {
Self::DuplicatePeers { peers }
}
}
......@@ -13,15 +13,15 @@ pub use error::{Error, Result};
pub(crate) use io::IoPair;
pub(crate) use peer::{DstAddr, Peer, PeerState, SourceAddr};
pub(crate) use proto::{Packet, PacketBuilder};
pub(crate) use ptr::{AtomPtr, Ref};
pub(crate) use ptr::AtomPtr;
pub(crate) use routes::Routes;
pub(crate) use server::Server;
use async_std::sync::{Arc, Receiver, RwLock};
use async_std::sync::Arc;
use async_trait::async_trait;
use netmod::{self, Endpoint as EndpointExt, Frame, Target};
use std::net::SocketAddr;
use tracing::{info, trace};
use tracing::info;
/// Define the runtime mode for this endpount
///
......@@ -33,68 +33,47 @@ pub enum Mode {
Dynamic,
}
impl Mode {
pub(crate) fn dynamic(&self) -> bool {
self == &Mode::Dynamic
}
}
#[derive(Clone)]
pub struct Endpoint {
mode: Arc<RwLock<Mode>>,
// socket: Arc<Socket>,
// peers: Arc<Peers>,
inbox: Option<Receiver<(Frame, usize)>>,
server: Arc<Server>,
routes: Arc<Routes>,
}
impl Endpoint {
/// Create a new endpoint on an interface and port
#[tracing::instrument(level = "info")]
pub async fn new(addr: &str, port: u16, name: &str) -> Result<Arc<Self>> {
pub async fn new(addr: &str, port: u16, name: &str, mode: Mode) -> Result<Arc<Self>> {
info!("Initialising Tcp backend");
let mut this = Self {
mode: Arc::new(RwLock::new(Mode::Static)),
// socket: Socket::new(addr, port, name).await?,
// peers: Peers::new(),
inbox: None,
};
trace!("Starting tcp router");
this.start().await;
Ok(Arc::new(this))
}
/// Set the runtime mode
#[tracing::instrument(skip(self), level = "info")]
pub async fn mode(&self, mode: Mode) {
*self.mode.write().await = mode;
}
let routes = Routes::new(port);
let server = Server::new(Arc::clone(&routes), addr, port, mode).await?;
/// Load a set of peers, replacing the old peer list
#[tracing::instrument(skip(self, peers), level = "info")]
pub async fn load_peers<I: Into<SocketAddr>>(&self, peers: Vec<I>) -> Result<()> {
// self.peers.load(peers).await?;
// if let Some(_) = self.inbox {
// self.update_peers().await;
// }
server.run();
Ok(Arc::new(Self { server, routes }))
}
Ok(())
/// Get the current runtime mode
pub fn mode(&self) -> Mode {
self.server.mode()
}
#[tracing::instrument(skip(self), level = "info")]
async fn start(&mut self) {
// self.inbox = Some(self.socket.start(*self.mode.read().await, &self.peers));
self.update_peers().await;
pub async fn stop(&self) {
self.server.stop();
self.routes.stop_all().await;
}
/// Get all known peers and introduce this node to them
async fn update_peers(&self) {
// let known = self.peers.all_known().await;
// for peer in known {
// if let Some(dst) = peer.dst {
// self.socket.introduce(peer.id, dst).await;
// }
// }
/// Insert a set of peers into the routing table
///
/// Each peer will spawn a worker that periodically attempts to
/// connect to it. Connections might not be recipricated if the
/// peer doesn't know the local IP or is rejecting unknown
/// connections.
pub async fn add_peers<I: Into<SocketAddr>>(&self, peers: Vec<I>) -> Result<()> {
for peer in peers.into_iter() {
self.routes.add_via_dst(peer.into()).await;
}
Ok(())
}
}
......@@ -105,94 +84,26 @@ impl EndpointExt for Endpoint {
}
async fn send(&self, frame: Frame, target: Target) -> netmod::Result<()> {
// match target {
// Target::Single(t) => {
// let addr = match self.peers.peer_by_id(t as usize).await {
// Some(p) => match p.dst {
// Some(dst) => dst,
// None => return Err(netmod::Error::ConnectionLost),
// },
// None => return Err(netmod::Error::ConnectionLost),
// };
// self.socket.send(addr, frame).await.unwrap()
// }
// Target::Flood => self.socket.send_all(frame).await.unwrap(),
// }
match target {
Target::Flood => {
let dsts = self.routes.all_dst().await;
for peer in dsts {
peer.send(Packet::Frame(frame.clone())).await;
}
}
Target::Single(id) => {
let peer = match self.routes.get_peer(id as usize).await {
Some(p) => Ok(p),
None => Err(netmod::Error::ConnectionLost),
}?;
peer.send(Packet::Frame(frame)).await;
}
}
Ok(())
}
async fn next(&self) -> netmod::Result<(Frame, Target)> {
match self.inbox {
Some(ref ib) => match ib.recv().await {
Some((f, id)) => Ok((f, Target::Single(id as u16))),
None => Err(netmod::Error::ConnectionLost),
},
None => Err(netmod::Error::ConnectionLost),
}
Ok(self.server.next().await)
}
}
// #[async_std::test]
// async fn trivial() {
// use async_std::{future, task};
// use std::{
// net::{Ipv4Addr, SocketAddrV4},
// time::Duration,
// };
// let mut a = Endpoint::new("127.0.0.1", 10000, ">> A").await.unwrap();
// a.load_peers(vec![SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 11000)])
// .await
// .unwrap();
// let mut b = Endpoint::new("127.0.0.1", 11000, "> B").await.unwrap();
// b.load_peers(vec![SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 10000)])
// .await
// .unwrap();
// a.start().await;
// b.start().await;
// task::sleep(Duration::from_secs(1)).await;
// future::timeout(Duration::from_secs(5), async {
// let f = Frame::dummy();
// a.send(f.clone(), Target::Single(0)).await.unwrap();
// assert_eq!(b.next().await.unwrap().0, f);
// })
// .await
// .unwrap();
// }
// /// This test establishes a connection between two peers and then
// /// let's them bounce keep-alive's back and forth for about 1 minute
// /// to test stability.
// ///
// /// This test should usually be ignored!
// #[async_std::test]
// #[ignore]
// async fn akward_silence() {
// use async_std::task;
// use std::{
// net::{Ipv4Addr, SocketAddrV4},
// time::Duration,
// };
// println!("Starting two sockets to talk to each other now...");
// let mut a = Endpoint::new("127.0.0.1", 10000, ">> A").await.unwrap();
// let mut b = Endpoint::new("127.0.0.1", 11000, "> B").await.unwrap();
// a.load_peers(vec![SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 11000)])
// .await
// .unwrap();
// a.start().await;
// b.load_peers(vec![SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 10000)])
// .await
// .unwrap();
// b.start().await;
// task::sleep(Duration::from_secs(120)).await;
// }
......@@ -27,11 +27,11 @@
//! channel, which means they will return immediately, even if the
//! connection is currently down.
use crate::{AtomPtr, IoPair, Packet, Ref};
use crate::{AtomPtr, IoPair, Packet};
use async_std::{
io::prelude::WriteExt,
net::TcpStream,
sync::{channel, Arc, Receiver, RwLock, Sender},
sync::{Arc, RwLock},
task,
};
use bincode::serialize;
......@@ -292,11 +292,6 @@ impl Peer {
self.io.tx.send(packet).await;
}
/// Check if this peer has completed it's reverse handshake
pub(crate) fn known(&self) -> bool {
self.get_src().is_some()
}
pub(crate) fn get_src(&self) -> Option<SourceAddr> {
*self.src.get_ref().clone()
}
......
//! Peer tracking
use crate::error::PeerErrs;
use async_std::sync::{Arc, RwLock};
use std::collections::HashMap;
use std::net::SocketAddr;
use tracing::{error, trace, warn};
type SourceAddr = SocketAddr;
type DstAddr = SocketAddr;
/// A set of errors that can occur when dealing with peer states
#[derive(Debug, Clone, Copy)]
pub(crate) enum PeerInsertError {
/// A peer with matching source address already exists
SameSrcAddr,
/// A peer with matching dst address already exists
SameDstAddr,
/// The requested peer (via Id) was not found
NoSuchPeer,
}
#[derive(Clone, Debug)]
pub(crate) struct Peer {
pub id: usize,
pub src: Option<SocketAddr>,
pub dst: Option<SocketAddr>,
pub known: bool,
}
impl Peer {
/// Get the current peer's LinkState
pub(crate) fn link_state(&self) -> LinkState {
use LinkState::*;
match (self.src, self.dst) {
(None, None) => NoLink,
(Some(_), None) => DownOnly,
(None, Some(_)) => UpOnly,
(Some(_), Some(_)) => Duplex,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum LinkState {
/// No established link with this peer
NoLink,
/// This peer can send us data, but we have no return channel
DownOnly,
/// We can send this peer data, but they have no return channel
UpOnly,
/// A bi-directional link
Duplex,
}
#[derive(Debug, Default)]
pub(crate) struct Peers {
/// Lookup table by source address
src_to_id: RwLock<HashMap<SourceAddr, usize>>,
/// Lookup table by destination address
dst_to_id: RwLock<HashMap<DstAddr, usize>>,
/// Mapping from Ids to peer data
peers: RwLock<HashMap<usize, Peer>>,
/// Used to monotonically create Ids
curr: RwLock<usize>,
}
impl Peers {
/// Create a new empty peer list
pub(crate) fn new() -> Arc<Self> {
Default::default()
}
/// Get all peers currently known to this server
pub(crate) async fn all_known(self: &Arc<Self>) -> Vec<Peer> {
self.peers
.read()
.await
.iter()
.map(|(_, p)| p.clone())
.collect()
}
/// Get a peer by it's unique numerical Id
pub(crate) async fn peer_by_id(self: &Arc<Self>, id: usize) -> Option<Peer> {
self.peers.read().await.get(&id).map(|v| v.clone())
}
/// Get a peer via it's source socket address
pub(crate) async fn peer_by_src(self: &Arc<Self>, src: &SourceAddr) -> Option<Peer> {
match self.src_to_id.read().await.get(src) {
Some(id) => self.peers.read().await.get(&id).cloned(),
None => None,
}
}
/// Get a peer via it's destination socket address
pub(crate) async fn peer_by_dst(self: &Arc<Self>, dst: &DstAddr) -> Option<Peer> {
match self.dst_to_id.read().await.get(dst) {
Some(id) => self.peers.read().await.get(&id).cloned(),
None => None,
}
}
/// Check if a peer is already stored via it's src or dst addr
#[tracing::instrument(skip(self), level = "trace")]
pub(crate) async fn filter_peer(self: &Arc<Self>, peer: &Peer) -> Result<(), PeerInsertError> {
match peer.src {
Some(src) if self.peer_by_src(&src).await.is_some() => {
trace!("Peer with same source address exists");
Err(PeerInsertError::SameSrcAddr)
}
_ => Ok(()),
}?;
match peer.dst {
Some(dst) if self.peer_by_dst(&dst).await.is_some() => {
trace!("Peer with same dst address exists");
Err(PeerInsertError::SameDstAddr)
}
_ => Ok(()),
}?;
Ok(())
}
/// Add a new peer into the store
#[tracing::instrument(skip(self), level = "trace")]
pub(crate) async fn add_peer(
self: &Arc<Self>,
mut peer: Peer,
) -> Result<usize, PeerInsertError> {
self.filter_peer(&peer).await?;
let mut curr = self.curr.write().await;
*curr += 1;
peer.id = *curr;
self.peers.write().await.insert(*curr, peer.clone());
// insert to src-map if src addr is known
if let Some(src) = peer.src {
self.src_to_id.write().await.insert(src.clone(), *curr);
}
// insert to dst-map if dst addr is known
if let Some(dst) = peer.dst {
self.dst_to_id.write().await.insert(dst.clone(), *curr);
}
Ok(*curr)
}
/// Change the destination address on an existing peer connection
#[tracing::instrument(skip(self), level = "trace")]
pub(crate) async fn change_dst(
self: &Arc<Self>,
id: usize,
dst: &DstAddr,
) -> Result<Peer, PeerInsertError> {
// Get the peer by Id and change it's dst field
let mut peer = self
.peer_by_id(id)
.await
.map_or(Err(PeerInsertError::NoSuchPeer), |p| Ok(p))?;
peer.dst = Some(dst.clone());
// Get an existing peer with this destination
let ghost = self.peer_by_dst(dst).await;
// Copy the "known" status from the ghost identity
// FIXME: Why? -- spacekookie
if let Some(ghost) = ghost {
peer.known = peer.known || ghost.known;
self.del_peer(ghost.id).await;
}
// Update peer data in peer maps
self.del_peer(peer.id).await;
self.add_peer(peer.clone()).await?;
Ok(peer)
}
/// Remove a peer by Id, and do nothing if the peer doens't exist
#[tracing::instrument(skip(self), level = "trace")]
pub(crate) async fn del_peer(self: &Arc<Self>, id: usize) {
let mut peers = self.peers.write().await;
let mut src_to_id = self.src_to_id.write().await;
let mut dst_to_id = self.dst_to_id.write().await;
if let Some(peer) = peers.remove(&id) {
if let Some(src) = peer.src {
src_to_id.remove(&src);
}
if let Some(dst) = peer.dst {
dst_to_id.remove(&dst);
}
}
}
/// Load a set of peers into the peer lookup table
///
/// By default every peer will have a dst address, which is what
/// is passed into the driver as the peer-set. Based on the given
/// dst address, hello packets will be dispatched
#[tracing::instrument(skip(self, peers), level = "trace")]
pub(crate) async fn load<I: Into<SocketAddr>>(
self: &Arc<Self>,
peers: Vec<I>,
) -> Result<(), PeerErrs> {
let new_peers: Vec<_> = peers.into_iter().map(Into::into).collect();
// Lock all required data stores
let mut peers = self.peers.write().await;
let mut dst_to_id = self.dst_to_id.write().await;
let mut curr = self.curr.write().await;
new_peers.into_iter().fold(Ok(()), |prev, addr| {
// Utility closure to insert a new peer
macro_rules! insert_new_peer {
() => {
dst_to_id.insert(addr.clone(), *curr);
peers.insert(
*curr,
Peer {
id: *curr,
src: None,
dst: Some(addr),
known: true,
},
);
};
};
match prev {
Ok(_) if dst_to_id.contains_key(&addr) => PeerErrs::new(addr),
Err(e) if dst_to_id.contains_key(&addr) => Err(e.append(addr)),
Ok(()) => {
insert_new_peer!();
*curr += 1;
Ok(())
}
err @ Err(_) => {
insert_new_peer!();
*curr += 1;
err
}
}
})
}
}
// #[async_std::test]
// async fn load_peers() {
// use std::net::{Ipv4Addr, SocketAddrV4};
// let a1 = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8000);
// let a2 = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 9000);
// let peers = Peers::new();
// peers.load(vec![a1.clone(), a2.clone()]).await.unwrap();
// let id = peers.id_by_dst(&a1.into()).await.unwrap();
// assert_eq!(peers.dst_by_id(id).await, Some(a1.into()));
// }
......@@ -14,7 +14,7 @@
use crate::{DstAddr, Peer, SourceAddr};
use async_std::sync::{Arc, RwLock};
use std::collections::BTreeMap;
use tracing::{error, trace};
use tracing::trace;
/// Routing table for local IP scope
#[derive(Clone, Default)]
......@@ -38,6 +38,25 @@ impl Routes {
})
}
pub(crate) async fn stop_all(self: &Arc<Self>) {
for (_, peer) in self.peers.read().await.iter() {
peer.stop();
}
}
/// Get all peers that are currently connected via a DST link
pub(crate) async fn all_dst(self: &Arc<Self>) -> Vec<Arc<Peer>> {
self.peers
.read()
.await
.iter()
.filter_map(|(_, p)| match p.get_dst() {
Some(_) => Some(Arc::clone(&p)),
None => None,
})
.collect()
}
/// Get the underlying peer for an ID
pub(crate) async fn get_peer(self: &Arc<Self>, id: usize) -> Option<Arc<Peer>> {
self.peers.read().await.get(&id).map(|p| Arc::clone(&p))
......@@ -83,11 +102,15 @@ impl Routes {
/// system. If a peer says hello, and we know the destination
/// address (and we're running in STATIC mode), then we can safely
/// peer with it.
pub(crate) async fn find_via_srcport(self: &Arc<Self>, src: &SourceAddr, port: u16) -> Option<usize> {
pub(crate) async fn find_via_srcport(
self: &Arc<Self>,
src: &SourceAddr,
port: u16,
) -> Option<usize> {
let imply_dst = DstAddr::new(src.ip(), port);
self.dst_map.read().await.get(&imply_dst).map(|id| *id)
}
/// Upgrade an existing peer with a destination address
///
/// The existing src peer will be dropped. If a dst peer is
......
......@@ -6,7 +6,7 @@ use async_std::{
stream::StreamExt,
task,
};
use netmod::Frame;
use netmod::{Frame, Target};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
......@@ -19,9 +19,9 @@ pub(crate) struct Server {
alive: Arc<AtomicBool>,
inner: TcpListener,
routes: Arc<Routes>,
port: u16,
_port: u16,
mode: Mode,
incoming: IoPair<Frame>,
incoming: IoPair<(Frame, usize)>,
}
impl Server {
......@@ -29,10 +29,10 @@ impl Server {
pub(crate) async fn new(
routes: Arc<Routes>,
addr: &str,
port: u16,
_port: u16,
mode: Mode,
) -> Result<Arc<Self>> {
Ok(TcpListener::bind(format!("{}:{}", addr, port))
Ok(TcpListener::bind(format!("{}:{}", addr, _port))
.await
.map(|inner| {
Arc::new(Self {
......@@ -40,7 +40,7 @@ impl Server {
incoming: IoPair::default(),
inner,
routes,
port,
_port,
mode,
})
})?)
......@@ -50,14 +50,23 @@ impl Server {
self.alive.load(Ordering::Relaxed)
}
pub(crate) fn mode(&self) -> Mode {
self.mode.clone()
}
/// Shut down the listening server
pub(crate) fn stop(self: &Arc<Self>) {
self.alive.fetch_and(false, Ordering::Relaxed);
}
/// Get the next available frame
pub(crate) async fn next(self: &Arc<Self>) -> Frame {
self.incoming.rx.recv().await.unwrap()
pub(crate) async fn next(self: &Arc<Self>) -> (Frame, Target) {
self.incoming
.rx
.recv()
.await
.map(|(f, t)| (f, Target::Single(t as u16)))
.unwrap()
}
/// Spawn a handler task for incoming connections
......@@ -127,7 +136,7 @@ impl Server {
use Packet::*;
use PeerState::*;
match (peer.state(), f) {
(Duplex, Frame(f)) | (RxOnly, Frame(f)) => self.handle_frame(f).await,
(Duplex, Frame(f)) | (RxOnly, Frame(f)) => self.handle_frame(peer.id, f).await,
(_, Hello { port }) => self.handle_hello(&src_addr, port).await,
(RxOnly, KeepAlive) => self.rx_keepalive(),
(Duplex, KeepAlive) => self.dup_keepalive(Arc::clone(&peer)),
......@@ -139,8 +148,8 @@ impl Server {
}
/// Handle an incoming frame message
async fn handle_frame(self: &Arc<Self>, p: Frame) {
self.incoming.tx.send(p).await;
async fn handle_frame(self: &Arc<Self>, peer_id: usize, p: Frame) {
self.incoming.tx.send((p, peer_id)).await;
}
/// A keepalive on an RXonly connection
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment