1
from __future__ import annotations
4
import http.client as httplib
9
from base64 import b64decode
10
from http.client import IncompleteRead as httplib_IncompleteRead
11
from io import BufferedReader, BytesIO, TextIOWrapper
12
from test import onlyBrotli, onlyZstd
13
from unittest import mock
17
from urllib3 import HTTPHeaderDict
18
from urllib3.exceptions import (
19
BodyNotHttplibCompatible,
28
from urllib3.response import ( # type: ignore[attr-defined]
34
from urllib3.util.response import is_fp_closed
35
from urllib3.util.retry import RequestHistory, Retry
38
class TestBytesQueueBuffer:
39
def test_single_chunk(self) -> None:
40
buffer = BytesQueueBuffer()
41
assert len(buffer) == 0
42
with pytest.raises(RuntimeError, match="buffer is empty"):
45
assert buffer.get(0) == b""
48
with pytest.raises(ValueError, match="n should be > 0"):
51
assert buffer.get(1) == b"f"
52
assert buffer.get(2) == b"oo"
53
with pytest.raises(RuntimeError, match="buffer is empty"):
56
def test_read_too_much(self) -> None:
57
buffer = BytesQueueBuffer()
59
assert buffer.get(100) == b"foo"
61
def test_multiple_chunks(self) -> None:
62
buffer = BytesQueueBuffer()
66
assert len(buffer) == 9
68
assert buffer.get(1) == b"f"
69
assert len(buffer) == 8
70
assert buffer.get(4) == b"ooba"
71
assert len(buffer) == 4
72
assert buffer.get(4) == b"rbaz"
73
assert len(buffer) == 0
75
def test_get_all_empty(self) -> None:
76
q = BytesQueueBuffer()
77
assert q.get_all() == b""
80
def test_get_all_single(self) -> None:
81
q = BytesQueueBuffer()
83
assert q.get_all() == b"a"
86
def test_get_all_many(self) -> None:
87
q = BytesQueueBuffer()
91
assert q.get_all() == b"abc"
94
@pytest.mark.parametrize(
96
(lambda b: b.get(len(b)), lambda b: b.get_all()),
97
ids=("get", "get_all"),
99
@pytest.mark.limit_memory(
100
"12.5 MB", current_thread_only=True
101
) # assert that we're not doubling memory usagelimit_mem
102
def test_memory_usage(
103
self, get_func: typing.Callable[[BytesQueueBuffer], str]
105
# Allocate 10 1MiB chunks
106
buffer = BytesQueueBuffer()
108
# This allocates 2MiB, putting the max at around 12MiB. Not sure why.
109
buffer.put(bytes(2**20))
111
assert len(get_func(buffer)) == 10 * 2**20
113
@pytest.mark.limit_memory("10.01 MB", current_thread_only=True)
114
def test_get_all_memory_usage_single_chunk(self) -> None:
115
buffer = BytesQueueBuffer()
116
chunk = bytes(10 * 2**20) # 10 MiB
118
assert buffer.get_all() is chunk
121
# A known random (i.e, not-too-compressible) payload generated with:
122
# "".join(random.choice(string.printable) for i in range(512))
123
# .encode("zlib").encode("base64")
124
# Randomness in tests == bad, and fixing a seed may not be sufficient.
125
ZLIB_PAYLOAD = b64decode(
127
eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+
128
lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi
129
pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk
130
nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ
131
Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn
132
a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ
133
auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW
134
S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV
140
def sock() -> typing.Generator[socket.socket, None, None]:
146
class TestLegacyResponse:
147
def test_getheaders(self) -> None:
148
headers = {"host": "example.com"}
149
r = HTTPResponse(headers=headers)
152
match=r"HTTPResponse.getheaders\(\) is deprecated",
154
assert r.getheaders() == HTTPHeaderDict(headers)
156
def test_getheader(self) -> None:
157
headers = {"host": "example.com"}
158
r = HTTPResponse(headers=headers)
161
match=r"HTTPResponse.getheader\(\) is deprecated",
163
assert r.getheader("host") == "example.com"
167
def test_cache_content(self) -> None:
168
r = HTTPResponse(b"foo")
169
assert r._body == b"foo"
170
assert r.data == b"foo"
171
assert r._body == b"foo"
173
def test_cache_content_preload_false(self) -> None:
175
r = HTTPResponse(fp, preload_content=False)
178
assert r.data == b"foo"
179
assert r._body == b"foo"
180
assert r.data == b"foo"
182
def test_default(self) -> None:
184
assert r.data is None
186
def test_none(self) -> None:
187
r = HTTPResponse(None) # type: ignore[arg-type]
188
assert r.data is None
190
def test_preload(self) -> None:
193
r = HTTPResponse(fp, preload_content=True)
195
assert fp.tell() == len(b"foo")
196
assert r.data == b"foo"
198
def test_no_preload(self) -> None:
201
r = HTTPResponse(fp, preload_content=False)
203
assert fp.tell() == 0
204
assert r.data == b"foo"
205
assert fp.tell() == len(b"foo")
207
def test_decode_bad_data(self) -> None:
208
fp = BytesIO(b"\x00" * 10)
209
with pytest.raises(DecodeError):
210
HTTPResponse(fp, headers={"content-encoding": "deflate"})
212
def test_reference_read(self) -> None:
214
r = HTTPResponse(fp, preload_content=False)
216
assert r.read(0) == b""
217
assert r.read(1) == b"f"
218
assert r.read(2) == b"oo"
219
assert r.read() == b""
220
assert r.read() == b""
222
@pytest.mark.parametrize("read_args", ((), (None,), (-1,)))
223
def test_reference_read_until_eof(self, read_args: tuple[typing.Any, ...]) -> None:
225
r = HTTPResponse(fp, preload_content=False)
226
assert r.read(*read_args) == b"foo"
228
def test_reference_read1(self) -> None:
229
fp = BytesIO(b"foobar")
230
r = HTTPResponse(fp, preload_content=False)
232
assert r.read1(0) == b""
233
assert r.read1(1) == b"f"
234
assert r.read1(2) == b"oo"
235
assert r.read1() == b"bar"
236
assert r.read1() == b""
238
@pytest.mark.parametrize("read1_args", ((), (None,), (-1,)))
239
def test_reference_read1_without_limit(
240
self, read1_args: tuple[typing.Any, ...]
243
r = HTTPResponse(fp, preload_content=False)
244
assert r.read1(*read1_args) == b"foo"
246
def test_reference_read1_nodecode(self) -> None:
247
fp = BytesIO(b"foobar")
248
r = HTTPResponse(fp, preload_content=False, decode_content=False)
250
assert r.read1(0) == b""
251
assert r.read1(1) == b"f"
252
assert r.read1(2) == b"oo"
253
assert r.read1() == b"bar"
254
assert r.read1() == b""
256
def test_decoding_read1(self) -> None:
257
data = zlib.compress(b"foobar")
261
fp, headers={"content-encoding": "deflate"}, preload_content=False
264
assert r.read1(1) == b"f"
265
assert r.read1(2) == b"oo"
266
assert r.read1() == b"bar"
267
assert r.read1() == b""
269
def test_decode_deflate(self) -> None:
270
data = zlib.compress(b"foo")
273
r = HTTPResponse(fp, headers={"content-encoding": "deflate"})
275
assert r.data == b"foo"
277
def test_decode_deflate_case_insensitve(self) -> None:
278
data = zlib.compress(b"foo")
281
r = HTTPResponse(fp, headers={"content-encoding": "DeFlAtE"})
283
assert r.data == b"foo"
285
def test_chunked_decoding_deflate(self) -> None:
286
data = zlib.compress(b"foo")
290
fp, headers={"content-encoding": "deflate"}, preload_content=False
293
assert r.read(1) == b"f"
294
assert r.read(2) == b"oo"
295
assert r.read() == b""
296
assert r.read() == b""
298
def test_chunked_decoding_deflate2(self) -> None:
299
compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
300
data = compress.compress(b"foo")
301
data += compress.flush()
305
fp, headers={"content-encoding": "deflate"}, preload_content=False
308
assert r.read(1) == b"f"
309
assert r.read(2) == b"oo"
310
assert r.read() == b""
311
assert r.read() == b""
313
@pytest.mark.parametrize("content_encoding", ["gzip", "x-gzip"])
314
def test_chunked_decoding_gzip(self, content_encoding: str) -> None:
315
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
316
data = compress.compress(b"foo")
317
data += compress.flush()
321
fp, headers={"content-encoding": content_encoding}, preload_content=False
324
assert r.read(1) == b"f"
325
assert r.read(2) == b"oo"
326
assert r.read() == b""
327
assert r.read() == b""
329
def test_decode_gzip_multi_member(self) -> None:
330
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
331
data = compress.compress(b"foo")
332
data += compress.flush()
336
r = HTTPResponse(fp, headers={"content-encoding": "gzip"})
338
assert r.data == b"foofoofoo"
340
def test_decode_gzip_error(self) -> None:
342
with pytest.raises(DecodeError):
343
HTTPResponse(fp, headers={"content-encoding": "gzip"})
345
def test_decode_gzip_swallow_garbage(self) -> None:
346
# When data comes from multiple calls to read(), data after
347
# the first zlib error (here triggered by garbage) should be
349
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
350
data = compress.compress(b"foo")
351
data += compress.flush()
352
data = data * 3 + b"foo"
356
fp, headers={"content-encoding": "gzip"}, preload_content=False
364
assert ret == b"foofoofoo"
366
def test_chunked_decoding_gzip_swallow_garbage(self) -> None:
367
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
368
data = compress.compress(b"foo")
369
data += compress.flush()
370
data = data * 3 + b"foo"
373
r = HTTPResponse(fp, headers={"content-encoding": "gzip"})
375
assert r.data == b"foofoofoo"
378
def test_decode_brotli(self) -> None:
379
data = brotli.compress(b"foo")
382
r = HTTPResponse(fp, headers={"content-encoding": "br"})
383
assert r.data == b"foo"
386
def test_chunked_decoding_brotli(self) -> None:
387
data = brotli.compress(b"foobarbaz")
390
r = HTTPResponse(fp, headers={"content-encoding": "br"}, preload_content=False)
397
assert ret == b"foobarbaz"
400
def test_decode_brotli_error(self) -> None:
402
with pytest.raises(DecodeError):
403
HTTPResponse(fp, headers={"content-encoding": "br"})
406
def test_decode_zstd(self) -> None:
407
import zstandard as zstd
409
data = zstd.compress(b"foo")
412
r = HTTPResponse(fp, headers={"content-encoding": "zstd"})
413
assert r.data == b"foo"
416
def test_decode_multiframe_zstd(self) -> None:
417
import zstandard as zstd
421
zstd.compress(b"foo")
422
# skippable frame (must be ignored)
424
"50 2A 4D 18" # Magic_Number (little-endian)
425
"07 00 00 00" # Frame_Size (little-endian)
426
"00 00 00 00 00 00 00" # User_Data
429
+ zstd.compress(b"bar")
433
r = HTTPResponse(fp, headers={"content-encoding": "zstd"})
434
assert r.data == b"foobar"
437
def test_chunked_decoding_zstd(self) -> None:
438
import zstandard as zstd
440
data = zstd.compress(b"foobarbaz")
444
fp, headers={"content-encoding": "zstd"}, preload_content=False
453
assert ret == b"foobarbaz"
461
@pytest.mark.parametrize("data", decode_param_set)
462
def test_decode_zstd_error(self, data: bytes) -> None:
465
with pytest.raises(DecodeError):
466
HTTPResponse(fp, headers={"content-encoding": "zstd"})
469
@pytest.mark.parametrize("data", decode_param_set)
470
def test_decode_zstd_incomplete_preload_content(self, data: bytes) -> None:
471
import zstandard as zstd
473
data = zstd.compress(data)
474
fp = BytesIO(data[:-1])
476
with pytest.raises(DecodeError):
477
HTTPResponse(fp, headers={"content-encoding": "zstd"})
480
@pytest.mark.parametrize("data", decode_param_set)
481
def test_decode_zstd_incomplete_read(self, data: bytes) -> None:
482
import zstandard as zstd
484
data = zstd.compress(data)
485
fp = BytesIO(data[:-1]) # shorten the data to trigger DecodeError
487
# create response object without(!) reading/decoding the content
489
fp, headers={"content-encoding": "zstd"}, preload_content=False
492
# read/decode, expecting DecodeError
493
with pytest.raises(DecodeError):
494
r.read(decode_content=True)
497
@pytest.mark.parametrize("data", decode_param_set)
498
def test_decode_zstd_incomplete_read1(self, data: bytes) -> None:
499
import zstandard as zstd
501
data = zstd.compress(data)
502
fp = BytesIO(data[:-1])
505
fp, headers={"content-encoding": "zstd"}, preload_content=False
508
# read/decode via read1(!), expecting DecodeError
509
with pytest.raises(DecodeError):
511
# loop, as read1() may return just partial data
512
while amt_decoded < len(data):
513
part = r.read1(decode_content=True)
514
amt_decoded += len(part)
517
@pytest.mark.parametrize("data", decode_param_set)
518
def test_decode_zstd_read1(self, data: bytes) -> None:
519
import zstandard as zstd
521
encoded_data = zstd.compress(data)
522
fp = BytesIO(encoded_data)
525
fp, headers={"content-encoding": "zstd"}, preload_content=False
530
# loop, as read1() may return just partial data
531
while amt_decoded < len(data):
532
part = r.read1(decode_content=True)
533
amt_decoded += len(part)
535
assert decoded_data == data
537
def test_multi_decoding_deflate_deflate(self) -> None:
538
data = zlib.compress(zlib.compress(b"foo"))
541
r = HTTPResponse(fp, headers={"content-encoding": "deflate, deflate"})
543
assert r.data == b"foo"
545
def test_multi_decoding_deflate_gzip(self) -> None:
546
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
547
data = compress.compress(zlib.compress(b"foo"))
548
data += compress.flush()
551
r = HTTPResponse(fp, headers={"content-encoding": "deflate, gzip"})
553
assert r.data == b"foo"
555
def test_multi_decoding_gzip_gzip(self) -> None:
556
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
557
data = compress.compress(b"foo")
558
data += compress.flush()
560
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
561
data = compress.compress(data)
562
data += compress.flush()
565
r = HTTPResponse(fp, headers={"content-encoding": "gzip, gzip"})
567
assert r.data == b"foo"
569
def test_read_multi_decoding_deflate_deflate(self) -> None:
570
msg = b"foobarbaz" * 42
571
data = zlib.compress(zlib.compress(msg))
575
fp, headers={"content-encoding": "deflate, deflate"}, preload_content=False
578
assert r.read(3) == b"foo"
579
assert r.read(3) == b"bar"
580
assert r.read(3) == b"baz"
581
assert r.read(9) == b"foobarbaz"
582
assert r.read(9 * 3) == b"foobarbaz" * 3
583
assert r.read(9 * 37) == b"foobarbaz" * 37
584
assert r.read() == b""
586
def test_body_blob(self) -> None:
587
resp = HTTPResponse(b"foo")
588
assert resp.data == b"foo"
591
@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning")
592
def test_base_io(self) -> None:
593
resp = BaseHTTPResponse(
596
version_string="HTTP/1.1",
598
decode_content=False,
602
assert not resp.closed
603
assert not resp.readable()
604
assert not resp.writable()
606
with pytest.raises(NotImplementedError):
608
with pytest.raises(NotImplementedError):
611
def test_io(self, sock: socket.socket) -> None:
613
resp = HTTPResponse(fp, preload_content=False)
615
assert not resp.closed
616
assert resp.readable()
617
assert not resp.writable()
618
with pytest.raises(IOError):
624
# Try closing with an `httplib.HTTPResponse`, because it has an
627
hlr = httplib.HTTPResponse(sock)
628
resp2 = HTTPResponse(hlr, preload_content=False)
629
assert not resp2.closed
635
# also try when only data is present.
636
resp3 = HTTPResponse("foodata")
637
with pytest.raises(IOError):
641
# A corner case where _fp is present but doesn't have `closed`,
642
# `isclosed`, or `fileno`. Unlikely, but possible.
644
with pytest.raises(IOError):
647
def test_io_closed_consistently_by_read(self, sock: socket.socket) -> None:
649
hlr = httplib.HTTPResponse(sock)
650
hlr.fp = BytesIO(b"foo") # type: ignore[assignment]
651
hlr.chunked = 0 # type: ignore[assignment]
653
with HTTPResponse(hlr, preload_content=False) as resp:
654
assert not resp.closed
655
assert resp._fp is not None
656
assert not resp._fp.isclosed()
657
assert not is_fp_closed(resp._fp)
658
assert not resp.isclosed()
661
assert resp._fp.isclosed()
662
assert is_fp_closed(resp._fp)
663
assert resp.isclosed()
667
@pytest.mark.parametrize("read_amt", (None, 3))
668
@pytest.mark.parametrize("length_known", (True, False))
669
def test_io_closed_consistently_by_read1(
670
self, sock: socket.socket, length_known: bool, read_amt: int | None
672
with httplib.HTTPResponse(sock) as hlr:
673
hlr.fp = BytesIO(b"foo") # type: ignore[assignment]
674
hlr.chunked = 0 # type: ignore[assignment]
675
hlr.length = 3 if length_known else None
676
with HTTPResponse(hlr, preload_content=False) as resp:
678
resp.length_remaining = 3
679
assert not resp.closed
680
assert resp._fp is not None
681
assert not resp._fp.isclosed()
682
assert not is_fp_closed(resp._fp)
683
assert not resp.isclosed()
685
# If content length is unknown, IO is not closed until
686
# the next read returning zero bytes.
688
assert not resp.closed
689
assert resp._fp is not None
690
assert not resp._fp.isclosed()
691
assert not is_fp_closed(resp._fp)
692
assert not resp.isclosed()
695
assert resp._fp.isclosed()
696
assert is_fp_closed(resp._fp)
697
assert resp.isclosed()
699
@pytest.mark.parametrize("length_known", (True, False))
700
def test_io_not_closed_until_all_data_is_read(
701
self, sock: socket.socket, length_known: bool
703
with httplib.HTTPResponse(sock) as hlr:
704
hlr.fp = BytesIO(b"foo") # type: ignore[assignment]
705
hlr.chunked = 0 # type: ignore[assignment]
707
hlr.length = length_remaining if length_known else None
708
with HTTPResponse(hlr, preload_content=False) as resp:
710
resp.length_remaining = length_remaining
711
while length_remaining:
712
assert not resp.closed
713
assert resp._fp is not None
714
assert not resp._fp.isclosed()
715
assert not is_fp_closed(resp._fp)
716
assert not resp.isclosed()
718
assert len(data) == 1
719
length_remaining -= 1
720
# If content length is unknown, IO is not closed until
721
# the next read returning zero bytes.
723
assert not resp.closed
724
assert resp._fp is not None
725
assert not resp._fp.isclosed()
726
assert not is_fp_closed(resp._fp)
727
assert not resp.isclosed()
729
assert len(data) == 0
731
assert resp._fp.isclosed() # type: ignore[union-attr]
732
assert is_fp_closed(resp._fp)
733
assert resp.isclosed()
735
@pytest.mark.parametrize("length_known", (True, False))
736
def test_io_not_closed_after_requesting_0_bytes(
737
self, sock: socket.socket, length_known: bool
739
with httplib.HTTPResponse(sock) as hlr:
740
hlr.fp = BytesIO(b"foo") # type: ignore[assignment]
741
hlr.chunked = 0 # type: ignore[assignment]
743
hlr.length = length_remaining if length_known else None
744
with HTTPResponse(hlr, preload_content=False) as resp:
746
resp.length_remaining = length_remaining
747
assert not resp.closed
748
assert resp._fp is not None
749
assert not resp._fp.isclosed()
750
assert not is_fp_closed(resp._fp)
751
assert not resp.isclosed()
754
assert not resp.closed
755
assert resp._fp is not None
756
assert not resp._fp.isclosed()
757
assert not is_fp_closed(resp._fp)
758
assert not resp.isclosed()
760
def test_io_bufferedreader(self) -> None:
762
resp = HTTPResponse(fp, preload_content=False)
763
br = BufferedReader(resp) # type: ignore[arg-type]
765
assert br.read() == b"foo"
770
# HTTPResponse.read() by default closes the response
771
# https://github.com/urllib3/urllib3/issues/1305
772
fp = BytesIO(b"hello\nworld")
773
resp = HTTPResponse(fp, preload_content=False)
774
with pytest.raises(ValueError, match="readline of closed file"):
775
list(BufferedReader(resp)) # type: ignore[arg-type]
779
resp = HTTPResponse(fp, preload_content=False)
780
br = BufferedReader(resp, 5) # type: ignore[arg-type]
782
br.read(1) # sets up the buffer, reading 5
783
assert len(fp.read()) == (len(b) - 5)
785
# This is necessary to make sure the "no bytes left" part of `readinto`
790
def test_io_not_autoclose_bufferedreader(self) -> None:
791
fp = BytesIO(b"hello\nworld")
792
resp = HTTPResponse(fp, preload_content=False, auto_close=False)
793
reader = BufferedReader(resp) # type: ignore[arg-type]
794
assert list(reader) == [b"hello\n", b"world"]
796
assert not reader.closed
797
assert not resp.closed
798
with pytest.raises(StopIteration):
804
with pytest.raises(ValueError, match="readline of closed file"):
807
def test_io_textiowrapper(self) -> None:
808
fp = BytesIO(b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f")
809
resp = HTTPResponse(fp, preload_content=False)
810
br = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type]
812
assert br.read() == "äöüß"
817
# HTTPResponse.read() by default closes the response
818
# https://github.com/urllib3/urllib3/issues/1305
820
b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4"
822
resp = HTTPResponse(fp, preload_content=False)
823
with pytest.raises(ValueError, match="I/O operation on closed file.?"):
824
list(TextIOWrapper(resp)) # type: ignore[arg-type]
826
def test_io_not_autoclose_textiowrapper(self) -> None:
828
b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4"
830
resp = HTTPResponse(fp, preload_content=False, auto_close=False)
831
reader = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type]
832
assert list(reader) == ["äöüß\n", "αβγδ"]
834
assert not reader.closed
835
assert not resp.closed
836
with pytest.raises(StopIteration):
842
with pytest.raises(ValueError, match="I/O operation on closed file.?"):
845
def test_read_with_illegal_mix_decode_toggle(self) -> None:
846
data = zlib.compress(b"foo")
851
fp, headers={"content-encoding": "deflate"}, preload_content=False
854
assert resp.read(1) == b"f"
859
r"Calling read\(decode_content=False\) is not supported after "
860
r"read\(decode_content=True\) was called"
863
resp.read(1, decode_content=False)
868
r"Calling read\(decode_content=False\) is not supported after "
869
r"read\(decode_content=True\) was called"
872
resp.read(decode_content=False)
874
def test_read1_with_illegal_mix_decode_toggle(self) -> None:
875
data = zlib.compress(b"foo")
880
fp, headers={"content-encoding": "deflate"}, preload_content=False
883
assert resp.read1(1) == b"f"
888
r"Calling read1\(decode_content=False\) is not supported after "
889
r"read1\(decode_content=True\) was called"
892
resp.read1(1, decode_content=False)
897
r"Calling read1\(decode_content=False\) is not supported after "
898
r"read1\(decode_content=True\) was called"
901
resp.read1(decode_content=False)
903
def test_read_with_mix_decode_toggle(self) -> None:
904
data = zlib.compress(b"foo")
909
fp, headers={"content-encoding": "deflate"}, preload_content=False
911
assert resp.read(2, decode_content=False) is not None
912
assert resp.read(1, decode_content=True) == b"f"
914
def test_streaming(self) -> None:
916
resp = HTTPResponse(fp, preload_content=False)
917
stream = resp.stream(2, decode_content=False)
919
assert next(stream) == b"fo"
920
assert next(stream) == b"o"
921
with pytest.raises(StopIteration):
924
def test_streaming_tell(self) -> None:
926
resp = HTTPResponse(fp, preload_content=False)
927
stream = resp.stream(2, decode_content=False)
931
position += len(next(stream))
933
assert position == resp.tell()
935
position += len(next(stream))
937
assert position == resp.tell()
939
with pytest.raises(StopIteration):
942
def test_gzipped_streaming(self) -> None:
943
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
944
data = compress.compress(b"foo")
945
data += compress.flush()
949
fp, headers={"content-encoding": "gzip"}, preload_content=False
951
stream = resp.stream(2)
953
assert next(stream) == b"fo"
954
assert next(stream) == b"o"
955
with pytest.raises(StopIteration):
958
def test_gzipped_streaming_tell(self) -> None:
959
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
960
uncompressed_data = b"foo"
961
data = compress.compress(uncompressed_data)
962
data += compress.flush()
966
fp, headers={"content-encoding": "gzip"}, preload_content=False
968
stream = resp.stream()
971
payload = next(stream)
972
assert payload == uncompressed_data
974
assert len(data) == resp.tell()
976
with pytest.raises(StopIteration):
979
def test_deflate_streaming_tell_intermediate_point(self) -> None:
980
# Ensure that ``tell()`` returns the correct number of bytes when
981
# part-way through streaming compressed content.
985
class MockCompressedDataReading(BytesIO):
987
A BytesIO-like reader returning ``payload`` in ``NUMBER_OF_READS``
991
def __init__(self, payload: bytes, payload_part_size: int) -> None:
993
payload[i * payload_part_size : (i + 1) * payload_part_size]
994
for i in range(NUMBER_OF_READS + 1)
997
assert b"".join(self.payloads) == payload
999
def read(self, _: int) -> bytes: # type: ignore[override]
1001
if len(self.payloads) > 0:
1002
return self.payloads.pop(0)
1005
def read1(self, amt: int) -> bytes: # type: ignore[override]
1006
return self.read(amt)
1008
uncompressed_data = zlib.decompress(ZLIB_PAYLOAD)
1010
payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS
1011
fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size)
1012
resp = HTTPResponse(
1013
fp, headers={"content-encoding": "deflate"}, preload_content=False
1015
stream = resp.stream(PART_SIZE)
1017
parts_positions = [(part, resp.tell()) for part in stream]
1018
end_of_stream = resp.tell()
1020
with pytest.raises(StopIteration):
1023
parts, positions = zip(*parts_positions)
1025
# Check that the payload is equal to the uncompressed data
1026
payload = b"".join(parts)
1027
assert uncompressed_data == payload
1029
# Check that the positions in the stream are correct
1030
# It is difficult to determine programmatically what the positions
1031
# returned by `tell` will be because the `HTTPResponse.read` method may
1032
# call socket `read` a couple of times if it doesn't have enough data
1033
# in the buffer or not call socket `read` at all if it has enough. All
1034
# this depends on the message, how it was compressed, what is
1035
# `PART_SIZE` and `payload_part_size`.
1036
# So for simplicity the expected values are hardcoded.
1037
expected = (92, 184, 230, 276, 322, 368, 414, 460)
1038
assert expected == positions
1040
# Check that the end of the stream is in the correct place
1041
assert len(ZLIB_PAYLOAD) == end_of_stream
1043
# Check that all parts have expected length
1044
expected_last_part_size = len(uncompressed_data) % PART_SIZE
1045
whole_parts = len(uncompressed_data) // PART_SIZE
1046
if expected_last_part_size == 0:
1047
expected_lengths = [PART_SIZE] * whole_parts
1049
expected_lengths = [PART_SIZE] * whole_parts + [expected_last_part_size]
1050
assert expected_lengths == [len(part) for part in parts]
1052
def test_deflate_streaming(self) -> None:
1053
data = zlib.compress(b"foo")
1056
resp = HTTPResponse(
1057
fp, headers={"content-encoding": "deflate"}, preload_content=False
1059
stream = resp.stream(2)
1061
assert next(stream) == b"fo"
1062
assert next(stream) == b"o"
1063
with pytest.raises(StopIteration):
1066
def test_deflate2_streaming(self) -> None:
1067
compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
1068
data = compress.compress(b"foo")
1069
data += compress.flush()
1072
resp = HTTPResponse(
1073
fp, headers={"content-encoding": "deflate"}, preload_content=False
1075
stream = resp.stream(2)
1077
assert next(stream) == b"fo"
1078
assert next(stream) == b"o"
1079
with pytest.raises(StopIteration):
1082
def test_empty_stream(self) -> None:
1084
resp = HTTPResponse(fp, preload_content=False)
1085
stream = resp.stream(2, decode_content=False)
1087
with pytest.raises(StopIteration):
1090
@pytest.mark.parametrize(
1091
"preload_content, amt, read_meth",
1093
(True, None, "read"),
1094
(False, None, "read"),
1095
(False, 10 * 2**20, "read"),
1096
(False, None, "read1"),
1097
(False, 10 * 2**20, "read1"),
1100
@pytest.mark.limit_memory("25 MB", current_thread_only=True)
1101
def test_buffer_memory_usage_decode_one_chunk(
1102
self, preload_content: bool, amt: int, read_meth: str
1104
content_length = 10 * 2**20 # 10 MiB
1105
fp = BytesIO(zlib.compress(bytes(content_length)))
1106
resp = HTTPResponse(
1108
preload_content=preload_content,
1109
headers={"content-encoding": "deflate"},
1111
data = resp.data if preload_content else getattr(resp, read_meth)(amt)
1112
assert len(data) == content_length
1114
@pytest.mark.parametrize(
1115
"preload_content, amt, read_meth",
1117
(True, None, "read"),
1118
(False, None, "read"),
1119
(False, 10 * 2**20, "read"),
1120
(False, None, "read1"),
1121
(False, 10 * 2**20, "read1"),
1124
@pytest.mark.limit_memory("10.5 MB", current_thread_only=True)
1125
def test_buffer_memory_usage_no_decoding(
1126
self, preload_content: bool, amt: int, read_meth: str
1128
content_length = 10 * 2**20 # 10 MiB
1129
fp = BytesIO(bytes(content_length))
1130
resp = HTTPResponse(fp, preload_content=preload_content, decode_content=False)
1131
data = resp.data if preload_content else getattr(resp, read_meth)(amt)
1132
assert len(data) == content_length
1134
def test_length_no_header(self) -> None:
1135
fp = BytesIO(b"12345")
1136
resp = HTTPResponse(fp, preload_content=False)
1137
assert resp.length_remaining is None
1139
def test_length_w_valid_header(self) -> None:
1140
headers = {"content-length": "5"}
1141
fp = BytesIO(b"12345")
1143
resp = HTTPResponse(fp, headers=headers, preload_content=False)
1144
assert resp.length_remaining == 5
1146
def test_length_w_bad_header(self) -> None:
1147
garbage = {"content-length": "foo"}
1148
fp = BytesIO(b"12345")
1150
resp = HTTPResponse(fp, headers=garbage, preload_content=False)
1151
assert resp.length_remaining is None
1153
garbage["content-length"] = "-10"
1154
resp = HTTPResponse(fp, headers=garbage, preload_content=False)
1155
assert resp.length_remaining is None
1157
def test_length_when_chunked(self) -> None:
1158
# This is expressly forbidden in RFC 7230 sec 3.3.2
1159
# We fall back to chunked in this case and try to
1160
# handle response ignoring content length.
1161
headers = {"content-length": "5", "transfer-encoding": "chunked"}
1162
fp = BytesIO(b"12345")
1164
resp = HTTPResponse(fp, headers=headers, preload_content=False)
1165
assert resp.length_remaining is None
1167
def test_length_with_multiple_content_lengths(self) -> None:
1168
headers = {"content-length": "5, 5, 5"}
1169
garbage = {"content-length": "5, 42"}
1170
fp = BytesIO(b"abcde")
1172
resp = HTTPResponse(fp, headers=headers, preload_content=False)
1173
assert resp.length_remaining == 5
1175
with pytest.raises(InvalidHeader):
1176
HTTPResponse(fp, headers=garbage, preload_content=False)
1178
def test_length_after_read(self) -> None:
1179
headers = {"content-length": "5"}
1181
# Test no defined length
1182
fp = BytesIO(b"12345")
1183
resp = HTTPResponse(fp, preload_content=False)
1185
assert resp.length_remaining is None
1187
# Test our update from content-length
1188
fp = BytesIO(b"12345")
1189
resp = HTTPResponse(fp, headers=headers, preload_content=False)
1191
assert resp.length_remaining == 0
1194
fp = BytesIO(b"12345")
1195
resp = HTTPResponse(fp, headers=headers, preload_content=False)
1196
data = resp.stream(2)
1198
assert resp.length_remaining == 3
1200
def test_mock_httpresponse_stream(self) -> None:
1201
# Mock out a HTTP Request that does enough to make it through urllib3's
1202
# read() and close() calls, and also exhausts and underlying file
1204
class MockHTTPRequest:
1205
def __init__(self) -> None:
1206
self.fp: BytesIO | None = None
1208
def read(self, amt: int) -> bytes:
1209
assert self.fp is not None
1210
data = self.fp.read(amt)
1216
def read1(self, amt: int) -> bytes:
1219
def close(self) -> None:
1222
bio = BytesIO(b"foo")
1223
fp = MockHTTPRequest()
1225
resp = HTTPResponse(fp, preload_content=False) # type: ignore[arg-type]
1226
stream = resp.stream(2)
1228
assert next(stream) == b"fo"
1229
assert next(stream) == b"o"
1230
with pytest.raises(StopIteration):
1233
def test_mock_transfer_encoding_chunked(self) -> None:
1234
stream = [b"fo", b"o", b"bar"]
1235
fp = MockChunkedEncodingResponse(stream)
1236
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1237
r.fp = fp # type: ignore[assignment]
1238
resp = HTTPResponse(
1239
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1242
for i, c in enumerate(resp.stream()):
1243
assert c == stream[i]
1245
def test_mock_gzipped_transfer_encoding_chunked_decoded(self) -> None:
1246
"""Show that we can decode the gzipped and chunked body."""
1248
def stream() -> typing.Generator[bytes, None, None]:
1249
# Set up a generator to chunk the gzipped body
1250
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
1251
data = compress.compress(b"foobar")
1252
data += compress.flush()
1253
for i in range(0, len(data), 2):
1254
yield data[i : i + 2]
1256
fp = MockChunkedEncodingResponse(list(stream()))
1257
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1258
r.fp = fp # type: ignore[assignment]
1259
headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
1260
resp = HTTPResponse(r, preload_content=False, headers=headers)
1263
for c in resp.stream(decode_content=True):
1266
assert b"foobar" == data
1268
def test_mock_transfer_encoding_chunked_custom_read(self) -> None:
1269
stream = [b"foooo", b"bbbbaaaaar"]
1270
fp = MockChunkedEncodingResponse(stream)
1271
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1272
r.fp = fp # type: ignore[assignment]
1275
resp = HTTPResponse(
1276
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1278
expected_response = [b"fo", b"oo", b"o", b"bb", b"bb", b"aa", b"aa", b"ar"]
1279
response = list(resp.read_chunked(2))
1280
assert expected_response == response
1282
@pytest.mark.parametrize("read_chunked_args", ((), (None,), (-1,)))
1283
def test_mock_transfer_encoding_chunked_unlmtd_read(
1284
self, read_chunked_args: tuple[typing.Any, ...]
1286
stream = [b"foooo", b"bbbbaaaaar"]
1287
fp = MockChunkedEncodingResponse(stream)
1288
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1289
r.fp = fp # type: ignore[assignment]
1292
resp = HTTPResponse(
1293
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1295
assert stream == list(resp.read_chunked(*read_chunked_args))
1297
def test_read_not_chunked_response_as_chunks(self) -> None:
1298
fp = BytesIO(b"foo")
1299
resp = HTTPResponse(fp, preload_content=False)
1300
r = resp.read_chunked()
1301
with pytest.raises(ResponseNotChunked):
1304
def test_read_chunked_not_supported(self) -> None:
1305
fp = BytesIO(b"foo")
1306
resp = HTTPResponse(
1307
fp, preload_content=False, headers={"transfer-encoding": "chunked"}
1309
r = resp.read_chunked()
1310
with pytest.raises(BodyNotHttplibCompatible):
1313
def test_buggy_incomplete_read(self) -> None:
1314
# Simulate buggy versions of Python (<2.7.4)
1315
# See http://bugs.python.org/issue16298
1316
content_length = 1337
1318
resp = HTTPResponse(
1320
headers={"content-length": str(content_length)},
1321
preload_content=False,
1322
enforce_content_length=True,
1324
with pytest.raises(ProtocolError) as ctx:
1327
orig_ex = ctx.value.args[1]
1328
assert isinstance(orig_ex, IncompleteRead)
1329
assert orig_ex.partial == 0
1330
assert orig_ex.expected == content_length
1332
def test_incomplete_chunk(self) -> None:
1333
stream = [b"foooo", b"bbbbaaaaar"]
1334
fp = MockChunkedIncompleteRead(stream)
1335
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1336
r.fp = fp # type: ignore[assignment]
1339
resp = HTTPResponse(
1340
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1342
with pytest.raises(ProtocolError) as ctx:
1343
next(resp.read_chunked())
1345
orig_ex = ctx.value.args[1]
1346
assert isinstance(orig_ex, httplib_IncompleteRead)
1348
def test_invalid_chunk_length(self) -> None:
1349
stream = [b"foooo", b"bbbbaaaaar"]
1350
fp = MockChunkedInvalidChunkLength(stream)
1351
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1352
r.fp = fp # type: ignore[assignment]
1355
resp = HTTPResponse(
1356
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1358
with pytest.raises(ProtocolError) as ctx:
1359
next(resp.read_chunked())
1361
orig_ex = ctx.value.args[1]
1363
"(\"Connection broken: InvalidChunkLength(got length b'ZZZ\\\\r\\\\n', 0 bytes read)\", "
1364
"InvalidChunkLength(got length b'ZZZ\\r\\n', 0 bytes read))"
1366
assert str(ctx.value) == msg
1367
assert isinstance(orig_ex, InvalidChunkLength)
1368
assert orig_ex.length == fp.BAD_LENGTH_LINE.encode()
1370
def test_truncated_before_chunk(self) -> None:
1371
stream = [b"foooo", b"bbbbaaaaar"]
1372
fp = MockChunkedNoChunks(stream)
1373
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1374
r.fp = fp # type: ignore[assignment]
1377
resp = HTTPResponse(
1378
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1380
with pytest.raises(ProtocolError) as ctx:
1381
next(resp.read_chunked())
1383
assert str(ctx.value) == "Response ended prematurely"
1385
def test_chunked_response_without_crlf_on_end(self) -> None:
1386
stream = [b"foo", b"bar", b"baz"]
1387
fp = MockChunkedEncodingWithoutCRLFOnEnd(stream)
1388
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1389
r.fp = fp # type: ignore[assignment]
1392
resp = HTTPResponse(
1393
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1395
assert stream == list(resp.stream())
1397
def test_chunked_response_with_extensions(self) -> None:
1398
stream = [b"foo", b"bar"]
1399
fp = MockChunkedEncodingWithExtensions(stream)
1400
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1401
r.fp = fp # type: ignore[assignment]
1404
resp = HTTPResponse(
1405
r, preload_content=False, headers={"transfer-encoding": "chunked"}
1407
assert stream == list(resp.stream())
1409
def test_chunked_head_response(self) -> None:
1410
r = httplib.HTTPResponse(MockSock, method="HEAD") # type: ignore[arg-type]
1413
resp = HTTPResponse(
1415
preload_content=False,
1416
headers={"transfer-encoding": "chunked"},
1417
original_response=r,
1419
assert resp.chunked is True
1421
setattr(resp, "supports_chunked_reads", lambda: True)
1422
setattr(resp, "release_conn", mock.Mock())
1423
for _ in resp.stream():
1425
resp.release_conn.assert_called_once_with() # type: ignore[attr-defined]
1427
def test_get_case_insensitive_headers(self) -> None:
1428
headers = {"host": "example.com"}
1429
r = HTTPResponse(headers=headers)
1430
assert r.headers.get("host") == "example.com"
1431
assert r.headers.get("Host") == "example.com"
1433
def test_retries(self) -> None:
1435
resp = HTTPResponse(fp)
1436
assert resp.retries is None
1438
resp = HTTPResponse(fp, retries=retry)
1439
assert resp.retries == retry
1441
def test_geturl(self) -> None:
1443
request_url = "https://example.com"
1444
resp = HTTPResponse(fp, request_url=request_url)
1445
assert resp.geturl() == request_url
1447
def test_url(self) -> None:
1449
request_url = "https://example.com"
1450
resp = HTTPResponse(fp, request_url=request_url)
1451
assert resp.url == request_url
1452
resp.url = "https://anotherurl.com"
1453
assert resp.url == "https://anotherurl.com"
1455
def test_geturl_retries(self) -> None:
1457
resp = HTTPResponse(fp, request_url="http://example.com")
1458
request_histories = (
1461
url="http://example.com",
1464
redirect_location="https://example.com/",
1468
url="https://example.com/",
1471
redirect_location="https://www.example.com",
1474
retry = Retry(history=request_histories)
1475
resp = HTTPResponse(fp, retries=retry)
1476
assert resp.geturl() == "https://www.example.com"
1478
@pytest.mark.parametrize(
1479
["payload", "expected_stream"],
1483
(b"\n\n\n", [b"\n", b"\n", b"\n"]),
1484
(b"abc\ndef", [b"abc\n", b"def"]),
1485
(b"Hello\nworld\n\n\n!", [b"Hello\n", b"world\n", b"\n", b"\n", b"!"]),
1488
def test__iter__(self, payload: bytes, expected_stream: list[bytes]) -> None:
1490
for chunk in HTTPResponse(BytesIO(payload), preload_content=False):
1491
actual_stream.append(chunk)
1493
assert actual_stream == expected_stream
1495
def test__iter__decode_content(self) -> None:
1496
def stream() -> typing.Generator[bytes, None, None]:
1497
# Set up a generator to chunk the gzipped body
1498
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
1499
data = compress.compress(b"foo\nbar")
1500
data += compress.flush()
1501
for i in range(0, len(data), 2):
1502
yield data[i : i + 2]
1504
fp = MockChunkedEncodingResponse(list(stream()))
1505
r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
1506
r.fp = fp # type: ignore[assignment]
1507
headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
1508
resp = HTTPResponse(r, preload_content=False, headers=headers)
1514
assert b"foo\nbar" == data
1516
def test_non_timeout_ssl_error_on_read(self) -> None:
1517
mac_error = ssl.SSLError(
1518
"SSL routines", "ssl3_get_record", "decryption failed or bad record mac"
1521
@contextlib.contextmanager
1522
def make_bad_mac_fp() -> typing.Generator[BytesIO, None, None]:
1524
with mock.patch.object(fp, "read") as fp_read:
1525
# mac/decryption error
1526
fp_read.side_effect = mac_error
1529
with make_bad_mac_fp() as fp:
1530
with pytest.raises(SSLError) as e:
1532
assert e.value.args[0] == mac_error
1534
with make_bad_mac_fp() as fp:
1535
resp = HTTPResponse(fp, preload_content=False)
1536
with pytest.raises(SSLError) as e:
1538
assert e.value.args[0] == mac_error
1540
def test_unexpected_body(self) -> None:
1541
with pytest.raises(ProtocolError) as excinfo:
1542
fp = BytesIO(b"12345")
1543
headers = {"content-length": "5"}
1544
resp = HTTPResponse(fp, status=204, headers=headers)
1546
assert "Response may not contain content" in str(excinfo.value)
1548
with pytest.raises(ProtocolError):
1549
fp = BytesIO(b"12345")
1550
headers = {"content-length": "0"}
1551
resp = HTTPResponse(fp, status=204, headers=headers)
1553
assert "Response may not contain content" in str(excinfo.value)
1555
with pytest.raises(ProtocolError):
1556
fp = BytesIO(b"12345")
1557
resp = HTTPResponse(fp, status=204)
1559
assert "Response may not contain content" in str(excinfo.value)
1562
class MockChunkedEncodingResponse:
1563
def __init__(self, content: list[bytes]) -> None:
1565
content: collection of str, each str is a chunk in response
1567
self.content = content
1568
self.index = 0 # This class iterates over self.content.
1570
self.cur_chunk = b""
1571
self.chunks_exhausted = False
1573
def _encode_chunk(self, chunk: bytes) -> bytes:
1574
# In the general case, we can't decode the chunk to unicode
1575
length = f"{len(chunk):X}\r\n"
1576
return length.encode() + chunk + b"\r\n"
1578
def _pop_new_chunk(self) -> bytes:
1579
if self.chunks_exhausted:
1582
chunk = self.content[self.index]
1585
self.chunks_exhausted = True
1588
chunk = self._encode_chunk(chunk)
1589
if not isinstance(chunk, bytes):
1590
chunk = chunk.encode()
1591
assert isinstance(chunk, bytes)
1594
def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes:
1595
if amt > 0 and till_crlf:
1596
raise ValueError("Can't specify amt and till_crlf.")
1597
if len(self.cur_chunk) <= 0:
1598
self.cur_chunk = self._pop_new_chunk()
1601
i = self.cur_chunk.index(b"\r\n")
1603
# No CRLF in current chunk -- probably caused by encoder.
1604
self.cur_chunk = b""
1607
chunk_part = self.cur_chunk[: i + 2]
1608
self.cur_chunk = self.cur_chunk[i + 2 :]
1611
chunk_part = self.cur_chunk
1612
self.cur_chunk = b""
1616
chunk_part = self.cur_chunk[:amt]
1618
chunk_part = self.cur_chunk
1619
self.cur_chunk = b""
1621
self.cur_chunk = self.cur_chunk[amt:]
1624
def readline(self) -> bytes:
1625
return self.pop_current_chunk(till_crlf=True)
1627
def read(self, amt: int = -1) -> bytes:
1628
return self.pop_current_chunk(amt)
1630
def read1(self, amt: int = -1) -> bytes:
1631
return self.pop_current_chunk(amt)
1633
def flush(self) -> None:
1634
# Python 3 wants this method.
1637
def close(self) -> None:
1641
class MockChunkedIncompleteRead(MockChunkedEncodingResponse):
1642
def _encode_chunk(self, chunk: bytes) -> bytes:
1643
return f"9999\r\n{chunk.decode()}\r\n".encode()
1646
class MockChunkedInvalidChunkLength(MockChunkedEncodingResponse):
1647
BAD_LENGTH_LINE = "ZZZ\r\n"
1649
def _encode_chunk(self, chunk: bytes) -> bytes:
1650
return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n".encode()
1653
class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse):
1654
def _encode_chunk(self, chunk: bytes) -> bytes:
1655
return "{:X}\r\n{}{}".format(
1658
"\r\n" if len(chunk) > 0 else "",
1662
class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse):
1663
def _encode_chunk(self, chunk: bytes) -> bytes:
1664
return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n".encode()
1667
class MockChunkedNoChunks(MockChunkedEncodingResponse):
1668
def _encode_chunk(self, chunk: bytes) -> bytes:
1674
def makefile(cls, *args: typing.Any, **kwargs: typing.Any) -> None: