werkzeug

Форк
0
/
test_test.py 
927 строк · 27.8 Кб
1
import json
2
import sys
3
from functools import partial
4
from io import BytesIO
5

6
import pytest
7

8
from werkzeug.datastructures import Authorization
9
from werkzeug.datastructures import FileStorage
10
from werkzeug.datastructures import Headers
11
from werkzeug.datastructures import MultiDict
12
from werkzeug.formparser import parse_form_data
13
from werkzeug.test import Client
14
from werkzeug.test import ClientRedirectError
15
from werkzeug.test import create_environ
16
from werkzeug.test import EnvironBuilder
17
from werkzeug.test import run_wsgi_app
18
from werkzeug.test import stream_encode_multipart
19
from werkzeug.test import TestResponse
20
from werkzeug.utils import redirect
21
from werkzeug.wrappers import Request
22
from werkzeug.wrappers import Response
23

24

25
def cookie_app(environ, start_response):
26
    """A WSGI application which sets a cookie, and returns as a response any
27
    cookie which exists.
28
    """
29
    response = Response(environ.get("HTTP_COOKIE", "No Cookie"), mimetype="text/plain")
30
    response.set_cookie("test", "test")
31
    return response(environ, start_response)
32

33

34
def redirect_loop_app(environ, start_response):
35
    response = redirect("http://localhost/some/redirect/")
36
    return response(environ, start_response)
37

38

39
def redirect_with_get_app(environ, start_response):
40
    req = Request(environ)
41
    if req.url not in (
42
        "http://localhost/",
43
        "http://localhost/first/request",
44
        "http://localhost/some/redirect/",
45
    ):
46
        raise AssertionError(f'redirect_demo_app() did not expect URL "{req.url}"')
47
    if "/some/redirect" not in req.url:
48
        response = redirect("http://localhost/some/redirect/")
49
    else:
50
        response = Response(f"current url: {req.url}")
51
    return response(environ, start_response)
52

53

54
def external_redirect_demo_app(environ, start_response):
55
    response = redirect("http://example.com/")
56
    return response(environ, start_response)
57

58

59
def external_subdomain_redirect_demo_app(environ, start_response):
60
    if "test.example.com" in environ["HTTP_HOST"]:
61
        response = Response("redirected successfully to subdomain")
62
    else:
63
        response = redirect("http://test.example.com/login")
64
    return response(environ, start_response)
65

66

67
def multi_value_post_app(environ, start_response):
68
    req = Request(environ)
69
    assert req.form["field"] == "val1", req.form["field"]
70
    assert req.form.getlist("field") == ["val1", "val2"], req.form.getlist("field")
71
    response = Response("ok")
72
    return response(environ, start_response)
73

74

75
def test_cookie_forging():
76
    c = Client(cookie_app)
77
    c.set_cookie("foo", "bar")
78
    response = c.open()
79
    assert response.text == "foo=bar"
80

81

82
def test_set_cookie_app():
83
    c = Client(cookie_app)
84
    response = c.open()
85
    assert "Set-Cookie" in response.headers
86

87

88
def test_cookiejar_stores_cookie():
89
    c = Client(cookie_app)
90
    c.open()
91
    assert c.get_cookie("test") is not None
92

93

94
def test_no_initial_cookie():
95
    c = Client(cookie_app)
96
    response = c.open()
97
    assert response.text == "No Cookie"
98

99

100
def test_resent_cookie():
101
    c = Client(cookie_app)
102
    c.open()
103
    response = c.open()
104
    assert response.text == "test=test"
105

106

107
def test_disable_cookies():
108
    c = Client(cookie_app, use_cookies=False)
109
    c.open()
110
    response = c.open()
111
    assert response.text == "No Cookie"
112

113

114
def test_cookie_for_different_path():
115
    c = Client(cookie_app)
116
    c.open("/path1")
117
    response = c.open("/path2")
118
    assert response.text == "test=test"
119

120

121
def test_cookie_default_path() -> None:
122
    """When no path is set for a cookie, the default uses everything up to but not
123
    including the first slash.
124
    """
125

126
    @Request.application
127
    def app(request: Request) -> Response:
128
        r = Response()
129
        r.set_cookie("k", "v", path=None)
130
        return r
131

132
    c = Client(app)
133
    c.get("/nested/leaf")
134
    assert c.get_cookie("k") is None
135
    assert c.get_cookie("k", path="/nested") is not None
136
    c.get("/nested/dir/")
137
    assert c.get_cookie("k", path="/nested/dir") is not None
138

139

140
def test_environ_builder_basics():
141
    b = EnvironBuilder()
142
    assert b.content_type is None
143
    b.method = "POST"
144
    assert b.content_type is None
145
    b.form["test"] = "normal value"
146
    assert b.content_type == "application/x-www-form-urlencoded"
147
    b.files.add_file("test", BytesIO(b"test contents"), "test.txt")
148
    assert b.files["test"].content_type == "text/plain"
149
    b.form["test_int"] = 1
150
    assert b.content_type == "multipart/form-data"
151

152
    req = b.get_request()
153
    b.close()
154

155
    assert req.url == "http://localhost/"
156
    assert req.method == "POST"
157
    assert req.form["test"] == "normal value"
158
    assert req.files["test"].content_type == "text/plain"
159
    assert req.files["test"].filename == "test.txt"
160
    assert req.files["test"].read() == b"test contents"
161
    req.close()
162

163

164
def test_environ_builder_data():
165
    b = EnvironBuilder(data="foo")
166
    assert b.input_stream.getvalue() == b"foo"
167
    b = EnvironBuilder(data=b"foo")
168
    assert b.input_stream.getvalue() == b"foo"
169

170
    b = EnvironBuilder(data={"foo": "bar"})
171
    assert b.form["foo"] == "bar"
172
    b = EnvironBuilder(data={"foo": ["bar1", "bar2"]})
173
    assert b.form.getlist("foo") == ["bar1", "bar2"]
174

175
    def check_list_content(b, length):
176
        foo = b.files.getlist("foo")
177
        assert len(foo) == length
178
        for obj in foo:
179
            assert isinstance(obj, FileStorage)
180

181
    b = EnvironBuilder(data={"foo": BytesIO()})
182
    check_list_content(b, 1)
183
    b = EnvironBuilder(data={"foo": [BytesIO(), BytesIO()]})
184
    check_list_content(b, 2)
185

186
    b = EnvironBuilder(data={"foo": (BytesIO(),)})
187
    check_list_content(b, 1)
188
    b = EnvironBuilder(data={"foo": [(BytesIO(),), (BytesIO(),)]})
189
    check_list_content(b, 2)
190

191

192
def test_environ_builder_json():
193
    @Request.application
194
    def app(request):
195
        assert request.content_type == "application/json"
196
        return Response(json.loads(request.get_data(as_text=True))["foo"])
197

198
    c = Client(app)
199
    response = c.post("/", json={"foo": "bar"})
200
    assert response.text == "bar"
201

202
    with pytest.raises(TypeError):
203
        c.post("/", json={"foo": "bar"}, data={"baz": "qux"})
204

205

206
def test_environ_builder_headers():
207
    b = EnvironBuilder(
208
        environ_base={"HTTP_USER_AGENT": "Foo/0.1"},
209
        environ_overrides={"wsgi.version": (1, 1)},
210
    )
211
    b.headers["X-Beat-My-Horse"] = "very well sir"
212
    env = b.get_environ()
213
    assert env["HTTP_USER_AGENT"] == "Foo/0.1"
214
    assert env["HTTP_X_BEAT_MY_HORSE"] == "very well sir"
215
    assert env["wsgi.version"] == (1, 1)
216

217
    b.headers["User-Agent"] = "Bar/1.0"
218
    env = b.get_environ()
219
    assert env["HTTP_USER_AGENT"] == "Bar/1.0"
220

221

222
def test_environ_builder_headers_content_type():
223
    b = EnvironBuilder(headers={"Content-Type": "text/plain"})
224
    env = b.get_environ()
225
    assert env["CONTENT_TYPE"] == "text/plain"
226
    assert "HTTP_CONTENT_TYPE" not in env
227
    b = EnvironBuilder(content_type="text/html", headers={"Content-Type": "text/plain"})
228
    env = b.get_environ()
229
    assert env["CONTENT_TYPE"] == "text/html"
230
    assert "HTTP_CONTENT_TYPE" not in env
231
    b = EnvironBuilder()
232
    env = b.get_environ()
233
    assert "CONTENT_TYPE" not in env
234
    assert "HTTP_CONTENT_TYPE" not in env
235

236

237
def test_envrion_builder_multiple_headers():
238
    h = Headers()
239
    h.add("FOO", "bar")
240
    h.add("FOO", "baz")
241
    b = EnvironBuilder(headers=h)
242
    env = b.get_environ()
243
    assert env["HTTP_FOO"] == "bar, baz"
244

245

246
def test_environ_builder_paths():
247
    b = EnvironBuilder(path="/foo", base_url="http://example.com/")
248
    assert b.base_url == "http://example.com/"
249
    assert b.path == "/foo"
250
    assert b.script_root == ""
251
    assert b.host == "example.com"
252

253
    b = EnvironBuilder(path="/foo", base_url="http://example.com/bar")
254
    assert b.base_url == "http://example.com/bar/"
255
    assert b.path == "/foo"
256
    assert b.script_root == "/bar"
257
    assert b.host == "example.com"
258

259
    b.host = "localhost"
260
    assert b.base_url == "http://localhost/bar/"
261
    b.base_url = "http://localhost:8080/"
262
    assert b.host == "localhost:8080"
263
    assert b.server_name == "localhost"
264
    assert b.server_port == 8080
265

266
    b.host = "foo.invalid"
267
    b.url_scheme = "https"
268
    b.script_root = "/test"
269
    env = b.get_environ()
270
    assert env["SERVER_NAME"] == "foo.invalid"
271
    assert env["SERVER_PORT"] == "443"
272
    assert env["SCRIPT_NAME"] == "/test"
273
    assert env["PATH_INFO"] == "/foo"
274
    assert env["HTTP_HOST"] == "foo.invalid"
275
    assert env["wsgi.url_scheme"] == "https"
276
    assert b.base_url == "https://foo.invalid/test/"
277

278

279
def test_environ_builder_content_type():
280
    builder = EnvironBuilder()
281
    assert builder.content_type is None
282
    builder.method = "POST"
283
    assert builder.content_type is None
284
    builder.method = "PUT"
285
    assert builder.content_type is None
286
    builder.method = "PATCH"
287
    assert builder.content_type is None
288
    builder.method = "DELETE"
289
    assert builder.content_type is None
290
    builder.method = "GET"
291
    assert builder.content_type is None
292
    builder.form["foo"] = "bar"
293
    assert builder.content_type == "application/x-www-form-urlencoded"
294
    builder.files.add_file("data", BytesIO(b"foo"), "test.txt")
295
    assert builder.content_type == "multipart/form-data"
296
    req = builder.get_request()
297
    builder.close()
298
    assert req.form["foo"] == "bar"
299
    assert req.files["data"].read() == b"foo"
300
    req.close()
301

302

303
def test_basic_auth():
304
    builder = EnvironBuilder(auth=("username", "password"))
305
    request = builder.get_request()
306
    assert request.authorization.username == "username"
307
    assert request.authorization.password == "password"
308

309

310
def test_auth_object():
311
    builder = EnvironBuilder(
312
        auth=Authorization("digest", {"username": "u", "password": "p"})
313
    )
314
    request = builder.get_request()
315
    assert request.headers["Authorization"].startswith("Digest ")
316

317

318
def test_environ_builder_stream_switch():
319
    d = MultiDict(dict(foo="bar", blub="blah", hu="hum"))
320
    for use_tempfile in False, True:
321
        stream, length, boundary = stream_encode_multipart(
322
            d, use_tempfile, threshold=150
323
        )
324
        assert isinstance(stream, BytesIO) != use_tempfile
325

326
        form = parse_form_data(
327
            {
328
                "wsgi.input": stream,
329
                "CONTENT_LENGTH": str(length),
330
                "CONTENT_TYPE": f'multipart/form-data; boundary="{boundary}"',
331
            }
332
        )[1]
333
        assert form == d
334
        stream.close()
335

336

337
def test_environ_builder_unicode_file_mix():
338
    for use_tempfile in False, True:
339
        f = FileStorage(BytesIO(rb"\N{SNOWMAN}"), "snowman.txt")
340
        d = MultiDict(dict(f=f, s="\N{SNOWMAN}"))
341
        stream, length, boundary = stream_encode_multipart(
342
            d, use_tempfile, threshold=150
343
        )
344
        assert isinstance(stream, BytesIO) != use_tempfile
345

346
        _, form, files = parse_form_data(
347
            {
348
                "wsgi.input": stream,
349
                "CONTENT_LENGTH": str(length),
350
                "CONTENT_TYPE": f'multipart/form-data; boundary="{boundary}"',
351
            }
352
        )
353
        assert form["s"] == "\N{SNOWMAN}"
354
        assert files["f"].name == "f"
355
        assert files["f"].filename == "snowman.txt"
356
        assert files["f"].read() == rb"\N{SNOWMAN}"
357
        stream.close()
358
        files["f"].close()
359

360

361
def test_environ_builder_empty_file():
362
    f = FileStorage(BytesIO(rb""), "empty.txt")
363
    d = MultiDict(dict(f=f, s=""))
364
    stream, length, boundary = stream_encode_multipart(d)
365
    _, form, files = parse_form_data(
366
        {
367
            "wsgi.input": stream,
368
            "CONTENT_LENGTH": str(length),
369
            "CONTENT_TYPE": f'multipart/form-data; boundary="{boundary}"',
370
        }
371
    )
372
    assert form["s"] == ""
373
    assert files["f"].read() == rb""
374
    stream.close()
375
    files["f"].close()
376

377

378
def test_create_environ():
379
    env = create_environ("/foo?bar=baz", "http://example.org/")
380
    expected = {
381
        "wsgi.multiprocess": False,
382
        "wsgi.version": (1, 0),
383
        "wsgi.run_once": False,
384
        "wsgi.errors": sys.stderr,
385
        "wsgi.multithread": False,
386
        "wsgi.url_scheme": "http",
387
        "SCRIPT_NAME": "",
388
        "SERVER_NAME": "example.org",
389
        "REQUEST_METHOD": "GET",
390
        "HTTP_HOST": "example.org",
391
        "PATH_INFO": "/foo",
392
        "SERVER_PORT": "80",
393
        "SERVER_PROTOCOL": "HTTP/1.1",
394
        "QUERY_STRING": "bar=baz",
395
    }
396
    for key, value in iter(expected.items()):
397
        assert env[key] == value
398
    assert env["wsgi.input"].read(0) == b""
399
    assert create_environ("/foo", "http://example.com/")["SCRIPT_NAME"] == ""
400

401

402
def test_create_environ_query_string_error():
403
    with pytest.raises(ValueError):
404
        create_environ("/foo?bar=baz", query_string={"a": "b"})
405

406

407
def test_builder_from_environ():
408
    environ = create_environ(
409
        "/ㄱ",
410
        base_url="https://example.com/base",
411
        query_string={"name": "Werkzeug"},
412
        data={"foo": "ㄴ"},
413
        headers={"X-Foo": "ㄷ"},
414
    )
415
    builder = EnvironBuilder.from_environ(environ)
416

417
    try:
418
        new_environ = builder.get_environ()
419
    finally:
420
        builder.close()
421

422
    assert new_environ == environ
423

424

425
def test_file_closing():
426
    closed = []
427

428
    class SpecialInput:
429
        def read(self, size):
430
            return b""
431

432
        def close(self):
433
            closed.append(self)
434

435
    create_environ(data={"foo": SpecialInput()})
436
    assert len(closed) == 1
437
    builder = EnvironBuilder()
438
    builder.files.add_file("blah", SpecialInput())
439
    builder.close()
440
    assert len(closed) == 2
441

442

443
def test_follow_redirect():
444
    env = create_environ("/", base_url="http://localhost")
445
    c = Client(redirect_with_get_app)
446
    response = c.open(environ_overrides=env, follow_redirects=True)
447
    assert response.status == "200 OK"
448
    assert response.text == "current url: http://localhost/some/redirect/"
449

450
    # Test that the :cls:`Client` is aware of user defined response wrappers
451
    c = Client(redirect_with_get_app)
452
    resp = c.get("/", follow_redirects=True)
453
    assert resp.status_code == 200
454
    assert resp.text == "current url: http://localhost/some/redirect/"
455

456
    # test with URL other than '/' to make sure redirected URL's are correct
457
    c = Client(redirect_with_get_app)
458
    resp = c.get("/first/request", follow_redirects=True)
459
    assert resp.status_code == 200
460
    assert resp.text == "current url: http://localhost/some/redirect/"
461

462

463
def test_follow_local_redirect():
464
    class LocalResponse(Response):
465
        autocorrect_location_header = False
466

467
    def local_redirect_app(environ, start_response):
468
        req = Request(environ)
469
        if "/from/location" in req.url:
470
            response = redirect("/to/location", Response=LocalResponse)
471
        else:
472
            response = Response(f"current path: {req.path}")
473
        return response(environ, start_response)
474

475
    c = Client(local_redirect_app)
476
    resp = c.get("/from/location", follow_redirects=True)
477
    assert resp.status_code == 200
478
    assert resp.text == "current path: /to/location"
479

480

481
@pytest.mark.parametrize(
482
    ("code", "keep"), ((302, False), (301, False), (307, True), (308, True))
483
)
484
def test_follow_redirect_body(code, keep):
485
    @Request.application
486
    def app(request):
487
        if request.url == "http://localhost/some/redirect/":
488
            assert request.method == "POST" if keep else "GET"
489
            assert request.headers["X-Foo"] == "bar"
490

491
            if keep:
492
                assert request.form["foo"] == "bar"
493
            else:
494
                assert not request.form
495

496
            return Response(f"current url: {request.url}")
497

498
        return redirect("http://localhost/some/redirect/", code=code)
499

500
    c = Client(app)
501
    response = c.post(
502
        "/", follow_redirects=True, data={"foo": "bar"}, headers={"X-Foo": "bar"}
503
    )
504
    assert response.status_code == 200
505
    assert response.text == "current url: http://localhost/some/redirect/"
506

507

508
def test_follow_external_redirect():
509
    env = create_environ("/", base_url="http://localhost")
510
    c = Client(external_redirect_demo_app)
511
    pytest.raises(
512
        RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True)
513
    )
514

515

516
def test_follow_external_redirect_on_same_subdomain():
517
    env = create_environ("/", base_url="http://example.com")
518
    c = Client(external_subdomain_redirect_demo_app, allow_subdomain_redirects=True)
519
    c.get(environ_overrides=env, follow_redirects=True)
520

521
    # check that this does not work for real external domains
522
    env = create_environ("/", base_url="http://localhost")
523
    pytest.raises(
524
        RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True)
525
    )
526

527
    # check that subdomain redirects fail if no `allow_subdomain_redirects` is applied
528
    c = Client(external_subdomain_redirect_demo_app)
529
    pytest.raises(
530
        RuntimeError, lambda: c.get(environ_overrides=env, follow_redirects=True)
531
    )
532

533

534
def test_follow_redirect_loop():
535
    c = Client(redirect_loop_app)
536
    with pytest.raises(ClientRedirectError):
537
        c.get("/", follow_redirects=True)
538

539

540
def test_follow_redirect_non_root_base_url():
541
    @Request.application
542
    def app(request):
543
        if request.path == "/redirect":
544
            return redirect("done")
545

546
        return Response(request.path)
547

548
    c = Client(app)
549
    response = c.get(
550
        "/redirect", base_url="http://localhost/other", follow_redirects=True
551
    )
552
    assert response.text == "/done"
553

554

555
def test_follow_redirect_exhaust_intermediate():
556
    class Middleware:
