Commit 114b7104 authored by Katharina Fey's avatar Katharina Fey

qrpc-sdk: updating docs, fixing socket::send_msg abstraction

parent 76d13164
......@@ -1462,6 +1462,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
name = "ping"
version = "0.1.0"
dependencies = [
"async-std",
"qrpc-sdk",
]
......@@ -1651,6 +1652,7 @@ dependencies = [
"netmod-tcp",
"netmod-udp",
"pnet",
"qrpc-sdk",
"ratman",
"ratman-configure",
"tracing",
......
......@@ -12,6 +12,7 @@ netmod-tcp = { path = "../../netmods/netmod-tcp" }
netmod-udp = { path = "../../netmods/netmod-udp" }
ratman = { path = "../../ratman" }
ratman-configure = { path = "../../ratman/configure" }
qrpc-sdk = { path = "../../rpc-layer/qrpc-sdk" }
async-std = { version = "=1.5", features = ["attributes"] }
clap = { version = "2.0", features = ["wrap_help", "color"] }
......
//! A set of type builders for the basic qrpc-sdk
//!
//! It's recommended to write similar abstraction layers in your own
//! crate, so to make it easy for other third-party developers to use
......@@ -7,6 +7,7 @@
//! In your service you will likely not need to consume this API. It
//! is included for debugging purposes.
use crate::Service;
use identity::Identity;
/// Generate an registry message for this service
......
//! This library provides the basic capabilities of interacting with a
//! qrpc-broker, and other qaul services. These docs outline API
//! usage and concrete types. For an overview of concepts, consult
//! the [contributors manual][manual]
//! A toolkit for writing clients on the qrpc message bus. This bus
//! is the backbone of the [qaul.net](https://qaul.net) service
//! ecosystem. With it you can create applications (services) that
//! interact with `libqaul`, and other services on the same message
//! broker.
//!
//! These crate docs describe the API and basic usage. For an
//! overview of the core concepts of this ecosystem, consult the
//! [contributors manual][manual]
//!
//! [manual]: https://docs.qaul.net/contributors/technical/rpc-layer
//!
//! ## Using this sdk
//!
//! In order to interact with a running qrpc-broker instance your
//! service needs to register itself and it's capabilities. This
//! mechanism is handled by this sdk.
//! In order to interact with a running [`qrpc-broker`] instance your
//! service needs to register itself and it's capabilities.
//!
//! First your service will need a place to save some state, composing
//! First your service needs a place to save some state, composing
//! different parts of this sdk together to create an app. You create
//! a [`Service`] and [`RpcSocket`] and connect to the rpc-broker
//! socket. First you will have to call `register(...)` on the
//! `Service`, before any messages can be relayed to you.
//!
//! [`qrpc-broker`]: ../qrpc_broker/index.html
//! [`Service`]: ./struct.Service.html
//! [`RpcSocket`]: ./struct.RpcSocket.html
//!
//! ```
//! # fn foo() -> Result<(), Box<std::error::Error>> {
//! use qrpc_sdk::{Service, RpcSocket, default_socket_path};
//!
//! let serv = Service::new("com.example.myapp", 1, "A simple app");
//! let sockt = RpcSocket::new(default_socket_path())?;
//! serv.register(sock)?;
//! println!("Service registered! Hash ID: {}", serv.hash_id().unwrap());
//! # }
//! ```
//!
//! Include the client-lib of the component you want to connect to,
//! and call `establish_connection()`, privded by
//! [`ServiceConnector`]. This will establish a connection with the
......@@ -59,7 +78,7 @@ pub mod types {
/// directly. Instead use the main API of the crate which invoces
/// these types internally
pub mod rpc {
pub use crate::carrier_capnp::{register, unregister, upgrade, carrier};
pub use crate::carrier_capnp::{carrier, register, unregister, upgrade};
}
pub mod builders;
......
......@@ -13,17 +13,6 @@ fn _socket(s: &Service) -> &Arc<RpcSocket> {
/// Use this struct to handle RPC connections to the network, and to
/// update any data you want your service to broadcast to other
/// participants on the QRPC system.
///
/// ```
/// let my_serv = Service::new(
/// "de.spacekookie.myapp",
/// 1,
/// "An app that does things!");
///
/// // Nothing happened yet. Connect to QRPC and register yourself
/// ```
///
///
pub struct Service {
name: String,
version: u16,
......@@ -48,13 +37,22 @@ impl Service {
}
/// Register this service with the RPC broker/ libqaul
pub async fn register(&mut self, socket: RpcSocket) -> RpcResult<Identity> {
self.socket = Some(Arc::new(socket));
pub async fn register(&mut self, socket: Arc<RpcSocket>) -> RpcResult<Identity> {
self.socket = Some(socket);
let (target, reg_msg) = builders::register(&self);
use crate::rpc::register;
_socket(self)
.send_msg(target, reg_msg, async { todo!() })
.send_msg(target, reg_msg, |reader| {
let r: register::Reader = reader.get_root().unwrap();
todo!()
})
.await
}
/// Get the `hash_id` field of this service, if it's set
pub fn hash_id(&self) -> Option<Identity> {
self.hash_id
}
}
/// An external service that can be connected to
......
......@@ -3,8 +3,10 @@
use crate::{
builders,
errors::{RpcError, RpcResult},
io::MsgReader,
};
use async_std::{future, sync::Arc, task};
use capnp::traits::FromPointerReader;
use socket2::{Domain, SockAddr, Socket, Type};
use std::{
future::Future,
......@@ -96,26 +98,25 @@ impl RpcSocket {
/// need to provide a future to be run.
///
/// [`io`]: ./io/index.html
pub async fn send_msg<F, T, S>(
self: &Arc<Self>,
pub async fn send_msg<'s, F: 'static, T, S, M: 's>(
self: &'s Arc<Self>,
target: S,
msg: Vec<u8>,
handle: F,
) -> RpcResult<T>
where
F: Future<Output = RpcResult<T>> + Send + 'static,
F: Fn(MsgReader<'s, M>) -> RpcResult<T> + Send,
S: Into<String>,
M: FromPointerReader<'s>,
{
let msg = builders::_internal::to(target.into(), msg);
let _self = Arc::clone(self);
self.with_timeout(async move {
let (_, buf) = builders::_internal::from(&_self.inner);
todo!()
MsgReader::new(buf).map(|ok| handle(ok))
})
.await;
todo!()
.await?
.map_err(|_| RpcError::Other("Serialisation failure!".into()))?
}
/// Check if the socket is still running
......
......@@ -7,10 +7,10 @@ edition = "2018"
[dependencies]
identity = { path = "../../ratman/identity", version = "0.4", package = "ratman-identity" }
qrpc-sdk = { path = "../qrpc-sdk", version = "0.1" }
async-std = "=1.5"
byteorder = "1.0"
capnp = "0.13"
qrpc-sdk = { path = "../qrpc-sdk" }
socket2 = { version = "0.3", features = ["unix"] }
tracing = "0.1"
\ No newline at end of file
//! An extensible rpc message broker for the libqaul ecosystem.
mod socket;
use qrpc_sdk::{default_socket_path, RpcSocket};
use std::{path::PathBuf, sync::Arc};
/// Hold the main broker state
pub struct Broker {}
pub struct Broker {
sock: Arc<RpcSocket>,
}
impl Broker {
pub fn new<P: Into<PathBuf>>(path: Option<P>) -> Self {
let path = path.map(|p| p.into()).unwrap_or(default_socket_path());
let sock = RpcSocket::new(path).unwrap();
Self { sock }
}
}
// #[test]
// fn make_it_just_work_please() {
......
//! Wrapper module to read and write to and from an RPC socket
//!
use crate::UtilReader;
use async_std::{sync::Arc, task};
use byteorder::{BigEndian, ByteOrder};
use socket2::{Domain, SockAddr, Socket, Type};
use std::{
io::Result,
path::Path,
sync::atomic::{AtomicBool, Ordering},
};
use qrpc_sdk::types::rpc_broker::service;
pub(crate) struct RpcSocket {
inner: Socket,
run: AtomicBool,
}
impl RpcSocket {
pub(crate) fn new<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
let run = AtomicBool::from(true);
let mut inner = Socket::new(
Domain::unix(),
Type::seqpacket(), // this _may_ not be supported on MacOS
None,
)?;
inner.connect(&SockAddr::unix(path)?)?;
inner.listen(128)?;
Ok(Arc::new(Self { inner, run }).spawn())
}
/// Spawn a worker to read messages from the socket
fn spawn(self: Arc<Self>) -> Arc<Self> {
let s = Arc::clone(&self);
task::spawn(async move {
while s.run.load(Ordering::Relaxed) {
while let Ok((sock, addr)) = s.inner.accept() {
let arc = Arc::clone(&s);
task::spawn(arc.handle_socket(sock, addr));
}
}
});
self
}
/// Handle one socket connection
async fn handle_socket(self: Arc<Self>, sock: Socket, addr: SockAddr) {
while self.run.load(Ordering::Relaxed) {
let len = match read_length(&self.inner) {
Some(len) => len,
None => continue,
};
let mut buf = vec![0; len];
if self.inner.recv(buf.as_mut_slice()).is_err() && continue {}
let msg = UtilReader::new(buf).unwrap();
let root: service::Reader = msg.get_root().unwrap();
}
}
}
/// Read 8 bytes to get the message length
fn read_length(sock: &Socket) -> Option<usize> {
let mut len_buf = [0; 8];
sock.recv(&mut len_buf).ok()?;
Some(BigEndian::read_u64(&len_buf) as usize)
}
......@@ -6,4 +6,5 @@ authors = ["Katharina Fey <kookie@spacekookie.de>"]
edition = "2018"
[dependencies]
qrpc-sdk = { path = "../../rpc-layer/qrpc-sdk", version = "0.1" }
\ No newline at end of file
qrpc-sdk = { path = "../../rpc-layer/qrpc-sdk", version = "0.1" }
async-std = { version = "=1.5", features = ["attributes"] }
\ No newline at end of file
......@@ -6,21 +6,19 @@
//! considered documentation. If you find anything that is unclear to
//! you, or could be commented better, please send us a patch (or MR).
use qrpc_sdk::{RpcSocket, Service, default_socket_path};
use qrpc_sdk::{default_socket_path, RpcSocket, Service};
struct Ping {
inner: Service,
}
fn main() {
let ping = Ping {
inner: Service::new(
"net.qaul.ping",
1,
"A simple service that says hello to everybody on the network.",
),
};
let sock = RpcSocket::new(default_socket_path());
#[async_std::main]
async fn main() {
let mut serv = Service::new(
"net.qaul.ping",
1,
"A simple service that says hello to everybody on the network.",
);
let sock = RpcSocket::new(default_socket_path()).unwrap();
serv.register(sock).await.unwrap();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment