3
from werkzeug.datastructures import Headers
4
from werkzeug.sansio.multipart import Data
5
from werkzeug.sansio.multipart import Epilogue
6
from werkzeug.sansio.multipart import Field
7
from werkzeug.sansio.multipart import File
8
from werkzeug.sansio.multipart import MultipartDecoder
9
from werkzeug.sansio.multipart import MultipartEncoder
10
from werkzeug.sansio.multipart import NeedData
11
from werkzeug.sansio.multipart import Preamble
14
def test_decoder_simple() -> None:
15
boundary = b"---------------------------9704338192090380615194531385$"
16
decoder = MultipartDecoder(boundary)
18
-----------------------------9704338192090380615194531385$
19
Content-Disposition: form-data; name="fname"
22
-----------------------------9704338192090380615194531385$
23
Content-Disposition: form-data; name="lname"; filename="bob"
26
-----------------------------9704338192090380615194531385$--
27
""".replace("\n", "\r\n").encode()
28
decoder.receive_data(data)
29
decoder.receive_data(None)
30
events = [decoder.next_event()]
31
while not isinstance(events[-1], Epilogue):
32
events.append(decoder.next_event())
37
headers=Headers([("Content-Disposition", 'form-data; name="fname"')]),
39
Data(data="ß∑œß∂ƒå∂".encode(), more_data=False),
44
[("Content-Disposition", 'form-data; name="lname"; filename="bob"')]
47
Data(data=b"asdasd", more_data=False),
50
encoder = MultipartEncoder(boundary)
53
result += encoder.send_event(event)
57
@pytest.mark.parametrize(
71
@pytest.mark.parametrize("data_end", [b"", b"\r\n--foo"])
72
def test_decoder_data_start_with_different_newline_positions(
73
data_start: bytes, data_end: bytes
78
b'Content-Disposition: form-data; name="test"; filename="testfile"\r\n'
79
b"Content-Type: application/octet-stream\r\n\r\n"
80
b"" + data_start + b"\r\nBCDE" + data_end
82
decoder = MultipartDecoder(boundary)
83
decoder.receive_data(data)
84
events = [decoder.next_event()]
86
while not isinstance(events[-1], Data):
87
events.append(decoder.next_event())
88
expected = data_start if data_end == b"" else data_start + b"\r\nBCDE"
97
"Content-Disposition",
98
'form-data; name="test"; filename="testfile"',
100
("Content-Type", "application/octet-stream"),
104
Data(data=expected, more_data=True),
108
def test_chunked_boundaries() -> None:
109
boundary = b"--boundary"
110
decoder = MultipartDecoder(boundary)
111
decoder.receive_data(b"--")
112
assert isinstance(decoder.next_event(), NeedData)
113
decoder.receive_data(b"--boundary\r\n")
114
assert isinstance(decoder.next_event(), Preamble)
115
decoder.receive_data(b"Content-Disposition: form-data;")
116
assert isinstance(decoder.next_event(), NeedData)
117
decoder.receive_data(b'name="fname"\r\n\r\n')
118
assert isinstance(decoder.next_event(), Field)
119
decoder.receive_data(b"longer than the boundary")
120
assert isinstance(decoder.next_event(), Data)
121
decoder.receive_data(b"also longer, but includes a linebreak\r\n--")
122
assert isinstance(decoder.next_event(), Data)
123
assert isinstance(decoder.next_event(), NeedData)
124
decoder.receive_data(b"--boundary--\r\n")
125
event = decoder.next_event()
126
assert isinstance(event, Data)
127
assert not event.more_data
128
decoder.receive_data(None)
129
assert isinstance(decoder.next_event(), Epilogue)
132
def test_empty_field() -> None:
134
decoder = MultipartDecoder(boundary)
137
Content-Disposition: form-data; name="text"
138
Content-Type: text/plain; charset="UTF-8"
142
Content-Disposition: form-data; name="empty"
143
Content-Type: text/plain; charset="UTF-8"
146
""".replace("\n", "\r\n").encode()
147
decoder.receive_data(data)
148
decoder.receive_data(None)
149
events = [decoder.next_event()]
150
while not isinstance(events[-1], Epilogue):
151
events.append(decoder.next_event())
158
("Content-Disposition", 'form-data; name="text"'),
159
("Content-Type", 'text/plain; charset="UTF-8"'),
163
Data(data=b"Some Text", more_data=False),
168
("Content-Disposition", 'form-data; name="empty"'),
169
("Content-Type", 'text/plain; charset="UTF-8"'),
173
Data(data=b"", more_data=False),
176
encoder = MultipartEncoder(boundary)
179
result += encoder.send_event(event)
180
assert data == result