oasis_core_runtime/enclave_rpc/
sessions.rs

1//! Session demultiplexer.
2use std::{
3    collections::{BTreeSet, HashMap, HashSet},
4    hash::Hash,
5    io::Write,
6    mem,
7    sync::Arc,
8};
9
10use anyhow::Result;
11use rand::{rngs::OsRng, Rng};
12use tokio::sync::OwnedMutexGuard;
13
14use super::{
15    session::{Builder, Session, SessionInfo},
16    types::{Message, SessionID},
17};
18use crate::common::{
19    crypto::signature,
20    namespace::Namespace,
21    sgx::{EnclaveIdentity, QuotePolicy},
22    time::insecure_posix_time,
23};
24
25/// Shared pointer to a multiplexed session.
26pub type SharedSession<PeerID> = Arc<tokio::sync::Mutex<MultiplexedSession<PeerID>>>;
27
28/// Key for use in the by-idle-time index.
29pub type SessionByTimeKey<PeerID> = (i64, PeerID, SessionID);
30
31/// Sessions error.
32#[derive(Debug, thiserror::Error)]
33pub enum Error {
34    #[error("max concurrent sessions reached")]
35    MaxConcurrentSessions,
36}
37
38/// A multiplexed session.
39pub struct MultiplexedSession<PeerID> {
40    /// Peer identifier (needed for resolution when only given the shared pointer).
41    peer_id: PeerID,
42    /// Session identifier (needed for resolution when only given the shared pointer).
43    session_id: SessionID,
44    /// The actual session.
45    inner: Session,
46}
47
48impl<PeerID> MultiplexedSession<PeerID> {
49    /// Return the session's peer ID.
50    pub fn get_peer_id(&self) -> &PeerID {
51        &self.peer_id
52    }
53
54    /// Set the session's peer ID.
55    pub fn set_peer_id(&mut self, peer_id: PeerID) {
56        self.peer_id = peer_id;
57    }
58
59    /// Return the session ID.
60    pub fn get_session_id(&self) -> &SessionID {
61        &self.session_id
62    }
63
64    /// Session information.
65    pub fn info(&self) -> Option<Arc<SessionInfo>> {
66        self.inner.session_info()
67    }
68
69    /// Whether the session is in closed state.
70    pub fn is_closed(&self) -> bool {
71        self.inner.is_closed()
72    }
73
74    /// Process incoming session data.
75    pub async fn process_data<W: Write>(
76        &mut self,
77        data: &[u8],
78        writer: W,
79    ) -> Result<Option<Message>> {
80        self.inner.process_data(data, writer).await
81    }
82
83    /// Write message to session and generate a response.
84    pub fn write_message<W: Write>(&mut self, msg: Message, mut writer: W) -> Result<()> {
85        self.inner.write_message(msg, &mut writer)
86    }
87
88    /// Return remote node identifier.
89    pub fn get_remote_node(&self) -> Result<signature::PublicKey> {
90        self.inner.get_remote_node()
91    }
92
93    /// Set the remote node identifier.
94    pub fn set_remote_node(&mut self, node: signature::PublicKey) -> Result<()> {
95        self.inner.set_remote_node(node)
96    }
97
98    /// Whether the session handshake has completed and the session
99    /// is in transport mode.
100    pub fn is_connected(&self) -> bool {
101        self.inner.is_connected()
102    }
103
104    /// Whether the session is in unauthenticated transport state. In this state the session can
105    /// only be used to transmit a close notification.
106    pub fn is_unauthenticated(&self) -> bool {
107        self.inner.is_unauthenticated()
108    }
109
110    /// Mark the session as closed.
111    ///
112    /// After the session is closed it can no longer be used to transmit
113    /// or receive messages and any such use will result in an error.
114    pub fn close(&mut self) {
115        self.inner.close()
116    }
117}
118
119/// Structure used for session accounting.
120pub struct SessionMeta<PeerID: Clone + Ord + Hash> {
121    /// Peer identifier.
122    peer_id: PeerID,
123    /// Session identifier.
124    session_id: SessionID,
125    /// Timestamp when the session was last accessed.
126    last_access_time: i64,
127    /// The shared session pointer that needs to be locked for access.
128    inner: SharedSession<PeerID>,
129}
130
131impl<PeerID> SessionMeta<PeerID>
132where
133    PeerID: Clone + Ord + Hash,
134{
135    /// Key for ordering in the by-idle-time index.
136    fn by_time_key(&self) -> SessionByTimeKey<PeerID> {
137        (self.last_access_time, self.peer_id.clone(), self.session_id)
138    }
139}
140
141/// Session indices and management operations.
142pub struct Sessions<PeerID: Clone + Ord + Hash> {
143    /// Session builder.
144    builder: Builder,
145    /// Maximum number of sessions.
146    max_sessions: usize,
147    /// Maximum number of sessions per peer.
148    max_sessions_per_peer: usize,
149    /// Stale session timeout (in seconds).
150    stale_session_timeout: i64,
151
152    /// A map of sessions for each peer.
153    by_peer: HashMap<PeerID, HashMap<SessionID, SessionMeta<PeerID>>>,
154    /// A set of all sessions, ordered by idle time.
155    by_idle_time: BTreeSet<SessionByTimeKey<PeerID>>,
156}
157
158impl<PeerID> Sessions<PeerID>
159where
160    PeerID: Clone + Ord + Hash,
161{
162    /// Create a new session management instance.
163    pub fn new(
164        builder: Builder,
165        max_sessions: usize,
166        max_sessions_per_peer: usize,
167        stale_session_timeout: i64,
168    ) -> Self {
169        Self {
170            builder,
171            max_sessions,
172            max_sessions_per_peer,
173            stale_session_timeout,
174            by_peer: HashMap::new(),
175            by_idle_time: BTreeSet::new(),
176        }
177    }
178
179    /// Set the session builder to use.
180    pub fn set_builder(&mut self, builder: Builder) {
181        self.builder = builder;
182    }
183
184    /// Update remote enclave identity verification in the session builder
185    /// and clear all sessions if the identity has changed.
186    pub fn update_enclaves(
187        &mut self,
188        enclaves: Option<HashSet<EnclaveIdentity>>,
189    ) -> Vec<SharedSession<PeerID>> {
190        if self.builder.get_remote_enclaves() == &enclaves {
191            return vec![];
192        }
193
194        self.builder = mem::take(&mut self.builder).remote_enclaves(enclaves);
195        self.drain()
196    }
197
198    /// Update quote policy used for remote quote verification in the session builder
199    /// and clear all sessions if the policy has changed.
200    pub fn update_quote_policy(&mut self, policy: QuotePolicy) -> Vec<SharedSession<PeerID>> {
201        let policy = Some(Arc::new(policy));
202        if self.builder.get_quote_policy() == &policy {
203            return vec![];
204        }
205
206        self.builder = mem::take(&mut self.builder).quote_policy(policy);
207        self.drain()
208    }
209
210    /// Update remote runtime ID for node identity verification in the session builder
211    /// and clear all sessions if the runtime ID has changed.
212    pub fn update_runtime_id(&mut self, id: Option<Namespace>) -> Vec<SharedSession<PeerID>> {
213        if self.builder.get_remote_runtime_id() == &id {
214            return vec![];
215        }
216
217        self.builder = mem::take(&mut self.builder).remote_runtime_id(id);
218        self.drain()
219    }
220
221    /// Create a new multiplexed responder session.
222    pub fn create_responder(
223        &mut self,
224        peer_id: PeerID,
225        session_id: SessionID,
226    ) -> MultiplexedSession<PeerID> {
227        // If no quote policy is set, use the local one.
228        if self.builder.get_quote_policy().is_none() {
229            let policy = self
230                .builder
231                .get_local_identity()
232                .as_ref()
233                .and_then(|id| id.quote_policy());
234
235            self.builder = mem::take(&mut self.builder).quote_policy(policy);
236        }
237
238        MultiplexedSession {
239            peer_id: peer_id.clone(),
240            session_id,
241            inner: self.builder.clone().build_responder(),
242        }
243    }
244
245    /// Create a new multiplexed initiator session.
246    pub fn create_initiator(&self, peer_id: PeerID) -> MultiplexedSession<PeerID> {
247        let session_id = SessionID::random();
248
249        MultiplexedSession {
250            peer_id: peer_id.clone(),
251            session_id,
252            inner: self.builder.clone().build_initiator(),
253        }
254    }
255
256    /// Fetch an existing session given its identifier.
257    pub fn get(
258        &mut self,
259        peer_id: &PeerID,
260        session_id: &SessionID,
261    ) -> Option<SharedSession<PeerID>> {
262        // Check if peer exists.
263        let sessions = self.by_peer.get_mut(peer_id)?;
264
265        // Check if the session exists. If so, return it.
266        let session = sessions.get_mut(session_id)?;
267
268        Self::update_access_time(session, &mut self.by_idle_time);
269
270        Some(session.inner.clone())
271    }
272
273    /// Fetch an existing session from one of the given peers. If no peers
274    /// are provided, a session from any peer will be returned.
275    pub fn find(&mut self, peer_ids: &[PeerID]) -> Option<SharedSession<PeerID>> {
276        match peer_ids.is_empty() {
277            true => self.find_any(),
278            false => self.find_one(peer_ids),
279        }
280    }
281
282    /// Fetch an existing session from any peer.
283    pub fn find_any(&mut self) -> Option<SharedSession<PeerID>> {
284        if self.by_idle_time.is_empty() {
285            return None;
286        }
287
288        // Check if there is a session that is not currently in use.
289        for (_, peer_id, session_id) in self.by_idle_time.iter() {
290            let session = self
291                .by_peer
292                .get_mut(peer_id)
293                .unwrap()
294                .get_mut(session_id)
295                .unwrap();
296
297            if session.inner.clone().try_lock_owned().is_ok() {
298                Self::update_access_time(session, &mut self.by_idle_time);
299                return Some(session.inner.clone());
300            }
301        }
302
303        // If all sessions are in use, return a random one.
304        let n = OsRng.gen_range(0..self.by_idle_time.len());
305        let (_, peer_id, session_id) = self.by_idle_time.iter().nth(n).unwrap();
306        let session = self
307            .by_peer
308            .get_mut(peer_id)
309            .unwrap()
310            .get_mut(session_id)
311            .unwrap();
312
313        Self::update_access_time(session, &mut self.by_idle_time);
314
315        Some(session.inner.clone())
316    }
317
318    /// Fetch an existing session from one of the given peers.
319    pub fn find_one(&mut self, peer_ids: &[PeerID]) -> Option<SharedSession<PeerID>> {
320        let mut all_sessions = vec![];
321
322        for peer_id in peer_ids.iter() {
323            let sessions = self.by_peer.get_mut(peer_id)?;
324
325            // Check if peer has a session that is not currently in use.
326            let session = sessions
327                .values_mut()
328                .filter(|s| s.inner.clone().try_lock_owned().is_ok())
329                .min_by_key(|s| s.last_access_time);
330
331            if let Some(session) = session {
332                Self::update_access_time(session, &mut self.by_idle_time);
333                return Some(session.inner.clone());
334            }
335
336            for session in sessions.values() {
337                all_sessions.push((session.peer_id.clone(), session.session_id));
338            }
339        }
340
341        if all_sessions.is_empty() {
342            return None;
343        }
344
345        // If all sessions are in use, return a random one.
346        let n = OsRng.gen_range(0..all_sessions.len());
347        let (peer_id, session_id) = all_sessions.get(n).unwrap();
348        let session = self
349            .by_peer
350            .get_mut(peer_id)
351            .unwrap()
352            .get_mut(session_id)
353            .unwrap();
354
355        Self::update_access_time(session, &mut self.by_idle_time);
356
357        Some(session.inner.clone())
358    }
359
360    /// Remove one session to free up a slot for the given peer.
361    pub fn remove_for(
362        &mut self,
363        peer_id: &PeerID,
364        now: i64,
365    ) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
366        if let Some(session) = self.remove_from(peer_id)? {
367            return Ok(Some(session));
368        }
369        self.remove_one(now)
370    }
371
372    /// Remove one existing session from the given peer if the peer has reached
373    /// the maximum number of sessions or if the total number of sessions exceeds
374    /// the global session limit.
375    pub fn remove_from(
376        &mut self,
377        peer_id: &PeerID,
378    ) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
379        // Check if peer exists.
380        let sessions = match self.by_peer.get_mut(peer_id) {
381            Some(sessions) => sessions,
382            None => return Ok(None),
383        };
384
385        // Check if the peer has max sessions or if no more sessions are available globally.
386        // If so, remove the oldest or return an error.
387        if sessions.len() < self.max_sessions_per_peer
388            && self.by_idle_time.len() < self.max_sessions
389        {
390            return Ok(None);
391        }
392
393        // Force close the oldest idle session.
394        let remove_session = sessions
395            .iter()
396            .min_by_key(|(_, s)| {
397                if let Ok(_inner) = s.inner.try_lock() {
398                    s.last_access_time
399                } else {
400                    i64::MAX // Session is currently in use.
401                }
402            })
403            .map(|(_, s)| s.inner.clone())
404            .ok_or(Error::MaxConcurrentSessions)?;
405
406        let session = match remove_session.try_lock_owned() {
407            Ok(inner) => inner,
408            Err(_) => return Err(Error::MaxConcurrentSessions), // All sessions are in use.
409        };
410
411        self.remove(&session);
412
413        Ok(Some(session))
414    }
415
416    /// Remove one stale session if the total number of sessions exceeds
417    /// the global session limit.
418    pub fn remove_one(
419        &mut self,
420        now: i64,
421    ) -> Result<Option<OwnedMutexGuard<MultiplexedSession<PeerID>>>, Error> {
422        // Check if there are too many sessions. If so, remove one or return an error.
423        if self.by_idle_time.len() < self.max_sessions {
424            return Ok(None);
425        }
426
427        // Attempt to prune stale sessions, starting with the oldest ones.
428        let mut remove_session: Option<OwnedMutexGuard<MultiplexedSession<PeerID>>> = None;
429
430        for (last_process_frame_time, peer_id, session_id) in self.by_idle_time.iter() {
431            if now.saturating_sub(*last_process_frame_time) < self.stale_session_timeout {
432                // This is the oldest session, all next ones will be more fresh.
433                return Err(Error::MaxConcurrentSessions);
434            }
435
436            // Fetch session and attempt to lock it.
437            if let Some(sessions) = self.by_peer.get(peer_id) {
438                if let Some(session) = sessions.get(session_id) {
439                    if let Ok(session) = session.inner.clone().try_lock_owned() {
440                        remove_session = Some(session);
441                        break;
442                    }
443                }
444            }
445        }
446
447        // Check if we found a session that can be removed.
448        let session = match remove_session {
449            Some(session) => session,
450            None => return Err(Error::MaxConcurrentSessions), // All stale sessions are in use.
451        };
452
453        self.remove(&session);
454
455        Ok(Some(session))
456    }
457
458    /// Add a session if there is an available spot.
459    pub fn add(
460        &mut self,
461        session: MultiplexedSession<PeerID>,
462        now: i64,
463    ) -> Result<SharedSession<PeerID>, Error> {
464        if self.by_idle_time.len() >= self.max_sessions {
465            return Err(Error::MaxConcurrentSessions);
466        }
467
468        let sessions = self.by_peer.entry(session.peer_id.clone()).or_default();
469        if sessions.len() >= self.max_sessions_per_peer {
470            return Err(Error::MaxConcurrentSessions);
471        }
472
473        let peer_id = session.peer_id.clone();
474        let session_id = session.session_id;
475
476        let session = SessionMeta {
477            inner: Arc::new(tokio::sync::Mutex::new(session)),
478            peer_id,
479            session_id,
480            last_access_time: now,
481        };
482        let inner = session.inner.clone();
483
484        self.by_idle_time.insert(session.by_time_key());
485        sessions.insert(session.session_id, session);
486
487        Ok(inner)
488    }
489
490    /// Remove a session that must be currently owned by the caller.
491    pub fn remove(&mut self, session: &OwnedMutexGuard<MultiplexedSession<PeerID>>) {
492        let sessions = self.by_peer.get_mut(&session.peer_id).unwrap();
493        let session_meta = sessions.get(&session.session_id).unwrap();
494        let key = session_meta.by_time_key();
495        sessions.remove(&session.session_id);
496        self.by_idle_time.remove(&key);
497
498        // If peer doesn't have any more sessions, remove the peer.
499        if sessions.is_empty() {
500            self.by_peer.remove(&session.peer_id);
501        }
502    }
503
504    /// Removes and returns all sessions.
505    pub fn drain(&mut self) -> Vec<SharedSession<PeerID>> {
506        self.by_idle_time.clear();
507
508        let mut all_sessions = vec![];
509        for (_, mut sessions) in self.by_peer.drain() {
510            for (_, session) in sessions.drain() {
511                all_sessions.push(session.inner);
512            }
513        }
514
515        all_sessions
516    }
517
518    fn update_access_time(
519        session: &mut SessionMeta<PeerID>,
520        by_idle_time: &mut BTreeSet<SessionByTimeKey<PeerID>>,
521    ) {
522        // Remove old idle time.
523        by_idle_time.remove(&session.by_time_key());
524
525        // Update idle time.
526        session.last_access_time = insecure_posix_time();
527        by_idle_time.insert(session.by_time_key());
528    }
529
530    /// Number of all sessions.
531    #[cfg(test)]
532    fn session_count(&self) -> usize {
533        self.by_idle_time.len()
534    }
535
536    /// Number of all peers.
537    #[cfg(test)]
538    fn peer_count(&self) -> usize {
539        self.by_peer.len()
540    }
541}
542
543#[cfg(test)]
544mod test {
545    use crate::enclave_rpc::{session::Builder, types::SessionID};
546
547    use super::{Error, Sessions};
548
549    fn ids() -> (Vec<Vec<u8>>, Vec<SessionID>) {
550        let peer_ids: Vec<Vec<u8>> = (1..8).map(|x| vec![x]).collect();
551        let session_ids: Vec<SessionID> = (1..8).map(|_| SessionID::random()).collect();
552
553        (peer_ids, session_ids)
554    }
555
556    #[test]
557    fn test_add() {
558        let (peer_ids, session_ids) = ids();
559        let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
560
561        let test_vector = vec![
562            (&peer_ids[0], &session_ids[0], 1, 1, true),
563            (&peer_ids[0], &session_ids[1], 2, 1, true), // Different session ID.
564            (&peer_ids[0], &session_ids[2], 2, 1, false), // Too many sessions per peer.
565            (&peer_ids[1], &session_ids[0], 3, 2, true), // Different peer ID.
566            (&peer_ids[2], &session_ids[2], 4, 3, true), // Different peer ID and session ID.
567            (&peer_ids[3], &session_ids[3], 4, 3, false), // Too many sessions.
568        ];
569
570        let now = 0;
571        for (peer_id, session_id, num_sessions, num_peers, created) in test_vector {
572            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
573            let res = sessions.add(session, now);
574            match created {
575                true => {
576                    assert!(res.is_ok(), "session should be created");
577                    let s = res.unwrap();
578                    let s_owned = s.try_lock().unwrap();
579                    assert_eq!(&s_owned.peer_id, peer_id);
580                    assert_eq!(&s_owned.session_id, session_id);
581                }
582                false => {
583                    assert!(res.is_err(), "session should not be created");
584                    assert!(matches!(res, Err(Error::MaxConcurrentSessions)));
585                }
586            };
587            assert_eq!(sessions.session_count(), num_sessions);
588            assert_eq!(sessions.peer_count(), num_peers);
589        }
590    }
591
592    #[test]
593    fn test_get() {
594        let (peer_ids, session_ids) = ids();
595        let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
596
597        let test_vector = vec![
598            (&peer_ids[0], &session_ids[0], true),
599            (&peer_ids[0], &session_ids[1], false), // Different peer ID.
600            (&peer_ids[1], &session_ids[0], false), // Different session ID.
601            (&peer_ids[1], &session_ids[1], false), // Different peer ID and session ID.
602        ];
603
604        let now = 0;
605        for (peer_id, session_id, create) in test_vector {
606            if create {
607                let session = sessions.create_responder(peer_id.clone(), session_id.clone());
608                let _ = sessions.add(session, now);
609            }
610
611            let maybe_s = sessions.get(peer_id, session_id);
612            match create {
613                true => {
614                    assert!(maybe_s.is_some(), "session should exist");
615                    let s = maybe_s.unwrap();
616                    let s_owned = s.try_lock_owned().unwrap();
617                    assert_eq!(&s_owned.peer_id, peer_id);
618                    assert_eq!(&s_owned.session_id, session_id);
619                }
620                false => assert!(maybe_s.is_none(), "session should not exist"),
621            }
622        }
623    }
624
625    #[test]
626    fn test_find_any() {
627        let (peer_ids, session_ids) = ids();
628        let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
629
630        let test_vector = vec![
631            (&peer_ids[0], &session_ids[0]),
632            (&peer_ids[0], &session_ids[1]),
633            (&peer_ids[1], &session_ids[2]),
634        ];
635
636        // No sessions.
637        let maybe_s = sessions.find_any();
638        assert!(maybe_s.is_none(), "session should not be found");
639
640        let mut now = 0;
641        for (peer_id, session_id) in test_vector {
642            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
643            let _ = sessions.add(session, now);
644            now += 1
645        }
646
647        // No sessions in use.
648        let maybe_s = sessions.find_any();
649        assert!(maybe_s.is_some(), "session should be found");
650        let s = maybe_s.unwrap();
651        let s1_owned = s.try_lock_owned().unwrap(); // Session now in use.
652        assert_eq!(&s1_owned.peer_id, &peer_ids[0]);
653        assert_eq!(&s1_owned.session_id, &session_ids[0]);
654
655        // One session in use.
656        let maybe_s = sessions.find_any();
657        assert!(maybe_s.is_some(), "session should be found");
658        let s = maybe_s.unwrap();
659        let s2_owned = s.try_lock_owned().unwrap(); // Session now in use.
660        assert_eq!(&s2_owned.peer_id, &peer_ids[0]);
661        assert_eq!(&s2_owned.session_id, &session_ids[1]); // Different session found.
662
663        // Two sessions in use.
664        let maybe_s = sessions.find_any();
665        assert!(maybe_s.is_some(), "session should be found");
666        let s = maybe_s.unwrap();
667        let s3_owned = s.try_lock_owned().unwrap(); // Session now in use.
668        assert_eq!(&s3_owned.peer_id, &peer_ids[1]);
669        assert_eq!(&s3_owned.session_id, &session_ids[2]); // Different session found.
670
671        // All sessions in use.
672        let maybe_s = sessions.find_any();
673        assert!(maybe_s.is_some(), "session should be found");
674        let s = maybe_s.unwrap();
675        let res = s.try_lock_owned(); // Session now in use.
676        assert!(res.is_err(), "session should be in use");
677
678        // Free one session.
679        drop(s2_owned);
680
681        // Two sessions in use.
682        let maybe_s = sessions.find_any();
683        assert!(maybe_s.is_some(), "session should be found");
684        let s = maybe_s.unwrap();
685        let s_owned = s.try_lock_owned().unwrap(); // Session now in use.
686        assert_eq!(&s_owned.peer_id, &peer_ids[0]);
687        assert_eq!(&s_owned.session_id, &session_ids[1]);
688    }
689
690    #[test]
691    fn test_find_one() {
692        let (peer_ids, session_ids) = ids();
693        let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
694
695        let test_vector = vec![
696            (&peer_ids[2], &session_ids[0]), // Incorrect peer.
697            (&peer_ids[0], &session_ids[0]),
698            (&peer_ids[3], &session_ids[1]), // Incorrect peer.
699            (&peer_ids[0], &session_ids[1]),
700            (&peer_ids[3], &session_ids[2]), // Incorrect peer.
701            (&peer_ids[1], &session_ids[2]),
702            (&peer_ids[2], &session_ids[2]), // Incorrect peer.
703        ];
704
705        // No sessions.
706        let maybe_s = sessions.find_one(&peer_ids[0..2]);
707        assert!(maybe_s.is_none(), "session should not be found");
708
709        let mut now = 0;
710        for (peer_id, session_id) in test_vector {
711            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
712            let _ = sessions.add(session, now);
713            now += 1
714        }
715
716        // Peers without sessions.
717        let maybe_s = sessions.find_one(&peer_ids[4..]);
718        assert!(maybe_s.is_none(), "session should not be found");
719
720        // No sessions in use.
721        let maybe_s = sessions.find_one(&peer_ids[0..2]);
722        assert!(maybe_s.is_some(), "session should be found");
723        let s = maybe_s.unwrap();
724        let s1_owned = s.try_lock_owned().unwrap(); // Session now in use.
725        assert_eq!(&s1_owned.peer_id, &peer_ids[0]);
726        assert_eq!(&s1_owned.session_id, &session_ids[0]);
727
728        // One session in use.
729        let maybe_s = sessions.find_one(&peer_ids[0..2]);
730        assert!(maybe_s.is_some(), "session should be found");
731        let s = maybe_s.unwrap();
732        let s2_owned = s.try_lock_owned().unwrap(); // Session now in use.
733        assert_eq!(&s2_owned.peer_id, &peer_ids[0]);
734        assert_eq!(&s2_owned.session_id, &session_ids[1]); // Different session found.
735
736        // Two sessions in use.
737        let maybe_s = sessions.find_one(&peer_ids[0..2]);
738        assert!(maybe_s.is_some(), "session should be found");
739        let s = maybe_s.unwrap();
740        let s3_owned = s.try_lock_owned().unwrap(); // Session now in use.
741        assert_eq!(&s3_owned.peer_id, &peer_ids[1]);
742        assert_eq!(&s3_owned.session_id, &session_ids[2]); // Different session found.
743
744        // All sessions in use.
745        let maybe_s = sessions.find_one(&peer_ids[0..2]);
746        assert!(maybe_s.is_some(), "session should be found");
747        let s = maybe_s.unwrap();
748        let res = s.try_lock_owned(); // Session now in use.
749        assert!(res.is_err(), "session should be in use");
750
751        // Free one session.
752        drop(s2_owned);
753
754        // Two sessions in use.
755        let maybe_s = sessions.find_one(&peer_ids[0..2]);
756        assert!(maybe_s.is_some(), "session should be found");
757        let s = maybe_s.unwrap();
758        let s_owned = s.try_lock_owned().unwrap(); // Session now in use.
759        assert_eq!(&s_owned.peer_id, &peer_ids[0]);
760        assert_eq!(&s_owned.session_id, &session_ids[1]);
761    }
762
763    #[test]
764    fn test_remove_from() {
765        let (peer_ids, session_ids) = ids();
766        let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
767
768        let test_vector = vec![
769            (&peer_ids[0], &session_ids[0]),
770            (&peer_ids[1], &session_ids[1]),
771            (&peer_ids[2], &session_ids[2]),
772            (&peer_ids[2], &session_ids[3]), // Max sessions per peer reached.
773                                             // Max sessions reached.
774        ];
775
776        let mut now = 0;
777        for (peer_id, session_id) in test_vector.clone() {
778            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
779            let _ = sessions.add(session, now);
780            now += 1;
781        }
782
783        // Removing one session from an unknown peer should have no effect,
784        // even if all global session slots are occupied.
785        let res = sessions.remove_from(&peer_ids[3]);
786        assert!(res.is_ok(), "remove_from should succeed");
787        let maybe_s_owned = res.unwrap();
788        assert!(maybe_s_owned.is_none(), "no sessions should be removed");
789        assert_eq!(sessions.session_count(), 4);
790        assert_eq!(sessions.peer_count(), 3);
791
792        // Removing one session for one of the existing peers should work
793        // as it should force evict an old session.
794        // Note that each peer has 2 available slots, but globally there are
795        // only 4 slots so if global slots are full this should trigger peer
796        // session eviction.
797        let res = sessions.remove_from(&peer_ids[0]);
798        assert!(res.is_ok(), "remove_from should succeed");
799        let maybe_s_owned = res.unwrap();
800        assert!(maybe_s_owned.is_some(), "one session should be removed");
801        let s_owned = maybe_s_owned.unwrap();
802        assert_eq!(&s_owned.peer_id, &peer_ids[0]);
803        assert_eq!(&s_owned.session_id, &session_ids[0]);
804        assert_eq!(sessions.session_count(), 3);
805        assert_eq!(sessions.peer_count(), 2);
806
807        // Removing another session should fail as one global session slot
808        // is available.
809        for peer_id in vec![&peer_ids[0], &peer_ids[1]] {
810            let res = sessions.remove_from(peer_id);
811            assert!(res.is_ok(), "remove_from should succeed");
812            let maybe_s_owned = res.unwrap();
813            assert!(maybe_s_owned.is_none(), "no sessions should be removed");
814            assert_eq!(sessions.session_count(), 3);
815            assert_eq!(sessions.peer_count(), 2);
816        }
817
818        // Removing one session from a peer with max sessions should succeed
819        // even if one global slot is available.
820        let res = sessions.remove_from(&peer_ids[2]);
821        assert!(res.is_ok(), "remove_from should succeed");
822        let maybe_s_owned = res.unwrap();
823        assert!(maybe_s_owned.is_some(), "one session should be removed");
824        let s_owned = maybe_s_owned.unwrap();
825        assert_eq!(&s_owned.peer_id, &peer_ids[2]);
826        assert_eq!(&s_owned.session_id, &session_ids[2]);
827        assert_eq!(sessions.session_count(), 2);
828        assert_eq!(sessions.peer_count(), 2);
829    }
830
831    #[test]
832    fn test_remove_one() {
833        let (peer_ids, session_ids) = ids();
834        let mut sessions = Sessions::new(Builder::default(), 4, 2, 60);
835
836        let test_vector = vec![
837            (&peer_ids[0], &session_ids[0]),
838            (&peer_ids[1], &session_ids[1]),
839            (&peer_ids[2], &session_ids[2]),
840            (&peer_ids[2], &session_ids[3]), // Max sessions reached.
841        ];
842
843        let mut now = 0;
844        for (peer_id, session_id) in test_vector.clone() {
845            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
846            let _ = sessions.add(session, now);
847            now += 1;
848        }
849
850        // Forward time (stale_session_timeout - test_vector.len() - 1).
851        now += 60 - 4 - 1;
852
853        // Removing one session should fail as there are none stale sessions.
854        let res = sessions.remove_one(now);
855        assert!(res.is_err(), "remove_one should fail");
856        assert!(matches!(res, Err(Error::MaxConcurrentSessions)));
857        assert_eq!(sessions.session_count(), 4);
858        assert_eq!(sessions.peer_count(), 3);
859
860        // Forward time.
861        now += 1;
862
863        // Removing one session should succeed as no session slots
864        // are available and there is one stale session.
865        let res = sessions.remove_one(now);
866        assert!(res.is_ok(), "remove_one should succeed");
867        let maybe_s_owned = res.unwrap();
868        assert!(maybe_s_owned.is_some(), "one session should be removed");
869        let s_owned = maybe_s_owned.unwrap();
870        assert_eq!(&s_owned.peer_id, &peer_ids[0]);
871        assert_eq!(&s_owned.session_id, &session_ids[0]);
872        assert_eq!(sessions.session_count(), 3);
873        assert_eq!(sessions.peer_count(), 2);
874
875        // Forward time.
876        now += 100;
877
878        // Removing one session should fail even though there are stale sessions
879        // because there is one session slot available.
880        let res = sessions.remove_one(now);
881        assert!(res.is_ok(), "remove_one should succeed");
882        let maybe_s_owned = res.unwrap();
883        assert!(maybe_s_owned.is_none(), "no sessions should be removed");
884        assert_eq!(sessions.session_count(), 3);
885        assert_eq!(sessions.peer_count(), 2);
886    }
887
888    #[test]
889    fn test_remove() {
890        let (peer_ids, session_ids) = ids();
891        let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
892
893        let test_vector = vec![
894            (&peer_ids[0], &session_ids[0], 3, 2),
895            (&peer_ids[1], &session_ids[1], 2, 1),
896            (&peer_ids[2], &session_ids[2], 1, 1),
897            (&peer_ids[2], &session_ids[3], 0, 0),
898        ];
899
900        let now = 0;
901        for (peer_id, session_id, _, _) in test_vector.clone() {
902            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
903            let _ = sessions.add(session, now);
904        }
905
906        for (peer_id, session_id, num_sessions, num_peers) in test_vector {
907            let maybe_s = sessions.get(peer_id, session_id);
908            assert!(maybe_s.is_some(), "session should exist");
909            let s = maybe_s.unwrap();
910            let s_owned = s.try_lock_owned().unwrap();
911
912            sessions.remove(&s_owned);
913            assert_eq!(sessions.session_count(), num_sessions);
914            assert_eq!(sessions.peer_count(), num_peers);
915        }
916    }
917
918    #[test]
919    fn test_clear() {
920        let (peer_ids, session_ids) = ids();
921        let mut sessions = Sessions::new(Builder::default(), 8, 2, 60);
922
923        let test_vector = vec![
924            (&peer_ids[0], &session_ids[0]),
925            (&peer_ids[1], &session_ids[1]),
926            (&peer_ids[2], &session_ids[2]),
927            (&peer_ids[2], &session_ids[3]),
928        ];
929
930        let now = 0;
931        for (peer_id, session_id) in test_vector.clone() {
932            let session = sessions.create_responder(peer_id.clone(), session_id.clone());
933            let _ = sessions.add(session, now);
934        }
935
936        let removed_sessions = sessions.drain();
937        assert_eq!(removed_sessions.len(), 4);
938        assert_eq!(sessions.session_count(), 0);
939        assert_eq!(sessions.peer_count(), 0);
940    }
941}