Commit 99528628 authored by Kaiden Fey's avatar Kaiden Fey Committed by Katharina Fey

netmod-tcp: implement limited client-server connections

By default connections are greated bi-directionally between two
peers.  This has the advantage of being able to read and write at the
same time, but might not be suported on all networks.  As a fallback
this commit implements reverse connection logic via the same stream as
an incoming connection.

A user can specify a peer in the peer list with "limited" connection
mode (example: 10.20.30.40:1312 limited), which will indicate in the
opening handshake to the server to re-use the incoming TcpStream as
the outgoing stream to the peer.

This way clients that can't be reached via an open upnp port (because
their network doesn't support this) can still use netmod-tcp to peer.
parent 9e0aae03
......@@ -15,13 +15,14 @@ pub(crate) use peer::{DstAddr, Peer, PeerState, SourceAddr};
pub(crate) use proto::{Packet, PacketBuilder};
pub(crate) use ptr::AtomPtr;
pub(crate) use routes::Routes;
pub(crate) use server::Server;
pub(crate) use server::{LockedStream, Server};
use async_std::sync::Arc;
use async_trait::async_trait;
use netmod::{self, Endpoint as EndpointExt, Frame, Target};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use tracing::info;
use tracing::{error, info};
/// Define the runtime mode for this endpount
///
......@@ -33,6 +34,31 @@ pub enum Mode {
Dynamic,
}
/// Specify the conneciton types used by this node
///
/// By default netmod-tcp tries to establish bi-directional
/// connections, meaning that two nodes each have a dedicated
/// transmission (tx) and receiving (rx) channels. However on some
/// networks this isn't possible. While `Bidirect` is a good default,
/// it's possible to override this behaviour.
///
/// `Limited` will open connections to peers with a special flag that
/// makes it use a different reverse-channel strategy. The server
/// won't try to create full reverse channels, and instead use the
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum LinkType {
/// Default connection type
Bidirect,
/// Fallback connection type
Limited,
}
impl Default for LinkType {
fn default() -> Self {
Self::Bidirect
}
}
#[derive(Clone)]
pub struct Endpoint {
server: Arc<Server>,
......@@ -68,9 +94,28 @@ impl Endpoint {
/// connect to it. Connections might not be recipricated if the
/// peer doesn't know the local IP or is rejecting unknown
/// connections.
pub async fn add_peers<I: Into<SocketAddr>>(&self, peers: Vec<I>) -> Result<()> {
for peer in peers.into_iter() {
self.routes.add_via_dst(peer.into()).await;
pub async fn add_peers(&self, peers: Vec<String>) -> Result<()> {
for p in peers.into_iter() {
let mut parts: Vec<_> = p.split(|x| x == ' ').collect();
let _type = parts.remove(1);
let peer = match parts[0].parse().ok() {
Some(s) => s,
None => {
error!("Failed to parse peer info `{}`", parts[0]);
continue;
}
};
self.routes
.add_via_dst(
peer,
if _type == "limited" {
LinkType::Limited
} else {
LinkType::Bidirect
},
)
.await;
}
Ok(())
......
......@@ -27,7 +27,7 @@
//! channel, which means they will return immediately, even if the
//! connection is currently down.
use crate::{AtomPtr, IoPair, Packet, PacketBuilder};
use crate::{AtomPtr, IoPair, LinkType, LockedStream, Packet, PacketBuilder};
use async_std::{
future::timeout,
io::prelude::WriteExt,
......@@ -58,9 +58,6 @@ pub(crate) type SourceAddr = SocketAddr;
/// Address to which packets are sent
pub(crate) type DstAddr = SocketAddr;
/// A thread-safe locked sending stream
type LockedStream = Arc<RwLock<Option<TcpStream>>>;
/// Encode the different states a `Peer` can be in
#[derive(Debug)]
pub(crate) enum PeerState {
......@@ -89,7 +86,9 @@ pub(crate) struct Peer {
/// Peer destination address
dst: Option<DstAddr>,
/// Sending stream for this peer (if it existst)
sender: LockedStream,
sender: AtomPtr<LockedStream>,
/// The type of link this maintains
_type: LinkType,
/// Secret run condition
#[doc(hidden)]
_run: Arc<AtomicBool>,
......@@ -121,17 +120,18 @@ impl Peer {
/// worker that will try to establish a connection to the peer,
/// exiting until `stop()` is called on this peer
#[tracing::instrument(level = "trace")]
pub(crate) fn open(dst: DstAddr, port: u16) -> Arc<Self> {
pub(crate) fn open(dst: DstAddr, port: u16, _type: LinkType) -> Arc<Self> {
let p = Arc::new(Self {
id: id::next(),
dst: Some(dst),
_run: Arc::new(true.into()),
_type,
..Default::default()
});
// Start sender loop and send a hello
Arc::clone(&p).run_io_sender(port);
task::block_on(async { Arc::clone(&p).send(Packet::Hello { port }).await });
task::block_on(async { Arc::clone(&p).send(Packet::Hello { port, _type }).await });
return p;
}
......@@ -141,6 +141,10 @@ impl Peer {
self.src.swap(src.into());
}
pub(crate) async fn set_stream(&self, s: LockedStream) {
self.sender.swap(s);
}
/// Stop all tasks associated with this peer
pub(crate) fn stop(&self) {
self._run.fetch_and(false, Ordering::Relaxed);
......@@ -156,6 +160,11 @@ impl Peer {
}
}
/// Get the type for this link
pub(crate) fn link_type(&self) -> LinkType {
self._type
}
/// Internal utility to verify that this peer is still alive
pub(crate) fn alive(&self) -> bool {
self._run.load(Ordering::Relaxed)
......@@ -163,7 +172,8 @@ impl Peer {
/// Call for each packet in the output stream
async fn send_packet(self: &Arc<Self>, p: &Packet) -> Option<()> {
let mut s = self.sender.write().await;
let r = self.sender.get_ref();
let mut s = r.write().await;
match *s {
Some(ref mut stream) => {
let addr = match stream.peer_addr() {
......@@ -203,7 +213,8 @@ impl Peer {
async fn wait_for_ack(self: Arc<Self>) {
let t = timeout(Duration::from_secs(10), async {
loop {
let mut s = self.sender.write().await;
let r = self.sender.get_ref();
let mut s = r.write().await;
if s.is_none() {
break;
}
......@@ -216,7 +227,6 @@ impl Peer {
_ => error!("Invalid data (only ACKs)!"),
},
_ => {
let mut s = self.sender.write().await;
std::mem::swap(&mut *s, &mut None);
error!("Failed to read ACK from sender stream");
}
......@@ -238,7 +248,8 @@ impl Peer {
Ok(_) => {}
Err(_) => {
// Remove the stream because it's probably dead
let mut s = self.sender.write().await;
let _ref = self.sender.get_ref();
let mut s = _ref.write().await;
std::mem::swap(&mut *s, &mut None);
}
}
......@@ -248,7 +259,7 @@ impl Peer {
/// output stream if it doesn't yet exist
async fn send_or_introduce(self: &Arc<Self>, p: Packet, port: u16) {
loop {
if { self.sender.read().await.is_some() } {
if { self.sender.get_ref().read().await.is_some() } {
// Send the packet and re-run the loop if we failed to send
match self.send_packet(&p).await {
Some(_) => break,
......@@ -293,7 +304,7 @@ impl Peer {
let dst = self.dst.clone().unwrap();
let run = Arc::clone(&self._run);
let sender = Arc::clone(&self.sender);
let sender = Arc::clone(&self.sender.get_ref());
let mut ctr = 0;
while run.load(Ordering::Relaxed) {
......
//! TCP internal protocol used to share connection state
use crate::LinkType;
use async_std::{
io::{self, prelude::ReadExt},
net::TcpStream,
......@@ -20,7 +21,7 @@ pub(crate) enum Packet {
/// network we create reverse connections. When establishing a
/// connection, the hello message contains the port which is
/// swapped into the source address to connect to.
Hello { port: u16 },
Hello { port: u16, _type: LinkType },
/// Response to a Hello on the sending stream
Ack,
/// An actual data packet
......
......@@ -4,11 +4,11 @@
//! in this file, so we should pull it out into it's own crate at some
//! point. But for now...
use std::{ops::Deref, cmp::PartialEq};
use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc,
};
use std::{cmp::PartialEq, ops::Deref};
/// An alias for a referenced pointer
pub(crate) struct Ref<T> {
......@@ -59,7 +59,9 @@ impl<T> AtomPtr<T> {
let arc = Arc::clone(&*b);
std::mem::forget(b);
Ref { inner: Box::new(arc) }
Ref {
inner: Box::new(arc),
}
}
/// Swap the data entry with a new value, returning the old
......@@ -71,7 +73,9 @@ impl<T> AtomPtr<T> {
let arc = Arc::clone(&*b);
std::mem::forget(b);
Ref { inner: Box::new(arc) }
Ref {
inner: Box::new(arc),
}
}
}
......
......@@ -11,10 +11,10 @@
//! this table, and introduced to. Once a peer worker has been
//! spawned, it will make sure the duplex link is never dropped.
use crate::{DstAddr, Peer, SourceAddr};
use crate::{DstAddr, LinkType, LockedStream, Peer, SourceAddr};
use async_std::sync::{Arc, RwLock};
use std::collections::BTreeMap;
use tracing::trace;
use tracing::{trace, warn};
/// Routing table for local IP scope
#[derive(Clone, Debug, Default)]
......@@ -79,8 +79,8 @@ impl Routes {
///
/// This function is called when adding a peer via the static set
/// of peers to connect to.
pub(crate) async fn add_via_dst(self: &Arc<Self>, dst: DstAddr) -> usize {
let p = Peer::open(dst.clone(), self.port);
pub(crate) async fn add_via_dst(self: &Arc<Self>, dst: DstAddr, _type: LinkType) -> usize {
let p = Peer::open(dst.clone(), self.port, _type);
let id = p.id;
self.peers.write().await.insert(id, p);
......@@ -165,7 +165,12 @@ impl Routes {
/// This indicates some bad state and we panic. This _should_
/// never happen, but might when calling this function in the
/// wrong position in the accept loop.
pub(crate) async fn upgrade(self: &Arc<Self>, id: usize, port: u16) -> usize {
pub(crate) async fn upgrade(
self: &Arc<Self>,
id: usize,
port: u16,
stream: Option<LockedStream>,
) -> usize {
let mut peers = self.peers.write().await;
let mut src_map = self.src_map.write().await;
let mut dst_map = self.dst_map.write().await;
......@@ -191,6 +196,10 @@ impl Routes {
// If a peer with the implied DST address exists, we drop the
// SRC peer, and upgrade this to a duplex connection.
Some(id) => {
if stream.is_some() {
warn!("An outgoing stream exists for a LIMITED incoming stream! ignoring...");
}
trace!("Upgrading peer {} with SRC address", id);
let peer = peers.get(&id).unwrap();
src_map.insert(src, peer.id);
......@@ -199,8 +208,11 @@ impl Routes {
}
// If no such peer exists, we create one with SRC and DST addresses
None => {
let p = Peer::open(dst, port);
let p = Peer::open(dst, port, LinkType::Bidirect);
p.set_src(src);
if let Some(s) = stream {
p.set_stream(s).await;
}
// Insert peer into lookup tables
src_map.insert(src, p.id);
......
//! TCP incoming connection server
use crate::{IoPair, Mode, Packet, PacketBuilder, Peer, PeerState, Result, Routes, SourceAddr};
use crate::{
IoPair, LinkType, Mode, Packet, PacketBuilder, Peer, PeerState, Result, Routes, SourceAddr,
};
use async_std::{
io::prelude::*,
net::{TcpListener, TcpStream},
stream::StreamExt,
sync::Mutex,
sync::{Arc, RwLock},
task,
};
use netmod::{Frame, Target};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};
type LockedStream = Arc<Mutex<TcpStream>>;
pub(crate) type LockedStream = Arc<RwLock<Option<TcpStream>>>;
fn locked_stream(s: TcpStream) -> LockedStream {
Arc::new(RwLock::new(Some(s)))
}
/// The listening server part of the tcp driver
pub(crate) struct Server {
......@@ -91,7 +94,7 @@ impl Server {
trace!("Accepting new connection...");
let s = Arc::clone(&s);
task::spawn(async move { s.accept_connection(Arc::new(Mutex::new(stream))).await });
task::spawn(async move { s.accept_connection(locked_stream(stream)).await });
}
info!("Terminating tcp accept loop!");
......@@ -100,7 +103,7 @@ impl Server {
/// loop over a stream of incoming data
async fn accept_connection(self: Arc<Self>, stream: LockedStream) {
let src_addr = match stream.lock().await.peer_addr() {
let src_addr = match stream.write().await.as_mut().unwrap().peer_addr() {
Ok(a) => a,
Err(_) => {
error!("Missing peer addr in stream; exiting!");
......@@ -124,9 +127,9 @@ impl Server {
let peer = self.routes.get_peer(pid).await.unwrap();
let f = {
let mut stream = stream.lock().await;
let mut stream = stream.write().await;
let mut fb = PacketBuilder::new(&mut stream);
let mut fb = PacketBuilder::new(stream.as_mut().unwrap());
if let Err(_) = fb.parse().await {
error!("Failed to read from incoming packet stream; dropping connection!");
break;
......@@ -149,8 +152,8 @@ impl Server {
use PeerState::*;
match (peer.state(), f) {
(_, Frame(f)) => self.handle_frame(peer.id, f).await,
(state, Hello { port }) => {
self.handle_hello(peer.id, state, &src_addr, port, Arc::clone(&stream))
(state, Hello { port, _type }) => {
self.handle_hello(peer.id, state, &src_addr, port, _type, Arc::clone(&stream))
.await
}
(state, packet) => panic!(format!("state={:?}, packet={:?}", state, packet)),
......@@ -179,29 +182,40 @@ impl Server {
state: PeerState,
src: &SourceAddr,
port: u16,
_type: LinkType,
stream: LockedStream,
) {
let maybe_id = self.routes.find_via_srcport(src, port).await;
let upm = "Received HELLO from unknown peer.";
let s = match _type {
// This connection needs to be established as the
// reverse channel on this stream
LinkType::Limited => {
debug!("Receiving a limited incoming connection...");
Some(Arc::clone(&stream))
}
// This is the default outgoing stream
LinkType::Bidirect => None,
};
use PeerState::*;
let _self = Arc::clone(self);
match (state, self.mode, maybe_id) {
// A peer we didn't know before, while running in static mode
(_, Mode::Static, None) => {
info!("{} Running STATIC: dropping packet!", upm);
debug!("{} Running STATIC: dropping packet!", upm);
return;
}
// A peer we didn't know before, while running in dynamic mode
(RxOnly, Mode::Dynamic, None) => {
trace!("{} Running DYNAMIC: establishing reverse connection!", upm);
let id = self.routes.upgrade(rx_peer, port).await;
let id = self.routes.upgrade(rx_peer, port, s).await;
trace!("Sending a hello...");
self.send_hello(id, stream).await;
}
// Reverse connection of a peer we have known before
(RxOnly, _, Some(id)) => {
let id = self.routes.upgrade(rx_peer, port).await;
let id = self.routes.upgrade(rx_peer, port, s).await;
trace!("Sending a hello...");
self.send_hello(id, stream).await;
}
......@@ -214,15 +228,19 @@ impl Server {
}
async fn send_hello(self: &Arc<Self>, id: usize, stream: LockedStream) {
let mut stream = stream.lock().await;
let mut stream = stream.write().await;
let buf = Packet::Ack.serialize();
(*stream).write_all(&buf).await.unwrap();
(*stream.as_mut().unwrap()).write_all(&buf).await.unwrap();
let s = Arc::clone(self);
task::spawn(async move {
if let Some(peer) = s.routes.get_peer(id).await {
task::sleep(Duration::from_secs(2)).await;
peer.send(Packet::Hello { port: s._port }).await;
peer.send(Packet::Hello {
port: s._port,
_type: peer.link_type(),
})
.await;
}
});
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment