urllib3

Форк
0
/
test_response.py 
1675 строк · 56.5 Кб
1
from __future__ import annotations
2

3
import contextlib
4
import http.client as httplib
5
import socket
6
import ssl
7
import typing
8
import zlib
9
from base64 import b64decode
10
from http.client import IncompleteRead as httplib_IncompleteRead
11
from io import BufferedReader, BytesIO, TextIOWrapper
12
from test import onlyBrotli, onlyZstd
13
from unittest import mock
14

15
import pytest
16

17
from urllib3 import HTTPHeaderDict
18
from urllib3.exceptions import (
19
    BodyNotHttplibCompatible,
20
    DecodeError,
21
    IncompleteRead,
22
    InvalidChunkLength,
23
    InvalidHeader,
24
    ProtocolError,
25
    ResponseNotChunked,
26
    SSLError,
27
)
28
from urllib3.response import (  # type: ignore[attr-defined]
29
    BaseHTTPResponse,
30
    BytesQueueBuffer,
31
    HTTPResponse,
32
    brotli,
33
)
34
from urllib3.util.response import is_fp_closed
35
from urllib3.util.retry import RequestHistory, Retry
36

37

38
class TestBytesQueueBuffer:
39
    def test_single_chunk(self) -> None:
40
        buffer = BytesQueueBuffer()
41
        assert len(buffer) == 0
42
        with pytest.raises(RuntimeError, match="buffer is empty"):
43
            assert buffer.get(10)
44

45
        assert buffer.get(0) == b""
46

47
        buffer.put(b"foo")
48
        with pytest.raises(ValueError, match="n should be > 0"):
49
            buffer.get(-1)
50

51
        assert buffer.get(1) == b"f"
52
        assert buffer.get(2) == b"oo"
53
        with pytest.raises(RuntimeError, match="buffer is empty"):
54
            assert buffer.get(10)
55

56
    def test_read_too_much(self) -> None:
57
        buffer = BytesQueueBuffer()
58
        buffer.put(b"foo")
59
        assert buffer.get(100) == b"foo"
60

61
    def test_multiple_chunks(self) -> None:
62
        buffer = BytesQueueBuffer()
63
        buffer.put(b"foo")
64
        buffer.put(b"bar")
65
        buffer.put(b"baz")
66
        assert len(buffer) == 9
67

68
        assert buffer.get(1) == b"f"
69
        assert len(buffer) == 8
70
        assert buffer.get(4) == b"ooba"
71
        assert len(buffer) == 4
72
        assert buffer.get(4) == b"rbaz"
73
        assert len(buffer) == 0
74

75
    def test_get_all_empty(self) -> None:
76
        q = BytesQueueBuffer()
77
        assert q.get_all() == b""
78
        assert len(q) == 0
79

80
    def test_get_all_single(self) -> None:
81
        q = BytesQueueBuffer()
82
        q.put(b"a")
83
        assert q.get_all() == b"a"
84
        assert len(q) == 0
85

86
    def test_get_all_many(self) -> None:
87
        q = BytesQueueBuffer()
88
        q.put(b"a")
89
        q.put(b"b")
90
        q.put(b"c")
91
        assert q.get_all() == b"abc"
92
        assert len(q) == 0
93

94
    @pytest.mark.parametrize(
95
        "get_func",
96
        (lambda b: b.get(len(b)), lambda b: b.get_all()),
97
        ids=("get", "get_all"),
98
    )
99
    @pytest.mark.limit_memory(
100
        "12.5 MB", current_thread_only=True
101
    )  # assert that we're not doubling memory usagelimit_mem
102
    def test_memory_usage(
103
        self, get_func: typing.Callable[[BytesQueueBuffer], str]
104
    ) -> None:
105
        # Allocate 10 1MiB chunks
106
        buffer = BytesQueueBuffer()
107
        for i in range(10):
108
            # This allocates 2MiB, putting the max at around 12MiB. Not sure why.
109
            buffer.put(bytes(2**20))
110

111
        assert len(get_func(buffer)) == 10 * 2**20
112

113
    @pytest.mark.limit_memory("10.01 MB", current_thread_only=True)
114
    def test_get_all_memory_usage_single_chunk(self) -> None:
115
        buffer = BytesQueueBuffer()
116
        chunk = bytes(10 * 2**20)  # 10 MiB
117
        buffer.put(chunk)
118
        assert buffer.get_all() is chunk
119

120

121
# A known random (i.e, not-too-compressible) payload generated with:
122
#    "".join(random.choice(string.printable) for i in range(512))
123
#    .encode("zlib").encode("base64")
124
# Randomness in tests == bad, and fixing a seed may not be sufficient.
125
ZLIB_PAYLOAD = b64decode(
126
    b"""\
127
eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+
128
lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi
129
pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk
130
nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ
131
Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn
132
a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ
133
auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW
134
S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV
135
nP4HF2uWHA=="""
136
)
137

138

139
@pytest.fixture
140
def sock() -> typing.Generator[socket.socket, None, None]:
141
    s = socket.socket()
142
    yield s
143
    s.close()
144

145

146
class TestLegacyResponse:
147
    def test_getheaders(self) -> None:
148
        headers = {"host": "example.com"}
149
        r = HTTPResponse(headers=headers)
150
        with pytest.warns(
151
            DeprecationWarning,
152
            match=r"HTTPResponse.getheaders\(\) is deprecated",
153
        ):
154
            assert r.getheaders() == HTTPHeaderDict(headers)
155

156
    def test_getheader(self) -> None:
157
        headers = {"host": "example.com"}
158
        r = HTTPResponse(headers=headers)
159
        with pytest.warns(
160
            DeprecationWarning,
161
            match=r"HTTPResponse.getheader\(\) is deprecated",
162
        ):
163
            assert r.getheader("host") == "example.com"
164

165

166
class TestResponse:
167
    def test_cache_content(self) -> None:
168
        r = HTTPResponse(b"foo")
169
        assert r._body == b"foo"
170
        assert r.data == b"foo"
171
        assert r._body == b"foo"
172

173
    def test_cache_content_preload_false(self) -> None:
174
        fp = BytesIO(b"foo")
175
        r = HTTPResponse(fp, preload_content=False)
176

177
        assert not r._body
178
        assert r.data == b"foo"
179
        assert r._body == b"foo"
180
        assert r.data == b"foo"
181

182
    def test_default(self) -> None:
183
        r = HTTPResponse()
184
        assert r.data is None
185

186
    def test_none(self) -> None:
187
        r = HTTPResponse(None)  # type: ignore[arg-type]
188
        assert r.data is None
189

190
    def test_preload(self) -> None:
191
        fp = BytesIO(b"foo")
192

193
        r = HTTPResponse(fp, preload_content=True)
194

195
        assert fp.tell() == len(b"foo")
196
        assert r.data == b"foo"
197

198
    def test_no_preload(self) -> None:
199
        fp = BytesIO(b"foo")
200

201
        r = HTTPResponse(fp, preload_content=False)
202

203
        assert fp.tell() == 0
204
        assert r.data == b"foo"
205
        assert fp.tell() == len(b"foo")
206

207
    def test_decode_bad_data(self) -> None:
208
        fp = BytesIO(b"\x00" * 10)
209
        with pytest.raises(DecodeError):
210
            HTTPResponse(fp, headers={"content-encoding": "deflate"})
211

212
    def test_reference_read(self) -> None:
213
        fp = BytesIO(b"foo")
214
        r = HTTPResponse(fp, preload_content=False)
215

216
        assert r.read(0) == b""
217
        assert r.read(1) == b"f"
218
        assert r.read(2) == b"oo"
219
        assert r.read() == b""
220
        assert r.read() == b""
221

222
    @pytest.mark.parametrize("read_args", ((), (None,), (-1,)))
223
    def test_reference_read_until_eof(self, read_args: tuple[typing.Any, ...]) -> None:
224
        fp = BytesIO(b"foo")
225
        r = HTTPResponse(fp, preload_content=False)
226
        assert r.read(*read_args) == b"foo"
227

228
    def test_reference_read1(self) -> None:
229
        fp = BytesIO(b"foobar")
230
        r = HTTPResponse(fp, preload_content=False)
231

232
        assert r.read1(0) == b""
233
        assert r.read1(1) == b"f"
234
        assert r.read1(2) == b"oo"
235
        assert r.read1() == b"bar"
236
        assert r.read1() == b""
237

238
    @pytest.mark.parametrize("read1_args", ((), (None,), (-1,)))
239
    def test_reference_read1_without_limit(
240
        self, read1_args: tuple[typing.Any, ...]
241
    ) -> None:
242
        fp = BytesIO(b"foo")
243
        r = HTTPResponse(fp, preload_content=False)
244
        assert r.read1(*read1_args) == b"foo"
245

246
    def test_reference_read1_nodecode(self) -> None:
247
        fp = BytesIO(b"foobar")
248
        r = HTTPResponse(fp, preload_content=False, decode_content=False)
249

250
        assert r.read1(0) == b""
251
        assert r.read1(1) == b"f"
252
        assert r.read1(2) == b"oo"
253
        assert r.read1() == b"bar"
254
        assert r.read1() == b""
255

256
    def test_decoding_read1(self) -> None:
257
        data = zlib.compress(b"foobar")
258

259
        fp = BytesIO(data)
260
        r = HTTPResponse(
261
            fp, headers={"content-encoding": "deflate"}, preload_content=False
262
        )
263

264
        assert r.read1(1) == b"f"
265
        assert r.read1(2) == b"oo"
266
        assert r.read1() == b"bar"
267
        assert r.read1() == b""
268

269
    def test_decode_deflate(self) -> None:
270
        data = zlib.compress(b"foo")
271

272
        fp = BytesIO(data)
273
        r = HTTPResponse(fp, headers={"content-encoding": "deflate"})
274

275
        assert r.data == b"foo"
276

277
    def test_decode_deflate_case_insensitve(self) -> None:
278
        data = zlib.compress(b"foo")
279

280
        fp = BytesIO(data)
281
        r = HTTPResponse(fp, headers={"content-encoding": "DeFlAtE"})
282

283
        assert r.data == b"foo"
284

285
    def test_chunked_decoding_deflate(self) -> None:
286
        data = zlib.compress(b"foo")
287

288
        fp = BytesIO(data)
289
        r = HTTPResponse(
290
            fp, headers={"content-encoding": "deflate"}, preload_content=False
291
        )
292

293
        assert r.read(1) == b"f"
294
        assert r.read(2) == b"oo"
295
        assert r.read() == b""
296
        assert r.read() == b""
297

298
    def test_chunked_decoding_deflate2(self) -> None:
299
        compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
300
        data = compress.compress(b"foo")
301
        data += compress.flush()
302

303
        fp = BytesIO(data)
304
        r = HTTPResponse(
305
            fp, headers={"content-encoding": "deflate"}, preload_content=False
306
        )
307

308
        assert r.read(1) == b"f"
309
        assert r.read(2) == b"oo"
310
        assert r.read() == b""
311
        assert r.read() == b""
312

313
    @pytest.mark.parametrize("content_encoding", ["gzip", "x-gzip"])
314
    def test_chunked_decoding_gzip(self, content_encoding: str) -> None:
315
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
316
        data = compress.compress(b"foo")
317
        data += compress.flush()
318

319
        fp = BytesIO(data)
320
        r = HTTPResponse(
321
            fp, headers={"content-encoding": content_encoding}, preload_content=False
322
        )
323

324
        assert r.read(1) == b"f"
325
        assert r.read(2) == b"oo"
326
        assert r.read() == b""
327
        assert r.read() == b""
328

329
    def test_decode_gzip_multi_member(self) -> None:
330
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
331
        data = compress.compress(b"foo")
332
        data += compress.flush()
333
        data = data * 3
334

335
        fp = BytesIO(data)
336
        r = HTTPResponse(fp, headers={"content-encoding": "gzip"})
337

338
        assert r.data == b"foofoofoo"
339

340
    def test_decode_gzip_error(self) -> None:
341
        fp = BytesIO(b"foo")
342
        with pytest.raises(DecodeError):
343
            HTTPResponse(fp, headers={"content-encoding": "gzip"})
344

345
    def test_decode_gzip_swallow_garbage(self) -> None:
346
        # When data comes from multiple calls to read(), data after
347
        # the first zlib error (here triggered by garbage) should be
348
        # ignored.
349
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
350
        data = compress.compress(b"foo")
351
        data += compress.flush()
352
        data = data * 3 + b"foo"
353

354
        fp = BytesIO(data)
355
        r = HTTPResponse(
356
            fp, headers={"content-encoding": "gzip"}, preload_content=False
357
        )
358
        ret = b""
359
        for _ in range(100):
360
            ret += r.read(1)
361
            if r.closed:
362
                break
363

364
        assert ret == b"foofoofoo"
365

366
    def test_chunked_decoding_gzip_swallow_garbage(self) -> None:
367
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
368
        data = compress.compress(b"foo")
369
        data += compress.flush()
370
        data = data * 3 + b"foo"
371

372
        fp = BytesIO(data)
373
        r = HTTPResponse(fp, headers={"content-encoding": "gzip"})
374

375
        assert r.data == b"foofoofoo"
376

377
    @onlyBrotli()
378
    def test_decode_brotli(self) -> None:
379
        data = brotli.compress(b"foo")
380

381
        fp = BytesIO(data)
382
        r = HTTPResponse(fp, headers={"content-encoding": "br"})
383
        assert r.data == b"foo"
384

385
    @onlyBrotli()
386
    def test_chunked_decoding_brotli(self) -> None:
387
        data = brotli.compress(b"foobarbaz")
388

389
        fp = BytesIO(data)
390
        r = HTTPResponse(fp, headers={"content-encoding": "br"}, preload_content=False)
391

392
        ret = b""
393
        for _ in range(100):
394
            ret += r.read(1)
395
            if r.closed:
396
                break
397
        assert ret == b"foobarbaz"
398

399
    @onlyBrotli()
400
    def test_decode_brotli_error(self) -> None:
401
        fp = BytesIO(b"foo")
402
        with pytest.raises(DecodeError):
403
            HTTPResponse(fp, headers={"content-encoding": "br"})
404

405
    @onlyZstd()
406
    def test_decode_zstd(self) -> None:
407
        import zstandard as zstd
408

409
        data = zstd.compress(b"foo")
410

411
        fp = BytesIO(data)
412
        r = HTTPResponse(fp, headers={"content-encoding": "zstd"})
413
        assert r.data == b"foo"
414

415
    @onlyZstd()
416
    def test_decode_multiframe_zstd(self) -> None:
417
        import zstandard as zstd
418

419
        data = (
420
            # Zstandard frame
421
            zstd.compress(b"foo")
422
            # skippable frame (must be ignored)
423
            + bytes.fromhex(
424
                "50 2A 4D 18"  # Magic_Number (little-endian)
425
                "07 00 00 00"  # Frame_Size (little-endian)
426
                "00 00 00 00 00 00 00"  # User_Data
427
            )
428
            # Zstandard frame
429
            + zstd.compress(b"bar")
430
        )
431

432
        fp = BytesIO(data)
433
        r = HTTPResponse(fp, headers={"content-encoding": "zstd"})
434
        assert r.data == b"foobar"
435

436
    @onlyZstd()
437
    def test_chunked_decoding_zstd(self) -> None:
438
        import zstandard as zstd
439

440
        data = zstd.compress(b"foobarbaz")
441

442
        fp = BytesIO(data)
443
        r = HTTPResponse(
444
            fp, headers={"content-encoding": "zstd"}, preload_content=False
445
        )
446

447
        ret = b""
448

449
        for _ in range(100):
450
            ret += r.read(1)
451
            if r.closed:
452
                break
453
        assert ret == b"foobarbaz"
454

455
    decode_param_set = [
456
        b"foo",
457
        b"x" * 100,
458
    ]
459

460
    @onlyZstd()
461
    @pytest.mark.parametrize("data", decode_param_set)
462
    def test_decode_zstd_error(self, data: bytes) -> None:
463
        fp = BytesIO(data)
464

465
        with pytest.raises(DecodeError):
466
            HTTPResponse(fp, headers={"content-encoding": "zstd"})
467

468
    @onlyZstd()
469
    @pytest.mark.parametrize("data", decode_param_set)
470
    def test_decode_zstd_incomplete_preload_content(self, data: bytes) -> None:
471
        import zstandard as zstd
472

473
        data = zstd.compress(data)
474
        fp = BytesIO(data[:-1])
475

476
        with pytest.raises(DecodeError):
477
            HTTPResponse(fp, headers={"content-encoding": "zstd"})
478

479
    @onlyZstd()
480
    @pytest.mark.parametrize("data", decode_param_set)
481
    def test_decode_zstd_incomplete_read(self, data: bytes) -> None:
482
        import zstandard as zstd
483

484
        data = zstd.compress(data)
485
        fp = BytesIO(data[:-1])  # shorten the data to trigger DecodeError
486

487
        # create response object without(!) reading/decoding the content
488
        r = HTTPResponse(
489
            fp, headers={"content-encoding": "zstd"}, preload_content=False
490
        )
491

492
        # read/decode, expecting DecodeError
493
        with pytest.raises(DecodeError):
494
            r.read(decode_content=True)
495

496
    @onlyZstd()
497
    @pytest.mark.parametrize("data", decode_param_set)
498
    def test_decode_zstd_incomplete_read1(self, data: bytes) -> None:
499
        import zstandard as zstd
500

501
        data = zstd.compress(data)
502
        fp = BytesIO(data[:-1])
503

504
        r = HTTPResponse(
505
            fp, headers={"content-encoding": "zstd"}, preload_content=False
506
        )
507

508
        # read/decode via read1(!), expecting DecodeError
509
        with pytest.raises(DecodeError):
510
            amt_decoded = 0
511
            # loop, as read1() may return just partial data
512
            while amt_decoded < len(data):
513
                part = r.read1(decode_content=True)
514
                amt_decoded += len(part)
515

516
    @onlyZstd()
517
    @pytest.mark.parametrize("data", decode_param_set)
518
    def test_decode_zstd_read1(self, data: bytes) -> None:
519
        import zstandard as zstd
520

521
        encoded_data = zstd.compress(data)
522
        fp = BytesIO(encoded_data)
523

524
        r = HTTPResponse(
525
            fp, headers={"content-encoding": "zstd"}, preload_content=False
526
        )
527

528
        amt_decoded = 0
529
        decoded_data = b""
530
        # loop, as read1() may return just partial data
531
        while amt_decoded < len(data):
532
            part = r.read1(decode_content=True)
533
            amt_decoded += len(part)
534
            decoded_data += part
535
        assert decoded_data == data
536

537
    def test_multi_decoding_deflate_deflate(self) -> None:
538
        data = zlib.compress(zlib.compress(b"foo"))
539

540
        fp = BytesIO(data)
541
        r = HTTPResponse(fp, headers={"content-encoding": "deflate, deflate"})
542

543
        assert r.data == b"foo"
544

545
    def test_multi_decoding_deflate_gzip(self) -> None:
546
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
547
        data = compress.compress(zlib.compress(b"foo"))
548
        data += compress.flush()
549

550
        fp = BytesIO(data)
551
        r = HTTPResponse(fp, headers={"content-encoding": "deflate, gzip"})
552

553
        assert r.data == b"foo"
554

555
    def test_multi_decoding_gzip_gzip(self) -> None:
556
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
557
        data = compress.compress(b"foo")
558
        data += compress.flush()
559

560
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
561
        data = compress.compress(data)
562
        data += compress.flush()
563

564
        fp = BytesIO(data)
565
        r = HTTPResponse(fp, headers={"content-encoding": "gzip, gzip"})
566

567
        assert r.data == b"foo"
568

569
    def test_read_multi_decoding_deflate_deflate(self) -> None:
570
        msg = b"foobarbaz" * 42
571
        data = zlib.compress(zlib.compress(msg))
572

573
        fp = BytesIO(data)
574
        r = HTTPResponse(
575
            fp, headers={"content-encoding": "deflate, deflate"}, preload_content=False
576
        )
577

578
        assert r.read(3) == b"foo"
579
        assert r.read(3) == b"bar"
580
        assert r.read(3) == b"baz"
581
        assert r.read(9) == b"foobarbaz"
582
        assert r.read(9 * 3) == b"foobarbaz" * 3
583
        assert r.read(9 * 37) == b"foobarbaz" * 37
584
        assert r.read() == b""
585

586
    def test_body_blob(self) -> None:
587
        resp = HTTPResponse(b"foo")
588
        assert resp.data == b"foo"
589
        assert resp.closed
590

591
    @pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning")
592
    def test_base_io(self) -> None:
593
        resp = BaseHTTPResponse(
594
            status=200,
595
            version=11,
596
            version_string="HTTP/1.1",
597
            reason=None,
598
            decode_content=False,
599
            request_url=None,
600
        )
601

602
        assert not resp.closed
603
        assert not resp.readable()
604
        assert not resp.writable()
605

606
        with pytest.raises(NotImplementedError):
607
            resp.read()
608
        with pytest.raises(NotImplementedError):
609
            resp.close()
610

611
    def test_io(self, sock: socket.socket) -> None:
612
        fp = BytesIO(b"foo")
613
        resp = HTTPResponse(fp, preload_content=False)
614

615
        assert not resp.closed
616
        assert resp.readable()
617
        assert not resp.writable()
618
        with pytest.raises(IOError):
619
            resp.fileno()
620

621
        resp.close()
622
        assert resp.closed
623

624
        # Try closing with an `httplib.HTTPResponse`, because it has an
625
        # `isclosed` method.
626
        try:
627
            hlr = httplib.HTTPResponse(sock)
628
            resp2 = HTTPResponse(hlr, preload_content=False)
629
            assert not resp2.closed
630
            resp2.close()
631
            assert resp2.closed
632
        finally:
633
            hlr.close()
634

635
        # also try when only data is present.
636
        resp3 = HTTPResponse("foodata")
637
        with pytest.raises(IOError):
638
            resp3.fileno()
639

640
        resp3._fp = 2
641
        # A corner case where _fp is present but doesn't have `closed`,
642
        # `isclosed`, or `fileno`.  Unlikely, but possible.
643
        assert resp3.closed
644
        with pytest.raises(IOError):
645
            resp3.fileno()
646

647
    def test_io_closed_consistently_by_read(self, sock: socket.socket) -> None:
648
        try:
649
            hlr = httplib.HTTPResponse(sock)
650
            hlr.fp = BytesIO(b"foo")  # type: ignore[assignment]
651
            hlr.chunked = 0  # type: ignore[assignment]
652
            hlr.length = 3
653
            with HTTPResponse(hlr, preload_content=False) as resp:
654
                assert not resp.closed
655
                assert resp._fp is not None
656
                assert not resp._fp.isclosed()
657
                assert not is_fp_closed(resp._fp)
658
                assert not resp.isclosed()
659
                resp.read()
660
                assert resp.closed
661
                assert resp._fp.isclosed()
662
                assert is_fp_closed(resp._fp)
663
                assert resp.isclosed()
664
        finally:
665
            hlr.close()
666

667
    @pytest.mark.parametrize("read_amt", (None, 3))
668
    @pytest.mark.parametrize("length_known", (True, False))
669
    def test_io_closed_consistently_by_read1(
670
        self, sock: socket.socket, length_known: bool, read_amt: int | None
671
    ) -> None:
672
        with httplib.HTTPResponse(sock) as hlr:
673
            hlr.fp = BytesIO(b"foo")  # type: ignore[assignment]
674
            hlr.chunked = 0  # type: ignore[assignment]
675
            hlr.length = 3 if length_known else None
676
            with HTTPResponse(hlr, preload_content=False) as resp:
677
                if length_known:
678
                    resp.length_remaining = 3
679
                assert not resp.closed
680
                assert resp._fp is not None
681
                assert not resp._fp.isclosed()
682
                assert not is_fp_closed(resp._fp)
683
                assert not resp.isclosed()
684
                resp.read1(read_amt)
685
                # If content length is unknown, IO is not closed until
686
                # the next read returning zero bytes.
687
                if not length_known:
688
                    assert not resp.closed
689
                    assert resp._fp is not None
690
                    assert not resp._fp.isclosed()
691
                    assert not is_fp_closed(resp._fp)
692
                    assert not resp.isclosed()
693
                    resp.read1(read_amt)
694
                assert resp.closed
695
                assert resp._fp.isclosed()
696
                assert is_fp_closed(resp._fp)
697
                assert resp.isclosed()
698

699
    @pytest.mark.parametrize("length_known", (True, False))
700
    def test_io_not_closed_until_all_data_is_read(
701
        self, sock: socket.socket, length_known: bool
702
    ) -> None:
703
        with httplib.HTTPResponse(sock) as hlr:
704
            hlr.fp = BytesIO(b"foo")  # type: ignore[assignment]
705
            hlr.chunked = 0  # type: ignore[assignment]
706
            length_remaining = 3
707
            hlr.length = length_remaining if length_known else None
708
            with HTTPResponse(hlr, preload_content=False) as resp:
709
                if length_known:
710
                    resp.length_remaining = length_remaining
711
                while length_remaining:
712
                    assert not resp.closed
713
                    assert resp._fp is not None
714
                    assert not resp._fp.isclosed()
715
                    assert not is_fp_closed(resp._fp)
716
                    assert not resp.isclosed()
717
                    data = resp.read(1)
718
                    assert len(data) == 1
719
                    length_remaining -= 1
720
                # If content length is unknown, IO is not closed until
721
                # the next read returning zero bytes.
722
                if not length_known:
723
                    assert not resp.closed
724
                    assert resp._fp is not None
725
                    assert not resp._fp.isclosed()
726
                    assert not is_fp_closed(resp._fp)
727
                    assert not resp.isclosed()
728
                    data = resp.read(1)
729
                    assert len(data) == 0
730
                assert resp.closed
731
                assert resp._fp.isclosed()  # type: ignore[union-attr]
732
                assert is_fp_closed(resp._fp)
733
                assert resp.isclosed()
734

735
    @pytest.mark.parametrize("length_known", (True, False))
736
    def test_io_not_closed_after_requesting_0_bytes(
737
        self, sock: socket.socket, length_known: bool
738
    ) -> None:
739
        with httplib.HTTPResponse(sock) as hlr:
740
            hlr.fp = BytesIO(b"foo")  # type: ignore[assignment]
741
            hlr.chunked = 0  # type: ignore[assignment]
742
            length_remaining = 3
743
            hlr.length = length_remaining if length_known else None
744
            with HTTPResponse(hlr, preload_content=False) as resp:
745
                if length_known:
746
                    resp.length_remaining = length_remaining
747
                assert not resp.closed
748
                assert resp._fp is not None
749
                assert not resp._fp.isclosed()
750
                assert not is_fp_closed(resp._fp)
751
                assert not resp.isclosed()
752
                data = resp.read(0)
753
                assert data == b""
754
                assert not resp.closed
755
                assert resp._fp is not None
756
                assert not resp._fp.isclosed()
757
                assert not is_fp_closed(resp._fp)
758
                assert not resp.isclosed()
759

760
    def test_io_bufferedreader(self) -> None:
761
        fp = BytesIO(b"foo")
762
        resp = HTTPResponse(fp, preload_content=False)
763
        br = BufferedReader(resp)  # type: ignore[arg-type]
764

765
        assert br.read() == b"foo"
766

767
        br.close()
768
        assert resp.closed
769

770
        # HTTPResponse.read() by default closes the response
771
        # https://github.com/urllib3/urllib3/issues/1305
772
        fp = BytesIO(b"hello\nworld")
773
        resp = HTTPResponse(fp, preload_content=False)
774
        with pytest.raises(ValueError, match="readline of closed file"):
775
            list(BufferedReader(resp))  # type: ignore[arg-type]
776

777
        b = b"fooandahalf"
778
        fp = BytesIO(b)
779
        resp = HTTPResponse(fp, preload_content=False)
780
        br = BufferedReader(resp, 5)  # type: ignore[arg-type]
781

782
        br.read(1)  # sets up the buffer, reading 5
783
        assert len(fp.read()) == (len(b) - 5)
784

785
        # This is necessary to make sure the "no bytes left" part of `readinto`
786
        # gets tested.
787
        while not br.closed:
788
            br.read(5)
789

790
    def test_io_not_autoclose_bufferedreader(self) -> None:
791
        fp = BytesIO(b"hello\nworld")
792
        resp = HTTPResponse(fp, preload_content=False, auto_close=False)
793
        reader = BufferedReader(resp)  # type: ignore[arg-type]
794
        assert list(reader) == [b"hello\n", b"world"]
795

796
        assert not reader.closed
797
        assert not resp.closed
798
        with pytest.raises(StopIteration):
799
            next(reader)
800

801
        reader.close()
802
        assert reader.closed
803
        assert resp.closed
804
        with pytest.raises(ValueError, match="readline of closed file"):
805
            next(reader)
806

807
    def test_io_textiowrapper(self) -> None:
808
        fp = BytesIO(b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f")
809
        resp = HTTPResponse(fp, preload_content=False)
810
        br = TextIOWrapper(resp, encoding="utf8")  # type: ignore[arg-type]
811

812
        assert br.read() == "äöüß"
813

814
        br.close()
815
        assert resp.closed
816

817
        # HTTPResponse.read() by default closes the response
818
        # https://github.com/urllib3/urllib3/issues/1305
819
        fp = BytesIO(
820
            b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4"
821
        )
822
        resp = HTTPResponse(fp, preload_content=False)
823
        with pytest.raises(ValueError, match="I/O operation on closed file.?"):
824
            list(TextIOWrapper(resp))  # type: ignore[arg-type]
825

826
    def test_io_not_autoclose_textiowrapper(self) -> None:
827
        fp = BytesIO(
828
            b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4"
829
        )
830
        resp = HTTPResponse(fp, preload_content=False, auto_close=False)
831
        reader = TextIOWrapper(resp, encoding="utf8")  # type: ignore[arg-type]
832
        assert list(reader) == ["äöüß\n", "αβγδ"]
833

834
        assert not reader.closed
835
        assert not resp.closed
836
        with pytest.raises(StopIteration):
837
            next(reader)
838

839
        reader.close()
840
        assert reader.closed
841
        assert resp.closed
842
        with pytest.raises(ValueError, match="I/O operation on closed file.?"):
843
            next(reader)
844

845
    def test_read_with_illegal_mix_decode_toggle(self) -> None:
846
        data = zlib.compress(b"foo")
847

848
        fp = BytesIO(data)
849

850
        resp = HTTPResponse(
851
            fp, headers={"content-encoding": "deflate"}, preload_content=False
852
        )
853

854
        assert resp.read(1) == b"f"
855

856
        with pytest.raises(
857
            RuntimeError,
858
            match=(
859
                r"Calling read\(decode_content=False\) is not supported after "
860
                r"read\(decode_content=True\) was called"
861
            ),
862
        ):
863
            resp.read(1, decode_content=False)
864

865
        with pytest.raises(
866
            RuntimeError,
867
            match=(
868
                r"Calling read\(decode_content=False\) is not supported after "
869
                r"read\(decode_content=True\) was called"
870
            ),
871
        ):
872
            resp.read(decode_content=False)
873

874
    def test_read1_with_illegal_mix_decode_toggle(self) -> None:
875
        data = zlib.compress(b"foo")
876

877
        fp = BytesIO(data)
878

879
        resp = HTTPResponse(
880
            fp, headers={"content-encoding": "deflate"}, preload_content=False
881
        )
882

883
        assert resp.read1(1) == b"f"
884

885
        with pytest.raises(
886
            RuntimeError,
887
            match=(
888
                r"Calling read1\(decode_content=False\) is not supported after "
889
                r"read1\(decode_content=True\) was called"
890
            ),
891
        ):
892
            resp.read1(1, decode_content=False)
893

894
        with pytest.raises(
895
            RuntimeError,
896
            match=(
897
                r"Calling read1\(decode_content=False\) is not supported after "
898
                r"read1\(decode_content=True\) was called"
899
            ),
900
        ):
901
            resp.read1(decode_content=False)
902

903
    def test_read_with_mix_decode_toggle(self) -> None:
904
        data = zlib.compress(b"foo")
905

906
        fp = BytesIO(data)
907

908
        resp = HTTPResponse(
909
            fp, headers={"content-encoding": "deflate"}, preload_content=False
910
        )
911
        assert resp.read(2, decode_content=False) is not None
912
        assert resp.read(1, decode_content=True) == b"f"
913

914
    def test_streaming(self) -> None:
915
        fp = BytesIO(b"foo")
916
        resp = HTTPResponse(fp, preload_content=False)
917
        stream = resp.stream(2, decode_content=False)
918

919
        assert next(stream) == b"fo"
920
        assert next(stream) == b"o"
921
        with pytest.raises(StopIteration):
922
            next(stream)
923

924
    def test_streaming_tell(self) -> None:
925
        fp = BytesIO(b"foo")
926
        resp = HTTPResponse(fp, preload_content=False)
927
        stream = resp.stream(2, decode_content=False)
928

929
        position = 0
930

931
        position += len(next(stream))
932
        assert 2 == position
933
        assert position == resp.tell()
934

935
        position += len(next(stream))
936
        assert 3 == position
937
        assert position == resp.tell()
938

939
        with pytest.raises(StopIteration):
940
            next(stream)
941

942
    def test_gzipped_streaming(self) -> None:
943
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
944
        data = compress.compress(b"foo")
945
        data += compress.flush()
946

947
        fp = BytesIO(data)
948
        resp = HTTPResponse(
949
            fp, headers={"content-encoding": "gzip"}, preload_content=False
950
        )
951
        stream = resp.stream(2)
952

953
        assert next(stream) == b"fo"
954
        assert next(stream) == b"o"
955
        with pytest.raises(StopIteration):
956
            next(stream)
957

958
    def test_gzipped_streaming_tell(self) -> None:
959
        compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
960
        uncompressed_data = b"foo"
961
        data = compress.compress(uncompressed_data)
962
        data += compress.flush()
963

964
        fp = BytesIO(data)
965
        resp = HTTPResponse(
966
            fp, headers={"content-encoding": "gzip"}, preload_content=False
967
        )
968
        stream = resp.stream()
969

970
        # Read everything
971
        payload = next(stream)
972
        assert payload == uncompressed_data
973

974
        assert len(data) == resp.tell()
975

976
        with pytest.raises(StopIteration):
977
            next(stream)
978

979
    def test_deflate_streaming_tell_intermediate_point(self) -> None:
980
        # Ensure that ``tell()`` returns the correct number of bytes when
981
        # part-way through streaming compressed content.
982
        NUMBER_OF_READS = 10
983
        PART_SIZE = 64
984

985
        class MockCompressedDataReading(BytesIO):
986
            """
987
            A BytesIO-like reader returning ``payload`` in ``NUMBER_OF_READS``
988
            calls to ``read``.
989
            """
990

991
            def __init__(self, payload: bytes, payload_part_size: int) -> None:
992
                self.payloads = [
993
                    payload[i * payload_part_size : (i + 1) * payload_part_size]
994
                    for i in range(NUMBER_OF_READS + 1)
995
                ]
996

997
                assert b"".join(self.payloads) == payload
998

999
            def read(self, _: int) -> bytes:  # type: ignore[override]
1000
                # Amount is unused.
1001
                if len(self.payloads) > 0:
1002
                    return self.payloads.pop(0)
1003
                return b""
1004

1005
            def read1(self, amt: int) -> bytes:  # type: ignore[override]
1006
                return self.read(amt)
1007

1008
        uncompressed_data = zlib.decompress(ZLIB_PAYLOAD)
1009

1010
        payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS
1011
        fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size)
1012
        resp = HTTPResponse(
1013
            fp, headers={"content-encoding": "deflate"}, preload_content=False
1014
        )
1015
        stream = resp.stream(PART_SIZE)
1016

1017
        parts_positions = [(part, resp.tell()) for part in stream]
1018
        end_of_stream = resp.tell()
1019

1020
        with pytest.raises(StopIteration):
1021
            next(stream)
1022

1023
        parts, positions = zip(*parts_positions)
1024

1025
        # Check that the payload is equal to the uncompressed data
1026
        payload = b"".join(parts)
1027
        assert uncompressed_data == payload
1028

1029
        # Check that the positions in the stream are correct
1030
        # It is difficult to determine programmatically what the positions
1031
        # returned by `tell` will be because the `HTTPResponse.read` method may
1032
        # call socket `read` a couple of times if it doesn't have enough data
1033
        # in the buffer or not call socket `read` at all if it has enough. All
1034
        # this depends on the message, how it was compressed, what is
1035
        # `PART_SIZE` and `payload_part_size`.
1036
        # So for simplicity the expected values are hardcoded.
1037
        expected = (92, 184, 230, 276, 322, 368, 414, 460)
1038
        assert expected == positions
1039

1040
        # Check that the end of the stream is in the correct place
1041
        assert len(ZLIB_PAYLOAD) == end_of_stream
1042

1043
        # Check that all parts have expected length
1044
        expected_last_part_size = len(uncompressed_data) % PART_SIZE
1045
        whole_parts = len(uncompressed_data) // PART_SIZE
1046
        if expected_last_part_size == 0:
1047
            expected_lengths = [PART_SIZE] * whole_parts
1048
        else:
1049
            expected_lengths = [PART_SIZE] * whole_parts + [expected_last_part_size]
1050
        assert expected_lengths == [len(part) for part in parts]
1051

1052
    def test_deflate_streaming(self) -> None:
1053
        data = zlib.compress(b"foo")
1054

1055
        fp = BytesIO(data)
1056
        resp = HTTPResponse(
1057
            fp, headers={"content-encoding": "deflate"}, preload_content=False
1058
        )
1059
        stream = resp.stream(2)
1060

1061
        assert next(stream) == b"fo"
1062
        assert next(stream) == b"o"
1063
        with pytest.raises(StopIteration):
1064
            next(stream)
1065

1066
    def test_deflate2_streaming(self) -> None:
1067
        compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
1068
        data = compress.compress(b"foo")
1069
        data += compress.flush()
1070

1071
        fp = BytesIO(data)
1072
        resp = HTTPResponse(
1073
            fp, headers={"content-encoding": "deflate"}, preload_content=False
1074
        )
1075
        stream = resp.stream(2)
1076

1077
        assert next(stream) == b"fo"
1078
        assert next(stream) == b"o"
1079
        with pytest.raises(StopIteration):
1080
            next(stream)
1081

1082
    def test_empty_stream(self) -> None:
1083
        fp = BytesIO(b"")
1084
        resp = HTTPResponse(fp, preload_content=False)
1085
        stream = resp.stream(2, decode_content=False)
1086

1087
        with pytest.raises(StopIteration):
1088
            next(stream)
1089

1090
    @pytest.mark.parametrize(
1091
        "preload_content, amt, read_meth",
1092
        [
1093
            (True, None, "read"),
1094
            (False, None, "read"),
1095
            (False, 10 * 2**20, "read"),
1096
            (False, None, "read1"),
1097
            (False, 10 * 2**20, "read1"),
1098
        ],
1099
    )
1100
    @pytest.mark.limit_memory("25 MB", current_thread_only=True)
1101
    def test_buffer_memory_usage_decode_one_chunk(
1102
        self, preload_content: bool, amt: int, read_meth: str
1103
    ) -> None:
1104
        content_length = 10 * 2**20  # 10 MiB
1105
        fp = BytesIO(zlib.compress(bytes(content_length)))
1106
        resp = HTTPResponse(
1107
            fp,
1108
            preload_content=preload_content,
1109
            headers={"content-encoding": "deflate"},
1110
        )
1111
        data = resp.data if preload_content else getattr(resp, read_meth)(amt)
1112
        assert len(data) == content_length
1113

1114
    @pytest.mark.parametrize(
1115
        "preload_content, amt, read_meth",
1116
        [
1117
            (True, None, "read"),
1118
            (False, None, "read"),
1119
            (False, 10 * 2**20, "read"),
1120
            (False, None, "read1"),
1121
            (False, 10 * 2**20, "read1"),
1122
        ],
1123
    )
1124
    @pytest.mark.limit_memory("10.5 MB", current_thread_only=True)
1125
    def test_buffer_memory_usage_no_decoding(
1126
        self, preload_content: bool, amt: int, read_meth: str
1127
    ) -> None:
1128
        content_length = 10 * 2**20  # 10 MiB
1129
        fp = BytesIO(bytes(content_length))
1130
        resp = HTTPResponse(fp, preload_content=preload_content, decode_content=False)
1131
        data = resp.data if preload_content else getattr(resp, read_meth)(amt)
1132
        assert len(data) == content_length
1133

1134
    def test_length_no_header(self) -> None:
1135
        fp = BytesIO(b"12345")
1136
        resp = HTTPResponse(fp, preload_content=False)
1137
        assert resp.length_remaining is None
1138

1139
    def test_length_w_valid_header(self) -> None:
1140
        headers = {"content-length": "5"}
1141
        fp = BytesIO(b"12345")
1142

1143
        resp = HTTPResponse(fp, headers=headers, preload_content=False)
1144
        assert resp.length_remaining == 5
1145

1146
    def test_length_w_bad_header(self) -> None:
1147
        garbage = {"content-length": "foo"}
1148
        fp = BytesIO(b"12345")
1149

1150
        resp = HTTPResponse(fp, headers=garbage, preload_content=False)
1151
        assert resp.length_remaining is None
1152

1153
        garbage["content-length"] = "-10"
1154
        resp = HTTPResponse(fp, headers=garbage, preload_content=False)
1155
        assert resp.length_remaining is None
1156

1157
    def test_length_when_chunked(self) -> None:
1158
        # This is expressly forbidden in RFC 7230 sec 3.3.2
1159
        # We fall back to chunked in this case and try to
1160
        # handle response ignoring content length.
1161
        headers = {"content-length": "5", "transfer-encoding": "chunked"}
1162
        fp = BytesIO(b"12345")
1163

1164
        resp = HTTPResponse(fp, headers=headers, preload_content=False)
1165
        assert resp.length_remaining is None
1166

1167
    def test_length_with_multiple_content_lengths(self) -> None:
1168
        headers = {"content-length": "5, 5, 5"}
1169
        garbage = {"content-length": "5, 42"}
1170
        fp = BytesIO(b"abcde")
1171

1172
        resp = HTTPResponse(fp, headers=headers, preload_content=False)
1173
        assert resp.length_remaining == 5
1174

1175
        with pytest.raises(InvalidHeader):
1176
            HTTPResponse(fp, headers=garbage, preload_content=False)
1177

1178
    def test_length_after_read(self) -> None:
1179
        headers = {"content-length": "5"}
1180

1181
        # Test no defined length
1182
        fp = BytesIO(b"12345")
1183
        resp = HTTPResponse(fp, preload_content=False)
1184
        resp.read()
1185
        assert resp.length_remaining is None
1186

1187
        # Test our update from content-length
1188
        fp = BytesIO(b"12345")
1189
        resp = HTTPResponse(fp, headers=headers, preload_content=False)
1190
        resp.read()
1191
        assert resp.length_remaining == 0
1192

1193
        # Test partial read
1194
        fp = BytesIO(b"12345")
1195
        resp = HTTPResponse(fp, headers=headers, preload_content=False)
1196
        data = resp.stream(2)
1197
        next(data)
1198
        assert resp.length_remaining == 3
1199

1200
    def test_mock_httpresponse_stream(self) -> None:
1201
        # Mock out a HTTP Request that does enough to make it through urllib3's
1202
        # read() and close() calls, and also exhausts and underlying file
1203
        # object.
1204
        class MockHTTPRequest:
1205
            def __init__(self) -> None:
1206
                self.fp: BytesIO | None = None
1207

1208
            def read(self, amt: int) -> bytes:
1209
                assert self.fp is not None
1210
                data = self.fp.read(amt)
1211
                if not data:
1212
                    self.fp = None
1213

1214
                return data
1215

1216
            def read1(self, amt: int) -> bytes:
1217
                return self.read(1)
1218

1219
            def close(self) -> None:
1220
                self.fp = None
1221

1222
        bio = BytesIO(b"foo")
1223
        fp = MockHTTPRequest()
1224
        fp.fp = bio
1225
        resp = HTTPResponse(fp, preload_content=False)  # type: ignore[arg-type]
1226
        stream = resp.stream(2)
1227

1228
        assert next(stream) == b"fo"
1229
        assert next(stream) == b"o"
1230
        with pytest.raises(StopIteration):
1231
            next(stream)
1232

1233
    def test_mock_transfer_encoding_chunked(self) -> None:
1234
        stream = [b"fo", b"o", b"bar"]
1235
        fp = MockChunkedEncodingResponse(stream)
1236
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1237
        r.fp = fp  # type: ignore[assignment]
1238
        resp = HTTPResponse(
1239
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1240
        )
1241

1242
        for i, c in enumerate(resp.stream()):
1243
            assert c == stream[i]
1244

1245
    def test_mock_gzipped_transfer_encoding_chunked_decoded(self) -> None:
1246
        """Show that we can decode the gzipped and chunked body."""
1247

1248
        def stream() -> typing.Generator[bytes, None, None]:
1249
            # Set up a generator to chunk the gzipped body
1250
            compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
1251
            data = compress.compress(b"foobar")
1252
            data += compress.flush()
1253
            for i in range(0, len(data), 2):
1254
                yield data[i : i + 2]
1255

1256
        fp = MockChunkedEncodingResponse(list(stream()))
1257
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1258
        r.fp = fp  # type: ignore[assignment]
1259
        headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
1260
        resp = HTTPResponse(r, preload_content=False, headers=headers)
1261

1262
        data = b""
1263
        for c in resp.stream(decode_content=True):
1264
            data += c
1265

1266
        assert b"foobar" == data
1267

1268
    def test_mock_transfer_encoding_chunked_custom_read(self) -> None:
1269
        stream = [b"foooo", b"bbbbaaaaar"]
1270
        fp = MockChunkedEncodingResponse(stream)
1271
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1272
        r.fp = fp  # type: ignore[assignment]
1273
        r.chunked = True
1274
        r.chunk_left = None
1275
        resp = HTTPResponse(
1276
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1277
        )
1278
        expected_response = [b"fo", b"oo", b"o", b"bb", b"bb", b"aa", b"aa", b"ar"]
1279
        response = list(resp.read_chunked(2))
1280
        assert expected_response == response
1281

1282
    @pytest.mark.parametrize("read_chunked_args", ((), (None,), (-1,)))
1283
    def test_mock_transfer_encoding_chunked_unlmtd_read(
1284
        self, read_chunked_args: tuple[typing.Any, ...]
1285
    ) -> None:
1286
        stream = [b"foooo", b"bbbbaaaaar"]
1287
        fp = MockChunkedEncodingResponse(stream)
1288
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1289
        r.fp = fp  # type: ignore[assignment]
1290
        r.chunked = True
1291
        r.chunk_left = None
1292
        resp = HTTPResponse(
1293
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1294
        )
1295
        assert stream == list(resp.read_chunked(*read_chunked_args))
1296

1297
    def test_read_not_chunked_response_as_chunks(self) -> None:
1298
        fp = BytesIO(b"foo")
1299
        resp = HTTPResponse(fp, preload_content=False)
1300
        r = resp.read_chunked()
1301
        with pytest.raises(ResponseNotChunked):
1302
            next(r)
1303

1304
    def test_read_chunked_not_supported(self) -> None:
1305
        fp = BytesIO(b"foo")
1306
        resp = HTTPResponse(
1307
            fp, preload_content=False, headers={"transfer-encoding": "chunked"}
1308
        )
1309
        r = resp.read_chunked()
1310
        with pytest.raises(BodyNotHttplibCompatible):
1311
            next(r)
1312

1313
    def test_buggy_incomplete_read(self) -> None:
1314
        # Simulate buggy versions of Python (<2.7.4)
1315
        # See http://bugs.python.org/issue16298
1316
        content_length = 1337
1317
        fp = BytesIO(b"")
1318
        resp = HTTPResponse(
1319
            fp,
1320
            headers={"content-length": str(content_length)},
1321
            preload_content=False,
1322
            enforce_content_length=True,
1323
        )
1324
        with pytest.raises(ProtocolError) as ctx:
1325
            resp.read(3)
1326

1327
        orig_ex = ctx.value.args[1]
1328
        assert isinstance(orig_ex, IncompleteRead)
1329
        assert orig_ex.partial == 0
1330
        assert orig_ex.expected == content_length
1331

1332
    def test_incomplete_chunk(self) -> None:
1333
        stream = [b"foooo", b"bbbbaaaaar"]
1334
        fp = MockChunkedIncompleteRead(stream)
1335
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1336
        r.fp = fp  # type: ignore[assignment]
1337
        r.chunked = True
1338
        r.chunk_left = None
1339
        resp = HTTPResponse(
1340
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1341
        )
1342
        with pytest.raises(ProtocolError) as ctx:
1343
            next(resp.read_chunked())
1344

1345
        orig_ex = ctx.value.args[1]
1346
        assert isinstance(orig_ex, httplib_IncompleteRead)
1347

1348
    def test_invalid_chunk_length(self) -> None:
1349
        stream = [b"foooo", b"bbbbaaaaar"]
1350
        fp = MockChunkedInvalidChunkLength(stream)
1351
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1352
        r.fp = fp  # type: ignore[assignment]
1353
        r.chunked = True
1354
        r.chunk_left = None
1355
        resp = HTTPResponse(
1356
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1357
        )
1358
        with pytest.raises(ProtocolError) as ctx:
1359
            next(resp.read_chunked())
1360

1361
        orig_ex = ctx.value.args[1]
1362
        msg = (
1363
            "(\"Connection broken: InvalidChunkLength(got length b'ZZZ\\\\r\\\\n', 0 bytes read)\", "
1364
            "InvalidChunkLength(got length b'ZZZ\\r\\n', 0 bytes read))"
1365
        )
1366
        assert str(ctx.value) == msg
1367
        assert isinstance(orig_ex, InvalidChunkLength)
1368
        assert orig_ex.length == fp.BAD_LENGTH_LINE.encode()
1369

1370
    def test_truncated_before_chunk(self) -> None:
1371
        stream = [b"foooo", b"bbbbaaaaar"]
1372
        fp = MockChunkedNoChunks(stream)
1373
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1374
        r.fp = fp  # type: ignore[assignment]
1375
        r.chunked = True
1376
        r.chunk_left = None
1377
        resp = HTTPResponse(
1378
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1379
        )
1380
        with pytest.raises(ProtocolError) as ctx:
1381
            next(resp.read_chunked())
1382

1383
        assert str(ctx.value) == "Response ended prematurely"
1384

1385
    def test_chunked_response_without_crlf_on_end(self) -> None:
1386
        stream = [b"foo", b"bar", b"baz"]
1387
        fp = MockChunkedEncodingWithoutCRLFOnEnd(stream)
1388
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1389
        r.fp = fp  # type: ignore[assignment]
1390
        r.chunked = True
1391
        r.chunk_left = None
1392
        resp = HTTPResponse(
1393
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1394
        )
1395
        assert stream == list(resp.stream())
1396

1397
    def test_chunked_response_with_extensions(self) -> None:
1398
        stream = [b"foo", b"bar"]
1399
        fp = MockChunkedEncodingWithExtensions(stream)
1400
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1401
        r.fp = fp  # type: ignore[assignment]
1402
        r.chunked = True
1403
        r.chunk_left = None
1404
        resp = HTTPResponse(
1405
            r, preload_content=False, headers={"transfer-encoding": "chunked"}
1406
        )
1407
        assert stream == list(resp.stream())
1408

1409
    def test_chunked_head_response(self) -> None:
1410
        r = httplib.HTTPResponse(MockSock, method="HEAD")  # type: ignore[arg-type]
1411
        r.chunked = True
1412
        r.chunk_left = None
1413
        resp = HTTPResponse(
1414
            "",
1415
            preload_content=False,
1416
            headers={"transfer-encoding": "chunked"},
1417
            original_response=r,
1418
        )
1419
        assert resp.chunked is True
1420

1421
        setattr(resp, "supports_chunked_reads", lambda: True)
1422
        setattr(resp, "release_conn", mock.Mock())
1423
        for _ in resp.stream():
1424
            continue
1425
        resp.release_conn.assert_called_once_with()  # type: ignore[attr-defined]
1426

1427
    def test_get_case_insensitive_headers(self) -> None:
1428
        headers = {"host": "example.com"}
1429
        r = HTTPResponse(headers=headers)
1430
        assert r.headers.get("host") == "example.com"
1431
        assert r.headers.get("Host") == "example.com"
1432

1433
    def test_retries(self) -> None:
1434
        fp = BytesIO(b"")
1435
        resp = HTTPResponse(fp)
1436
        assert resp.retries is None
1437
        retry = Retry()
1438
        resp = HTTPResponse(fp, retries=retry)
1439
        assert resp.retries == retry
1440

1441
    def test_geturl(self) -> None:
1442
        fp = BytesIO(b"")
1443
        request_url = "https://example.com"
1444
        resp = HTTPResponse(fp, request_url=request_url)
1445
        assert resp.geturl() == request_url
1446

1447
    def test_url(self) -> None:
1448
        fp = BytesIO(b"")
1449
        request_url = "https://example.com"
1450
        resp = HTTPResponse(fp, request_url=request_url)
1451
        assert resp.url == request_url
1452
        resp.url = "https://anotherurl.com"
1453
        assert resp.url == "https://anotherurl.com"
1454

1455
    def test_geturl_retries(self) -> None:
1456
        fp = BytesIO(b"")
1457
        resp = HTTPResponse(fp, request_url="http://example.com")
1458
        request_histories = (
1459
            RequestHistory(
1460
                method="GET",
1461
                url="http://example.com",
1462
                error=None,
1463
                status=301,
1464
                redirect_location="https://example.com/",
1465
            ),
1466
            RequestHistory(
1467
                method="GET",
1468
                url="https://example.com/",
1469
                error=None,
1470
                status=301,
1471
                redirect_location="https://www.example.com",
1472
            ),
1473
        )
1474
        retry = Retry(history=request_histories)
1475
        resp = HTTPResponse(fp, retries=retry)
1476
        assert resp.geturl() == "https://www.example.com"
