werkzeug

Форк
0
139 строк · 4.5 Кб
1
"""A simple URL shortener using Werkzeug and redis."""
2
import os
3
from urllib.parse import urlsplit
4

5
import redis
6
from jinja2 import Environment
7
from jinja2 import FileSystemLoader
8
from werkzeug.exceptions import HTTPException
9
from werkzeug.exceptions import NotFound
10
from werkzeug.middleware.shared_data import SharedDataMiddleware
11
from werkzeug.routing import Map
12
from werkzeug.routing import Rule
13
from werkzeug.utils import redirect
14
from werkzeug.wrappers import Request
15
from werkzeug.wrappers import Response
16

17

18
def base36_encode(number):
19
    assert number >= 0, "positive integer required"
20
    if number == 0:
21
        return "0"
22
    base36 = []
23
    while number != 0:
24
        number, i = divmod(number, 36)
25
        base36.append("0123456789abcdefghijklmnopqrstuvwxyz"[i])
26
    return "".join(reversed(base36))
27

28

29
def is_valid_url(url):
30
    parts = urlsplit(url)
31
    return parts.scheme in ("http", "https")
32

33

34
def get_hostname(url):
35
    return urlsplit(url).netloc
36

37

38
class Shortly:
39
    def __init__(self, config):
40
        self.redis = redis.Redis(
41
            config["redis_host"], config["redis_port"], decode_responses=True
42
        )
43
        template_path = os.path.join(os.path.dirname(__file__), "templates")
44
        self.jinja_env = Environment(
45
            loader=FileSystemLoader(template_path), autoescape=True
46
        )
47
        self.jinja_env.filters["hostname"] = get_hostname
48

49
        self.url_map = Map(
50
            [
51
                Rule("/", endpoint="new_url"),
52
                Rule("/<short_id>", endpoint="follow_short_link"),
53
                Rule("/<short_id>+", endpoint="short_link_details"),
54
            ]
55
        )
56

57
    def on_new_url(self, request):
58
        error = None
59
        url = ""
60
        if request.method == "POST":
61
            url = request.form["url"]
62
            if not is_valid_url(url):
63
                error = "Please enter a valid URL"
64
            else:
65
                short_id = self.insert_url(url)
66
                return redirect(f"/{short_id}+")
67
        return self.render_template("new_url.html", error=error, url=url)
68

69
    def on_follow_short_link(self, request, short_id):
70
        link_target = self.redis.get(f"url-target:{short_id}")
71
        if link_target is None:
72
            raise NotFound()
73
        self.redis.incr(f"click-count:{short_id}")
74
        return redirect(link_target)
75

76
    def on_short_link_details(self, request, short_id):
77
        link_target = self.redis.get(f"url-target:{short_id}")
78
        if link_target is None:
79
            raise NotFound()
80
        click_count = int(self.redis.get(f"click-count:{short_id}") or 0)
81
        return self.render_template(
82
            "short_link_details.html",
83
            link_target=link_target,
84
            short_id=short_id,
85
            click_count=click_count,
86
        )
87

88
    def error_404(self):
89
        response = self.render_template("404.html")
90
        response.status_code = 404
91
        return response
92

93
    def insert_url(self, url):
94
        short_id = self.redis.get(f"reverse-url:{url}")
95
        if short_id is not None:
96
            return short_id
97
        url_num = self.redis.incr("last-url-id")
98
        short_id = base36_encode(url_num)
99
        self.redis.set(f"url-target:{short_id}", url)
100
        self.redis.set(f"reverse-url:{url}", short_id)
101
        return short_id
102

103
    def render_template(self, template_name, **context):
104
        t = self.jinja_env.get_template(template_name)
105
        return Response(t.render(context), mimetype="text/html")
106

107
    def dispatch_request(self, request):
108
        adapter = self.url_map.bind_to_environ(request.environ)
109
        try:
110
            endpoint, values = adapter.match()
111
            return getattr(self, f"on_{endpoint}")(request, **values)
112
        except NotFound:
113
            return self.error_404()
114
        except HTTPException as e:
115
            return e
116

117
    def wsgi_app(self, environ, start_response):
118
        request = Request(environ)
119
        response = self.dispatch_request(request)
120
        return response(environ, start_response)
121

122
    def __call__(self, environ, start_response):
123
        return self.wsgi_app(environ, start_response)
124

125

126
def create_app(redis_host="localhost", redis_port=6379, with_static=True):
127
    app = Shortly({"redis_host": redis_host, "redis_port": redis_port})
128
    if with_static:
129
        app.wsgi_app = SharedDataMiddleware(
130
            app.wsgi_app, {"/static": os.path.join(os.path.dirname(__file__), "static")}
131
        )
132
    return app
133

134

135
if __name__ == "__main__":
136
    from werkzeug.serving import run_simple
137

138
    app = create_app()
139
    run_simple("127.0.0.1", 5000, app, use_debugger=True, use_reloader=True)
140

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.