4
from datetime import datetime
5
from datetime import timedelta
6
from datetime import timezone
11
from werkzeug import Response
12
from werkzeug import wrappers
13
from werkzeug.datastructures import Accept
14
from werkzeug.datastructures import CharsetAccept
15
from werkzeug.datastructures import CombinedMultiDict
16
from werkzeug.datastructures import Headers
17
from werkzeug.datastructures import ImmutableList
18
from werkzeug.datastructures import ImmutableMultiDict
19
from werkzeug.datastructures import ImmutableOrderedMultiDict
20
from werkzeug.datastructures import LanguageAccept
21
from werkzeug.datastructures import MIMEAccept
22
from werkzeug.datastructures import MultiDict
23
from werkzeug.datastructures import WWWAuthenticate
24
from werkzeug.exceptions import BadRequest
25
from werkzeug.exceptions import RequestedRangeNotSatisfiable
26
from werkzeug.exceptions import SecurityError
27
from werkzeug.exceptions import UnsupportedMediaType
28
from werkzeug.http import COEP
29
from werkzeug.http import COOP
30
from werkzeug.http import generate_etag
31
from werkzeug.test import Client
32
from werkzeug.test import create_environ
33
from werkzeug.test import run_wsgi_app
34
from werkzeug.wsgi import LimitedStream
35
from werkzeug.wsgi import wrap_file
38
@wrappers.Request.application
39
def request_demo_app(request):
40
assert "werkzeug.request" in request.environ
44
def assert_environ(environ, method):
45
assert environ["REQUEST_METHOD"] == method
46
assert environ["PATH_INFO"] == "/"
47
assert environ["SCRIPT_NAME"] == ""
48
assert environ["SERVER_NAME"] == "localhost"
49
assert environ["wsgi.version"] == (1, 0)
50
assert environ["wsgi.url_scheme"] == "http"
53
def test_base_request():
54
client = Client(request_demo_app)
57
response = client.get("/?foo=bar&foo=hehe")
58
request = response.request
59
assert request.args == MultiDict([("foo", "bar"), ("foo", "hehe")])
60
assert request.form == MultiDict()
61
assert request.data == b""
62
assert_environ(request.environ, "GET")
65
response = client.post(
67
data="foo=blub+hehe&blah=42",
68
content_type="application/x-www-form-urlencoded",
70
request = response.request
71
assert request.args == MultiDict([("blub", "blah")])
72
assert request.form == MultiDict([("foo", "blub hehe"), ("blah", "42")])
73
assert request.data == b""
77
assert_environ(request.environ, "POST")
80
response = client.patch(
82
data="foo=blub+hehe&blah=42",
83
content_type="application/x-www-form-urlencoded",
85
request = response.request
86
assert request.args == MultiDict([("blub", "blah")])
87
assert request.form == MultiDict([("foo", "blub hehe"), ("blah", "42")])
88
assert request.data == b""
89
assert_environ(request.environ, "PATCH")
92
json = b'{"foo": "bar", "blub": "blah"}'
93
response = client.post("/?a=b", data=json, content_type="application/json")
94
request = response.request
95
assert request.data == json
96
assert request.args == MultiDict([("a", "b")])
97
assert request.form == MultiDict()
100
def test_query_string_is_bytes():
101
req = wrappers.Request.from_values("/?foo=%2f")
102
assert req.query_string == b"foo=%2f"
105
def test_request_repr():
106
req = wrappers.Request.from_values("/foobar")
107
assert "<Request 'http://localhost/foobar' [GET]>" == repr(req)
108
req = wrappers.Request.from_values("/привет")
109
assert "<Request 'http://localhost/привет' [GET]>" == repr(req)
112
def test_access_route():
113
req = wrappers.Request.from_values(
114
headers={"X-Forwarded-For": "192.168.1.2, 192.168.1.1"},
115
environ_base={"REMOTE_ADDR": "192.168.1.3"},
117
assert req.access_route == ["192.168.1.2", "192.168.1.1"]
118
assert req.remote_addr == "192.168.1.3"
120
req = wrappers.Request.from_values(environ_base={"REMOTE_ADDR": "192.168.1.3"})
121
assert list(req.access_route) == ["192.168.1.3"]
124
def test_url_request_descriptors():
125
req = wrappers.Request.from_values("/bar?foo=baz", "http://example.com/test")
126
assert req.path == "/bar"
127
assert req.full_path == "/bar?foo=baz"
128
assert req.script_root == "/test"
129
assert req.url == "http://example.com/test/bar?foo=baz"
130
assert req.base_url == "http://example.com/test/bar"
131
assert req.url_root == "http://example.com/test/"
132
assert req.host_url == "http://example.com/"
133
assert req.host == "example.com"
134
assert req.scheme == "http"
136
req = wrappers.Request.from_values("/bar?foo=baz", "https://example.com/test")
137
assert req.scheme == "https"
140
def test_url_request_descriptors_query_quoting():
141
quoted = "http%3A%2F%2Fwww.example.com%2F%3Fnext%3D%2Fbaz%23my%3Dhash"
142
unquoted = "http://www.example.com/?next%3D/baz%23my%3Dhash"
143
req = wrappers.Request.from_values(f"/bar?next={quoted}", "http://example.com/")
144
assert req.path == "/bar"
145
assert req.full_path == f"/bar?next={quoted}"
146
assert req.url == f"http://example.com/bar?next={unquoted}"
149
def test_url_request_descriptors_hosts():
150
req = wrappers.Request.from_values("/bar?foo=baz", "http://example.com/test")
151
req.trusted_hosts = ["example.com"]
152
assert req.path == "/bar"
153
assert req.full_path == "/bar?foo=baz"
154
assert req.script_root == "/test"
155
assert req.url == "http://example.com/test/bar?foo=baz"
156
assert req.base_url == "http://example.com/test/bar"
157
assert req.url_root == "http://example.com/test/"
158
assert req.host_url == "http://example.com/"
159
assert req.host == "example.com"
160
assert req.scheme == "http"
162
req = wrappers.Request.from_values("/bar?foo=baz", "https://example.com/test")
163
assert req.scheme == "https"
165
req = wrappers.Request.from_values("/bar?foo=baz", "http://example.com/test")
166
req.trusted_hosts = ["example.org"]
167
pytest.raises(SecurityError, lambda: req.url)
168
pytest.raises(SecurityError, lambda: req.base_url)
169
pytest.raises(SecurityError, lambda: req.url_root)
170
pytest.raises(SecurityError, lambda: req.host_url)
171
pytest.raises(SecurityError, lambda: req.host)
174
def test_authorization():
175
request = wrappers.Request.from_values(
176
headers={"Authorization": "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="}
178
a = request.authorization
179
assert a.type == "basic"
180
assert a.username == "Aladdin"
181
assert a.password == "open sesame"
184
def test_authorization_with_unicode():
185
request = wrappers.Request.from_values(
186
headers={"Authorization": "Basic 0YDRg9GB0YHQutC40IE60JHRg9C60LLRiw=="}
188
a = request.authorization
189
assert a.type == "basic"
190
assert a.username == "русскиЁ"
191
assert a.password == "Буквы"
194
def test_request_application():
195
@wrappers.Request.application
196
def application(request):
197
return wrappers.Response("Hello World!")
199
@wrappers.Request.application
200
def failing_application(request):
203
resp = wrappers.Response.from_app(application, create_environ())
204
assert resp.data == b"Hello World!"
205
assert resp.status_code == 200
207
resp = wrappers.Response.from_app(failing_application, create_environ())
208
assert b"Bad Request" in resp.data
209
assert resp.status_code == 400
212
def test_request_access_control():
213
request = wrappers.Request.from_values(
215
"Origin": "https://palletsprojects.com",
216
"Access-Control-Request-Headers": "X-A, X-B",
217
"Access-Control-Request-Method": "PUT",
220
assert request.origin == "https://palletsprojects.com"
221
assert request.access_control_request_headers == {"X-A", "X-B"}
222
assert request.access_control_request_method == "PUT"
225
def test_response_access_control():
226
response = wrappers.Response("Hello World")
227
assert response.access_control_allow_credentials is False
228
response.access_control_allow_credentials = True
229
response.access_control_allow_headers = ["X-A", "X-B"]
230
assert response.headers["Access-Control-Allow-Credentials"] == "true"
231
assert set(response.headers["Access-Control-Allow-Headers"].split(", ")) == {
237
def test_base_response():
238
response = wrappers.Response("öäü")
239
assert response.get_data() == "öäü".encode()
242
response = wrappers.Response("foo")
243
response.stream.write("bar")
244
assert response.get_data() == b"foobar"
247
response = wrappers.Response()
254
domain="example.org",
257
assert response.headers.to_wsgi_list() == [
258
("Content-Type", "text/plain; charset=utf-8"),
261
"foo=bar; Domain=example.org;"
262
" Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=60;"
263
" Path=/blub; SameSite=Strict",
268
response = wrappers.Response()
269
response.delete_cookie("foo")
270
assert response.headers.to_wsgi_list() == [
271
("Content-Type", "text/plain; charset=utf-8"),
274
"foo=; Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=0; Path=/",
283
raise StopIteration()
291
response = wrappers.Response(Iterable())
292
response.call_on_close(lambda: closed.append(True))
293
app_iter, status, headers = run_wsgi_app(response, create_environ(), buffered=True)
294
assert status == "200 OK"
295
assert "".join(app_iter) == ""
296
assert len(closed) == 2
300
response = wrappers.Response(Iterable())
303
assert len(closed) == 1
306
@pytest.mark.parametrize(
307
("status_code", "expected_status"),
310
(404, "404 NOT FOUND"),
311
(588, "588 UNKNOWN"),
312
(999, "999 UNKNOWN"),
315
def test_response_set_status_code(status_code, expected_status):
316
response = wrappers.Response()
317
response.status_code = status_code
318
assert response.status_code == status_code
319
assert response.status == expected_status
322
@pytest.mark.parametrize(
323
("status", "expected_status_code", "expected_status"),
325
("404", 404, "404 NOT FOUND"),
326
("588", 588, "588 UNKNOWN"),
327
("999", 999, "999 UNKNOWN"),
328
("200 OK", 200, "200 OK"),
329
("999 WTF", 999, "999 WTF"),
331
("200 TEA POT", 200, "200 TEA POT"),
332
(200, 200, "200 OK"),
333
(400, 400, "400 BAD REQUEST"),
336
def test_response_set_status(status, expected_status_code, expected_status):
337
response = wrappers.Response()
338
response.status = status
339
assert response.status_code == expected_status_code
340
assert response.status == expected_status
342
response = wrappers.Response(status=status)
343
assert response.status_code == expected_status_code
344
assert response.status == expected_status
347
def test_response_init_status_empty_string():
349
with pytest.raises(ValueError) as info:
350
wrappers.Response(None, "")
352
assert "Empty status argument" in str(info.value)
355
def test_type_forcing():
356
def wsgi_application(environ, start_response):
357
start_response("200 OK", [("Content-Type", "text/html")])
358
return ["Hello World!"]
360
base_response = wrappers.Response("Hello World!", content_type="text/html")
362
class SpecialResponse(wrappers.Response):
370
for orig_resp in wsgi_application, base_response:
371
response = SpecialResponse.force_type(orig_resp, fake_env)
372
assert response.__class__ is SpecialResponse
373
assert response.foo() == 42
374
assert response.get_data() == b"Hello World!"
375
assert response.content_type == "text/html"
378
pytest.raises(TypeError, SpecialResponse.force_type, wsgi_application)
382
request = wrappers.Request(
384
"HTTP_ACCEPT": "text/xml,application/xml,application/xhtml+xml,"
385
"text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5",
386
"HTTP_ACCEPT_CHARSET": "ISO-8859-1,utf-8;q=0.7,*;q=0.7",
387
"HTTP_ACCEPT_ENCODING": "gzip,deflate",
388
"HTTP_ACCEPT_LANGUAGE": "en-us,en;q=0.5",
389
"SERVER_NAME": "eggs",
393
assert request.accept_mimetypes == MIMEAccept(
396
("application/xml", 1),
397
("application/xhtml+xml", 1),
404
assert request.accept_charsets == CharsetAccept(
405
[("ISO-8859-1", 1), ("utf-8", 0.7), ("*", 0.7)]
407
assert request.accept_encodings == Accept([("gzip", 1), ("deflate", 1)])
408
assert request.accept_languages == LanguageAccept([("en-us", 1), ("en", 0.5)])
410
request = wrappers.Request(
411
{"HTTP_ACCEPT": "", "SERVER_NAME": "example.org", "SERVER_PORT": "80"}
413
assert request.accept_mimetypes == MIMEAccept()
416
def test_etag_request():
417
request = wrappers.Request(
419
"HTTP_CACHE_CONTROL": "no-store, no-cache",
420
"HTTP_IF_MATCH": 'W/"foo", bar, "baz"',
421
"HTTP_IF_NONE_MATCH": 'W/"foo", bar, "baz"',
422
"HTTP_IF_MODIFIED_SINCE": "Tue, 22 Jan 2008 11:18:44 GMT",
423
"HTTP_IF_UNMODIFIED_SINCE": "Tue, 22 Jan 2008 11:18:44 GMT",
424
"SERVER_NAME": "eggs",
428
assert request.cache_control.no_store
429
assert request.cache_control.no_cache
431
for etags in request.if_match, request.if_none_match:
433
assert etags.contains_raw('W/"foo"')
434
assert etags.contains_weak("foo")
435
assert not etags.contains("foo")
437
dt = datetime(2008, 1, 22, 11, 18, 44, tzinfo=timezone.utc)
438
assert request.if_modified_since == dt
439
assert request.if_unmodified_since == dt
442
def test_user_agent():
443
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:94.0) Gecko/20100101 Firefox/94.0"
444
request = wrappers.Request({"HTTP_USER_AGENT": user_agent})
445
assert request.user_agent.to_header() == user_agent
446
assert str(request.user_agent) == user_agent
447
assert request.user_agent.string == user_agent
450
def test_stream_wrapping():
451
class LowercasingStream:
452
def __init__(self, stream):
453
self._stream = stream
455
def read(self, size=-1):
456
return self._stream.read(size).lower()
458
def readline(self, size=-1):
459
return self._stream.readline(size).lower()
461
data = b"foo=Hello+World"
462
req = wrappers.Request.from_values(
463
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
465
req.stream = LowercasingStream(req.stream)
466
assert req.form["foo"] == "hello world"
469
def test_data_descriptor_triggers_parsing():
470
data = b"foo=Hello+World"
471
req = wrappers.Request.from_values(
472
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
475
assert req.data == b""
476
assert req.form["foo"] == "Hello World"
479
def test_get_data_method_parsing_caching_behavior():
480
data = b"foo=Hello+World"
481
req = wrappers.Request.from_values(
482
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
486
assert req.get_data() == data
487
assert req.form["foo"] == "Hello World"
488
assert req.get_data() == data
491
req = wrappers.Request.from_values(
492
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
494
assert req.form["foo"] == "Hello World"
495
assert req.get_data() == b""
498
req = wrappers.Request.from_values(
499
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
501
assert req.get_data(cache=False) == data
502
assert req.get_data(cache=False) == b""
503
assert req.form == {}
507
req = wrappers.Request.from_values(
508
"/", method="POST", data=data, content_type="application/x-www-form-urlencoded"
510
assert req.get_data(parse_form_data=True) == b""
511
assert req.form["foo"] == "Hello World"
514
def test_etag_response():
515
response = wrappers.Response("Hello World")
516
assert response.get_etag() == (None, None)
518
assert response.get_etag() == ("0a4d55a8d778e5022fab701977c5d840bbc486d0", False)
519
assert not response.cache_control
520
response.cache_control.must_revalidate = True
521
response.cache_control.max_age = 60
522
response.headers["Content-Length"] = len(response.get_data())
523
assert response.headers["Cache-Control"] in (
524
"must-revalidate, max-age=60",
525
"max-age=60, must-revalidate",
528
assert "date" not in response.headers
529
env = create_environ()
530
env.update({"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": response.get_etag()[0]})
531
response.make_conditional(env)
532
assert "date" in response.headers
537
resp = wrappers.Response.from_app(response, env)
538
assert resp.status_code == 304
539
assert "content-length" not in resp.headers
542
response = wrappers.Response("Hello World")
545
response.make_conditional(env)
546
assert response.date == d
549
response = wrappers.Response("Hello World")
550
response.content_length = 999
551
response.make_conditional(env)
552
assert response.content_length == 999
555
def test_etag_response_412():
556
response = wrappers.Response("Hello World")
557
assert response.get_etag() == (None, None)
559
assert response.get_etag() == ("0a4d55a8d778e5022fab701977c5d840bbc486d0", False)
560
assert not response.cache_control
561
response.cache_control.must_revalidate = True
562
response.cache_control.max_age = 60
563
response.headers["Content-Length"] = len(response.get_data())
564
assert response.headers["Cache-Control"] in (
565
"must-revalidate, max-age=60",
566
"max-age=60, must-revalidate",
569
assert "date" not in response.headers
570
env = create_environ()
572
{"REQUEST_METHOD": "GET", "HTTP_IF_MATCH": f"{response.get_etag()[0]}xyz"}
574
response.make_conditional(env)
575
assert "date" in response.headers
580
resp = wrappers.Response.from_app(response, env)
581
assert resp.status_code == 412
583
assert resp.data != b""
586
response = wrappers.Response("Hello World")
589
response.make_conditional(env)
590
assert response.date == d
593
response = wrappers.Response("Hello World")
594
response.content_length = 999
595
response.make_conditional(env)
596
assert response.content_length == 999
599
def test_range_request_basic():
600
env = create_environ()
601
response = wrappers.Response("Hello World")
602
env["HTTP_RANGE"] = "bytes=0-4"
603
response.make_conditional(env, accept_ranges=True, complete_length=11)
604
assert response.status_code == 206
605
assert response.headers["Accept-Ranges"] == "bytes"
606
assert response.headers["Content-Range"] == "bytes 0-4/11"
607
assert response.headers["Content-Length"] == "5"
608
assert response.data == b"Hello"
611
def test_range_request_out_of_bound():
612
env = create_environ()
613
response = wrappers.Response("Hello World")
614
env["HTTP_RANGE"] = "bytes=6-666"
615
response.make_conditional(env, accept_ranges=True, complete_length=11)
616
assert response.status_code == 206
617
assert response.headers["Accept-Ranges"] == "bytes"
618
assert response.headers["Content-Range"] == "bytes 6-10/11"
619
assert response.headers["Content-Length"] == "5"
620
assert response.data == b"World"
623
def test_range_request_with_file():
624
env = create_environ()
625
resources = os.path.join(os.path.dirname(__file__), "res")
626
fname = os.path.join(resources, "test.txt")
627
with open(fname, "rb") as f:
629
with open(fname, "rb") as f:
630
response = wrappers.Response(wrap_file(env, f))
631
env["HTTP_RANGE"] = "bytes=0-0"
632
response.make_conditional(
633
env, accept_ranges=True, complete_length=len(fcontent)
635
assert response.status_code == 206
636
assert response.headers["Accept-Ranges"] == "bytes"
637
assert response.headers["Content-Range"] == f"bytes 0-0/{len(fcontent)}"
638
assert response.headers["Content-Length"] == "1"
639
assert response.data == fcontent[:1]
642
def test_range_request_with_complete_file():
643
env = create_environ()
644
resources = os.path.join(os.path.dirname(__file__), "res")
645
fname = os.path.join(resources, "test.txt")
646
with open(fname, "rb") as f:
648
with open(fname, "rb") as f:
649
fsize = os.path.getsize(fname)
650
response = wrappers.Response(wrap_file(env, f))
651
env["HTTP_RANGE"] = f"bytes=0-{fsize - 1}"
652
response.make_conditional(env, accept_ranges=True, complete_length=fsize)
653
assert response.status_code == 206
654
assert response.headers["Accept-Ranges"] == "bytes"
655
assert response.headers["Content-Range"] == f"bytes 0-{fsize - 1}/{fsize}"
656
assert response.headers["Content-Length"] == str(fsize)
657
assert response.data == fcontent
660
@pytest.mark.parametrize("value", [None, 0])
661
def test_range_request_without_complete_length(value):
662
env = create_environ(headers={"Range": "bytes=0-10"})
663
response = wrappers.Response("Hello World")
664
response.make_conditional(env, accept_ranges=True, complete_length=value)
665
assert response.status_code == 200
666
assert response.data == b"Hello World"
669
def test_invalid_range_request():
670
env = create_environ()
671
response = wrappers.Response("Hello World")
672
env["HTTP_RANGE"] = "bytes=-"
673
with pytest.raises(RequestedRangeNotSatisfiable):
674
response.make_conditional(env, accept_ranges=True, complete_length=11)
677
def test_etag_response_freezing():
678
response = Response("Hello World")
680
assert response.get_etag() == (str(generate_etag(b"Hello World")), False)
683
def test_authenticate():
684
resp = wrappers.Response()
685
resp.www_authenticate.realm = "Testing"
686
assert resp.headers["WWW-Authenticate"] == "Basic realm=Testing"
687
del resp.www_authenticate
688
assert "WWW-Authenticate" not in resp.headers
691
def test_authenticate_quoted_qop():
693
resp = wrappers.Response()
694
resp.www_authenticate = WWWAuthenticate(
695
"digest", {"realm": "REALM", "nonce": "NONCE", "qop": "auth, auth-int"}
698
actual = resp.headers["WWW-Authenticate"]
699
expected = 'Digest realm="REALM", nonce="NONCE", qop="auth, auth-int"'
700
assert actual == expected
702
resp.www_authenticate.parameters["qop"] = "auth"
703
actual = resp.headers["WWW-Authenticate"]
704
expected = 'Digest realm="REALM", nonce="NONCE", qop="auth"'
705
assert actual == expected
708
def test_response_stream():
709
response = wrappers.Response()
710
response.stream.write("Hello ")
711
response.stream.write("World!")
712
assert response.response == ["Hello ", "World!"]
713
assert response.get_data() == b"Hello World!"
716
def test_common_response_descriptors():
717
response = wrappers.Response()
718
response.mimetype = "text/html"
719
assert response.mimetype == "text/html"
720
assert response.content_type == "text/html; charset=utf-8"
721
assert response.mimetype_params == {"charset": "utf-8"}
722
response.mimetype_params["x-foo"] = "yep"
723
del response.mimetype_params["charset"]
724
assert response.content_type == "text/html; x-foo=yep"
726
now = datetime.now(timezone.utc).replace(microsecond=0)
728
assert response.content_length is None
729
response.content_length = "42"
730
assert response.content_length == 42
732
for attr in "date", "expires":
733
assert getattr(response, attr) is None
734
setattr(response, attr, now)
735
assert getattr(response, attr) == now
737
assert response.age is None
738
age_td = timedelta(days=1, minutes=3, seconds=5)
739
response.age = age_td
740
assert response.age == age_td
742
assert response.age == timedelta(seconds=42)
744
assert response.retry_after is None
745
response.retry_after = now
746
assert response.retry_after == now
748
assert not response.vary
749
response.vary.add("Cookie")
750
response.vary.add("Content-Language")
751
assert "cookie" in response.vary
752
assert response.vary.to_header() == "Cookie, Content-Language"
753
response.headers["Vary"] = "Content-Encoding"
754
assert response.vary.as_set() == {"content-encoding"}
756
response.allow.update(["GET", "POST"])
757
assert response.headers["Allow"] == "GET, POST"
759
response.content_language.add("en-US")
760
response.content_language.add("fr")
761
assert response.headers["Content-Language"] == "en-US, fr"
764
def test_common_request_descriptors():
765
request = wrappers.Request.from_values(
766
content_type="text/html; charset=utf-8",
769
"Referer": "http://www.example.com/",
770
"Date": "Sat, 28 Feb 2009 19:04:35 GMT",
771
"Max-Forwards": "10",
772
"Pragma": "no-cache",
773
"Content-Encoding": "gzip",
774
"Content-MD5": "9a3bc6dbc47a70db25b84c6e5867a072",
778
assert request.content_type == "text/html; charset=utf-8"
779
assert request.mimetype == "text/html"
780
assert request.mimetype_params == {"charset": "utf-8"}
781
assert request.content_length == 23
782
assert request.referrer == "http://www.example.com/"
783
assert request.date == datetime(2009, 2, 28, 19, 4, 35, tzinfo=timezone.utc)
784
assert request.max_forwards == 10
785
assert "no-cache" in request.pragma
786
assert request.content_encoding == "gzip"
787
assert request.content_md5 == "9a3bc6dbc47a70db25b84c6e5867a072"
790
def test_request_mimetype_always_lowercase():
791
request = wrappers.Request.from_values(content_type="APPLICATION/JSON")
792
assert request.mimetype == "application/json"
795
def test_shallow_mode():
796
request = wrappers.Request(
797
{"QUERY_STRING": "foo=bar", "SERVER_NAME": "eggs", "SERVER_PORT": "80"},
800
assert request.args["foo"] == "bar"
801
pytest.raises(RuntimeError, lambda: request.stream)
802
pytest.raises(RuntimeError, lambda: request.data)
803
pytest.raises(RuntimeError, lambda: request.form)
806
def test_form_parsing_failed():
808
request = wrappers.Request.from_values(
809
input_stream=BytesIO(data),
810
content_length=len(data),
811
content_type="multipart/form-data; boundary=foo",
814
assert not request.files
815
assert not request.form
819
request = wrappers.Request.from_values(
820
input_stream=BytesIO(data),
821
content_length=len(data),
825
assert not request.form
828
def test_file_closing():
831
b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
832
b"Content-Type: text/plain; charset=utf-8\r\n\r\n"
833
b"file contents, just the contents\r\n"
836
req = wrappers.Request.from_values(
837
input_stream=BytesIO(data),
838
content_length=len(data),
839
content_type="multipart/form-data; boundary=foo",
842
foo = req.files["foo"]
843
assert foo.mimetype == "text/plain"
844
assert foo.filename == "foo.txt"
846
assert foo.closed is False
848
assert foo.closed is True
851
def test_file_closing_with():
854
b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
855
b"Content-Type: text/plain; charset=utf-8\r\n\r\n"
856
b"file contents, just the contents\r\n"
859
req = wrappers.Request.from_values(
860
input_stream=BytesIO(data),
861
content_length=len(data),
862
content_type="multipart/form-data; boundary=foo",
866
foo = req.files["foo"]
867
assert foo.mimetype == "text/plain"
868
assert foo.filename == "foo.txt"
870
assert foo.closed is True
873
def test_response_streamed():
874
r = wrappers.Response()
875
assert not r.is_streamed
876
r = wrappers.Response("Hello World")
877
assert not r.is_streamed
878
r = wrappers.Response(["foo", "bar"])
879
assert not r.is_streamed
885
r = wrappers.Response(gen())
889
def test_response_iter_wrapping():
890
def uppercasing(iterator):
891
for item in iterator:
898
req = wrappers.Request.from_values()
899
resp = wrappers.Response(generator())
900
del resp.headers["Content-Length"]
901
resp.response = uppercasing(resp.iter_encoded())
902
actual_resp = wrappers.Response.from_app(resp, req.environ, buffered=True)
903
assert actual_resp.get_data() == b"FOOBAR"
906
def test_response_freeze():
911
resp = wrappers.Response(generate())
913
assert resp.response == [b"foo", b"bar"]
914
assert resp.headers["content-length"] == "6"
917
def test_response_content_length_uses_encode():
918
r = wrappers.Response("你好")
919
assert r.calculate_content_length() == 6
922
def test_other_method_payload():
923
data = b"Hello World"
924
req = wrappers.Request.from_values(
925
input_stream=BytesIO(data),
926
content_length=len(data),
927
content_type="text/plain",
928
method="WHAT_THE_FUCK",
930
assert req.get_data() == data
931
assert isinstance(req.stream, LimitedStream)
934
def test_urlfication():
935
resp = wrappers.Response()
936
resp.headers["Location"] = "http://üser:pässword@☃.net/påth"
937
resp.headers["Content-Location"] = "http://☃.net/"
938
headers = resp.get_wsgi_headers(create_environ())
939
assert headers["location"] == "http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th"
940
assert headers["content-location"] == "http://xn--n3h.net/"
943
def test_new_response_iterator_behavior():
944
req = wrappers.Request.from_values()
945
resp = wrappers.Response("Hello Wörld!")
947
def get_content_length(resp):
948
headers = resp.get_wsgi_headers(req.environ)
949
return headers.get("content-length", type=int)
951
def generate_items():
957
assert resp.response == ["Hello Wörld!".encode()]
958
assert resp.get_data() == "Hello Wörld!".encode()
959
assert get_content_length(resp) == 13
960
assert not resp.is_streamed
961
assert resp.is_sequence
964
resp.set_data("Wörd")
965
assert resp.response == ["Wörd".encode()]
966
assert resp.get_data() == "Wörd".encode()
967
assert get_content_length(resp) == 5
968
assert not resp.is_streamed
969
assert resp.is_sequence
972
resp.response = generate_items()
973
assert resp.is_streamed
974
assert not resp.is_sequence
975
assert resp.get_data() == "Hello Wörld!".encode()
976
assert resp.response == [b"Hello ", "Wörld!".encode()]
977
assert not resp.is_streamed
978
assert resp.is_sequence
981
resp.response = generate_items()
982
resp.implicit_sequence_conversion = False
983
assert resp.is_streamed
984
assert not resp.is_sequence
985
pytest.raises(RuntimeError, lambda: resp.get_data())
987
assert resp.get_data() == "Hello Wörld!".encode()
988
assert resp.response == [b"Hello ", "Wörld!".encode()]
989
assert not resp.is_streamed
990
assert resp.is_sequence
993
for val in True, False:
994
resp.implicit_sequence_conversion = val
995
resp.response = ("foo", "bar")
996
assert resp.is_sequence
997
resp.stream.write("baz")
998
assert resp.response == ["foo", "bar", "baz"]
1001
def test_form_data_ordering():
1002
class MyRequest(wrappers.Request):
1003
parameter_storage_class = ImmutableOrderedMultiDict
1005
req = MyRequest.from_values("/?foo=1&bar=0&foo=3")
1006
assert list(req.args) == ["foo", "bar"]
1007
assert list(req.args.items(multi=True)) == [
1012
assert isinstance(req.args, ImmutableOrderedMultiDict)
1013
assert isinstance(req.values, CombinedMultiDict)
1014
assert req.values["foo"] == "1"
1015
assert req.values.getlist("foo") == ["1", "3"]
1019
r = wrappers.Request.from_values(
1020
method="POST", query_string={"a": "1"}, data={"a": "2", "b": "2"}
1022
assert r.values["a"] == "1"
1023
assert r.values["b"] == "2"
1026
r = wrappers.Request.from_values(
1027
method="GET", query_string={"a": "1"}, data={"a": "2", "b": "2"}
1029
assert r.values["a"] == "1"
1030
assert "b" not in r.values
1033
def test_storage_classes():
1034
class MyRequest(wrappers.Request):
1035
dict_storage_class = dict
1036
list_storage_class = list
1037
parameter_storage_class = dict
1039
req = MyRequest.from_values("/?foo=baz", headers={"Cookie": "foo=bar"})
1040
assert type(req.cookies) is dict
1041
assert req.cookies == {"foo": "bar"}
1042
assert type(req.access_route) is list
1044
assert type(req.args) is dict
1045
assert type(req.values) is CombinedMultiDict
1046
assert req.values["foo"] == "baz"
1048
req = wrappers.Request.from_values(headers={"Cookie": "foo=bar;foo=baz"})
1049
assert type(req.cookies) is ImmutableMultiDict
1050
assert req.cookies.to_dict() == {"foo": "bar"}
1053
assert req.cookies.getlist("foo") == ["bar", "baz"]
1054
assert type(req.access_route) is ImmutableList
1056
MyRequest.list_storage_class = tuple
1057
req = MyRequest.from_values()
1058
assert type(req.access_route) is tuple
1061
def test_response_headers_passthrough():
1063
resp = wrappers.Response(headers=headers)
1064
assert resp.headers is headers
1067
def test_response_304_no_content_length():
1068
resp = wrappers.Response("Test", status=304)
1069
env = create_environ()
1070
assert "content-length" not in resp.get_wsgi_headers(env)
1075
req = wrappers.Request.from_values()
1076
assert req.range is None
1077
req = wrappers.Request.from_values(headers={"Range": "bytes=0-499"})
1078
assert req.range.ranges == [(0, 500)]
1080
resp = wrappers.Response()
1081
resp.content_range = req.range.make_content_range(1000)
1082
assert resp.content_range.units == "bytes"
1083
assert resp.content_range.start == 0
1084
assert resp.content_range.stop == 500
1085
assert resp.content_range.length == 1000
1086
assert resp.headers["Content-Range"] == "bytes 0-499/1000"
1088
resp.content_range.unset()
1089
assert "Content-Range" not in resp.headers
1091
resp.headers["Content-Range"] = "bytes 0-499/1000"
1092
assert resp.content_range.units == "bytes"
1093
assert resp.content_range.start == 0
1094
assert resp.content_range.stop == 500
1095
assert resp.content_range.length == 1000
1099
resp = wrappers.Response()
1100
resp.content_security_policy.default_src = "'self'"
1101
assert resp.headers["Content-Security-Policy"] == "default-src 'self'"
1102
resp.content_security_policy.script_src = "'self' palletsprojects.com"
1104
resp.headers["Content-Security-Policy"]
1105
== "default-src 'self'; script-src 'self' palletsprojects.com"
1108
resp.content_security_policy = None
1109
assert "Content-Security-Policy" not in resp.headers
1112
def test_auto_content_length():
1113
resp = wrappers.Response("Hello World!")
1114
assert resp.content_length == 12
1116
resp = wrappers.Response(["Hello World!"])
1117
assert resp.content_length is None
1118
assert resp.get_wsgi_headers({})["Content-Length"] == "12"
1121
def test_stream_content_length():
1122
resp = wrappers.Response()
1123
resp.stream.writelines(["foo", "bar", "baz"])
1124
assert resp.get_wsgi_headers({})["Content-Length"] == "9"
1126
resp = wrappers.Response()
1127
resp.make_conditional({"REQUEST_METHOD": "GET"})
1128
resp.stream.writelines(["foo", "bar", "baz"])
1129
assert resp.get_wsgi_headers({})["Content-Length"] == "9"
1131
resp = wrappers.Response("foo")
1132
resp.stream.writelines(["bar", "baz"])
1133
assert resp.get_wsgi_headers({})["Content-Length"] == "9"
1136
def test_disabled_auto_content_length():
1137
class MyResponse(wrappers.Response):
1138
automatically_set_content_length = False
1140
resp = MyResponse("Hello World!")
1141
assert resp.content_length is None
1143
resp = MyResponse(["Hello World!"])
1144
assert resp.content_length is None
1145
assert "Content-Length" not in resp.get_wsgi_headers({})
1148
resp.make_conditional({"REQUEST_METHOD": "GET"})
1149
assert resp.content_length is None
1150
assert "Content-Length" not in resp.get_wsgi_headers({})
1153
@pytest.mark.parametrize(
1154
("auto", "location", "expect"),
1156
(False, "/test", "/test"),
1157
(False, "/\\\\test.example?q", "/%5C%5Ctest.example?q"),
1158
(True, "/test", "http://localhost/test"),
1159
(True, "test", "http://localhost/a/b/test"),
1160
(True, "./test", "http://localhost/a/b/test"),
1161
(True, "../test", "http://localhost/a/test"),
1164
def test_location_header_autocorrect(monkeypatch, auto, location, expect):
1165
monkeypatch.setattr(wrappers.Response, "autocorrect_location_header", auto)
1166
env = create_environ("/a/b/c")
1167
resp = wrappers.Response("Hello World!")
1168
resp.headers["Location"] = location
1169
assert resp.get_wsgi_headers(env)["Location"] == expect
1172
def test_204_and_1XX_response_has_no_content_length():
1173
response = wrappers.Response(status=204)
1174
assert response.content_length is None
1176
headers = response.get_wsgi_headers(create_environ())
1177
assert "Content-Length" not in headers
1179
response = wrappers.Response(status=100)
1180
assert response.content_length is None
1182
headers = response.get_wsgi_headers(create_environ())
1183
assert "Content-Length" not in headers
1186
def test_malformed_204_response_has_no_content_length():
1188
response = wrappers.Response(status=204)
1189
response.set_data(b"test")
1190
assert response.content_length == 4
1192
env = create_environ()
1193
app_iter, status, headers = response.get_wsgi_response(env)
1194
assert status == "204 NO CONTENT"
1195
assert "Content-Length" not in headers
1196
assert b"".join(app_iter) == b""
1199
def test_request_method_case_sensitivity():
1200
req = wrappers.Request(
1201
{"REQUEST_METHOD": "get", "SERVER_NAME": "eggs", "SERVER_PORT": "80"}
1203
assert req.method == "GET"
1206
def test_write_length():
1207
response = wrappers.Response()
1208
length = response.stream.write(b"bar")
1212
def test_stream_zip():
1215
response = wrappers.Response()
1216
with contextlib.closing(zipfile.ZipFile(response.stream, mode="w")) as z:
1217
z.writestr("foo", b"bar")
1219
buffer = BytesIO(response.get_data())
1220
with contextlib.closing(zipfile.ZipFile(buffer, mode="r")) as z:
1221
assert z.namelist() == ["foo"]
1222
assert z.read("foo") == b"bar"
1226
def test_secure(self):
1227
response = wrappers.Response()
1228
response.set_cookie(
1234
domain="example.org",
1238
assert response.headers.to_wsgi_list() == [
1239
("Content-Type", "text/plain; charset=utf-8"),
1242
"foo=bar; Domain=example.org;"
1243
" Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=60;"
1244
" Secure; Path=/blub",
1248
def test_httponly(self):
1249
response = wrappers.Response()
1250
response.set_cookie(
1256
domain="example.org",
1261
assert response.headers.to_wsgi_list() == [
1262
("Content-Type", "text/plain; charset=utf-8"),
1265
"foo=bar; Domain=example.org;"
1266
" Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=60;"
1267
" HttpOnly; Path=/blub",
1271
def test_secure_and_httponly(self):
1272
response = wrappers.Response()
1273
response.set_cookie(
1279
domain="example.org",
1284
assert response.headers.to_wsgi_list() == [
1285
("Content-Type", "text/plain; charset=utf-8"),
1288
"foo=bar; Domain=example.org;"
1289
" Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=60;"
1290
" Secure; HttpOnly; Path=/blub",
1294
def test_samesite(self):
1295
response = wrappers.Response()
1296
response.set_cookie(
1302
domain="example.org",
1306
assert response.headers.to_wsgi_list() == [
1307
("Content-Type", "text/plain; charset=utf-8"),
1310
"foo=bar; Domain=example.org;"
1311
" Expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=60;"
1312
" Path=/blub; SameSite=Strict",
1318
def test_request(self):
1320
request = wrappers.Request.from_values(json=value)
1321
assert request.json == value
1322
assert request.get_data()
1324
def test_response(self):
1326
response = wrappers.Response(
1327
response=json.dumps(value), content_type="application/json"
1329
assert response.json == value
1331
def test_bad_content_type(self):
1333
request = wrappers.Request.from_values(json=value, content_type="text/plain")
1335
with pytest.raises(UnsupportedMediaType):
1338
assert request.get_json(silent=True) is None
1339
assert request.get_json(force=True) == value
1341
def test_bad_data(self):
1342
request = wrappers.Request.from_values(
1343
data=b'{"a":}', content_type="application/json"
1345
assert request.get_json(silent=True) is None
1347
with pytest.raises(BadRequest):
1350
def test_cache_disabled(self):
1352
request = wrappers.Request.from_values(json=value)
1353
assert request.get_json(cache=False) == [1, 2, 3]
1354
assert not request.get_data()
1356
with pytest.raises(BadRequest):
1360
def test_response_coop():
1361
response = wrappers.Response("Hello World")
1362
assert response.cross_origin_opener_policy is COOP.UNSAFE_NONE
1363
response.cross_origin_opener_policy = COOP.SAME_ORIGIN
1364
assert response.headers["Cross-Origin-Opener-Policy"] == "same-origin"
1367
def test_response_coep():
1368
response = wrappers.Response("Hello World")
1369
assert response.cross_origin_embedder_policy is COEP.UNSAFE_NONE
1370
response.cross_origin_embedder_policy = COEP.REQUIRE_CORP
1371
assert response.headers["Cross-Origin-Embedder-Policy"] == "require-corp"