1
"""A simple URL shortener using Werkzeug and redis."""
3
from urllib.parse import urlsplit
6
from jinja2 import Environment
7
from jinja2 import FileSystemLoader
8
from werkzeug.exceptions import HTTPException
9
from werkzeug.exceptions import NotFound
10
from werkzeug.middleware.shared_data import SharedDataMiddleware
11
from werkzeug.routing import Map
12
from werkzeug.routing import Rule
13
from werkzeug.utils import redirect
14
from werkzeug.wrappers import Request
15
from werkzeug.wrappers import Response
18
def base36_encode(number):
19
assert number >= 0, "positive integer required"
24
number, i = divmod(number, 36)
25
base36.append("0123456789abcdefghijklmnopqrstuvwxyz"[i])
26
return "".join(reversed(base36))
31
return parts.scheme in ("http", "https")
35
return urlsplit(url).netloc
39
def __init__(self, config):
40
self.redis = redis.Redis(
41
config["redis_host"], config["redis_port"], decode_responses=True
43
template_path = os.path.join(os.path.dirname(__file__), "templates")
44
self.jinja_env = Environment(
45
loader=FileSystemLoader(template_path), autoescape=True
47
self.jinja_env.filters["hostname"] = get_hostname
51
Rule("/", endpoint="new_url"),
52
Rule("/<short_id>", endpoint="follow_short_link"),
53
Rule("/<short_id>+", endpoint="short_link_details"),
57
def on_new_url(self, request):
60
if request.method == "POST":
61
url = request.form["url"]
62
if not is_valid_url(url):
63
error = "Please enter a valid URL"
65
short_id = self.insert_url(url)
66
return redirect(f"/{short_id}+")
67
return self.render_template("new_url.html", error=error, url=url)
69
def on_follow_short_link(self, request, short_id):
70
link_target = self.redis.get(f"url-target:{short_id}")
71
if link_target is None:
73
self.redis.incr(f"click-count:{short_id}")
74
return redirect(link_target)
76
def on_short_link_details(self, request, short_id):
77
link_target = self.redis.get(f"url-target:{short_id}")
78
if link_target is None:
80
click_count = int(self.redis.get(f"click-count:{short_id}") or 0)
81
return self.render_template(
82
"short_link_details.html",
83
link_target=link_target,
85
click_count=click_count,
89
response = self.render_template("404.html")
90
response.status_code = 404
93
def insert_url(self, url):
94
short_id = self.redis.get(f"reverse-url:{url}")
95
if short_id is not None:
97
url_num = self.redis.incr("last-url-id")
98
short_id = base36_encode(url_num)
99
self.redis.set(f"url-target:{short_id}", url)
100
self.redis.set(f"reverse-url:{url}", short_id)
103
def render_template(self, template_name, **context):
104
t = self.jinja_env.get_template(template_name)
105
return Response(t.render(context), mimetype="text/html")
107
def dispatch_request(self, request):
108
adapter = self.url_map.bind_to_environ(request.environ)
110
endpoint, values = adapter.match()
111
return getattr(self, f"on_{endpoint}")(request, **values)
113
return self.error_404()
114
except HTTPException as e:
117
def wsgi_app(self, environ, start_response):
118
request = Request(environ)
119
response = self.dispatch_request(request)
120
return response(environ, start_response)
122
def __call__(self, environ, start_response):
123
return self.wsgi_app(environ, start_response)
126
def create_app(redis_host="localhost", redis_port=6379, with_static=True):
127
app = Shortly({"redis_host": redis_host, "redis_port": redis_port})
129
app.wsgi_app = SharedDataMiddleware(
130
app.wsgi_app, {"/static": os.path.join(os.path.dirname(__file__), "static")}
135
if __name__ == "__main__":
136
from werkzeug.serving import run_simple
139
run_simple("127.0.0.1", 5000, app, use_debugger=True, use_reloader=True)