Commit d71dfce3 authored by Kaiden Fey's avatar Kaiden Fey Committed by Katharina Fey

qrpc-sdk: refactoring errors, implementing basic carrier messages

parent 2dfe05da
......@@ -1458,6 +1458,13 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ping"
version = "0.1.0"
dependencies = [
"qrpc-sdk",
]
[[package]]
name = "pkg-config"
version = "0.3.17"
......
......@@ -43,6 +43,9 @@ members = [
"netmods/netmod-udp",
"netmods/netmod-wd",
# Set of services in qaul.net
"services/ping",
# android build support
# "utils/android-support",
......
......@@ -36,7 +36,6 @@ use async_std::{
task,
};
use bincode::serialize;
use byteorder::{BigEndian, ByteOrder};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{net::SocketAddr, time::Duration};
use tracing::{error, trace};
......
......@@ -28,3 +28,8 @@ struct Upgrade {
resp @2 :Bool;
}
struct Carrier {
target @0 :Text;
data @1 :Data;
}
\ No newline at end of file
//! A set of type builders for the basic qrpc-sdk
//!
//! It's recommended to write similar abstraction layers in your own
//! crate, so to make it easy for other third-party developers to use
//! your service's APIs and types as easily as possible.
//!
//! In your service you will likely not need to consume this API. It
//! is included for debugging purposes.
use crate::{io::MsgReader, rpc::*, Service};
use byteorder::{BigEndian, ByteOrder};
use capnp::{message::Builder as Bld, serialize_packed};
use identity::Identity;
use socket2::Socket;
/// Generate an registry message for this service
pub fn register(service: &Service) -> (String, Vec<u8>) {
todo!()
}
/// Generate an unregistry message for this service
pub fn unregister(hash_id: Identity) -> (String, Vec<u8>) {
todo!()
}
pub fn upgrade(s: &Service, hash_id: Identity) -> (String, Vec<u8>) {
todo!()
}
/// This function is only included for debugging reasons. There's
/// basically no reason to call this function directly.
#[doc(hidden)]
pub fn _internal_to(target: String, data: Vec<u8>) -> Vec<u8> {
let mut msg = Bld::new_default();
let mut carrier = msg.init_root::<carrier::Builder>();
carrier.set_target(&target);
carrier.set_data(&data);
let mut buffer = vec![];
serialize_packed::write_message(&mut buffer, &msg).unwrap();
let len = buffer.len();
let mut message = vec![8];
BigEndian::write_u64(&mut message, len as u64);
message.append(&mut buffer);
message
}
pub fn _internal_from(socket: &Socket) -> (String, Vec<u8>) {
let mut len = vec![0; 8];
loop {
let (l, a) = socket.peek_from(&mut len).unwrap();
if l == 8 {
break;
}
}
let (_, _) = socket.recv_from(&mut len).unwrap();
let len = BigEndian::read_u64(&len);
let mut buffer = vec![0; len as usize];
socket.recv_from(&mut buffer).unwrap();
let msg = MsgReader::new(buffer).unwrap();
let carrier: carrier::Reader = msg.get_root().unwrap();
let target = carrier.get_target().unwrap();
let data = carrier.get_data().unwrap();
(target.to_string(), data.to_vec())
}
//! Service related error handling
//! RPC related error handling
pub type Result<T> = std::result::Result<T, ServiceError>;
pub type RpcResult<T> = Result<T, RpcError>;
/// A set of errors that occur when connecting to services
#[derive(Debug)]
pub enum ServiceError {
pub enum RpcError {
/// No such service was found by the broker
NoSuchService,
/// The service didn't reply within the timeout time
/// The selected recipient didn't reply within the timeout
///
/// This may indicate that the requested service has crashed, is
/// dealing with backpressure, or the broker is quietly dropping
/// requests.
ServiceBusy,
Timeout,
/// Tried connecting to a service that's already connected
AlreadyConnected,
/// Failed to perform action that requires a connection
......
//! Error handling
mod service;
pub use service::{Result as ServiceResult, ServiceError};
......@@ -10,6 +10,28 @@
//! In order to interact with a running qrpc-broker instance your
//! service needs to register itself and it's capabilities. This
//! mechanism is handled by this sdk.
//!
//! First your service will need a place to save some state, composing
//! different parts of this sdk together to create an app. You create
//! a [`Service`] and [`RpcSocket`] and connect to the rpc-broker
//! socket. First you will have to call `register(...)` on the
//! `Service`, before any messages can be relayed to you.
//!
//! Include the client-lib of the component you want to connect to,
//! and call `establish_connection()`, privded by
//! [`ServiceConnector`]. This will establish a connection with the
//! service to verify it's capability set. Your service will also
//! have to implement this mechanism to be usable by other services on
//! the RPC bus.
//!
//! After that you can call functions on the public API type of the
//! component. You can get a copy of it via your service handle:
//! `service.component("net.qaul.libqaul")`.
//!
//! If you want to see a minimal example of the smallest functional
//! service, see the [`ping`] crate.
//!
//! [`ping`]: https://git.open-communication.net/qaul/qaul.net/-/tree/develop/services%2Fping/
pub mod io;
......@@ -37,11 +59,13 @@ pub mod types {
/// directly. Instead use the main API of the crate which invoces
/// these types internally
pub mod rpc {
pub use crate::carrier_capnp::{register, unregister, upgrade};
pub use crate::carrier_capnp::{register, unregister, upgrade, carrier};
}
pub mod builders;
pub mod errors;
mod service;
mod socket;
pub use service::Service;
pub use socket::{default_socket_path, RpcSocket};
use crate::errors::ServiceResult as Result;
use crate::errors::RpcResult;
use crate::{builders, RpcSocket};
use identity::Identity;
use std::sync::Arc;
/// Access the socket stored in a service
fn _socket(s: &Service) -> &Arc<RpcSocket> {
s.socket.as_ref().unwrap()
}
/// A service representation on the qrpc system
///
/// Use this struct to handle RPC connections to the network, and to
......@@ -15,12 +21,15 @@ use std::sync::Arc;
/// "An app that does things!");
///
/// // Nothing happened yet. Connect to QRPC and register yourself
///
/// ```
///
///
pub struct Service {
name: String,
version: u16,
description: String,
hash_id: Option<Identity>,
socket: Option<Arc<RpcSocket>>,
}
impl Service {
......@@ -34,12 +43,17 @@ impl Service {
version,
description: description.into(),
hash_id: None,
socket: None,
}
}
/// Register this service with the RPC broker/ libqaul
pub async fn register(&mut self) -> Option<()> {
None
pub async fn register(&mut self, socket: RpcSocket) -> RpcResult<Identity> {
self.socket = Some(Arc::new(socket));
let (target, reg_msg) = builders::register(&self);
_socket(self)
.send_msg(target, reg_msg, async { todo!() })
.await
}
}
......@@ -57,7 +71,7 @@ impl Service {
#[async_trait::async_trait]
pub trait ServiceConnector: Default {
/// Start a connection to the service backend
async fn establish_connection(self: Arc<Self>) -> Result<()>;
async fn establish_connection(self: Arc<Self>) -> RpcResult<()>;
/// Terminate the connection to the service backend
async fn terminate_connection(self: Arc<Self>) -> Result<()>;
async fn terminate_connection(self: Arc<Self>) -> RpcResult<()>;
}
//! An internal abstraction over the RPC socket
use crate::{
builders,
errors::{RpcError, RpcResult},
};
use async_std::{future, sync::Arc, task};
use byteorder::{BigEndian, ByteOrder};
use socket2::{Domain, SockAddr, Socket, Type};
use std::{
future::Future,
io::Result,
path::Path,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
/// Get the location the qrpc socket _should_ be by default
///
/// This default can be overridden, though! It's safer to make this
/// option configurable for the user, instead of only relying on the
/// default.
pub fn default_socket_path() -> PathBuf {
PathBuf::from("/run/user/1000/qrpc.socket")
}
/// A qrpc connection wrapper
///
/// This type wraps a UNIX socket connection to a remote client. By
......@@ -21,6 +33,7 @@ use std::{
/// that your service can be used by other services)
pub struct RpcSocket {
inner: Socket,
addr: SockAddr,
run: AtomicBool,
listening: AtomicBool,
timeout: Duration,
......@@ -41,15 +54,17 @@ impl RpcSocket {
/// Setup is the same as when calling `new`, except that you can
/// choose an explicit timeout, instead of the default.
pub fn with_duration<P: AsRef<Path>>(path: P, timeout: Duration) -> Result<Arc<Self>> {
let addr = SockAddr::unix(path)?;
let mut inner = Socket::new(
Domain::unix(),
Type::seqpacket(), // this _may_ not be supported on MacOS
None,
)?;
inner.connect(&SockAddr::unix(path)?)?;
inner.connect(&addr)?;
Ok(Arc::new(Self {
inner,
addr,
timeout,
run: AtomicBool::from(true),
listening: AtomicBool::from(false),
......@@ -68,6 +83,44 @@ impl RpcSocket {
task::spawn(fut);
}
/// Send a binary payload message to a specific service.
///
/// This function needs to be called by your service when mapping
/// your public API to the RPC layer. Internally all requests
/// will be proxied, and parsed by your service backend.
///
/// Use the message builder functions available in [`io`] to
/// construct a correctly packed and compressed message.
///
/// In order to react to the response sent by the other side, you
/// need to provide a future to be run.
///
/// [`io`]: ./io/index.html
pub async fn send_msg<F, T, S>(
self: &Arc<Self>,
target: S,
msg: Vec<u8>,
handle: F,
) -> RpcResult<T>
where
F: Future<Output = RpcResult<T>> + Send + 'static,
S: Into<String>,
{
let msg = builders::_internal_to(target.into(), msg);
let _self = Arc::clone(self);
self.with_timeout(async move {
// let len = match _self.inner.send_to(&msg, &_self.addr) {
// Ok(l) => l,
// Err(e) => return Err(RpcError::Other(e.to_string())),
// };
todo!()
})
.await;
todo!()
}
/// Check if the socket is still running
///
/// Use this function in your service's listening code to
......@@ -81,15 +134,13 @@ impl RpcSocket {
self.listening.load(Ordering::Relaxed)
}
pub async fn send_msg(self: &Arc<Self>) -> Option<()> {
None
}
/// Drive a future to completion with a timeout
async fn with_timeout<T, F>(&self, fut: F) -> Option<T>
async fn with_timeout<T, F>(&self, fut: F) -> RpcResult<T>
where
F: Future<Output = T> + Send + 'static,
{
future::timeout(self.timeout.clone(), fut).await.ok()
future::timeout(self.timeout.clone(), fut)
.await
.map_err(|_| RpcError::Timeout)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment