Commit 3b0b41e3 authored by Katharina Fey's avatar Katharina Fey

libqaul/messages: adding sender type id concept

In previous iterations of the libqaul API there was the concept of a
group hard-coded into the message type.  Because this was somewhat
restrictive, and because it didn't actually matter to most services
who a message was sent to, this was removed.  Each message was only
identifiable via it's service, sender, tags or message ID.  The
message ID was assigned during send-off.

This caused a problem however.  Considering that some services would
implement their own, more specific group concepts on top of libqaul,
it might happen that the same message payload was sent off multiple
times, to different people.  But the fact that they were sent to
different people was an implementation details the service might now
care about!  This could lead to data duplication in the backing store,
as now every payload was being stored multiple times for different
message IDs.

To solve this, we are re-adding a more restrictive group concept into
libqaul:  ID types!  When sending a message, a service needs to
specify whether the message ID should be unique, or if it should be
bound to some constraint.  What this means is that for multi-recipient
dispatches, a service can set the id_type to group (via the helpful
`create_group()` function), at which point the data payload would only
be stored in the database ONCE, but sent of normally just the same.

However now we ran into some other problems.  Ratman (the
decentralised router) used the message ID of a service message to
label it's frame sequences for re-transmission hashes, and more.
Sending the same payload to many recipients, would actually result in
different payloads, because all messages going through Ratman are
encrypted: the ciphertext would differ.

The solution to this was simple: the message ID is now part of the
encrypted envelope (which also means that an attacker can't grasp
group relationships just by looking at the packet headers), and Ratman
generates a new message ID for each frame sequence.

While we have no tests for these scenarios yet, it is easy to imagine
a scenario in which this would have been a problem (i.e. any group).
In the future we will want to add some tests that fail before this
commit and don't after this commit.

