Commit a7fdb88a authored by Katharina Fey's avatar Katharina Fey

netmod-tcp: fixing: issue where dropped connections wouldn't resume

The solution to this problem has been to send explicit hello-responses
from the receiving stream end.  When sending a HELLO a task tries to
read from the sending stream to wait for an ACK.  If this response
times out (TODO: make these values configurable), it will officially
drop the sending connection, meaning that the next send re-initialises
the stream.

With some initial testing this has shown _most_ connection drops to
recover gracefully, but not all.  Further testing is required.
parent 58794f53
......@@ -27,8 +27,9 @@
//! channel, which means they will return immediately, even if the
//! connection is currently down.
use crate::{AtomPtr, IoPair, Packet};
use crate::{AtomPtr, IoPair, Packet, PacketBuilder};
use async_std::{
future::timeout,
io::prelude::WriteExt,
net::TcpStream,
sync::{Arc, RwLock},
......@@ -119,6 +120,7 @@ impl Peer {
/// While this function returns immediately, it spawns an async
/// worker that will try to establish a connection to the peer,
/// exiting until `stop()` is called on this peer
#[tracing::instrument(level = "trace")]
pub(crate) fn open(dst: DstAddr, port: u16) -> Arc<Self> {
let p = Arc::new(Self {
id: id::next(),
......@@ -172,24 +174,23 @@ impl Peer {
}
};
// Serialise the payload and pre-pend the length
let mut vec = serialize(p).unwrap();
let mut buf = vec![0; 8];
BigEndian::write_u64(&mut buf, vec.len() as u64);
buf.append(&mut vec);
// And woosh!
let buf = p.serialize();
if let Err(e) = stream.write_all(&buf).await {
error!("Failed to send message: {}!", e.to_string());
// We mark ourselves as missing uplink
std::mem::swap(&mut *s, &mut None);
return None;
}
match p {
Packet::Hello { .. } => trace!("Sending HELLO to {}", addr),
Packet::Hello { .. } => {
trace!("Sending HELLO to {}", addr);
let _self = Arc::clone(self);
task::spawn(_self.wait_for_ack());
}
_ => {}
}
......@@ -199,18 +200,59 @@ impl Peer {
}
}
async fn wait_for_ack(self: Arc<Self>) {
let t = timeout(Duration::from_secs(10), async {
loop {
let mut s = self.sender.write().await;
if s.is_none() {
break;
}
if timeout(Duration::from_millis(1), async {
let mut pb = PacketBuilder::new((*s).as_mut().unwrap());
match pb.parse().await {
Ok(_) => match pb.build() {
Some(Packet::Ack) => trace!("Received an ACK."),
_ => error!("Invalid data (only ACKs)!"),
},
_ => {
let mut s = self.sender.write().await;
std::mem::swap(&mut *s, &mut None);
error!("Failed to read ACK from sender stream");
}
}
})
.await
.is_ok()
{
break;
}
drop(s);
task::sleep(Duration::from_millis(50)).await;
}
});
// If the top-level timeout is ever hit...
match t.await {
Ok(_) => {}
Err(_) => {
// Remove the stream because it's probably dead
let mut s = self.sender.write().await;
std::mem::swap(&mut *s, &mut None);
}
}
}
/// This function will try sending a packet, initialising the
/// output stream if it doesn't yet exist
async fn send_or_introduce(self: &Arc<Self>, p: Packet, port: u16) {
loop {
if { self.sender.write().await.is_some() } {
if { self.sender.read().await.is_some() } {
// Send the packet and re-run the loop if we failed to send
match self.send_packet(&p).await {
Some(_) => break,
None => {
dbg!();
continue;
}
None => continue, // send_packet sets sender = None if failed
}
} else {
trace!("Sender is None, opening a connection first...");
......@@ -228,8 +270,10 @@ impl Peer {
/// There's currently no way to get diagnostics from failed sends
/// back to ratman. **FIXME**: implement this!
pub(crate) fn run_io_sender(self: Arc<Self>, port: u16) {
trace!("Running IO sender");
task::spawn(async move {
while let Some(p) = self.io.rx.recv().await {
trace!("Queued packet {:?}", p);
self.send_or_introduce(p, port).await;
if !self.alive() {
......@@ -241,6 +285,8 @@ impl Peer {
});
}
fn register_revhello(self: &Arc<Self>) {}
/// Loop on a connection until it could be established!
async fn introduce_blocking(self: Arc<Self>, port: u16) {
let id = self.id.clone();
......
......@@ -4,7 +4,7 @@ use async_std::{
io::{self, prelude::ReadExt},
net::TcpStream,
};
use bincode::deserialize;
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder};
use netmod::Frame;
use serde::{Deserialize, Serialize};
......@@ -21,10 +21,23 @@ pub(crate) enum Packet {
/// connection, the hello message contains the port which is
/// swapped into the source address to connect to.
Hello { port: u16 },
/// Response to a Hello on the sending stream
Ack,
/// An actual data packet
Frame(Frame),
}
impl Packet {
/// Serialises the packet into a length prepended data stream
pub(crate) fn serialize(&self) -> Vec<u8> {
let mut vec = serialize(self).unwrap();
let mut buf = vec![0; 8];
BigEndian::write_u64(&mut buf, vec.len() as u64);
buf.append(&mut vec);
buf
}
}
/// A utility to read packets from an incoming TCP stream
pub(crate) struct PacketBuilder<'s> {
stream: &'s mut TcpStream,
......@@ -42,7 +55,7 @@ impl<'s> PacketBuilder<'s> {
let mut len_buf = [0; 8];
self.stream.read_exact(&mut len_buf).await?;
let len = BigEndian::read_u64(&len_buf);
let mut data_buf = vec![0; len as usize];
self.stream.read_exact(&mut data_buf).await?;
self.data = Some(data_buf);
......
......@@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use tracing::trace;
/// Routing table for local IP scope
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
pub(crate) struct Routes {
/// Store which port this instance is listening to
port: u16,
......@@ -165,7 +165,7 @@ impl Routes {
/// This indicates some bad state and we panic. This _should_
/// never happen, but might when calling this function in the
/// wrong position in the accept loop.
pub(crate) async fn upgrade(self: &Arc<Self>, id: usize, port: u16) {
pub(crate) async fn upgrade(self: &Arc<Self>, id: usize, port: u16) -> usize {
let mut peers = self.peers.write().await;
let mut src_map = self.src_map.write().await;
let mut dst_map = self.dst_map.write().await;
......
......@@ -2,8 +2,10 @@
use crate::{IoPair, Mode, Packet, PacketBuilder, Peer, PeerState, Result, Routes, SourceAddr};
use async_std::{
io::prelude::*,
net::{TcpListener, TcpStream},
stream::StreamExt,
sync::Mutex,
task,
};
use netmod::{Frame, Target};
......@@ -14,6 +16,8 @@ use std::sync::{
use std::time::Duration;
use tracing::{error, info, trace, warn};
type LockedStream = Arc<Mutex<TcpStream>>;
/// The listening server part of the tcp driver
pub(crate) struct Server {
alive: Arc<AtomicBool>,
......@@ -87,7 +91,7 @@ impl Server {
trace!("Accepting new connection...");
let s = Arc::clone(&s);
task::spawn(async move { s.accept_connection(stream).await });
task::spawn(async move { s.accept_connection(Arc::new(Mutex::new(stream))).await });
}
info!("Terminating tcp accept loop!");
......@@ -95,8 +99,8 @@ impl Server {
}
/// loop over a stream of incoming data
async fn accept_connection(self: Arc<Self>, mut stream: TcpStream) {
let src_addr = match stream.peer_addr() {
async fn accept_connection(self: Arc<Self>, stream: LockedStream) {
let src_addr = match stream.lock().await.peer_addr() {
Ok(a) => a,
Err(_) => {
error!("Missing peer addr in stream; exiting!");
......@@ -119,17 +123,21 @@ impl Server {
});
let peer = self.routes.get_peer(pid).await.unwrap();
let mut fb = PacketBuilder::new(&mut stream);
if let Err(_) = fb.parse().await {
error!("Failed to read from incoming packet stream; dropping connection!");
break;
}
let f = {
let mut stream = stream.lock().await;
let f = match fb.build() {
Some(f) => f,
None => {
error!("Malformed frame; skipping!");
continue;
let mut fb = PacketBuilder::new(&mut stream);
if let Err(_) = fb.parse().await {
error!("Failed to read from incoming packet stream; dropping connection!");
break;
}
match fb.build() {
Some(f) => f,
None => {
error!("Malformed frame; skipping!");
continue;
}
}
};
......@@ -141,7 +149,10 @@ impl Server {
use PeerState::*;
match (peer.state(), f) {
(_, Frame(f)) => self.handle_frame(peer.id, f).await,
(state, Hello { port }) => self.handle_hello(peer.id, state, &src_addr, port).await,
(state, Hello { port }) => {
self.handle_hello(peer.id, state, &src_addr, port, Arc::clone(&stream))
.await
}
(state, packet) => panic!(format!("state={:?}, packet={:?}", state, packet)),
}
}
......@@ -168,6 +179,7 @@ impl Server {
state: PeerState,
src: &SourceAddr,
port: u16,
stream: LockedStream,
) {
let maybe_id = self.routes.find_via_srcport(src, port).await;
let upm = "Received HELLO from unknown peer.";
......@@ -185,23 +197,27 @@ impl Server {
trace!("{} Running DYNAMIC: establishing reverse connection!", upm);
let id = self.routes.upgrade(rx_peer, port).await;
trace!("Sending a hello...");
self.send_hello(id);
self.send_hello(id, stream).await;
}
// Reverse connection of a peer we have known before
(RxOnly, _, Some(id)) => {
let id = self.routes.upgrade(rx_peer, port).await;
trace!("Sending a hello...");
self.send_hello(id);
self.send_hello(id, stream).await;
}
(TxOnly, _, Some(id)) => {
self.routes.add_src(id, *src).await;
self.send_hello(id);
self.send_hello(id, stream).await;
}
(link, mode, id) => panic!("{:?} {:?} {:?}", link, mode, id),
}
}
fn send_hello(self: &Arc<Self>, id: usize) {
async fn send_hello(self: &Arc<Self>, id: usize, stream: LockedStream) {
let mut stream = stream.lock().await;
let buf = Packet::Ack.serialize();
(*stream).write_all(&buf).await.unwrap();
let s = Arc::clone(self);
task::spawn(async move {
let peer = s.routes.get_peer(id).await.unwrap();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment