use std::{
collections::{BTreeSet, HashMap, HashSet},
hash::Hash,
io::Write,
mem,
sync::Arc,
};
use anyhow::Result;
use rand::{rngs::OsRng, Rng};
use tokio::sync::OwnedMutexGuard;
use super::{
session::{Builder, Session, SessionInfo},
types::{Message, SessionID},
};
use crate::common::{
crypto::signature,
namespace::Namespace,
sgx::{EnclaveIdentity, QuotePolicy},
time::insecure_posix_time,
};
pub type SharedSession<PeerID> = Arc<tokio::sync::Mutex<MultiplexedSession<PeerID>>>;
pub type SessionByTimeKey<PeerID> = (i64, PeerID, SessionID);
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("max concurrent sessions reached")]
MaxConcurrentSessions,
}
pub struct MultiplexedSession<PeerID> {
peer_id: PeerID,
session_id: SessionID,
inner: Session,
}
impl<PeerID> MultiplexedSession<PeerID> {
pub fn get_peer_id(&self) -> &PeerID {
&self.peer_id
}
pub fn set_peer_id(&mut self, peer_id: PeerID) {
self.peer_id = peer_id;
}
pub fn get_session_id(&self) -> &SessionID {
&self.session_id
}
pub fn info(&self) -> Option<Arc<SessionInfo>> {
self.inner.session_info()
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub async fn process_data<W: Write>(
&mut self,
data: &[u8],
writer: W,
) -> Result<Option<Message>> {
self.inner.process_data(data, writer).await
}
pub fn write_message<W: Write>(&mut self, msg: Message, mut writer: W) -> Result<()> {
self.inner.write_message(msg, &mut writer)
}
pub fn get_remote_node(&self) -> Result<signature::PublicKey> {
self.inner.get_remote_node()
}
pub fn set_remote_node(&mut self, node: signature::PublicKey) -> Result<()> {
self.inner.set_remote_node(node)
}
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub fn is_unauthenticated(&self) -> bool {
self.inner.is_unauthenticated()
}
pub fn close(&mut self) {
self.inner.close()
}
}
pub struct SessionMeta<PeerID: Clone + Ord + Hash> {
peer_id: PeerID,
session_id: SessionID,
last_access_time: i64,
inner: SharedSession<PeerID>,
}
impl<PeerID> SessionMeta<PeerID>
where
PeerID: Clone + Ord + Hash,
{
fn by_time_key(&self) -> SessionByTimeKey<PeerID> {
(self.last_access_time, self.peer_id.clone(), self.session_id)
}
}
pub struct Sessions<PeerID: Clone + Ord + Hash> {
builder: Builder,
max_sessions: usize,
max_sessions_per_peer: usize,
stale_session_timeout: i64,
by_peer: HashMap<PeerID, HashMap<SessionID, SessionMeta<PeerID>>>,
by_idle_time: BTreeSet<SessionByTimeKey<PeerID>>,
}
impl<PeerID> Sessions<PeerID>
where
PeerID: Clone + Ord + Hash,
{
pub fn new(
builder: Builder,
max_sessions: usize,
max_sessions_per_peer: usize,
stale_session_timeout: i64,
) -> Self {
Self {
builder,
max_sessions,
max_sessions_per_peer,
stale_session_timeout,
by_peer: HashMap::new(),
by_idle_time: BTreeSet::new(),
}
}
pub fn set_builder(&mut self, builder: Builder) {
self.builder = builder;
}
pub fn update_enclaves(
&mut self,
enclaves: Option<HashSet<EnclaveIdentity>>,
) -> Vec<SharedSession<PeerID>> {
if self.builder.get_remote_enclaves() == &enclaves {
return vec![];
}
self.builder = mem::take(&mut self.builder).remote_enclaves(enclaves);
self.drain()
}
pub fn update_quote_policy(&mut self, policy: QuotePolicy) -> Vec<SharedSession<PeerID>> {
let policy = Some(Arc::new(policy));
if self.builder.get_quote_policy() == &policy {
return vec![];
}
self.builder = mem::take(&mut self.builder).quote_policy(policy);
self.drain()
}
pub fn update_runtime_id(&mut self, id: Option<Namespace>) -> Vec<SharedSession<PeerID>> {
if self.builder.get_remote_runtime_id() == &id {
return vec![];
}
self.builder = mem::take(&mut self.builder).remote_runtime_id(id);
self.drain()
}
pub fn create_responder(
&mut self,
peer_id: PeerID,
session_id: SessionID,
) -> MultiplexedSession<PeerID> {
if self.builder.get_quote_policy().is_none() {
let policy = self
.builder
.get_local_identity()
.as_ref()
.and_then(|id| id.quote_policy());
self.builder = mem::take(&mut self.builder).quote_policy(policy);
}
MultiplexedSession {
peer_id: peer_id.clone(),
session_id,
inner: self.builder.clone().build_responder(),
}
}
pub fn create_initiator(&self, peer_id: PeerID) -> MultiplexedSession<PeerID> {
let session_id = SessionID::random();
MultiplexedSession {
peer_id: peer_id.clone(),
session_id,
inner: self.builder.clone().build_initiator(),
}
}
pub fn get(
&mut self,
peer_id: &PeerID,
session_id: &SessionID,
) -> Option<SharedSession<PeerID>> {
let sessions = match self.by_peer.get_mut(peer_id) {
Some(sessions) => sessions,
None => return None,
};
let session = match sessions.get_mut(session_id) {
Some(session) => session,
None => return None,
};
Self::update_access_time(session, &mut self.by_idle_time);
Some(session.inner.clone())
}
pub fn find(&mut self, peer_ids: &[PeerID]) -> Option<SharedSession<PeerID>> {
match peer_ids.is_empty() {
true => self.find_any(),
false => self.find_one(peer_ids),
}
}
pub fn find_any(&mut self) -> Option<SharedSession<PeerID>> {
if self.by_idle_time.is_empty() {
return None;
}
for (_, peer_id, session_id) in self.by_idle_time.iter() {
let session = self
.by_peer
.get_mut(peer_id)
.unwrap()
.get_mut(session_id)
.unwrap();
if session.inner.clone().try_lock_owned().is_ok() {
Self::update_access_time(session, &mut self.by_idle_time);
return Some(session.inner.clone());
}
}
let n = OsRng.gen_range(0..self.by_idle_time.len());
let (_, peer_id, session_id) = self.by_idle_time.iter().nth(n).unwrap();
let session = self
.by_peer
.get_mut(peer_id)
.unwrap()
.get_mut(session_id)
.unwrap();
Self::update_access_time(session, &mut self.by_idle_time);
Some(session.inner.clone())
}
pub fn find_one(&mut self, peer_ids: &[PeerID]) -> Option<SharedSession<PeerID>> {
let mut all_sessions = vec![];
for peer_id in peer_ids.iter() {
let sessions = match self.by_peer.get_mut(peer_id) {
Some(sessions) => sessions,
None => return None,
};
let session = sessions
.values_mut()
.filter(|s| s.inner.clone().try_lock_owned().is_ok())
.min_by_key(|s| s.last_access_time);
if let Some(session) = session {
Self::update_access_time(session, &mut self.by_idle_time);
return Some(session.inner.clone());
}
for session in sessions.values() {
all_sessions.push((session.peer_id.clone(), session.session_id));
}
}
if all_sessions.is_empty() {
return None;
}
let n = OsRng.gen_range(0..all_sessions.len());
let (peer_id, session_id) = all_sessions.get(n).unwrap();
let session = self
.by_peer
.get_mut(peer_id)
.unwrap()
.get_mut(session_id)
.unwrap();
Self::update_access_time(session, &mut self.by_idle_time);
Some(session.inner.clone())
}
pub fn remove_for(
&mut self,
peer_id: &PeerID,
now: i64,
) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
if let Some(session) = self.remove_from(peer_id)? {
return Ok(Some(session));
}
self.remove_one(now)
}
pub fn remove_from(
&mut self,
peer_id: &PeerID,
) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
let sessions = match self.by_peer.get_mut(peer_id) {
Some(sessions) => sessions,
None => return Ok(None),
};
if sessions.len() < self.max_sessions_per_peer
&& self.by_idle_time.len() < self.max_sessions
{
return Ok(None);
}
let remove_session = sessions
.iter()
.min_by_key(|(_, s)| {
if let Ok(_inner) = s.inner.try_lock() {
s.last_access_time
} else {
i64::MAX }
})
.map(|(_, s)| s.inner.clone())
.ok_or(Error::MaxConcurrentSessions)?;
let session = match remove_session.try_lock_owned() {
Ok(inner) => inner,
Err(_) => return Err(Error::MaxConcurrentSessions), };
self.remove(&session);
Ok(Some(session))
}
pub fn remove_one(
&mut self,
now: i64,
) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
if self.by_idle_time.len() < self.max_sessions {
return Ok(None);
}
let mut remove_session: Option<OwnedMutexGuard<MultiplexedSession<PeerID>>> = None;
for (last_process_frame_time, peer_id, session_id) in self.by_idle_time.iter() {
if now.saturating_sub(*last_process_frame_time) < self.stale_session_timeout {
return Err(Error::MaxConcurrentSessions);
}
if let Some(sessions) = self.by_peer.get(peer_id) {
if let Some(session) = sessions.get(session_id) {
if let Ok(session) = session.inner.clone().try_lock_owned() {
remove_session = Some(session);
break;
}
}
}
}
let session = match remove_session {
Some(session) => session,
None => return Err(Error::MaxConcurrentSessions), };
self.remove(&session);
Ok(Some(session))
}
pub fn add(
&mut self,
session: MultiplexedSession<PeerID>,
now: i64,
) -> Result<SharedSession<PeerID>, Error> {
if self.by_idle_time.len() >= self.max_sessions {
return Err(Error::MaxConcurrentSessions);
}
let sessions = self.by_peer.entry(session.peer_id.clone()).or_default();
if sessions.len() >= self.max_sessions_per_peer {
return Err(Error::MaxConcurrentSessions);
}
let peer_id = session.peer_id.clone();
let session_id = session.session_id;
let session = SessionMeta {
inner: Arc::new(tokio::sync::Mutex::new(session)),
peer_id,
session_id,
last_access_time: now,
};
let inner = session.inner.clone();
self.by_idle_time.insert(session.by_time_key());
sessions.insert(session.session_id, session);
Ok(inner)
}
pub fn remove(&mut self, session: &OwnedMutexGuard<MultiplexedSession<PeerID>>) {
let sessions = self.by_peer.get_mut(&session.peer_id).unwrap();
let session_meta = sessions.get(&session.session_id).unwrap();
let key = session_meta.by_time_key();
sessions.remove(&session.session_id);
self.by_idle_time.remove(&key);
if sessions.is_empty() {
self.by_peer.remove(&session.peer_id);
}
}
pub fn drain(&mut self) -> Vec<SharedSession<PeerID>> {
self.by_idle_time.clear();
let mut all_sessions = vec![];
for (_, mut sessions) in self.by_peer.drain() {
for (_, session) in sessions.drain() {
all_sessions.push(session.inner);
}
}
all_sessions
}
fn update_access_time(
session: &mut SessionMeta<PeerID>,
by_idle_time: &mut BTreeSet<SessionByTimeKey<PeerID>>,
) {
by_idle_time.remove(&session.by_time_key());
session.last_access_time = insecure_posix_time();
by_idle_time.insert(session.by_time_key());
}
#[cfg(test)]
fn session_count(&self) -> usize {
self.by_idle_time.len()
}
#[cfg(test)]
fn peer_count(&self) -> usize {
self.by_peer.len()
}
}
#[cfg(test)]
mod test {
use crate::enclave_rpc::{session::Builder, types::SessionID};
use super::{Error, Sessions};
fn ids() -> (Vec<Vec<u8>>, Vec<SessionID>) {
let peer_ids: Vec<Vec<u8>> = (1..8).map(|x| vec![x]).collect();
let session_ids: Vec<SessionID> = (1..8).map(|_| SessionID::random()).collect();
(peer_ids, session_ids)
}
#[test]
fn test_add() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0], 1, 1, true),
(&peer_ids[0], &session_ids[1], 2, 1, true), (&peer_ids[0], &session_ids[2], 2, 1, false), (&peer_ids[1], &session_ids[0], 3, 2, true), (&peer_ids[2], &session_ids[2], 4, 3, true), (&peer_ids[3], &session_ids[3], 4, 3, false), ];
let now = 0;
for (peer_id, session_id, num_sessions, num_peers, created) in test_vector {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let res = sessions.add(session, now);
match created {
true => {
assert!(res.is_ok(), "session should be created");
let s = res.unwrap();
let s_owned = s.try_lock().unwrap();
assert_eq!(&s_owned.peer_id, peer_id);
assert_eq!(&s_owned.session_id, session_id);
}
false => {
assert!(res.is_err(), "session should not be created");
assert!(matches!(res, Err(Error::MaxConcurrentSessions)));
}
};
assert_eq!(sessions.session_count(), num_sessions);
assert_eq!(sessions.peer_count(), num_peers);
}
}
#[test]
fn test_get() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0], true),
(&peer_ids[0], &session_ids[1], false), (&peer_ids[1], &session_ids[0], false), (&peer_ids[1], &session_ids[1], false), ];
let now = 0;
for (peer_id, session_id, create) in test_vector {
if create {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
}
let maybe_s = sessions.get(peer_id, session_id);
match create {
true => {
assert!(maybe_s.is_some(), "session should exist");
let s = maybe_s.unwrap();
let s_owned = s.try_lock_owned().unwrap();
assert_eq!(&s_owned.peer_id, peer_id);
assert_eq!(&s_owned.session_id, session_id);
}
false => assert!(maybe_s.is_none(), "session should not exist"),
}
}
}
#[test]
fn test_find_any() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0]),
(&peer_ids[0], &session_ids[1]),
(&peer_ids[1], &session_ids[2]),
];
let maybe_s = sessions.find_any();
assert!(maybe_s.is_none(), "session should not be found");
let mut now = 0;
for (peer_id, session_id) in test_vector {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
now += 1
}
let maybe_s = sessions.find_any();
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s1_owned = s.try_lock_owned().unwrap(); assert_eq!(&s1_owned.peer_id, &peer_ids[0]);
assert_eq!(&s1_owned.session_id, &session_ids[0]);
let maybe_s = sessions.find_any();
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s2_owned = s.try_lock_owned().unwrap(); assert_eq!(&s2_owned.peer_id, &peer_ids[0]);
assert_eq!(&s2_owned.session_id, &session_ids[1]); let maybe_s = sessions.find_any();
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s3_owned = s.try_lock_owned().unwrap(); assert_eq!(&s3_owned.peer_id, &peer_ids[1]);
assert_eq!(&s3_owned.session_id, &session_ids[2]); let maybe_s = sessions.find_any();
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let res = s.try_lock_owned(); assert!(res.is_err(), "session should be in use");
drop(s2_owned);
let maybe_s = sessions.find_any();
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s_owned = s.try_lock_owned().unwrap(); assert_eq!(&s_owned.peer_id, &peer_ids[0]);
assert_eq!(&s_owned.session_id, &session_ids[1]);
}
#[test]
fn test_find_one() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
let test_vector = vec![
(&peer_ids[2], &session_ids[0]), (&peer_ids[0], &session_ids[0]),
(&peer_ids[3], &session_ids[1]), (&peer_ids[0], &session_ids[1]),
(&peer_ids[3], &session_ids[2]), (&peer_ids[1], &session_ids[2]),
(&peer_ids[2], &session_ids[2]), ];
let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_none(), "session should not be found");
let mut now = 0;
for (peer_id, session_id) in test_vector {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
now += 1
}
let maybe_s = sessions.find_one(&peer_ids[4..]);
assert!(maybe_s.is_none(), "session should not be found");
let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s1_owned = s.try_lock_owned().unwrap(); assert_eq!(&s1_owned.peer_id, &peer_ids[0]);
assert_eq!(&s1_owned.session_id, &session_ids[0]);
let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s2_owned = s.try_lock_owned().unwrap(); assert_eq!(&s2_owned.peer_id, &peer_ids[0]);
assert_eq!(&s2_owned.session_id, &session_ids[1]); let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s3_owned = s.try_lock_owned().unwrap(); assert_eq!(&s3_owned.peer_id, &peer_ids[1]);
assert_eq!(&s3_owned.session_id, &session_ids[2]); let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let res = s.try_lock_owned(); assert!(res.is_err(), "session should be in use");
drop(s2_owned);
let maybe_s = sessions.find_one(&peer_ids[0..2]);
assert!(maybe_s.is_some(), "session should be found");
let s = maybe_s.unwrap();
let s_owned = s.try_lock_owned().unwrap(); assert_eq!(&s_owned.peer_id, &peer_ids[0]);
assert_eq!(&s_owned.session_id, &session_ids[1]);
}
#[test]
fn test_remove_from() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0]),
(&peer_ids[1], &session_ids[1]),
(&peer_ids[2], &session_ids[2]),
(&peer_ids[2], &session_ids[3]), ];
let mut now = 0;
for (peer_id, session_id) in test_vector.clone() {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
now += 1;
}
let res = sessions.remove_from(&peer_ids[3]);
assert!(res.is_ok(), "remove_from should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_none(), "no sessions should be removed");
assert_eq!(sessions.session_count(), 4);
assert_eq!(sessions.peer_count(), 3);
let res = sessions.remove_from(&peer_ids[0]);
assert!(res.is_ok(), "remove_from should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_some(), "one session should be removed");
let s_owned = maybe_s_owned.unwrap();
assert_eq!(&s_owned.peer_id, &peer_ids[0]);
assert_eq!(&s_owned.session_id, &session_ids[0]);
assert_eq!(sessions.session_count(), 3);
assert_eq!(sessions.peer_count(), 2);
for peer_id in vec![&peer_ids[0], &peer_ids[1]] {
let res = sessions.remove_from(peer_id);
assert!(res.is_ok(), "remove_from should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_none(), "no sessions should be removed");
assert_eq!(sessions.session_count(), 3);
assert_eq!(sessions.peer_count(), 2);
}
let res = sessions.remove_from(&peer_ids[2]);
assert!(res.is_ok(), "remove_from should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_some(), "one session should be removed");
let s_owned = maybe_s_owned.unwrap();
assert_eq!(&s_owned.peer_id, &peer_ids[2]);
assert_eq!(&s_owned.session_id, &session_ids[2]);
assert_eq!(sessions.session_count(), 2);
assert_eq!(sessions.peer_count(), 2);
}
#[test]
fn test_remove_one() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0]),
(&peer_ids[1], &session_ids[1]),
(&peer_ids[2], &session_ids[2]),
(&peer_ids[2], &session_ids[3]), ];
let mut now = 0;
for (peer_id, session_id) in test_vector.clone() {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
now += 1;
}
now += 60 - 4 - 1;
let res = sessions.remove_one(now);
assert!(res.is_err(), "remove_one should fail");
assert!(matches!(res, Err(Error::MaxConcurrentSessions)));
assert_eq!(sessions.session_count(), 4);
assert_eq!(sessions.peer_count(), 3);
now += 1;
let res = sessions.remove_one(now);
assert!(res.is_ok(), "remove_one should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_some(), "one session should be removed");
let s_owned = maybe_s_owned.unwrap();
assert_eq!(&s_owned.peer_id, &peer_ids[0]);
assert_eq!(&s_owned.session_id, &session_ids[0]);
assert_eq!(sessions.session_count(), 3);
assert_eq!(sessions.peer_count(), 2);
now += 100;
let res = sessions.remove_one(now);
assert!(res.is_ok(), "remove_one should succeed");
let maybe_s_owned = res.unwrap();
assert!(maybe_s_owned.is_none(), "no sessions should be removed");
assert_eq!(sessions.session_count(), 3);
assert_eq!(sessions.peer_count(), 2);
}
#[test]
fn test_remove() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0], 3, 2),
(&peer_ids[1], &session_ids[1], 2, 1),
(&peer_ids[2], &session_ids[2], 1, 1),
(&peer_ids[2], &session_ids[3], 0, 0),
];
let now = 0;
for (peer_id, session_id, _, _) in test_vector.clone() {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
}
for (peer_id, session_id, num_sessions, num_peers) in test_vector {
let maybe_s = sessions.get(peer_id, session_id);
assert!(maybe_s.is_some(), "session should exist");
let s = maybe_s.unwrap();
let s_owned = s.try_lock_owned().unwrap();
sessions.remove(&s_owned);
assert_eq!(sessions.session_count(), num_sessions);
assert_eq!(sessions.peer_count(), num_peers);
}
}
#[test]
fn test_clear() {
let (peer_ids, session_ids) = ids();
let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
let test_vector = vec![
(&peer_ids[0], &session_ids[0]),
(&peer_ids[1], &session_ids[1]),
(&peer_ids[2], &session_ids[2]),
(&peer_ids[2], &session_ids[3]),
];
let now = 0;
for (peer_id, session_id) in test_vector.clone() {
let session = sessions.create_responder(peer_id.clone(), session_id.clone());
let _ = sessions.add(session, now);
}
let removed_sessions = sessions.drain();
assert_eq!(removed_sessions.len(), 4);
assert_eq!(sessions.session_count(), 0);
assert_eq!(sessions.peer_count(), 0);
}
}