Ton

Форк
0
/
http-connection.cpp 
267 строк · 7.7 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2019-2020 Telegram Systems LLP
18
*/
19
#include "http-connection.h"
20

21
namespace ton {
22

23
namespace http {
24

25
void HttpConnection::loop() {
26
  if (in_loop_) {
27
    return;
28
  }
29
  in_loop_ = true;
30
  auto status = [&] {
31
    while (true) {
32
      LOG(DEBUG) << "loop(): in=" << buffered_fd_.left_unread() << " out=" << buffered_fd_.left_unwritten();
33
      bool is_eof = td::can_close(buffered_fd_);
34
      bool read_eof = false;
35

36
      bool written = false;
37
      bool read = false;
38
      if (is_eof || buffered_fd_.left_unread() <= fd_low_watermark()) {
39
        allow_read_ = true;
40
      }
41
      if (allow_read_ && buffered_fd_.left_unread() < fd_high_watermark()) {
42
        TRY_RESULT(r, buffered_fd_.flush_read(fd_high_watermark() - buffered_fd_.left_unread()));
43
        if (r == 0 && is_eof) {
44
          read_eof = true;
45
        }
46
      }
47
      if (buffered_fd_.left_unread() >= fd_high_watermark()) {
48
        allow_read_ = false;
49
      }
50
      {
51
        auto &input = buffered_fd_.input_buffer();
52
        auto s = input.size();
53
        TRY_STATUS(receive(input));
54
        read = input.size() < s;
55
      }
56
      if (buffered_fd_.left_unread() == 0 && read_eof) {
57
        TRY_STATUS(receive_eof());
58
      }
59
      TRY_STATUS(buffered_fd_.flush_write());
60
      if (writing_payload_ && buffered_fd_.left_unwritten() < fd_high_watermark()) {
61
        written = continue_payload_write();
62
      }
63
      if (close_after_write_ && !writing_payload_ && !buffered_fd_.left_unwritten()) {
64
        LOG(INFO) << "close after write";
65
        stop();
66
        break;
67
      }
68
      if (close_after_read_ && !reading_payload_ && !buffered_fd_.left_unread()) {
69
        LOG(INFO) << "close after read";
70
        stop();
71
        break;
72
      }
73
      if (!written && !read) {
74
        break;
75
      }
76
    }
77
    return td::Status::OK();
78
  }();
79
  in_loop_ = false;
80
  if (status.is_error()) {
81
    LOG(ERROR) << "loop() failed: " << status;
82
    stop();
83
  } else {
84
    send_ready();
85
  }
86
}
87

88
void HttpConnection::send_error(std::unique_ptr<HttpResponse> response) {
89
  CHECK(!writing_payload_);
90
  auto payload = response->create_empty_payload().move_as_ok();
91
  CHECK(payload->parse_completed());
92
  send_response(std::move(response), std::move(payload));
93
}
94

95
void HttpConnection::send_request(std::unique_ptr<HttpRequest> request, std::shared_ptr<HttpPayload> payload) {
96
  CHECK(!writing_payload_);
97
  request->store_http(buffered_fd_.output_buffer());
98

99
  write_payload(std::move(payload));
100
}
101

102
void HttpConnection::send_response(std::unique_ptr<HttpResponse> response, std::shared_ptr<HttpPayload> payload) {
103
  CHECK(!writing_payload_);
104
  response->store_http(buffered_fd_.output_buffer());
105

106
  write_payload(std::move(payload));
107
}
108

109
void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
110
  CHECK(!writing_payload_);
111

112
  writing_payload_ = std::move(payload);
113

114
  if (writing_payload_->parse_completed()) {
115
    continue_payload_write();
116
    return;
117
  }
118

119
  class Cb : public HttpPayload::Callback {
120
   public:
121
    Cb(td::actor::ActorId<HttpConnection> conn, size_t watermark) : conn_(conn), watermark_(watermark) {
122
    }
123
    void run(size_t ready_bytes) override {
124
      if (!reached_ && ready_bytes >= watermark_) {
125
        td::actor::send_closure(conn_, &HttpConnection::loop);
126
        reached_ = true;
127
      } else if (reached_ && ready_bytes < watermark_) {
128
        reached_ = false;
129
      }
130
    }
131
    void completed() override {
132
      td::actor::send_closure(conn_, &HttpConnection::loop);
133
    }
134

135
   private:
136
    td::actor::ActorId<HttpConnection> conn_;
137
    size_t watermark_;
138
    bool reached_ = false;
139
  };
140

141
  writing_payload_->add_callback(std::make_unique<Cb>(
142
      actor_id(this), writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel ? 1 : chunk_size()));
143
  continue_payload_write();
144
}
145

146
bool HttpConnection::continue_payload_write() {
147
  if (!writing_payload_) {
148
    return false;
149
  }
150
  if (writing_payload_->is_error()) {
151
    stop();
152
    return false;
153
  }
154

155
  auto t = writing_payload_->payload_type();
156
  if (t == HttpPayload::PayloadType::pt_eof) {
157
    t = HttpPayload::PayloadType::pt_chunked;
158
  }
159

160
  bool wrote = false;
161
  while (!writing_payload_->written()) {
162
    if (buffered_fd_.left_unwritten() > fd_high_watermark()) {
163
      return wrote;
164
    }
165
    bool is_tunnel = writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel;
166
    if (!is_tunnel && !writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
167
      return wrote;
168
    }
169
    if (is_tunnel && writing_payload_->ready_bytes() == 0) {
170
      return wrote;
171
    }
172
    wrote |= writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
173
  }
174
  if (writing_payload_->parse_completed() && writing_payload_->written()) {
175
    payload_written();
176
  }
177
  return wrote;
178
}
179

180
td::Status HttpConnection::read_payload(HttpResponse *response) {
181
  CHECK(!reading_payload_);
182

183
  if (!response->keep_alive()) {
184
    close_after_read_ = true;
185
  }
186

187
  return read_payload(response->create_empty_payload().move_as_ok());
188
}
189

190
td::Status HttpConnection::read_payload(HttpRequest *request) {
191
  CHECK(!reading_payload_);
192

193
  return read_payload(request->create_empty_payload().move_as_ok());
194
}
195

196
td::Status HttpConnection::read_payload(std::shared_ptr<HttpPayload> payload) {
197
  CHECK(!reading_payload_);
198

199
  reading_payload_ = std::move(payload);
200

201
  if (reading_payload_->parse_completed()) {
202
    payload_read();
203
    return td::Status::OK();
204
  }
205

206
  class Cb : public HttpPayload::Callback {
207
   public:
208
    Cb(td::actor::ActorId<HttpConnection> conn) : conn_(conn) {
209
    }
210
    void run(size_t ready_bytes) override {
211
      if (!reached_ && ready_bytes < watermark_) {
212
        reached_ = true;
213
        td::actor::send_closure(conn_, &HttpConnection::loop);
214
      } else if (reached_ && ready_bytes >= watermark_) {
215
        reached_ = false;
216
      }
217
    }
218
    void completed() override {
219
      td::actor::send_closure(conn_, &HttpConnection::loop);
220
    }
221

222
   private:
223
    size_t watermark_ = HttpRequest::low_watermark();
224
    bool reached_ = false;
225

226
    td::actor::ActorId<HttpConnection> conn_;
227
  };
228

229
  reading_payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
230
  auto &input = buffered_fd_.input_buffer();
231
  return continue_payload_read(input);
232
}
233

234
td::Status HttpConnection::continue_payload_read(td::ChainBufferReader &input) {
235
  if (!reading_payload_) {
236
    return td::Status::OK();
237
  }
238
  while (!reading_payload_->parse_completed()) {
239
    if (reading_payload_->ready_bytes() > fd_high_watermark()) {
240
      return td::Status::OK();
241
    }
242
    auto s = input.size();
243
    auto R = reading_payload_->parse(input);
244
    if (R.is_error()) {
245
      reading_payload_->set_error();
246
      return R.move_as_error();
247
    }
248
    if (input.size() == s) {
249
      return td::Status::OK();
250
    }
251
  }
252
  if (reading_payload_->parse_completed()) {
253
    payload_read();
254
    return td::Status::OK();
255
  }
256
  return td::Status::OK();
257
}
258

259
td::Status HttpConnection::receive_payload(td::ChainBufferReader &input) {
260
  CHECK(reading_payload_);
261
  continue_payload_read(input);
262
  return td::Status::OK();
263
}
264

265
}  // namespace http
266

267
}  // namespace ton
268

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.