3
from jinja2 import ChainableUndefined
4
from jinja2 import DictLoader
5
from jinja2 import Environment
6
from jinja2 import Template
7
from jinja2.async_utils import auto_aiter
8
from jinja2.exceptions import TemplateNotFound
9
from jinja2.exceptions import TemplatesNotFound
10
from jinja2.exceptions import UndefinedError
11
from jinja2.nativetypes import NativeEnvironment
14
def test_basic_async(run_async_fn):
16
"{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}", enable_async=True
20
return await t.render_async()
22
rv = run_async_fn(func)
23
assert rv == "[1][2][3]"
26
def test_await_on_calls(run_async_fn):
27
t = Template("{{ async_func() + normal_func() }}", enable_async=True)
29
async def async_func():
36
return await t.render_async(async_func=async_func, normal_func=normal_func)
38
rv = run_async_fn(func)
42
def test_await_on_calls_normal_render():
43
t = Template("{{ async_func() + normal_func() }}", enable_async=True)
45
async def async_func():
51
rv = t.render(async_func=async_func, normal_func=normal_func)
55
def test_await_and_macros(run_async_fn):
57
"{% macro foo(x) %}[{{ x }}][{{ async_func() }}]{% endmacro %}{{ foo(42) }}",
61
async def async_func():
65
return await t.render_async(async_func=async_func)
67
rv = run_async_fn(func)
68
assert rv == "[42][42]"
71
def test_async_blocks(run_async_fn):
73
"{% block foo %}<Test>{% endblock %}{{ self.foo() }}",
79
return await t.render_async()
81
rv = run_async_fn(func)
82
assert rv == "<Test><Test>"
85
def test_async_generate():
86
t = Template("{% for x in [1, 2, 3] %}{{ x }}{% endfor %}", enable_async=True)
87
rv = list(t.generate())
88
assert rv == ["1", "2", "3"]
91
def test_async_iteration_in_templates():
92
t = Template("{% for x in rng %}{{ x }}{% endfor %}", enable_async=True)
94
async def async_iterator():
95
for item in [1, 2, 3]:
98
rv = list(t.generate(rng=async_iterator()))
99
assert rv == ["1", "2", "3"]
102
def test_async_iteration_in_templates_extended():
104
"{% for x in rng %}{{ loop.index0 }}/{{ x }}{% endfor %}", enable_async=True
106
stream = t.generate(rng=auto_aiter(range(1, 4)))
107
assert next(stream) == "0"
108
assert "".join(stream) == "/11/22/3"
116
module="{% macro test() %}[{{ foo }}|{{ bar }}]{% endmacro %}",
117
header="[{{ foo }}|{{ 23 }}]",
118
o_printer="({{ o }})",
123
env.globals["bar"] = 23
127
class TestAsyncImports:
128
def test_context_imports(self, test_env_async):
129
t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
130
assert t.render(foo=42) == "[|23]"
131
t = test_env_async.from_string(
132
'{% import "module" as m without context %}{{ m.test() }}'
134
assert t.render(foo=42) == "[|23]"
135
t = test_env_async.from_string(
136
'{% import "module" as m with context %}{{ m.test() }}'
138
assert t.render(foo=42) == "[42|23]"
139
t = test_env_async.from_string('{% from "module" import test %}{{ test() }}')
140
assert t.render(foo=42) == "[|23]"
141
t = test_env_async.from_string(
142
'{% from "module" import test without context %}{{ test() }}'
144
assert t.render(foo=42) == "[|23]"
145
t = test_env_async.from_string(
146
'{% from "module" import test with context %}{{ test() }}'
148
assert t.render(foo=42) == "[42|23]"
150
def test_trailing_comma(self, test_env_async):
151
test_env_async.from_string('{% from "foo" import bar, baz with context %}')
152
test_env_async.from_string('{% from "foo" import bar, baz, with context %}')
153
test_env_async.from_string('{% from "foo" import bar, with context %}')
154
test_env_async.from_string('{% from "foo" import bar, with, context %}')
155
test_env_async.from_string('{% from "foo" import bar, with with context %}')
157
def test_exports(self, test_env_async, run_async_fn):
158
coro_fn = test_env_async.from_string(
160
{% macro toplevel() %}...{% endmacro %}
161
{% macro __private() %}...{% endmacro %}
162
{% set variable = 42 %}
163
{% for item in [1] %}
164
{% macro notthere() %}{% endmacro %}
167
)._get_default_module_async
168
m = run_async_fn(coro_fn)
169
assert run_async_fn(m.toplevel) == "..."
170
assert not hasattr(m, "__missing")
171
assert m.variable == 42
172
assert not hasattr(m, "notthere")
174
def test_import_with_globals(self, test_env_async):
175
t = test_env_async.from_string(
176
'{% import "module" as m %}{{ m.test() }}', globals={"foo": 42}
178
assert t.render() == "[42|23]"
180
t = test_env_async.from_string('{% import "module" as m %}{{ m.test() }}')
181
assert t.render() == "[|23]"
183
def test_import_with_globals_override(self, test_env_async):
184
t = test_env_async.from_string(
185
'{% set foo = 41 %}{% import "module" as m %}{{ m.test() }}',
188
assert t.render() == "[42|23]"
190
def test_from_import_with_globals(self, test_env_async):
191
t = test_env_async.from_string(
192
'{% from "module" import test %}{{ test() }}',
195
assert t.render() == "[42|23]"
198
class TestAsyncIncludes:
199
def test_context_include(self, test_env_async):
200
t = test_env_async.from_string('{% include "header" %}')
201
assert t.render(foo=42) == "[42|23]"
202
t = test_env_async.from_string('{% include "header" with context %}')
203
assert t.render(foo=42) == "[42|23]"
204
t = test_env_async.from_string('{% include "header" without context %}')
205
assert t.render(foo=42) == "[|23]"
207
def test_choice_includes(self, test_env_async):
208
t = test_env_async.from_string('{% include ["missing", "header"] %}')
209
assert t.render(foo=42) == "[42|23]"
211
t = test_env_async.from_string(
212
'{% include ["missing", "missing2"] ignore missing %}'
214
assert t.render(foo=42) == ""
216
t = test_env_async.from_string('{% include ["missing", "missing2"] %}')
217
pytest.raises(TemplateNotFound, t.render)
218
with pytest.raises(TemplatesNotFound) as e:
221
assert e.value.templates == ["missing", "missing2"]
222
assert e.value.name == "missing2"
224
def test_includes(t, **ctx):
226
assert t.render(ctx) == "[42|23]"
228
t = test_env_async.from_string('{% include ["missing", "header"] %}')
230
t = test_env_async.from_string("{% include x %}")
231
test_includes(t, x=["missing", "header"])
232
t = test_env_async.from_string('{% include [x, "header"] %}')
233
test_includes(t, x="missing")
234
t = test_env_async.from_string("{% include x %}")
235
test_includes(t, x="header")
236
t = test_env_async.from_string("{% include x %}")
237
test_includes(t, x="header")
238
t = test_env_async.from_string("{% include [x] %}")
239
test_includes(t, x="header")
241
def test_include_ignoring_missing(self, test_env_async):
242
t = test_env_async.from_string('{% include "missing" %}')
243
pytest.raises(TemplateNotFound, t.render)
244
for extra in "", "with context", "without context":
245
t = test_env_async.from_string(
246
'{% include "missing" ignore missing ' + extra + " %}"
248
assert t.render() == ""
250
def test_context_include_with_overrides(self, test_env_async):
254
main="{% for item in [1, 2, 3] %}{% include 'item' %}{% endfor %}",
259
assert env.get_template("main").render() == "123"
261
def test_unoptimized_scopes(self, test_env_async):
262
t = test_env_async.from_string(
266
{% include "o_printer" %}
273
assert t.render().strip() == "(FOO)"
275
def test_unoptimized_scopes_autoescape(self):
277
loader=DictLoader({"o_printer": "({{ o }})"}),
285
{% include "o_printer" %}
292
assert t.render().strip() == "(FOO)"
295
class TestAsyncForLoop:
296
def test_simple(self, test_env_async):
297
tmpl = test_env_async.from_string("{% for item in seq %}{{ item }}{% endfor %}")
298
assert tmpl.render(seq=list(range(10))) == "0123456789"
300
def test_else(self, test_env_async):
301
tmpl = test_env_async.from_string(
302
"{% for item in seq %}XXX{% else %}...{% endfor %}"
304
assert tmpl.render() == "..."
306
def test_empty_blocks(self, test_env_async):
307
tmpl = test_env_async.from_string(
308
"<{% for item in seq %}{% else %}{% endfor %}>"
310
assert tmpl.render() == "<>"
312
@pytest.mark.parametrize(
313
"transform", [lambda x: x, iter, reversed, lambda x: (i for i in x), auto_aiter]
315
def test_context_vars(self, test_env_async, transform):
316
t = test_env_async.from_string(
317
"{% for item in seq %}{{ loop.index }}|{{ loop.index0 }}"
318
"|{{ loop.revindex }}|{{ loop.revindex0 }}|{{ loop.first }}"
319
"|{{ loop.last }}|{{ loop.length }}\n{% endfor %}"
321
out = t.render(seq=transform([42, 24]))
322
assert out == "1|0|2|1|True|False|2\n2|1|1|0|False|True|2\n"
324
def test_cycling(self, test_env_async):
325
tmpl = test_env_async.from_string(
326
"""{% for item in seq %}{{
327
loop.cycle('<1>', '<2>') }}{% endfor %}{%
328
for item in seq %}{{ loop.cycle(*through) }}{% endfor %}"""
330
output = tmpl.render(seq=list(range(4)), through=("<1>", "<2>"))
331
assert output == "<1><2>" * 4
333
def test_lookaround(self, test_env_async):
334
tmpl = test_env_async.from_string(
335
"""{% for item in seq -%}
336
{{ loop.previtem|default('x') }}-{{ item }}-{{
337
loop.nextitem|default('x') }}|
340
output = tmpl.render(seq=list(range(4)))
341
assert output == "x-0-1|0-1-2|1-2-3|2-3-x|"
343
def test_changed(self, test_env_async):
344
tmpl = test_env_async.from_string(
345
"""{% for item in seq -%}
346
{{ loop.changed(item) }},
349
output = tmpl.render(seq=[None, None, 1, 2, 2, 3, 4, 4, 4])
350
assert output == "True,False,True,True,False,True,True,False,False,"
352
def test_scope(self, test_env_async):
353
tmpl = test_env_async.from_string("{% for item in seq %}{% endfor %}{{ item }}")
354
output = tmpl.render(seq=list(range(10)))
357
def test_varlen(self, test_env_async):
361
tmpl = test_env_async.from_string(
362
"{% for item in iter %}{{ item }}{% endfor %}"
364
output = tmpl.render(iter=inner())
365
assert output == "01234"
367
def test_noniter(self, test_env_async):
368
tmpl = test_env_async.from_string("{% for item in none %}...{% endfor %}")
369
pytest.raises(TypeError, tmpl.render)
371
def test_recursive(self, test_env_async):
372
tmpl = test_env_async.from_string(
373
"""{% for item in seq recursive -%}
374
[{{ item.a }}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
380
dict(a=1, b=[dict(a=1), dict(a=2)]),
381
dict(a=2, b=[dict(a=1), dict(a=2)]),
382
dict(a=3, b=[dict(a="a")]),
385
== "[1<[1][2]>][2<[1][2]>][3<[a]>]"
388
def test_recursive_lookaround(self, test_env_async):
389
tmpl = test_env_async.from_string(
390
"""{% for item in seq recursive -%}
391
[{{ loop.previtem.a if loop.previtem is defined else 'x' }}.{{
392
item.a }}.{{ loop.nextitem.a if loop.nextitem is defined else 'x'
393
}}{% if item.b %}<{{ loop(item.b) }}>{% endif %}]
399
dict(a=1, b=[dict(a=1), dict(a=2)]),
400
dict(a=2, b=[dict(a=1), dict(a=2)]),
401
dict(a=3, b=[dict(a="a")]),
404
== "[x.1.2<[x.1.2][1.2.x]>][1.2.3<[x.1.2][1.2.x]>][2.3.x<[x.a.x]>]"
407
def test_recursive_depth0(self, test_env_async):
408
tmpl = test_env_async.from_string(
409
"{% for item in seq recursive %}[{{ loop.depth0 }}:{{ item.a }}"
410
"{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
415
dict(a=1, b=[dict(a=1), dict(a=2)]),
416
dict(a=2, b=[dict(a=1), dict(a=2)]),
417
dict(a=3, b=[dict(a="a")]),
420
== "[0:1<[1:1][1:2]>][0:2<[1:1][1:2]>][0:3<[1:a]>]"
423
def test_recursive_depth(self, test_env_async):
424
tmpl = test_env_async.from_string(
425
"{% for item in seq recursive %}[{{ loop.depth }}:{{ item.a }}"
426
"{% if item.b %}<{{ loop(item.b) }}>{% endif %}]{% endfor %}"
431
dict(a=1, b=[dict(a=1), dict(a=2)]),
432
dict(a=2, b=[dict(a=1), dict(a=2)]),
433
dict(a=3, b=[dict(a="a")]),
436
== "[1:1<[2:1][2:2]>][1:2<[2:1][2:2]>][1:3<[2:a]>]"
439
def test_looploop(self, test_env_async):
440
tmpl = test_env_async.from_string(
441
"""{% for row in table %}
442
{%- set rowloop = loop -%}
443
{% for cell in row -%}
444
[{{ rowloop.index }}|{{ loop.index }}]
448
assert tmpl.render(table=["ab", "cd"]) == "[1|1][1|2][2|1][2|2]"
450
def test_reversed_bug(self, test_env_async):
451
tmpl = test_env_async.from_string(
452
"{% for i in items %}{{ i }}"
453
"{% if not loop.last %}"
454
",{% endif %}{% endfor %}"
456
assert tmpl.render(items=reversed([3, 2, 1])) == "1,2,3"
458
def test_loop_errors(self, test_env_async, run_async_fn):
459
tmpl = test_env_async.from_string(
460
"""{% for item in [1] if loop.index
461
== 0 %}...{% endfor %}"""
463
with pytest.raises(UndefinedError):
464
run_async_fn(tmpl.render_async)
466
tmpl = test_env_async.from_string(
467
"""{% for item in [] %}...{% else
468
%}{{ loop }}{% endfor %}"""
470
assert run_async_fn(tmpl.render_async) == ""
472
def test_loop_filter(self, test_env_async):
473
tmpl = test_env_async.from_string(
474
"{% for item in range(10) if item is even %}[{{ item }}]{% endfor %}"
476
assert tmpl.render() == "[0][2][4][6][8]"
477
tmpl = test_env_async.from_string(
479
{%- for item in range(10) if item is even %}[{{
480
loop.index }}:{{ item }}]{% endfor %}"""
482
assert tmpl.render() == "[1:0][2:2][3:4][4:6][5:8]"
484
def test_scoped_special_var(self, test_env_async):
485
t = test_env_async.from_string(
486
"{% for s in seq %}[{{ loop.first }}{% for c in s %}"
487
"|{{ loop.first }}{% endfor %}]{% endfor %}"
489
assert t.render(seq=("ab", "cd")) == "[True|True|False][False|True|False]"
491
def test_scoped_loop_var(self, test_env_async):
492
t = test_env_async.from_string(
493
"{% for x in seq %}{{ loop.first }}"
494
"{% for y in seq %}{% endfor %}{% endfor %}"
496
assert t.render(seq="ab") == "TrueFalse"
497
t = test_env_async.from_string(
498
"{% for x in seq %}{% for y in seq %}"
499
"{{ loop.first }}{% endfor %}{% endfor %}"
501
assert t.render(seq="ab") == "TrueFalseTrueFalse"
503
def test_recursive_empty_loop_iter(self, test_env_async):
504
t = test_env_async.from_string(
506
{%- for item in foo recursive -%}{%- endfor -%}
509
assert t.render(dict(foo=[])) == ""
511
def test_call_in_loop(self, test_env_async):
512
t = test_env_async.from_string(
514
{%- macro do_something() -%}
518
{%- for i in [1, 2, 3] %}
519
{%- call do_something() -%}
525
assert t.render() == "[1][2][3]"
527
def test_scoping_bug(self, test_env_async):
528
t = test_env_async.from_string(
530
{%- for item in foo %}...{{ item }}...{% endfor %}
531
{%- macro item(a) %}...{{ a }}...{% endmacro %}
535
assert t.render(foo=(1,)) == "...1......2..."
537
def test_unpacking(self, test_env_async):
538
tmpl = test_env_async.from_string(
539
"{% for a, b, c in [[1, 2, 3]] %}{{ a }}|{{ b }}|{{ c }}{% endfor %}"
541
assert tmpl.render() == "1|2|3"
543
def test_recursive_loop_filter(self, test_env_async):
544
t = test_env_async.from_string(
546
<?xml version="1.0" encoding="UTF-8"?>
547
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
548
{%- for page in [site.root] if page.url != this recursive %}
549
<url><loc>{{ page.url }}</loc></url>
550
{{- loop(page.children) }}
557
site={"root": {"url": "/", "children": [{"url": "/foo"}, {"url": "/bar"}]}},
559
lines = [x.strip() for x in sm.splitlines() if x.strip()]
561
'<?xml version="1.0" encoding="UTF-8"?>',
562
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
563
"<url><loc>/</loc></url>",
564
"<url><loc>/bar</loc></url>",
568
def test_nonrecursive_loop_filter(self, test_env_async):
569
t = test_env_async.from_string(
571
<?xml version="1.0" encoding="UTF-8"?>
572
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
573
{%- for page in items if page.url != this %}
574
<url><loc>{{ page.url }}</loc></url>
580
this="/foo", items=[{"url": "/"}, {"url": "/foo"}, {"url": "/bar"}]
582
lines = [x.strip() for x in sm.splitlines() if x.strip()]
584
'<?xml version="1.0" encoding="UTF-8"?>',
585
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">',
586
"<url><loc>/</loc></url>",
587
"<url><loc>/bar</loc></url>",
591
def test_bare_async(self, test_env_async):
592
t = test_env_async.from_string('{% extends "header" %}')
593
assert t.render(foo=42) == "[42|23]"
595
def test_awaitable_property_slicing(self, test_env_async):
596
t = test_env_async.from_string("{% for x in a.b[:1] %}{{ x }}{% endfor %}")
597
assert t.render(a=dict(b=[1, 2, 3])) == "1"
600
def test_namespace_awaitable(test_env_async, run_async_fn):
602
t = test_env_async.from_string(
603
'{% set ns = namespace(foo="Bar") %}{{ ns.foo }}'
605
actual = await t.render_async()
606
assert actual == "Bar"
611
def test_chainable_undefined_aiter(run_async_fn):
614
"{% for x in a['b']['c'] %}{{ x }}{% endfor %}",
616
undefined=ChainableUndefined,
618
rv = await t.render_async(a={})
625
def async_native_env():
626
return NativeEnvironment(enable_async=True)
629
def test_native_async(async_native_env, run_async_fn):
631
t = async_native_env.from_string("{{ x }}")
632
rv = await t.render_async(x=23)
638
def test_native_list_async(async_native_env, run_async_fn):
640
t = async_native_env.from_string("{{ x }}")
641
rv = await t.render_async(x=list(range(3)))
642
assert rv == [0, 1, 2]
647
def test_getitem_after_filter():
648
env = Environment(enable_async=True)
649
env.filters["add_each"] = lambda v, x: [i + x for i in v]
650
t = env.from_string("{{ (a|add_each(2))[1:] }}")
651
out = t.render(a=range(3))
652
assert out == "[3, 4]"
655
def test_getitem_after_call():
656
env = Environment(enable_async=True)
657
env.globals["add_each"] = lambda v, x: [i + x for i in v]
658
t = env.from_string("{{ add_each(a, 2)[1:] }}")
659
out = t.render(a=range(3))
660
assert out == "[3, 4]"
663
def test_basic_generate_async(run_async_fn):
665
"{% for item in [1, 2, 3] %}[{{ item }}]{% endfor %}", enable_async=True
669
agen = t.generate_async()
671
return await agen.__anext__()
675
rv = run_async_fn(func)
679
def test_include_generate_async(run_async_fn, test_env_async):
680
t = test_env_async.from_string('{% include "header" %}')
683
agen = t.generate_async()
685
return await agen.__anext__()
689
rv = run_async_fn(func)
693
def test_blocks_generate_async(run_async_fn):
695
"{% block foo %}<Test>{% endblock %}{{ self.foo() }}",
701
agen = t.generate_async()
703
return await agen.__anext__()
707
rv = run_async_fn(func)
708
assert rv == "<Test>"
711
def test_async_extend(run_async_fn, test_env_async):
712
t = test_env_async.from_string('{% extends "header" %}')
715
agen = t.generate_async()
717
return await agen.__anext__()
721
rv = run_async_fn(func)