Files
staticshield/staticshield.py

138 lines
5.9 KiB
Python

import json
import os
import urllib.request
from logging.config import dictConfig
from flask import Flask, redirect, request, send_from_directory, session
from flask_session import Session
dictConfig({
'version': 1,
'formatters': {'default': {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
}},
'handlers': {'wsgi': {
'class': 'logging.StreamHandler',
'stream': 'ext://flask.logging.wsgi_errors_stream',
'formatter': 'default'
}},
'root': {
#'level': 'INFO',
'level': 'DEBUG',
'handlers': ['wsgi']
}
})
app = Flask(__name__)
app.config.from_prefixed_env()
app.config['SESSION_PERMANENT'] = True
app.config['SESSION_TYPE'] = 'filesystem'
Session(app)
# Verify the required configuration
# SERVE_DIR: Base dir of the files we want to serve; Flask will take care not to escape this dir
# MOTHERSHIP: Mothership server and login-url, which will redirect here with a sessionstart/SEKRIT
# ERROR_PAGES_DIR: (optional) Base dir of 403.html, 404.html to show as alternative to default short error message
config_vars = ['SERVE_DIR', 'MOTHERSHIP', 'ERROR_PAGES_DIR']
for config_var in config_vars:
if config_var not in app.config:
print(f'FLASK_{config_var} env var not set')
os.sys.exit(1)
else:
app.logger.info('Config env %s with value "%s"', config_var, app.config[config_var])
def handle_error_code(http_code):
"""Handle 403/404/whatever HTTP response; either with a simple default, or with custom file.
:param int http_code: HTTP status code to handle and return
"""
path = f'{http_code}.html'
if app.config['ERROR_PAGES_DIR']:
file_path = os.path.join(app.config['ERROR_PAGES_DIR'], path)
app.logger.debug(file_path)
if os.path.isfile(file_path):
app.logger.info('Serving file %s', str(file_path))
# This takes a base directory and a path, and ensures that the path is contained in the directory, which makes it safe to accept user-provided paths.
return send_from_directory(app.config['ERROR_PAGES_DIR'], path), http_code
return 'Unable to set up session', http_code
@app.route('/')
@app.route('/<path:path>', methods=['GET', 'POST'])
def all_routes(path = ''):
"""Intercept all routes and proxy it through a session.
Loosely based on https://stackoverflow.com/a/20648053
:param str path: path of file inside the base SERVE_DIR to be served
"""
app.logger.info('Requested %s', path)
if path.startswith('sessionstart/'):
# We got redirected back from the mothership, lets see if the secret we got is really known
# The path we should have gotten back is of the format:
# /sessionstart/SEKRIT_TOKEN/<the_url_to_redirect_on_here_afterwards>
# or, if the request is denied by the mothership:
# /sessionstart/denied
secret_and_redirect = path.split('sessionstart/')[1]
secret_redirect_split = secret_and_redirect.split('/')
secret = secret_redirect_split[0]
redirect_path = '/'
if len(secret_redirect_split) > 1:
redirect_path = '/'.join(secret_redirect_split[1:])
if secret == 'denied':
# Mother says no
return handle_error_code(403)
# Ask the mothership if the secret is known to them, to prevent someone from just making up a URL
# Mothership will invalidate this secret token upon handling this request to prevent replay
try:
app.logger.info('verifying token "%s"', secret)
with urllib.request.urlopen(f'{app.config["MOTHERSHIP"]}/verify/{secret}') as response:
challenge_response = response.read()
print(challenge_response)
try:
# Expects a JSON dict with {'correct': true/false}
challenge_response_dict = json.loads(challenge_response)
if challenge_response_dict.get('correct', False):
# Start session if challenge response was successful
session['id'] = secret
app.logger.info('starting new session with secret "%s", afterwards redirecting to %s', secret, redirect_path)
return redirect(redirect_path)
app.logger.warning('new session aborted, secret "%s" was incorrect, not redirecting to %s', secret, redirect_path)
except ValueError as e:
app.logger.error('Error while decoding challenge response: %s', str(e))
return handle_error_code(403)
except urllib.error.URLError as e:
app.logger.error('lolwtf, server not found: %s', str(e.reason))
return handle_error_code(403)
except urllib.error.HTTPError as e:
app.logger.error('lolnope, server says no: %s', str(e.reason))
return handle_error_code(403)
# Fallback to disallow
return handle_error_code(403)
# Check if the users exist or not
if not session.get('id'):
# Our current URL, to which mothership will redirect back including a sessionstart
original_url = f'{request.host_url}{path}'
callback_url = f'{request.host_url}sessionstart/'
# No session yet, redirect to mothership
app.logger.info('Redirecting to mothership with %s', original_url)
return redirect(f'{app.config["MOTHERSHIP"]}/login?redirect={original_url}&callback={callback_url}')
if path == '':
path = 'index.html'
file_path = os.path.join(app.config['SERVE_DIR'], path)
if os.path.isfile(file_path):
app.logger.info('Serving file %s', str(file_path))
# This takes a base directory and a path, and ensures that the path is contained in the directory, which makes it safe to accept user-provided paths.
return send_from_directory(app.config['SERVE_DIR'], path)
else:
app.logger.error('File not found: %s', str(file_path))
return handle_error_code(404)