The consumers of libqaul (namely libqaul-rpc, the services, and the
low-level binding test clients) have all been updated to the new API.
parent 8abb6d14
Pipeline #1042 failed with stages
in 3 minutes and 37 seconds
use futures::join;
use libqaul::{error::Result, helpers::TagSet, messages::Mode, Qaul};
use libqaul::{error::Result, helpers::TagSet, messages::{Mode, IdType}, Qaul};
use ratman::Router;
#[async_std::main]
......@@ -14,6 +14,7 @@ async fn main() -> Result<()> {
let send = msg.send(
user.clone(),
Mode::Flood,
IdType::unique(),
"de.spacekookie.myapp",
TagSet::empty(),
vec![1, 2, 3, 4],
......
......@@ -7,7 +7,7 @@ use {
futures::stream::StreamExt,
libqaul::{
helpers::TagSet,
messages::{Mode, MsgRef},
messages::{Mode, MsgRef, IdType},
Qaul,
},
linux_voice_test::event::Events,
......@@ -116,6 +116,7 @@ async fn run() {
.send(
user.clone(),
Mode::Std(dest.id),
IdType::unique(),
"HELLO",
TagSet::empty(),
Vec::new(),
......
......@@ -5,7 +5,7 @@ use async_trait::async_trait;
use libqaul::{
error::{Error, Result},
helpers::{Subscription, Tag},
messages::{Mode, MsgId, MsgQuery, MsgRef},
messages::{IdType, Mode, MsgId, MsgQuery, MsgRef},
users::UserAuth,
Qaul,
};
......@@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
pub struct Send {
auth: UserAuth,
mode: Mode,
id_type: IdType,
service: String,
#[serde(default)]
tags: Vec<Tag>,
......@@ -26,8 +27,17 @@ pub struct Send {
impl QaulRpc for Send {
type Response = Result<MsgId>;
async fn apply(self, qaul: &Qaul) -> Self::Response {
let Self {
auth,
mode,
id_type,
service,
tags,
payload,
} = self;
qaul.messages()
.send(self.auth, self.mode, self.service, self.tags, self.payload)
.send(auth, mode, id_type, service, tags, payload)
.await
}
}
......@@ -101,18 +101,18 @@ impl Chat {
user: UserAuth,
friends: Vec<Identity>,
name: Option<String>,
) -> Result<RoomId> {
) -> Result<Room> {
let friends = friends.into_iter().collect();
if let Some(id) = Room::check(self, user.clone(), &friends).await {
return Ok(id);
return self.get_room(user.clone(), id).await;
}
let room = Room::create(self, user.clone(), friends.clone(), name).await;
let room_id = room.id();
let payload = msg::gen_payload("", room);
msg::dispatch_to(self, user, friends, payload, room_id).await?;
Ok(room_id)
msg::dispatch_to(self, user.clone(), friends, payload, room_id).await?;
self.get_room(user, room_id).await
}
/// Send a normal chat message to a room
......@@ -121,7 +121,7 @@ impl Chat {
user: UserAuth,
room: RoomId,
content: String,
) -> Result<()> {
) -> Result<ChatMessage> {
let friends = self.rooms.get(user.clone(), room).await?.users;
let payload = msg::gen_payload(content, Room::resume(room));
msg::dispatch_to(self, user, friends, payload, room).await
......
......@@ -5,7 +5,7 @@ use async_std::sync::Arc;
use bincode::{deserialize, serialize};
use chrono::{Duration, Utc};
use libqaul::{
messages::{Message, Mode, MsgQuery},
messages::{IdType, Message, Mode, MsgQuery},
users::UserAuth,
Identity,
};
......@@ -74,6 +74,28 @@ pub(crate) fn gen_payload(content: impl Into<String>, room: RoomState) -> Vec<u8
serialize(&Meta { content, room }).unwrap()
}
/// Get a chat message via a specific Id
///
/// This function is very unstable and should only be called
/// immediately after inserting a message. On the flip-side, if this
/// function ever panics, it indicates a deeper problem in the service
/// or even libqaul code. This set of queries should never fail!
pub(crate) async fn fetch_chat_message(
serv: &Arc<Chat>,
user: UserAuth,
id: Identity,
) -> ChatMessage {
serv.qaul
.messages()
.query(user, ASC_NAME, MsgQuery::id(id))
.await
.unwrap()
.resolve()
.await
.remove(0)
.into()
}
/// Simple looping helper function that dispatches messages
pub(crate) async fn dispatch_to(
serv: &Arc<Chat>,
......@@ -81,9 +103,11 @@ pub(crate) async fn dispatch_to(
friends: BTreeSet<Identity>,
payload: Vec<u8>,
room: RoomId,
) -> Result<()> {
) -> Result<ChatMessage> {
trace!("Creating room with {:?}", friends);
let id_type = IdType::create_group();
for recp in friends {
// Skip self
if recp == user.0 {
......@@ -96,6 +120,7 @@ pub(crate) async fn dispatch_to(
.send(
user.clone(),
mode,
id_type,
ASC_NAME,
tags::room_id(room),
payload.clone(),
......@@ -103,7 +128,7 @@ pub(crate) async fn dispatch_to(
.await?;
}
Ok(())
Ok(fetch_chat_message(serv, user, id_type.consume()).await)
}
pub(crate) async fn subscribe_for(
......
......@@ -44,7 +44,7 @@ async fn rooms_for_different_people() -> Result<()> {
let mut rooms = net.b().chat.rooms(bob.clone()).await?;
assert!(rooms.len() == 1);
assert_eq!(rooms.remove(0).id, room_1);
assert_eq!(rooms.remove(0).id, room_1.id);
///// And do it again
......@@ -64,7 +64,7 @@ async fn rooms_for_different_people() -> Result<()> {
let mut rooms = net.b().chat.rooms(david.clone()).await?;
assert!(rooms.len() == 1);
assert_eq!(rooms.remove(0).id, room_2);
assert_eq!(rooms.remove(0).id, room_2.id);
Ok(())
}
......@@ -86,11 +86,11 @@ async fn send_messages_for_different_people() -> Result<()> {
zzz().await;
})
.await?;
let b_sub = net.b().chat.subscribe(bob.clone(), room_1).await?;
let b_sub = net.b().chat.subscribe(bob.clone(), room_1.id).await?;
net.a()
.chat
.send_message(alice.clone(), room_1, "Hello Bob, how are you?".into())
.send_message(alice.clone(), room_1.id, "Hello Bob, how are you?".into())
.await
.unwrap();
......@@ -98,9 +98,9 @@ async fn send_messages_for_different_people() -> Result<()> {
let mut rooms = net.b().chat.rooms(bob.clone()).await?;
assert!(rooms.len() == 1);
assert_eq!(rooms.remove(0).id, room_1);
assert_eq!(rooms.remove(0).id, room_1.id);
let msgs1 = net.b().chat.load_messages(bob.clone(), room_1).await?;
let msgs1 = net.b().chat.load_messages(bob.clone(), room_1.id).await?;
assert_eq!(msgs1[0].content, "".to_string());
assert_eq!(msgs1[1].content, "Hello Bob, how are you?".to_string());
......@@ -123,11 +123,11 @@ async fn send_messages_for_different_people() -> Result<()> {
zzz().await;
})
.await?;
let a_sub = net.a().chat.subscribe(charlie.clone(), room_2).await?;
let a_sub = net.a().chat.subscribe(charlie.clone(), room_2.id).await?;
net.b()
.chat
.send_message(david.clone(), room_2, "Hello Charlie, how are you?".into())
.send_message(david.clone(), room_2.id, "Hello Charlie, how are you?".into())
.await
.unwrap();
......@@ -135,9 +135,9 @@ async fn send_messages_for_different_people() -> Result<()> {
let mut rooms = net.a().chat.rooms(charlie.clone()).await?;
assert!(rooms.len() == 1);
assert_eq!(rooms.remove(0).id, room_2);
assert_eq!(rooms.remove(0).id, room_2.id);
let msgs2 = net.a().chat.load_messages(charlie.clone(), room_2).await?;
let msgs2 = net.a().chat.load_messages(charlie.clone(), room_2.id).await?;
assert_eq!(msgs2[0].content, "".to_string());
assert_eq!(msgs2[1].content, "Hello Charlie, how are you?".to_string());
Ok(())
......
......@@ -38,7 +38,7 @@ async fn create_room() -> Result<()> {
// Wait for user propagations
zzz().await;
let room_id = net
let room = net
.a()
.chat
.start_chat(alice.clone(), vec![bob.0], None)
......@@ -48,7 +48,7 @@ async fn create_room() -> Result<()> {
let mut rooms = net.b().chat.rooms(bob.clone()).await?;
assert!(rooms.len() == 1);
assert_eq!(rooms.remove(0).id, room_id);
assert_eq!(rooms.remove(0).id, room.id);
Ok(())
}
......@@ -65,21 +65,21 @@ async fn send_message() -> Result<()> {
// Wait for user propagations
zzz().await;
let room_id = net
let room = net
.a()
.chat
.start_chat(alice.clone(), vec![bob.0], None)
.await?;
println!("ROOM ID = {}", room_id);
println!("ROOM ID = {}", room.id);
zzz().await;
let room = net.b().chat.get_room(bob.clone(), room_id).await.unwrap();
let room = net.b().chat.get_room(bob.clone(), room.id).await.unwrap();
assert_eq!(room.users, vec![alice.0, bob.0].into_iter().collect());
net.b()
.chat
.send_message(bob.clone(), room_id, "Hello Alice, how are you?".into())
.send_message(bob.clone(), room.id, "Hello Alice, how are you?".into())
.await
.unwrap();
......@@ -88,7 +88,7 @@ async fn send_message() -> Result<()> {
let msg = net
.a()
.chat
.load_messages(alice.clone(), room_id)
.load_messages(alice.clone(), room.id)
.await
.unwrap()
.into_iter()
......@@ -111,7 +111,7 @@ async fn send_message_subscribe() -> Result<()> {
// Wait for user propagations
zzz().await;
let room_id = net
let room = net
.a()
.chat
.start_chat(alice.clone(), vec![bob.0], None)
......@@ -119,12 +119,12 @@ async fn send_message_subscribe() -> Result<()> {
zzz().await;
let room = net.b().chat.get_room(bob.clone(), room_id).await.unwrap();
let room = net.b().chat.get_room(bob.clone(), room.id).await.unwrap();
assert_eq!(room.users, vec![alice.0, bob.0].into_iter().collect());
net.b()
.chat
.send_message(bob.clone(), room_id, "Hello Alice, how are you?".into())
.send_message(bob.clone(), room.id, "Hello Alice, how are you?".into())
.await
.unwrap();
......@@ -132,7 +132,7 @@ async fn send_message_subscribe() -> Result<()> {
let sub = net
.a()
.chat
.subscribe(alice.clone(), room_id)
.subscribe(alice.clone(), room.id)
.await
.unwrap();
sub.next().await
......@@ -158,7 +158,7 @@ async fn change_room_name() -> Result<()> {
zzz().await;
let room_id = net
let room = net
.a()
.chat
.start_chat(alice.clone(), vec![bob.0], None)
......@@ -168,12 +168,12 @@ async fn change_room_name() -> Result<()> {
net.b()
.chat
.set_name(bob.clone(), room_id, room_name.clone())
.set_name(bob.clone(), room.id, room_name.clone())
.await?;
zzz().await;
let room = net.a().chat.get_room(alice, room_id).await?;
let room = net.a().chat.get_room(alice, room.id).await?;
assert_eq!(room.name, Some(room_name));
Ok(())
}
......@@ -193,7 +193,7 @@ async fn create_room_with_name() -> Result<()> {
zzz().await;
let room_id = net
let room = net
.a()
.chat
.start_chat(alice.clone(), vec![bob.0], Some(room_name.clone()))
......@@ -201,7 +201,7 @@ async fn create_room_with_name() -> Result<()> {
zzz().await;
let room = net.b().chat.get_room(bob, room_id).await?;
let room = net.b().chat.get_room(bob, room.id).await?;
assert_eq!(room.name, Some(room_name));
Ok(())
}
......@@ -3,7 +3,7 @@
use crate::{error::Result, tags, CallId, CallMessage, ASC_NAME};
use conjoiner;
use libqaul::{
messages::{Message, Mode, ID_LEN},
messages::{Message, Mode, ID_LEN, IdType},
users::UserAuth,
Identity, Qaul,
};
......@@ -20,6 +20,7 @@ impl CallMessage {
) -> Result<()> {
let messages = qaul.messages();
let payload = conjoiner::serialise(self).unwrap();
let id_type = IdType::create_group();
for dest in to {
if *dest == user.0 {
continue;
......@@ -29,6 +30,7 @@ impl CallMessage {
.send(
user.clone(),
Mode::Std(dest.clone()),
id_type,
ASC_NAME,
tags::call_id(call),
payload.clone(),
......@@ -50,7 +52,7 @@ impl CallMessage {
let messages = qaul.messages();
let payload = conjoiner::serialise(self).unwrap();
messages
.send(user, Mode::Std(to), ASC_NAME, tags::call_id(call), payload)
.send(user, Mode::Std(to), IdType::unique(), ASC_NAME, tags::call_id(call), payload)
.await?;
Ok(())
......
......@@ -9,7 +9,7 @@ use crate::{
use ratman::netmod::Recipient;
use serde::{Deserialize, Serialize};
use std::{convert::TryFrom, sync::Arc};
use std::sync::Arc;
/// A reference to an internally stored message object
pub type MsgRef = Arc<Message>;
......@@ -89,10 +89,62 @@ impl From<Mode> for Recipient {
}
}
/// Specify the id type for a message dispatch
///
/// Because libqaul doesn't implement recipient groups it's up to a
/// service to create useful categorisations for groups of users.
/// This means that a service might send the same message to different
/// users, that are then receiving technically different messages.
/// This can cause all sorts of issues for services because now the
/// database is keeping track of a message many times (for each user
/// it was sent to).
///
/// This is what this type aims to circumvent: a message id can be
/// randomised during delivery, or fixed as a group to ensure that a
/// set of messages are all assigned the same Id.
///
/// **This comes with some caveats:** when inserting into the
/// database, the message Id will already exist, and so further
/// messages will not be stored. If you are using the grouped
/// constraint on an unequal message set (meaning that payloads
/// differ), this will result in data loss!
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum IdType {
/// A unique message ID will be generated on dispatch
Unique,
/// Create a grouped message ID constraint
Grouped(MsgId),
}
impl IdType {
pub fn consume(self) -> MsgId {
match self {
Self::Unique => MsgId::random(),
Self::Grouped(id) => id,
}
}
/// Create an ID type that is constrained for a group
pub fn group(id: MsgId) -> Self {
Self::Grouped(id)
}
/// Create a new message group with a random Id
pub fn create_group() -> Self {
Self::Grouped(MsgId::random())
}
/// Create a new message ID for every message dispatched
pub fn unique() -> Self {
Self::Unique
}
}
/// A query interface for the local message store
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct MsgQuery {
pub(crate) id: Option<MsgId>,
pub(crate) sender: Option<Identity>,
pub(crate) tags: TagSet,
pub(crate) skip: usize,
......@@ -104,6 +156,14 @@ impl MsgQuery {
Self::default()
}
/// An override query, that only searches for a specific Id
///
/// Ignores all other values passed into the query
pub fn id(id: MsgId) -> Self {
let id = Some(id);
Self { id, ..Self::new() }
}
/// Query for messages by a specific sender
pub fn sender(self, sender: Identity) -> Self {
Self {
......@@ -194,6 +254,7 @@ impl<'qaul> Messages<'qaul> {
&self,
user: UserAuth,
mode: Mode,
id_type: IdType,
service: S,
tags: T,
payload: Vec<u8>,
......@@ -205,7 +266,7 @@ impl<'qaul> Messages<'qaul> {
let (sender, _) = self.q.auth.trusted(user)?;
let recipient = mode.into();
let associator = service.into();
let id = MsgId::random();
let id = id_type.consume();
let tags: TagSet = tags.into();
println!("Sending `{}` with tags {:?}", id, tags);
......@@ -220,20 +281,25 @@ impl<'qaul> Messages<'qaul> {
println!("Sending message with ID `{:?}`", id);
println!("Sending message to {:?}", recipient);
self.q
.messages
.insert_local(
sender,
Arc::new(Message {
id,
// Only insert the message into the store if the Id is unique!
if !self.q.messages.probe_id(sender, id).await {
self.q
.messages
.insert_local(
sender,
associator,
tags,
payload,
}),
mode,
)
.await;
Arc::new(Message {
id,
sender,
associator,
tags,
payload,
}),
mode,
)
.await;
assert!(self.q.messages.probe_id(sender, id).await);
}
MsgUtils::send(
&self.q.users,
......
......@@ -44,6 +44,37 @@ where
self.inner.skip(num);
}
/// Try to get a single element from the query iterator
pub async fn next(&self) -> Result<T> {
match self.inner.next().await {
Ok(Some(rec)) => Ok(rec.into()),
_ => Err(Error::NoData),
}
}
/// Take all elements from the iterator and drop invalid reads
///
/// This is a semi-destructive operation because read errors will
/// be dropped, but sometimes that is exactly what you want.
/// Because libqaul stores messages for users separately from the
/// "global" scope (i.e. flooded messages), a specific path query
/// will return two paths: one will fail, the other will not.
///
/// Instead of having to filter the query results manually, this
/// function does it internally and folds into an empty vector,
/// meaning: if both reads fail, the resolved set will be empty.
pub async fn resolve(&self) -> Vec<T> {
let mut vec = vec![];
for _ in 0..self.inner.remaining() {
if let Ok(t) = self.next().await {
vec.push(t);
}
}
vec
}
/// Take a number of items from the iterator to advance
///
/// If no more items are present, this function will return
......@@ -66,7 +97,11 @@ where
Ok(vec)
}
/// Take all elements from this result set at once
/// Take elements from the iterator until a read fails
///
/// Once a read fails that does't mean that there's no more data
/// left, just that a read was unsuccessful. If you want to
/// quietly drop errors, use `resolve()` instead!
pub async fn all(&self) -> Result<Vec<T>> {
let mut vec = vec![];
while let Ok(Some(rec)) = self.inner.next().await {
......
//! Network message types and utilities
// Public exports
pub use crate::api::messages::{Message, Mode, MsgId, MsgQuery, MsgRef, SigTrust, ID_LEN};
pub use crate::api::messages::{IdType, Message, Mode, MsgId, MsgQuery, MsgRef, SigTrust, ID_LEN};
mod store;
pub(crate) use self::store::{MsgStore, TAG_UNREAD};
......@@ -56,7 +56,10 @@ impl RatMessageProto {
let recipient = self.recipient;
RatMessage {
id,
// Ratman generates a new message ID here to keep the real
// message ID a secret and prevents header inspection to
// figure out who is talking to whom.
id: MsgId::random(),
sender,
recipient,
payload,
......@@ -86,7 +89,7 @@ impl MsgUtils {
/// Process incoming RATMAN message, verifying it's signature and payload
pub(crate) async fn process(msg: RatMessage, store: &UserStore) -> Result<Message> {
let RatMessage {
id,
id: _,
sender,
recipient,
payload,
......@@ -108,7 +111,7 @@ impl MsgUtils {
};
let Envelope {
id: _,
id,
sender: _,
associator,
payload,
......
......@@ -7,7 +7,7 @@ use crate::{
Identity,
};
use alexandria::{
query::Query,
query::{Query, QueryResult as AQResult},
utils::{Path, Tag, TagSet},
Library, Session, GLOBAL,
};
......@@ -95,6 +95,33 @@ impl MsgStore {
.unwrap();
}
/// Check if a message for a service with a particular Id exists
///
/// This is required to avoid duplicate insertions when sending
/// messages to a group of people from a service.
pub(crate) async fn probe_id(&self, user: Identity, msg_id: Identity) -> bool {
self.inner
.path_exists(user, msg_path(msg_id))
.await
.unwrap_or(false)
}
/// Query a message only via the message Id the path
pub(crate) async fn query_path(
&self,
user: Identity,
msg_id: Identity,
) -> QueryResult<Message> {
let q = Query::path(msg_path(msg_id));
// Make db query
let mut glb = self.inner.query_iter(GLOBAL, q.clone()).await.unwrap();
let usr = self.inner.query_iter(user, q).await.unwrap();
glb.merge(usr).unwrap();
QueryResult::new(glb)
}
/// Return items from alexandria via a user query
pub(crate) async fn query(
&self,
......@@ -102,6 +129,11 @@ impl MsgStore {
service: Service,
query: MsgQuery,
) -> QueryResult<Message> {
// Check if we are dealing with an Id query
if let Some(id) = query.id {
return self.query_path(user, id).await;
}
// Add the service tag to the set
let mut meta = match service {
Service::Name(s) => service_tag(s).into(),
......
......@@ -5,17 +5,19 @@ use harness::{millis, sec10, sec5, zzz};
use libqaul::{
helpers::TagSet,
messages::{Mode, MsgQuery},
messages::{IdType, Mode, MsgQuery},
users::UserAuth,
Identity, Qaul,
};
use std::{sync::Arc, time::Instant};
async fn send_simple(q: &Arc<Qaul>, auth: &UserAuth, target: Identity) -> Identity {
dbg!(q.messages()
dbg!(q
.messages()
.send(
auth.clone(),
Mode::Std(target),
IdType::unique(),
"net.qaul.testing",
TagSet::empty(),
vec![1 as u8, 3, 1, 2],
......@@ -102,3 +104,42 @@ async fn send_three() {
.await
.unwrap();
}
#[async_std::test]
async fn grouped_send_ids() {
let net = harness::init().await;
let auth_a = net.a().users().create("abc").await.unwrap();
let auth_b = net.b().users().create("abc").await.unwrap();
zzz(millis(2000)).await;
let id_type = IdType::group(Identity::random());
let id = net
.a()
.messages()
.send(
auth_a.clone(),
Mode::Std(auth_b.0),
id_type,
"net.qaul.testing",
TagSet::empty(),
vec![1 as u8, 3, 1, 2],
)
.await
.unwrap();