...
 
Commits (8)
......@@ -880,6 +880,8 @@ version = "0.1.0"
dependencies = [
"async-std",
"libqaul",
"libqaul-rpc",
"serde_json",
"tide",
]
......@@ -941,12 +943,13 @@ dependencies = [
]
[[package]]
name = "linux-ws-test"
name = "linux-http-test"
version = "0.1.0"
dependencies = [
"async-std",
"libqaul",
"libqaul-ws",
"libqaul-http",
"libqaul-rpc",
"qaul-chat",
"ratman",
]
......@@ -1345,7 +1348,9 @@ dependencies = [
name = "ratman"
version = "0.1.0"
dependencies = [
"access-notifier",
"async-std",
"bincode",
"clockctrl",
"conjoiner-engine",
"futures 0.3.4",
......
......@@ -39,5 +39,5 @@ members = [
"librobot",
# test binaries
"clients/linux-ws-test"
"clients/linux-http-test"
]
//! A utility wrapper type for notifying async tasks about mutation of data they're interested in.
use std::task::Waker;
//! A utility wrapper type for notifying async tasks about mutation of data they're interested in.
use std::ops::{Deref, DerefMut};
use std::task::Waker;
/// A wrapper which wakes tasks on mutable accesses to the wrapped value.
///
///
/// This can be used to transparently notify an asyncronous task that it
/// should, for example, check for more work in a queue or try again to
/// acquire a lock.
......@@ -29,7 +29,7 @@ impl<T> DerefMut for AccessNotifier<T> {
impl<T> AccessNotifier<T> {
/// Check whether or not this `AccessNotifier` has a registered `Waker`.
///
///
/// This function is implemented as an associated function rather than a
/// method to avoid conflicts with methods on the wrapped type.
/// Call it as `AccessNotifier::has_waker()`.
......@@ -46,12 +46,19 @@ impl<T> AccessNotifier<T> {
ptr.waker.as_ref().map(|w| w.clone())
}
/// Call wake on the waker, if it's a waker, yehaa!
pub fn wake_if_waker(ptr: &mut AccessNotifier<T>) {
if let Some(ref w) = ptr.waker {
w.clone().wake();
}
}
/// Register a `Waker` to be woken upon mutable accesses to the wrapped value.
///
/// This function is implemented as an associated function rather than a
/// method to avoid conflicts with methods on the wrapped type.
/// Call it as `AccessNotifier::register_waker()`.
///
///
/// # Panics
/// Panics if there is an already registered `Waker`.
/// Use `AccessNotifier::has_waker` to check the state before using this.
......@@ -83,7 +90,7 @@ impl<T> AccessNotifier<T> {
}
/// Notifies any registered `Waker` immediately.
///
///
/// This function is implemented as an associated function rather than a
/// method to avoid conflicts with methods on the wrapped type.
/// Call it as `AccessNotifier::notify()`.
......
[package]
name = "linux-ws-test"
name = "linux-http-test"
description = "A linux server binary that bootstraps a test server for the web UI to test with"
version = "0.1.0"
authors = ["Katharina Fey <kookie@spacekookie.de>"]
......@@ -8,7 +8,8 @@ edition = "2018"
[dependencies]
ratman = { path = "../../ratman" }
libqaul = { path = "../../libqaul" }
libqaul-ws = { path = "../../libqaul/ws", features = ["chat"] }
libqaul-rpc = { path = "../../libqaul/rpc", features = ["chat", "json"] }
libqaul-http = { path = "../../libqaul/http" }
qaul-chat = { path = "../../libqaul/service/chat" }
async-std = "1.0"
use async_std::sync::Arc;
use libqaul::Qaul;
use libqaul_ws::WsServer;
use qaul_chat::Chat;
use ratman::Router;
use {libqaul::Qaul, libqaul_http::HttpServer, libqaul_rpc::Responder, qaul_chat::Chat};
fn main() {
// Init a basic libqaul stack with no interfaces
......@@ -11,6 +9,5 @@ fn main() {
let chat = Chat::new(Arc::clone(&qaul)).unwrap();
// Start the websocket server
let ws = WsServer::new("127.0.0.1:9900", qaul, chat);
ws.block();
HttpServer::block("127.0.0.1:9900", Responder { qaul, chat });
}
......@@ -8,5 +8,7 @@ license = "AGPL-3.0"
[dependencies]
libqaul = { path = ".." }
libqaul-rpc = { path = "../rpc" }
serde_json = "1.0"
async-std = "1.0"
tide = "0.6"
\ No newline at end of file
//! libqaul http server API
#![allow(unused)]
use async_std::sync::Arc;
use libqaul::Qaul;
use tide::{self, Request, Response, Server};
use libqaul_rpc::{
json::{RequestEnv, ResponseEnv},
Envelope, EnvelopeType, Responder,
};
use async_std::{sync::Arc, task};
use serde_json;
use tide::{self, Request, Response};
/// State structure for the libqaul http server
pub struct HttpServer {
qaul: Arc<Qaul>,
app: Server<()>,
}
pub struct HttpServer;
impl HttpServer {
pub fn new(qaul: Arc<Qaul>) -> Self {
let mut app = tide::new();
app.at("/")
.get(|mut r: Request<()>| async move { Response::new(200) });
pub fn block(addr: &str, rpc: Responder) {
let mut app = tide::with_state(Arc::new(rpc));
app.at("/api").post(|mut r: Request<Arc<Responder>>| {
async move {
let json: String = r.body_json().await.unwrap();
let req_env: RequestEnv =
serde_json::from_str(json.as_str()).expect("Malformed json envelope");
let Envelope { id, data } = req_env.clone().into();
let req = match data {
EnvelopeType::Request(req) => req,
_ => unreachable!(), // Obviously possibly but fuck you
};
// Call into libqaul via the rpc utilities
let responder: Arc<_> = Arc::clone(r.state());
let resp = responder.respond(req).await;
let env = Envelope {
id,
data: EnvelopeType::Response(resp),
};
// Build the reply envelope
let resp_env: ResponseEnv = (env, req_env).into();
Response::new(200).body_json(&resp_env).unwrap()
}
});
Self { qaul, app }
task::block_on(async move { app.listen(addr).await }).unwrap();
}
}
//! Json message generator
use crate::{RequestEnv, ResponseEnv};
use libqaul_rpc::{Envelope, EnvelopeType};
use crate::{
json::{RequestEnv, ResponseEnv},
Envelope, EnvelopeType,
};
use serde_json::{self, Map, Value as JsonValue};
impl From<(Envelope, RequestEnv)> for ResponseEnv {
......
//! Json enveloping code
//! Json enveloping module
//!
//! This code is mostly used in `libqaul-ws` and `libqaul-http`, and
//! wraps the transactions of libqaul-rpc in json envelopes that are
//! nicer to work with for web tools.
mod parser;
mod generator;
use libqaul::{users::UserAuth, Identity};
use serde::{Deserialize, Serialize};
......@@ -9,7 +16,7 @@ pub(crate) type JsonMap = BTreeMap<String, JsonValue>;
/// A struct wrapper for UserAuth
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct JsonAuth {
pub struct JsonAuth {
id: Identity,
token: String,
}
......@@ -22,7 +29,7 @@ impl From<JsonAuth> for UserAuth {
/// A json specific request envelope
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct RequestEnv {
pub struct RequestEnv {
/// The request ID
pub id: String,
/// Auth data for the request
......@@ -45,7 +52,7 @@ pub(crate) struct RequestEnv {
/// A json specific repsonse envelope
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ResponseEnv {
pub struct ResponseEnv {
/// Response ID, same as request ID
pub id: String,
/// Mirrored auth token
......
use crate::{JsonAuth, JsonMap, RequestEnv};
use crate::{
json::{JsonAuth, JsonMap, RequestEnv},
Envelope, EnvelopeType, Request,
};
use libqaul::users::UserAuth;
use libqaul_rpc::{Envelope, EnvelopeType, Request};
use serde::de::DeserializeOwned;
#[derive(Debug)]
......@@ -66,12 +68,13 @@ impl From<RequestEnv> for Envelope {
data: match (kind.as_str(), method.as_str()) {
// chat service message functions
#[cfg(features = "chat")]
("chat-message", "poll") => req(Request::ChatMsgNext(de_json(data, auth))),
#[cfg(features = "chat")]
("chat-message", "subscribe") => req(Request::ChatMsgSub(de_json(data, auth))),
("chat-message", "next") => req(Request::ChatMsgNext(de_json(data, auth))),
// #[cfg(features = "chat")]
// ("chat-message", "subscribe") => req(Request::ChatMsgSub(de_json(data, auth))),
#[cfg(features = "chat")]
("chat-message", "send") => req(Request::ChatMsgSend(de_json(data, auth))),
// ("chat-message", "query") => req(Request::ChatMsgQuery(de_json(data, auth))),
#[cfg(features = "chat")]
("chat-message", "query") => req(Request::ChatMsgQuery(de_json(data, auth))),
// chat service room management
#[cfg(features = "chat")]
......
......@@ -24,3 +24,5 @@ pub use api::{
};
pub use api::{chat, chat::ChatExt, chat::ChatRpc};
pub mod json;
//! libqaul websocket RPC
//!
//! The native interface for libqaul in async Rust. But a few other
//! RPC interfaces are exposed via the libqaul-rpc collection. One of
//! them is the websocket interface, which is primarily used by the
//! qaul.net webui.
//!
//! The structures are encoded in JSON, as described by the
//! libqaul-rpc structures. Every request has an envelope, which
//! contains in ID and some data. the data can either be a request or
//! a response, with appropriate data or error values inside.
mod env;
pub(crate) use env::{JsonAuth, JsonMap, RequestEnv, ResponseEnv};
mod generator;
mod parser;
mod server;
pub use server::WsServer;
//! Websocket accept server
use libqaul::Qaul;
use libqaul_rpc::{
json::{RequestEnv, ResponseEnv},
Envelope, EnvelopeType, Responder,
};
use qaul_chat::Chat;
use async_std::{
net::{TcpListener, TcpStream},
sync::Arc,
task,
};
use async_tungstenite::tungstenite::Message;
use futures::prelude::*;
use serde_json;
use std::sync::atomic::{AtomicBool, Ordering};
/// Websocket server structure
pub struct WsServer {
running: AtomicBool,
addr: String,
rpc: Responder,
}
impl WsServer {
/// Create a websocket server with a libqaul instance and services
pub fn new<S: Into<String>>(addr: S, qaul: Arc<Qaul>, chat: Arc<Chat>) -> Arc<Self> {
Arc::new(Self {
running: AtomicBool::from(true),
addr: addr.into(),
rpc: Responder { qaul, chat },
})
}
/// Accept connections in a detached task
pub fn run(self: Arc<Self>) {
task::spawn(async move {
while self.running.load(Ordering::Relaxed) {
println!("Binding '{}'", &self.addr);
let socket = TcpListener::bind(&self.addr)
.await
.expect(&format!("Failed to bind; '{}'", &self.addr));
while let Ok((stream, _)) = socket.accept().await {
task::spawn(Arc::clone(&self).handle(stream));
}
}
});
}
/// Same as `run` but blocks the current thread
pub fn block(self: Arc<Self>) {
task::block_on(async move {
while self.running.load(Ordering::Relaxed) {
println!("Binding '{}'", &self.addr);
let socket = TcpListener::bind(&self.addr)
.await
.expect(&format!("Failed to bind; '{}'", &self.addr));
while let Ok((stream, _)) = socket.accept().await {
task::spawn(Arc::clone(&self).handle(stream));
}
}
})
}
/// Handle an incoming websocket stream
async fn handle(self: Arc<Self>, stream: TcpStream) {
let ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Failed ws handshake");
let (mut tx, mut rx) = ws_stream.split();
// Read messages from this stream
while let Some(Ok(Message::Text(msg))) = rx.next().await {
let req_env: RequestEnv = serde_json::from_str(&msg).expect("Malformed json envelope");
let Envelope { id, data } = req_env.clone().into();
let req = match data {
EnvelopeType::Request(req) => req,
_ => unreachable!(), // Obviously possibly but fuck you
};
// Call into libqaul via the rpc utilities
let resp = self.rpc.respond(req).await;
let env = Envelope {
id,
data: EnvelopeType::Response(resp),
};
// Build the reply envelope
let resp_env: ResponseEnv = (env, req_env).into();
let json = serde_json::to_string(&resp_env).unwrap();
// Send the reply
tx.send(Message::Text(json))
.await
.expect("Failed to send reply!");
// Break on server shutdown
// The if is here because of a possible rustc bug and does nothing
if !self.running.load(Ordering::Relaxed) && break {};
}
}
/// Signal the runner to shut down
pub fn stop(&self) {
self.running.swap(false, Ordering::Relaxed);
}
}
//! Websocket accept server
use crate::env::{RequestEnv, ResponseEnv};
use async_std::{
net::{TcpListener, TcpStream},
sync::Arc,
task,
};
use async_tungstenite::tungstenite::Message;
use futures::prelude::*;
use libqaul::Qaul;
use libqaul_rpc::{Envelope, EnvelopeType, Responder};
use qaul_chat::Chat;
use serde_json;
use std::sync::atomic::{AtomicBool, Ordering};
/// Websocket server structure
pub struct WsServer {
running: AtomicBool,
addr: String,
rpc: Responder,
}
impl WsServer {
/// Create a websocket server with a libqaul instance and services
pub fn new<S: Into<String>>(addr: S, qaul: Arc<Qaul>, chat: Arc<Chat>) -> Arc<Self> {
Arc::new(Self {
running: AtomicBool::from(true),
addr: addr.into(),
rpc: Responder { qaul, chat },
})
}
/// Accept connections in a detached task
pub fn run(self: Arc<Self>) {
task::spawn(async move {
while self.running.load(Ordering::Relaxed) {
println!("Binding '{}'", &self.addr);
let socket = TcpListener::bind(&self.addr)
.await
.expect(&format!("Failed to bind; '{}'", &self.addr));
while let Ok((stream, _)) = socket.accept().await {
task::spawn(Arc::clone(&self).handle(stream));
}
}
});
}
/// Same as `run` but blocks the current thread
pub fn block(self: Arc<Self>) {
task::block_on(async move {
while self.running.load(Ordering::Relaxed) {
println!("Binding '{}'", &self.addr);
let socket = TcpListener::bind(&self.addr)
.await
.expect(&format!("Failed to bind; '{}'", &self.addr));
while let Ok((stream, _)) = socket.accept().await {
task::spawn(Arc::clone(&self).handle(stream));
}
}
})
}
/// Handle an incoming websocket stream
async fn handle(self: Arc<Self>, stream: TcpStream) {
let ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Failed ws handshake");
let (mut tx, mut rx) = ws_stream.split();
// Read messages from this stream
while let Some(Ok(Message::Text(msg))) = rx.next().await {
let req_env: RequestEnv = serde_json::from_str(&msg).expect("Malformed json envelope");
let Envelope { id, data } = req_env.clone().into();
let req = match data {
EnvelopeType::Request(req) => req,
_ => unreachable!(), // Obviously possibly but fuck you
};
// Call into libqaul via the rpc utilities
let resp = self.rpc.respond(req).await;
let env = Envelope {
id,
data: EnvelopeType::Response(resp),
};
// Build the reply envelope
let resp_env: ResponseEnv = (env, req_env).into();
let json = serde_json::to_string(&resp_env).unwrap();
// Send the reply
tx.send(Message::Text(json))
.await
.expect("Failed to send reply!");
// Break on server shutdown
// The if is here because of a possible rustc bug and does nothing
if !self.running.load(Ordering::Relaxed) && break {};
}
// while let Ok(num) = rx.read_to_string(&mut buf).await {
// }
}
/// Signal the runner to shut down
pub fn stop(&self) {
self.running.swap(false, Ordering::Relaxed);
}
}
......@@ -10,6 +10,7 @@ edition = "2018"
crate-type = ["cdylib"]
[dependencies]
jni = { version = "0.14", default-features = false }
libqaul = { path = "../libqaul" }
ratman = { path = "../ratman" }
......
......@@ -6,3 +6,11 @@
A hacky binding to libqaul, for Android.
You need:
rustup target add aarch64-linux-android armv7-linux-androideabi i686-linux-android
Then run `build_env_setup.sh` and copy the contents of `cargo-config.toml` to your ~/.cargo/config
To build use `build.sh` while in this directory
#!/usr/bin/env bash
JNI_LIBS=../clients/android/app/src/main/jniLibs
if [ ! -d $JNI_LIBS ]; then
echo "JNI_LIBS directory $JNI_LIBS does not exist, exiting."
exit 1
fi
NDK_ROOT=~/.NDK
echo $NDK_ROOT
echo "$NDK_ROOT/whatev"
export AR="$NDK_ROOT/arm64/bin/aarch64-linux-android-ar"
export CC="$NDK_ROOT/arm64/bin/aarch64-linux-android-clang"
cargo build --target aarch64-linux-android --release
export AR="$NDK_ROOT/arm/bin/arm-linux-androideabi-ar"
export CC="$NDK_ROOT/arm/bin/arm-linux-androideabi-clang"
cargo build --target armv7-linux-androideabi --release
export AR="$NDK_ROOT/x86/bin/i686-linux-android-ar"
export CC="$NDK_ROOT/x86/bin/i686-linux-android-clang"
cargo build --target i686-linux-android --release
rm -rf $JNI_LIBS
mkdir $JNI_LIBS
mkdir $JNI_LIBS/arm64-v8a
mkdir $JNI_LIBS/armeabi-v7a
mkdir $JNI_LIBS/x86
cp target/aarch64-linux-android/release/librobot.so $JNI_LIBS/arm64-v8a/librobot.so
cp target/armv7-linux-androideabi/release/librobot.so $JNI_LIBS/armeabi-v7a/librobot.so
cp target/i686-linux-android/release/librobot.so $JNI_LIBS/x86/librobot.so
\ No newline at end of file
#!/usr/bin/env bash
mkdir ~/.NDK
$ANDROID_HOME/../android-sdk/ndk/21.0.6113669/build/tools/make-standalone-toolchain.sh --platform=30 --arch=arm64 --install-dir=~/.NDK/arm64;
$ANDROID_HOME/../android-sdk/ndk/21.0.6113669/build/tools/make-standalone-toolchain.sh --platform=30 --arch=arm --install-dir=~/.NDK/arm;
$ANDROID_HOME/../android-sdk/ndk/21.0.6113669/build/tools/make-standalone-toolchain.sh --platform=30 --arch=x86 --install-dir=~/.NDK/x86;
[target.aarch64-linux-android]
ar = ".NDK/arm64/bin/aarch64-linux-android-ar"
linker = ".NDK/arm64/bin/aarch64-linux-android-clang"
[target.armv7-linux-androideabi]
ar = ".NDK/arm/bin/arm-linux-androideabi-ar"
linker = ".NDK/arm/bin/arm-linux-androideabi-clang"
[target.i686-linux-android]
ar = ".NDK/x86/bin/i686-linux-android-ar"
linker = ".NDK/x86/bin/i686-linux-android-clang"
\ No newline at end of file
#![cfg(target_os="android")]
#![allow(non_snake_case)]
use std::ffi::{CString, CStr};
use jni::JNIEnv;
use jni::objects::{JObject, JString};
use jni::sys::{jstring};
#[no_mangle]
pub unsafe extern fn Java_com_example_android_MainActivity_hello(env: JNIEnv, _: JObject, j_recipient: JString) -> jstring {
let recipient = CString::from(
CStr::from_ptr(
env.get_string(j_recipient).unwrap().as_ptr()
)
);
let output = env.new_string("Hello ".to_owned() + recipient.to_str().unwrap()).unwrap();
output.into_inner()
}
\ No newline at end of file
......@@ -19,6 +19,8 @@ twox-hash = "1.5"
identity = { version = "0.3", path = "identity", package = "ratman-identity", features = ["digest", "random"] }
netmod = { version = "0.3", path = "netmod", package = "ratman-netmod"}
clockctrl = { version = "0.1", path = "../clockctrl" }
access-notifier = { path = "../access-notifier" }
[dev-dependencies]
netmod-mem = { path = "../netmod-mem" }
\ No newline at end of file
netmod-mem = { path = "../netmod-mem" }
bincode = "1.2"
\ No newline at end of file
......@@ -63,11 +63,15 @@ impl Collector {
/// Queue the work, and spawn a worker if required
pub(crate) async fn queue_and_spawn(&self, seq: SeqId, f: Frame) {
println!("Queuing work");
self.state.queue(seq, f).await;
let mut map = self.workers.lock().await;
if !map.contains_key(&seq) {
map.insert(seq, Arc::new(Worker::new(seq, Arc::clone(&self.state))));
drop(map);
// This function tries to re-lock!
self.spawn_worker(seq).await;
}
}
......@@ -103,6 +107,8 @@ impl Collector {
};
task::spawn(async move {
println!("Spawning worker");
// This loop breaks when the worker is done
while let Some(()) = worker.poll().await {}
......@@ -189,89 +195,3 @@ fn queue_many() {
assert!(c.completed().await.id == seqid);
});
}
#[cfg(test)]
fn queue_test(num: usize) -> Arc<Collector> {
use netmod::{Recipient, SeqBuilder};
let (sender, recipient, seqid) = (Identity::random(), Identity::random(), Identity::random());
let seq = SeqBuilder::new(sender, Recipient::User(recipient), seqid)
.add(vec![1, 3, 1, 2])
.build();
task::block_on(async move {
let col = Collector::new();
let mut vec = vec![];
for f in seq.into_iter().cycle().take(num) {
let seqid = Identity::random();
col.queue(seqid, f).await;
vec.push(seqid);
}
assert!(col.num_queued().await == num);
for seqid in vec {
col.spawn_worker(seqid).await;
}
// Return the collector to be re-used
col
})
}
#[test]
fn queue_1000() {
queue_test(1000);
}
#[test]
fn queue_10000() {
queue_test(10000);
}
// This test is a bit bleh because each message sequence is only 1
// frame long. There should be better test at generating frame
// sequences, but we can always add that later.
#[cfg(test)]
fn queue_and_collect_test(num: usize) {
use std::time::Duration;
let col = queue_test(num);
task::block_on(async move {
while col.num_completed().await != num {
println!("Completed: {}", col.num_completed().await);
task::sleep(Duration::from_millis(100)).await;
}
let mut vec = vec![];
for _ in 0..num {
vec.push(col.completed().await);
}
assert!(vec.len() == num);
});
}
#[test]
fn queue_and_collect_1000() {
queue_and_collect_test(1000);
}
#[test]
fn queue_and_collect_10000() {
queue_and_collect_test(10000);
}
#[test]
fn queue_and_collect_100000() {
queue_and_collect_test(100000);
}
#[test]
#[ignore]
fn queue_and_collect_1000000() {
queue_and_collect_test(1000000);
}
use super::Locked;
use crate::Message;
use async_std::{sync::Arc, task};
use netmod::{Frame, SeqId};
use std::{
collections::{BTreeMap, VecDeque},
time::Duration,
use access_notifier::AccessNotifier as Notifier;
use async_std::{
future::{self, Future},
pin::Pin,
sync::Arc,
task::{self, Poll},
};
use netmod::{Frame, SeqId};
use std::collections::{BTreeMap, VecDeque};
/// Local frame collector state holder
#[derive(Default)]
pub(super) struct State {
incoming: Locked<BTreeMap<SeqId, VecDeque<Frame>>>,
done: Locked<VecDeque<Message>>,
incoming: Notifier<Locked<Notifier<BTreeMap<SeqId, Notifier<VecDeque<Frame>>>>>>,
done: Locked<Notifier<VecDeque<Message>>>,
}
impl State {
......@@ -24,47 +27,55 @@ impl State {
/// Poll for completed messages from teh outside world
pub(super) async fn completed(&self) -> Message {
let done = Arc::clone(&self.done);
loop {
let mut vec = done.lock().await;
match vec.pop_front() {
Some(msg) => return msg,
None => {}
future::poll_fn(|ctx| {
let lock = &mut done.lock();
match unsafe { Pin::new_unchecked(lock).poll(ctx) } {
Poll::Ready(ref mut not) => match not.pop_front() {
Some(f) => Poll::Ready(f),
None => {
Notifier::register_waker(not, ctx.waker());
Poll::Pending
}
},
_ => Poll::Pending,
}
drop(vec);
task::sleep(Duration::from_millis(20)).await;
}
})
.await
}
/// Poll for new work on a particular frame sequence
pub(super) async fn get(&self, seq: &SeqId) -> Frame {
let incoming = Arc::clone(&self.incoming);
loop {
let mut map = incoming.lock().await;
match map.get_mut(seq) {
Some(ref mut vec) => match vec.pop_front() {
Some(msg) => return msg,
None => {}
future::poll_fn(|ctx| {
let lock = &mut incoming.lock();
match unsafe { Pin::new_unchecked(lock).poll(ctx) } {
Poll::Ready(ref mut map) => match map.get_mut(seq) {
Some(ref mut vec) if vec.len() > 0 => Poll::Ready(vec.pop_front().unwrap()),
Some(ref mut vec) => {
Notifier::register_waker(vec, ctx.waker());
Poll::Pending
}
None => unimplemented!(), // No work queue _should_ never happen
},
_ => {}
_ => Poll::Pending,
}
drop(map);
task::sleep(Duration::from_millis(20)).await;
}
})
.await
}
/// Yield a finished message to the state
pub(super) async fn finish(&self, msg: Message) {
self.done.lock().await.push_back(msg);
let mut done = self.done.lock().await;
done.push_back(msg);
Notifier::wake_if_waker(&mut *done);
}
/// Queue a new frame to the state
pub(super) async fn queue(&self, seq: SeqId, frame: Frame) {
self.incoming
.lock()
.await
.entry(seq)
.or_default()
.push_back(frame);
let mut map = self.incoming.lock().await;
let vec = map.entry(seq).or_default();
vec.push_back(frame);
Notifier::wake_if_waker(vec);
}
/// Get the current number of queued frames for diagnostic and testing
......
......@@ -27,9 +27,11 @@ impl Worker {
/// Poll for new frames to assemble from the frame pool
pub(crate) async fn poll(&self) -> Option<()> {
println!("Polling for new work to be done");
let frame = self.parent.get(&self.seq).await;
let mut buf = self.buf.lock().await;
println!("Joining frames");
if let Some(msg) = join_frames(&mut buf, frame) {
self.parent.finish(msg).await;
None
......
use async_std::{sync::{Arc, channel}, task};
use async_std::{
sync::{channel, Arc},
task,
};
use netmod::Recipient;
use crate::{
......@@ -49,7 +52,7 @@ impl Switch {
pub(crate) async fn add(&self, id: usize) {
self.ctrl.0.send(id).await;
}
/// Dispatches a long-running task to run the switching logic
pub(crate) fn run(self: Arc<Self>) {
task::spawn(async move {
......
......@@ -12,7 +12,7 @@ pub type MsgId = Identity;
/// payload is. The payload can be empty, which can be used to create
/// a ping, or using the 16 byte MsgId as payload. In these cases,
/// the sigature can also be empty.
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Message {
/// A random message ID
pub id: MsgId,
......
......@@ -148,6 +148,7 @@ pub(crate) type IoPair<T> = (Sender<T>, Receiver<T>);
pub use crate::{
data::{Message, MsgId},
error::{Error, Result},
netmod::Recipient,
};
pub use identity::{Identity, ID_LEN};
pub use netmod;
......
......@@ -9,3 +9,5 @@ description.
- [announce](./announce.rs) a test with three static nodes, sending
announcements.
- [very_simple_chat](./very_simple_chat.rs) an example of how to send
messages with payloads via Ratman
//! A simple chat app built on the Ratman router
//!
//! It doesn't actually implement chat logic, as that would be silly
//! (maybe another test could?), but shows how you can create messages
//! by taking some data structure, serialising it, and then addressing
//! the message to somewhere.
//!
//! As you can see the message isn't modified by the routing layer.
//! Still, you should use some mechanism to seal and sign your
//! payload. The "Identity" used for Sender and Recipient is 32
//! bytes: the right length for a curve25519 key!
use async_std::task;
use bincode;
use netmod_mem::MemMod;
use ratman::{Identity, Message, MsgId, Recipient, Result, Router};
use serde::{Deserialize, Serialize};
/// A message from someone
#[derive(Clone, Debug, Serialize, Deserialize)]
struct ChatMessage {
nick: String,
text: String,
}
impl ChatMessage {
fn to_msg(&self, sender: Identity, recp: Identity) -> Message {
let payload = bincode::serialize(self).unwrap();
let recipient = Recipient::User(recp);
Message {
id: MsgId::random(),
recipient,
sender,
payload,
sign: vec![],
}
}
}
async fn build_network() -> Result<()> {
// Build two channels in memory
let mm1 = MemMod::new();
let mm2_1 = MemMod::new();
let mm2_3 = MemMod::new();
let mm3 = MemMod::new();
mm1.link(&mm2_1);
mm2_3.link(&mm3);
// Initialise three empty routers
let r1 = Router::new();
let r2 = Router::new();
let r3 = Router::new();
// Attach endpoints so the topology is r1 - r2 - r3
r1.add_endpoint(mm1).await;
r2.add_endpoint(mm2_1).await;
r2.add_endpoint(mm2_3).await;
r3.add_endpoint(mm3).await;
// Create two users and add them to the routers
let u1 = dbg!(Identity::random());
r1.add_user(u1).await?;
let u3 = dbg!(Identity::random());
r3.add_user(u3).await?;
// And mark them "online"
r1.online(u1).await?;
r3.online(u3).await?;
// The routers will now start announcing their new users on the
// micro-network. You can now poll for new user discoveries.
assert_eq!(r1.discover().await, u3);
// We need some serialisation format. Let's use bincode
let hello = ChatMessage {
nick: "alice".into(),
text: "Hey bob, how are you?".into(),
};
// Create a message from Alice (u1) to Bob (u3)
let msg = hello.to_msg(u1, u3);
r1.send(msg.clone()).await?;
assert_eq!(r3.next().await, msg);
Ok(())
}
#[test]
fn very_simple_chat() {
task::block_on(build_network()).unwrap();
}