557
        def __init__(self, app):
558
            self.app = app
559
            self.active = 0
560

561
        def __call__(self, environ, start_response):
562
            # Test client must exhaust response stream, otherwise the
563
            # cleanup code that decrements this won't have run by the
564
            # time the next request is started.
565
            assert not self.active
566
            self.active += 1
567
            try:
568
                yield from self.app(environ, start_response)
569
            finally:
570
                self.active -= 1
571

572
    app = Middleware(redirect_with_get_app)
573
    client = Client(Middleware(redirect_with_get_app))
574
    response = client.get("/", follow_redirects=True, buffered=False)
575
    assert response.text == "current url: http://localhost/some/redirect/"
576
    assert not app.active
577

578

579
def test_redirects_are_tracked():
580
    @Request.application
581
    def app(request):
582
        if request.path == "/first":
583
            return redirect("/second")
584

585
        if request.path == "/second":
586
            return redirect("/third")
587

588
        return Response("done")
589

590
    c = Client(app)
591
    response = c.get("/first", follow_redirects=True)
592
    assert response.text == "done"
593
    assert len(response.history) == 2
594

595
    assert response.history[-1].request.path == "/second"
596
    assert response.history[-1].status_code == 302
597
    assert response.history[-1].location == "/third"
598
    assert len(response.history[-1].history) == 1
599
    assert response.history[-1].history[-1] is response.history[-2]
600

601
    assert response.history[-2].request.path == "/first"
602
    assert response.history[-2].status_code == 302
603
    assert response.history[-2].location == "/second"
604
    assert len(response.history[-2].history) == 0
605

606

607
def test_cookie_across_redirect():
608
    @Request.application
609
    def app(request):
610
        if request.path == "/":
611
            return Response(request.cookies.get("auth", "out"))
612

613
        if request.path == "/in":
614
            rv = redirect("/")
615
            rv.set_cookie("auth", "in")
616
            return rv
617

618
        if request.path == "/out":
619
            rv = redirect("/")
620
            rv.delete_cookie("auth")
621
            return rv
622

623
    c = Client(app)
624
    assert c.get("/").text == "out"
625
    assert c.get("/in", follow_redirects=True).text == "in"
626
    assert c.get("/").text == "in"
627
    assert c.get("/out", follow_redirects=True).text == "out"
628
    assert c.get("/").text == "out"
629

630

631
def test_path_info_script_name_unquoting():
632
    def test_app(environ, start_response):
633
        start_response("200 OK", [("Content-Type", "text/plain")])
634
        return [f"{environ['PATH_INFO']}\n{environ['SCRIPT_NAME']}"]
635

636
    c = Client(test_app)
637
    resp = c.get("/foo%40bar")
638
    assert resp.text == "/foo@bar\n"
639
    c = Client(test_app)
640
    resp = c.get("/foo%40bar", "http://localhost/bar%40baz")
641
    assert resp.text == "/foo@bar\n/bar@baz"
642

643

644
def test_multi_value_submit():
645
    c = Client(multi_value_post_app)
646
    data = {"field": ["val1", "val2"]}
647
    resp = c.post("/", data=data)
648
    assert resp.status_code == 200
649
    c = Client(multi_value_post_app)
650
    data = MultiDict({"field": ["val1", "val2"]})
651
    resp = c.post("/", data=data)
652
    assert resp.status_code == 200
653

654

655
def test_iri_support():
656
    b = EnvironBuilder("/föö-bar", base_url="http://☃.net/")
657
    assert b.path == "/f%C3%B6%C3%B6-bar"
658
    assert b.base_url == "http://xn--n3h.net/"
659

660

661
@pytest.mark.parametrize("buffered", (True, False))
662
@pytest.mark.parametrize("iterable", (True, False))
663
def test_run_wsgi_apps(buffered, iterable):
664
    leaked_data = []
665

666
    def simple_app(environ, start_response):
667
        start_response("200 OK", [("Content-Type", "text/html")])
668
        return ["Hello World!"]
669

670
    def yielding_app(environ, start_response):
671
        start_response("200 OK", [("Content-Type", "text/html")])
672
        yield "Hello "
673
        yield "World!"
674

675
    def late_start_response(environ, start_response):
676
        yield "Hello "
677
        yield "World"
678
        start_response("200 OK", [("Content-Type", "text/html")])
679
        yield "!"
680

681
    def depends_on_close(environ, start_response):
682
        leaked_data.append("harhar")
683
        start_response("200 OK", [("Content-Type", "text/html")])
684

685
        class Rv:
686
            def __iter__(self):
687
                yield "Hello "
688
                yield "World"
689
                yield "!"
690

691
            def close(self):
692
                assert leaked_data.pop() == "harhar"
693

694
        return Rv()
695

696
    for app in (simple_app, yielding_app, late_start_response, depends_on_close):
697
        if iterable:
698
            app = iterable_middleware(app)
699
        app_iter, status, headers = run_wsgi_app(app, {}, buffered=buffered)
700
        assert status == "200 OK"
701
        assert list(headers) == [("Content-Type", "text/html")]
702
        assert "".join(app_iter) == "Hello World!"
703

704
        if hasattr(app_iter, "close"):
705
            app_iter.close()
706
        assert not leaked_data
707

708

709
@pytest.mark.parametrize("buffered", (True, False))
710
@pytest.mark.parametrize("iterable", (True, False))
711
def test_lazy_start_response_empty_response_app(buffered, iterable):
712
    class app:
713
        def __init__(self, environ, start_response):
714
            self.start_response = start_response
715

716
        def __iter__(self):
717
            return self
718

719
        def __next__(self):
720
            self.start_response("200 OK", [("Content-Type", "text/html")])
721
            raise StopIteration
722

723
    if iterable:
724
        app = iterable_middleware(app)
725
    app_iter, status, headers = run_wsgi_app(app, {}, buffered=buffered)
726
    assert status == "200 OK"
727
    assert list(headers) == [("Content-Type", "text/html")]
728
    assert "".join(app_iter) == ""
729

730

731
def test_run_wsgi_app_closing_iterator():
732
    got_close = []
733

734
    class CloseIter:
735
        def __init__(self):
736
            self.iterated = False
737

738
        def __iter__(self):
739
            return self
740

741
        def close(self):
742
            got_close.append(None)
743

744
        def __next__(self):
745
            if self.iterated:
746
                raise StopIteration()
747
            self.iterated = True
748
            return "bar"
749

750
    def bar(environ, start_response):
751
        start_response("200 OK", [("Content-Type", "text/plain")])
752
        return CloseIter()
753

754
    app_iter, status, headers = run_wsgi_app(bar, {})
755
    assert status == "200 OK"
756
    assert list(headers) == [("Content-Type", "text/plain")]
757
    assert next(app_iter) == "bar"
758
    pytest.raises(StopIteration, partial(next, app_iter))
759
    app_iter.close()
760

761
    assert run_wsgi_app(bar, {}, True)[0] == ["bar"]
762

763
    assert len(got_close) == 2
764

765

766
def iterable_middleware(app):
767
    """Guarantee that the app returns an iterable"""
768

769
    def inner(environ, start_response):
770
        rv = app(environ, start_response)
771

772
        class Iterable:
773
            def __iter__(self):
774
                return iter(rv)
775

776
            if hasattr(rv, "close"):
777

778
                def close(self):
779
                    rv.close()
780

781
        return Iterable()
782

783
    return inner
784

785

786
def test_multiple_cookies():
787
    @Request.application
788
    def test_app(request):
789
        response = Response(repr(sorted(request.cookies.items())))
790
        response.set_cookie("test1", "foo")
791
        response.set_cookie("test2", "bar")
792
        return response
793

794
    client = Client(test_app)
795
    resp = client.get("/")
796
    assert resp.text == "[]"
797
    resp = client.get("/")
798
    assert resp.text == repr([("test1", "foo"), ("test2", "bar")])
799

800

801
def test_correct_open_invocation_on_redirect():
802
    class MyClient(Client):
803
        counter = 0
804

