terminal-chat

Форк
0
/
stream_manager.rs 
93 строки · 2.2 Кб
1
use std::{ 
2
  io::{
3
    Write, BufReader
4
  }, 
5
  thread,
6
  sync::mpsc::{
7
    self, 
8
    Sender
9
  }
10
};
11
use anyhow::Result;
12

13
use crate::{managers::data_manager::DataManager, reader::StreamReader};
14

15
use super::manager::Manager;
16

17
pub trait StreamManager {
18
  fn process_connection(&mut self) -> Result<()>;
19
  fn process_disconnection(&mut self) -> Result<()>;
20
  fn send_data(&mut self, data: &str) -> Result<()>;
21
  fn process_signals(&mut self, sender: Sender<()>) -> Result<()>;
22
}
23

24
impl StreamManager for Manager {
25
  fn process_connection(&mut self) -> Result<()> {
26
    println!("Connection established - {}", self.connected_peer_addr);
27

28
    let auth_data = match BufReader::new(
29
      self.stream.try_clone()?
30
    ).read_signal() {
31
      Ok(v) => v,
32
      Err(_) => {
33
        self.process_disconnection()?;
34
        return Ok(())
35
      }
36
    };
37

38
    if self.auth(auth_data.clone()).is_err() {
39
      self.deny_auth()?;
40
      self.process_disconnection()?;
41
      return Ok(())
42
    }
43

44
    let (channel_sender, channel_receiver) = mpsc::channel::<()>();
45
    self.process_signals(channel_sender)?;
46
    
47
    self.process_messages_pool(channel_receiver)?;
48

49
    self.process_disconnection()?;
50
    Ok(())
51
  }
52

53
  fn process_disconnection(&mut self) -> Result<()> {
54
    if self.connected_user_username.is_some() {
55
      self.remove_user(self.connected_user_username.clone().unwrap())?;
56
    }
57
    println!("Connection closed - {}", self.connected_peer_addr);
58
    Ok(())
59
  }
60

61
  fn send_data(&mut self, data: &str) -> Result<()> {
62
    self.stream.write(data.as_bytes())?;
63
    Ok(())
64
  }
65

66
  fn process_signals(&mut self, sender: Sender<()>) -> Result<()> {
67
    let cloned_stream = self.stream.try_clone()?;
68
    let cloned_messages_pool = self.messages_pool.clone();
69

70
    thread::spawn(move || -> Result<()> {
71
      let mut reader = BufReader::new(cloned_stream.try_clone()?);
72
      loop {
73
        let data_from_socket = match reader.read_signal() {
74
          Ok(s) => s,
75
          Err(_) => {
76
            break;
77
          }
78
        };
79

80
        match Self::process_incoming_message(cloned_messages_pool.clone(), data_from_socket) {
81
          Ok(_) => (),
82
          Err(_) => println!("invalid message")
83
        };
84
      }
85

86
      sender.send(())?;
87

88
      Ok(())
89
    });
90

91
    Ok(())
92
  }
93
}

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.