1477

1478
    @pytest.mark.parametrize(
1479
        ["payload", "expected_stream"],
1480
        [
1481
            (b"", []),
1482
            (b"\n", [b"\n"]),
1483
            (b"\n\n\n", [b"\n", b"\n", b"\n"]),
1484
            (b"abc\ndef", [b"abc\n", b"def"]),
1485
            (b"Hello\nworld\n\n\n!", [b"Hello\n", b"world\n", b"\n", b"\n", b"!"]),
1486
        ],
1487
    )
1488
    def test__iter__(self, payload: bytes, expected_stream: list[bytes]) -> None:
1489
        actual_stream = []
1490
        for chunk in HTTPResponse(BytesIO(payload), preload_content=False):
1491
            actual_stream.append(chunk)
1492

1493
        assert actual_stream == expected_stream
1494

1495
    def test__iter__decode_content(self) -> None:
1496
        def stream() -> typing.Generator[bytes, None, None]:
1497
            # Set up a generator to chunk the gzipped body
1498
            compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
1499
            data = compress.compress(b"foo\nbar")
1500
            data += compress.flush()
1501
            for i in range(0, len(data), 2):
1502
                yield data[i : i + 2]
1503

1504
        fp = MockChunkedEncodingResponse(list(stream()))
1505
        r = httplib.HTTPResponse(MockSock)  # type: ignore[arg-type]
1506
        r.fp = fp  # type: ignore[assignment]
1507
        headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
1508
        resp = HTTPResponse(r, preload_content=False, headers=headers)
1509

1510
        data = b""
1511
        for c in resp:
1512
            data += c
1513

1514
        assert b"foo\nbar" == data
1515

1516
    def test_non_timeout_ssl_error_on_read(self) -> None:
1517
        mac_error = ssl.SSLError(
1518
            "SSL routines", "ssl3_get_record", "decryption failed or bad record mac"
1519
        )
1520

1521
        @contextlib.contextmanager
1522
        def make_bad_mac_fp() -> typing.Generator[BytesIO, None, None]:
1523
            fp = BytesIO(b"")
1524
            with mock.patch.object(fp, "read") as fp_read:
1525
                # mac/decryption error
1526
                fp_read.side_effect = mac_error
1527
                yield fp
1528

1529
        with make_bad_mac_fp() as fp:
1530
            with pytest.raises(SSLError) as e:
1531
                HTTPResponse(fp)
1532
            assert e.value.args[0] == mac_error
1533

1534
        with make_bad_mac_fp() as fp:
1535
            resp = HTTPResponse(fp, preload_content=False)
1536
            with pytest.raises(SSLError) as e:
1537
                resp.read()
1538
            assert e.value.args[0] == mac_error
1539

1540
    def test_unexpected_body(self) -> None:
1541
        with pytest.raises(ProtocolError) as excinfo:
1542
            fp = BytesIO(b"12345")
1543
            headers = {"content-length": "5"}
1544
            resp = HTTPResponse(fp, status=204, headers=headers)
1545
            resp.read(16)
1546
        assert "Response may not contain content" in str(excinfo.value)
1547

1548
        with pytest.raises(ProtocolError):
1549
            fp = BytesIO(b"12345")
1550
            headers = {"content-length": "0"}
1551
            resp = HTTPResponse(fp, status=204, headers=headers)
1552
            resp.read(16)
1553
        assert "Response may not contain content" in str(excinfo.value)
1554

1555
        with pytest.raises(ProtocolError):
1556
            fp = BytesIO(b"12345")
1557
            resp = HTTPResponse(fp, status=204)
1558
            resp.read(16)
1559
        assert "Response may not contain content" in str(excinfo.value)
1560

1561

1562
class MockChunkedEncodingResponse:
1563
    def __init__(self, content: list[bytes]) -> None:
1564
        """
1565
        content: collection of str, each str is a chunk in response
1566
        """
1567
        self.content = content
1568
        self.index = 0  # This class iterates over self.content.
1569
        self.closed = False
1570
        self.cur_chunk = b""
1571
        self.chunks_exhausted = False
1572

1573
    def _encode_chunk(self, chunk: bytes) -> bytes:
1574
        # In the general case, we can't decode the chunk to unicode
1575
        length = f"{len(chunk):X}\r\n"
1576
        return length.encode() + chunk + b"\r\n"
1577

1578
    def _pop_new_chunk(self) -> bytes:
1579
        if self.chunks_exhausted:
1580
            return b""
1581
        try:
1582
            chunk = self.content[self.index]
1583
        except IndexError:
1584
            chunk = b""
1585
            self.chunks_exhausted = True
1586
        else:
1587
            self.index += 1
1588
        chunk = self._encode_chunk(chunk)
1589
        if not isinstance(chunk, bytes):
1590
            chunk = chunk.encode()
1591
        assert isinstance(chunk, bytes)
1592
        return chunk
1593

1594
    def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes:
1595
        if amt > 0 and till_crlf:
1596
            raise ValueError("Can't specify amt and till_crlf.")
1597
        if len(self.cur_chunk) <= 0:
1598
            self.cur_chunk = self._pop_new_chunk()
1599
        if till_crlf:
1600
            try:
1601
                i = self.cur_chunk.index(b"\r\n")
1602
            except ValueError:
1603
                # No CRLF in current chunk -- probably caused by encoder.
1604
                self.cur_chunk = b""
1605
                return b""
1606
            else:
1607
                chunk_part = self.cur_chunk[: i + 2]
1608
                self.cur_chunk = self.cur_chunk[i + 2 :]
1609
                return chunk_part
1610
        elif amt <= -1:
1611
            chunk_part = self.cur_chunk
1612
            self.cur_chunk = b""
1613
            return chunk_part
1614
        else:
1615
            try:
1616
                chunk_part = self.cur_chunk[:amt]
1617
            except IndexError:
1618
                chunk_part = self.cur_chunk
1619
                self.cur_chunk = b""
1620
            else:
1621
                self.cur_chunk = self.cur_chunk[amt:]
1622
            return chunk_part
1623

1624
    def readline(self) -> bytes:
1625
        return self.pop_current_chunk(till_crlf=True)
1626

1627
    def read(self, amt: int = -1) -> bytes:
1628
        return self.pop_current_chunk(amt)
1629

1630
    def read1(self, amt: int = -1) -> bytes:
1631
        return self.pop_current_chunk(amt)
1632

1633
    def flush(self) -> None:
1634
        # Python 3 wants this method.
1635
        pass
1636

1637
    def close(self) -> None:
1638
        self.closed = True
1639

1640

1641
class MockChunkedIncompleteRead(MockChunkedEncodingResponse):
1642
    def _encode_chunk(self, chunk: bytes) -> bytes:
1643
        return f"9999\r\n{chunk.decode()}\r\n".encode()
1644

1645

1646
class MockChunkedInvalidChunkLength(MockChunkedEncodingResponse):
1647
    BAD_LENGTH_LINE = "ZZZ\r\n"
1648

1649
    def _encode_chunk(self, chunk: bytes) -> bytes:
1650
        return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n".encode()
1651

1652

1653
class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse):
1654
    def _encode_chunk(self, chunk: bytes) -> bytes:
1655
        return "{:X}\r\n{}{}".format(
1656
            len(chunk),
1657
            chunk.decode(),
1658
            "\r\n" if len(chunk) > 0 else "",
1659
        ).encode()
1660

1661

1662
class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse):
1663
    def _encode_chunk(self, chunk: bytes) -> bytes:
1664
        return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n".encode()
1665

1666

1667
class MockChunkedNoChunks(MockChunkedEncodingResponse):
1668
    def _encode_chunk(self, chunk: bytes) -> bytes:
1669
        return b""
1670

1671

1672
class MockSock:
1673
    @classmethod
1674
    def makefile(cls, *args: typing.Any, **kwargs: typing.Any) -> None:
1675
        return
1676

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.