805
        def open(self, *args, **kwargs):
806
            self.counter += 1
807
            env = kwargs.setdefault("environ_overrides", {})
808
            env["werkzeug._foo"] = self.counter
809
            return Client.open(self, *args, **kwargs)
810

811
    @Request.application
812
    def test_app(request):
813
        return Response(str(request.environ["werkzeug._foo"]))
814

815
    c = MyClient(test_app, response_wrapper=Response)
816
    assert c.get("/").text == "1"
817
    assert c.get("/").text == "2"
818
    assert c.get("/").text == "3"
819

820

821
def test_correct_encoding():
822
    req = Request.from_values("/\N{SNOWMAN}", "http://example.com/foo")
823
    assert req.script_root == "/foo"
824
    assert req.path == "/\N{SNOWMAN}"
825

826

827
def test_full_url_requests_with_args():
828
    base = "http://example.com/"
829

830
    @Request.application
831
    def test_app(request):
832
        return Response(request.args["x"])
833

834
    client = Client(test_app)
835
    resp = client.get("/?x=42", base)
836
    assert resp.text == "42"
837
    resp = client.get("http://www.example.com/?x=23", base)
838
    assert resp.text == "23"
839

840

841
def test_delete_requests_with_form():
842
    @Request.application
843
    def test_app(request):
844
        return Response(request.form.get("x", None))
845

846
    client = Client(test_app)
847
    resp = client.delete("/", data={"x": 42})
848
    assert resp.text == "42"
849

850

851
def test_post_with_file_descriptor(tmpdir):
852
    c = Client(Response())
853
    f = tmpdir.join("some-file.txt")
854
    f.write("foo")
855
    with open(f.strpath) as data:
856
        resp = c.post("/", data=data)
857
    assert resp.status_code == 200
858
    with open(f.strpath, mode="rb") as data:
859
        resp = c.post("/", data=data)
860
    assert resp.status_code == 200
861

862

863
def test_content_type():
864
    @Request.application
865
    def test_app(request):
866
        return Response(request.content_type)
867

868
    client = Client(test_app)
869

870
    resp = client.get("/", data=b"testing", mimetype="text/css")
871
    assert resp.text == "text/css; charset=utf-8"
872

873
    resp = client.get("/", data=b"testing", mimetype="application/octet-stream")
874
    assert resp.text == "application/octet-stream"
875

876

877
def test_raw_request_uri():
878
    @Request.application
879
    def app(request):
880
        path_info = request.path
881
        request_uri = request.environ["REQUEST_URI"]
882
        return Response("\n".join((path_info, request_uri)))
883

884
    client = Client(app)
885
    response = client.get("/hello%2fworld")
886
    data = response.text
887
    assert data == "/hello/world\n/hello%2fworld"
888

889
    response = client.get("/?a=b")
890
    assert response.text == "/\n/?a=b"
891

892
    response = client.get("/%3f?")  # escaped ? in path, and empty query string
893
    assert response.text == "/?\n/%3f?"
894

895

896
def no_response_headers_app(environ, start_response):
897
    """A WSGI application which returns a resposne with no headers."""
898
    response = Response("Response")
899
    response.headers.clear()
900
    return response(environ, start_response)
901

902

903
def test_no_content_type_header_addition():
904
    c = Client(no_response_headers_app)
905
    response = c.open()
906
    assert response.headers == Headers([("Content-Length", "8")])
907

908

909
def test_client_response_wrapper():
910
    class CustomResponse(Response):
911
        pass
912

913
    class CustomTestResponse(TestResponse, Response):
914
        pass
915

916
    c1 = Client(Response(), CustomResponse)
917
    r1 = c1.open()
918

919
    assert isinstance(r1, CustomResponse)
920
    assert type(r1) is not CustomResponse  # Got subclassed
921
    assert issubclass(type(r1), CustomResponse)
922

923
    c2 = Client(Response(), CustomTestResponse)
924
    r2 = c2.open()
925

926
    assert isinstance(r2, CustomTestResponse)
927
    assert type(r2) is CustomTestResponse  # Did not get subclassed
928

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.