2
This file is part of TON Blockchain Library.
4
TON Blockchain Library is free software: you can redistribute it and/or modify
5
it under the terms of the GNU Lesser General Public License as published by
6
the Free Software Foundation, either version 2 of the License, or
7
(at your option) any later version.
9
TON Blockchain Library is distributed in the hope that it will be useful,
10
but WITHOUT ANY WARRANTY; without even the implied warranty of
11
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
GNU Lesser General Public License for more details.
14
You should have received a copy of the GNU Lesser General Public License
15
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
17
Copyright 2019-2020 Telegram Systems LLP
19
#include "http-connection.h"
25
void HttpConnection::loop() {
32
LOG(DEBUG) << "loop(): in=" << buffered_fd_.left_unread() << " out=" << buffered_fd_.left_unwritten();
33
bool is_eof = td::can_close(buffered_fd_);
34
bool read_eof = false;
38
if (is_eof || buffered_fd_.left_unread() <= fd_low_watermark()) {
41
if (allow_read_ && buffered_fd_.left_unread() < fd_high_watermark()) {
42
TRY_RESULT(r, buffered_fd_.flush_read(fd_high_watermark() - buffered_fd_.left_unread()));
43
if (r == 0 && is_eof) {
47
if (buffered_fd_.left_unread() >= fd_high_watermark()) {
51
auto &input = buffered_fd_.input_buffer();
52
auto s = input.size();
53
TRY_STATUS(receive(input));
54
read = input.size() < s;
56
if (buffered_fd_.left_unread() == 0 && read_eof) {
57
TRY_STATUS(receive_eof());
59
TRY_STATUS(buffered_fd_.flush_write());
60
if (writing_payload_ && buffered_fd_.left_unwritten() < fd_high_watermark()) {
61
written = continue_payload_write();
63
if (close_after_write_ && !writing_payload_ && !buffered_fd_.left_unwritten()) {
64
LOG(INFO) << "close after write";
68
if (close_after_read_ && !reading_payload_ && !buffered_fd_.left_unread()) {
69
LOG(INFO) << "close after read";
73
if (!written && !read) {
77
return td::Status::OK();
80
if (status.is_error()) {
81
LOG(ERROR) << "loop() failed: " << status;
88
void HttpConnection::send_error(std::unique_ptr<HttpResponse> response) {
89
CHECK(!writing_payload_);
90
auto payload = response->create_empty_payload().move_as_ok();
91
CHECK(payload->parse_completed());
92
send_response(std::move(response), std::move(payload));
95
void HttpConnection::send_request(std::unique_ptr<HttpRequest> request, std::shared_ptr<HttpPayload> payload) {
96
CHECK(!writing_payload_);
97
request->store_http(buffered_fd_.output_buffer());
99
write_payload(std::move(payload));
102
void HttpConnection::send_response(std::unique_ptr<HttpResponse> response, std::shared_ptr<HttpPayload> payload) {
103
CHECK(!writing_payload_);
104
response->store_http(buffered_fd_.output_buffer());
106
write_payload(std::move(payload));
109
void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
110
CHECK(!writing_payload_);
112
writing_payload_ = std::move(payload);
114
if (writing_payload_->parse_completed()) {
115
continue_payload_write();
119
class Cb : public HttpPayload::Callback {
121
Cb(td::actor::ActorId<HttpConnection> conn, size_t watermark) : conn_(conn), watermark_(watermark) {
123
void run(size_t ready_bytes) override {
124
if (!reached_ && ready_bytes >= watermark_) {
125
td::actor::send_closure(conn_, &HttpConnection::loop);
127
} else if (reached_ && ready_bytes < watermark_) {
131
void completed() override {
132
td::actor::send_closure(conn_, &HttpConnection::loop);
136
td::actor::ActorId<HttpConnection> conn_;
138
bool reached_ = false;
141
writing_payload_->add_callback(std::make_unique<Cb>(
142
actor_id(this), writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel ? 1 : chunk_size()));
143
continue_payload_write();
146
bool HttpConnection::continue_payload_write() {
147
if (!writing_payload_) {
150
if (writing_payload_->is_error()) {
155
auto t = writing_payload_->payload_type();
156
if (t == HttpPayload::PayloadType::pt_eof) {
157
t = HttpPayload::PayloadType::pt_chunked;
161
while (!writing_payload_->written()) {
162
if (buffered_fd_.left_unwritten() > fd_high_watermark()) {
165
bool is_tunnel = writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel;
166
if (!is_tunnel && !writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
169
if (is_tunnel && writing_payload_->ready_bytes() == 0) {
172
wrote |= writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
174
if (writing_payload_->parse_completed() && writing_payload_->written()) {
180
td::Status HttpConnection::read_payload(HttpResponse *response) {
181
CHECK(!reading_payload_);
183
if (!response->keep_alive()) {
184
close_after_read_ = true;
187
return read_payload(response->create_empty_payload().move_as_ok());
190
td::Status HttpConnection::read_payload(HttpRequest *request) {
191
CHECK(!reading_payload_);
193
return read_payload(request->create_empty_payload().move_as_ok());
196
td::Status HttpConnection::read_payload(std::shared_ptr<HttpPayload> payload) {
197
CHECK(!reading_payload_);
199
reading_payload_ = std::move(payload);
201
if (reading_payload_->parse_completed()) {
203
return td::Status::OK();
206
class Cb : public HttpPayload::Callback {
208
Cb(td::actor::ActorId<HttpConnection> conn) : conn_(conn) {
210
void run(size_t ready_bytes) override {
211
if (!reached_ && ready_bytes < watermark_) {
213
td::actor::send_closure(conn_, &HttpConnection::loop);
214
} else if (reached_ && ready_bytes >= watermark_) {
218
void completed() override {
219
td::actor::send_closure(conn_, &HttpConnection::loop);
223
size_t watermark_ = HttpRequest::low_watermark();
224
bool reached_ = false;
226
td::actor::ActorId<HttpConnection> conn_;
229
reading_payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
230
auto &input = buffered_fd_.input_buffer();
231
return continue_payload_read(input);
234
td::Status HttpConnection::continue_payload_read(td::ChainBufferReader &input) {
235
if (!reading_payload_) {
236
return td::Status::OK();
238
while (!reading_payload_->parse_completed()) {
239
if (reading_payload_->ready_bytes() > fd_high_watermark()) {
240
return td::Status::OK();
242
auto s = input.size();
243
auto R = reading_payload_->parse(input);
245
reading_payload_->set_error();
246
return R.move_as_error();
248
if (input.size() == s) {
249
return td::Status::OK();
252
if (reading_payload_->parse_completed()) {
254
return td::Status::OK();
256
return td::Status::OK();
259
td::Status HttpConnection::receive_payload(td::ChainBufferReader &input) {
260
CHECK(reading_payload_);
261
continue_payload_read(input);
262
return td::Status::OK();