1
0
mirror of https://github.com/aquatix/digimarks.git synced 2025-12-07 10:45:11 +01:00

Refactoring to FastAPI and SQLAlchemy

This commit is contained in:
2023-07-30 21:19:51 +02:00
parent 96e7ef16d4
commit 7e397f9d2b
6 changed files with 407 additions and 157 deletions

View File

@@ -1,28 +1,25 @@
from __future__ import print_function
import binascii
import datetime
import gzip
import hashlib
import logging
import os
import shutil
import sys
from urllib.parse import urljoin, urlparse, urlunparse
import bs4
import requests
from dateutil import tz
from feedgen.feed import FeedGenerator
#from flask import (Flask, abort, jsonify, make_response, redirect,
# render_template, request, url_for)
from typing import List
import databases
import sqlalchemy
from fastapi import FastAPI
from pydantic import BaseModel
from urlparse import urljoin, urlparse, urlunparse
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from feedgen.feed import FeedGenerator
from pydantic import BaseModel, DirectoryPath, FilePath, validator
from pydantic_settings import BaseSettings
from sqlalchemy import VARCHAR, Boolean, Column, DateTime, ForeignKey, Integer, String, Text, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DIGIMARKS_USER_AGENT = 'digimarks/2.0.0-dev'
@@ -150,44 +147,44 @@ themes = {
}
}
try:
import settings
except ImportError:
print('Copy settings_example.py to settings.py and set the configuration to your own preferences')
sys.exit(1)
# app configuration
APP_ROOT = os.path.dirname(os.path.realpath(__file__))
MEDIA_ROOT = os.path.join(APP_ROOT, 'static')
MEDIA_URL = '/static/'
DATABASE = {
'name': os.path.join(APP_ROOT, 'bookmarks.db'),
'engine': 'peewee.SqliteDatabase',
}
DATABASE_URL = os.path.join(APP_ROOT, 'bookmarks.db')
#PHANTOM = '/usr/local/bin/phantomjs'
#SCRIPT = os.path.join(APP_ROOT, 'screenshot.js')
class Settings(BaseSettings):
"""Configuration needed for digimarks to find its database, favicons, API integrations"""
# create our flask app and a database wrapper
#app = Flask(__name__)
#app.config.from_object(__name__)
#database = SqliteDatabase(os.path.join(APP_ROOT, 'bookmarks.db'))
database_file: FilePath = './bookmarks.db'
media_dir: DirectoryPath
media_url: str = '/static/'
database = databases.Database(DATABASE_URL)
mashape_api_key: str
metadata = sqlalchemy.MetaData()
debug: bool = False
# Strip unnecessary whitespace due to jinja2 codeblocks
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
# set custom url for the app, for example '/bookmarks'
try:
app.config['APPLICATION_ROOT'] = settings.APPLICATION_ROOT
except AttributeError:
pass
settings = Settings()
engine = create_engine(
f'sqlite:///{settings.database_file}', connect_args={'check_same_thread': False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
app = FastAPI()
logger = logging.getLogger('digimarks')
if settings.debug:
logger.setLevel(logging.DEBUG)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow requests from everywhere
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Cache the tags
all_tags = {}
usersettings = {}
@@ -202,9 +199,11 @@ def ifilterfalse(predicate, iterable):
def unique_everseen(iterable, key=None):
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
"""List unique elements, preserving order. Remember all elements ever seen.
unique_everseen('AAAABBBCCDAABBB') --> A B C D
unique_everseen('ABBCcAD', str.lower) --> A B C D
"""
seen = set()
seen_add = seen.add
if key is None:
@@ -218,6 +217,7 @@ def unique_everseen(iterable, key=None):
seen_add(k)
yield element
def clean_tags(tags_list):
tags_res = [x.strip() for x in tags_list]
tags_res = list(unique_everseen(tags_res))
@@ -244,17 +244,13 @@ def file_type(filename):
return "no match"
class BaseModel(Model):
class Meta:
database = database
class User(BaseModel):
class User(Base):
""" User account """
username = CharField()
key = CharField()
theme = CharField(default=DEFAULT_THEME)
created_date = DateTimeField(default=datetime.datetime.now)
username = Column(VARCHAR(255))
key = Column(VARCHAR(255))
# theme = CharField(default=DEFAULT_THEME)
theme = Column(VARCHAR(20), default=DEFAULT_THEME)
created_date = Column(DateTime, default=datetime.datetime.now)
def generate_key(self):
""" Generate userkey """
@@ -262,21 +258,21 @@ class User(BaseModel):
return self.key
class Bookmark(BaseModel):
class Bookmark(Base):
""" Bookmark instance, connected to User """
# Foreign key to User
userkey = CharField()
userkey = Column(VARCHAR(255))
title = CharField(default='')
url = CharField()
note = TextField(default='')
title = Column(VARCHAR(255), default='')
url = Column(VARCHAR(255))
note = Column(Text, default='')
#image = CharField(default='')
url_hash = CharField(default='')
tags = CharField(default='')
starred = BooleanField(default=False)
url_hash = Column(VARCHAR(255) , default='')
tags = Column(VARCHAR(255), default='')
starred = Column(Boolean, default=False)
# Website (domain) favicon
favicon = CharField(null=True)
favicon = Column(VARCHAR(255), null=True)
# Status code: 200 is OK, 404 is not found, for example (showing an error)
HTTP_CONNECTIONERROR = 0
@@ -285,17 +281,17 @@ class Bookmark(BaseModel):
HTTP_MOVEDTEMPORARILY = 304
HTTP_NOTFOUND = 404
http_status = IntegerField(default=200)
http_status = Column(Integer, default=200)
redirect_uri = None
created_date = DateTimeField(default=datetime.datetime.now)
modified_date = DateTimeField(null=True)
deleted_date = DateTimeField(null=True)
created_date = Column(DateTime, default=datetime.datetime.now)
modified_date = Column(DateTime, null=True)
deleted_date = Column(DateTime, null=True)
# Bookmark status; deleting doesn't remove from DB
VISIBLE = 0
DELETED = 1
status = IntegerField(default=VISIBLE)
status = Column(Integer, default=VISIBLE)
class Meta:
@@ -345,7 +341,7 @@ class Bookmark(BaseModel):
stream=True,
headers={'User-Agent': DIGIMARKS_USER_AGENT}
)
filename = os.path.join(MEDIA_ROOT, 'favicons/' + domain + fileextension)
filename = os.path.join(settings.media_dir, 'favicons/', domain + fileextension)
with open(filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
@@ -387,7 +383,7 @@ class Bookmark(BaseModel):
fileextension = '.jpg'
if response.headers['content-type'] == 'image/x-icon':
fileextension = '.ico'
filename = os.path.join(MEDIA_ROOT, 'favicons/' + domain + fileextension)
filename = os.path.join(settings.media_dir, 'favicons/', domain + fileextension)
with open(filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
@@ -406,11 +402,11 @@ class Bookmark(BaseModel):
""" Fetch favicon for the domain """
u = urlparse(self.url)
domain = u.netloc
if os.path.isfile(os.path.join(MEDIA_ROOT, 'favicons/' + domain + '.png')):
if os.path.isfile(os.path.join(settings.media_dir, 'favicons/', domain + '.png')):
# If file exists, don't re-download it
self.favicon = domain + '.png'
return
if os.path.isfile(os.path.join(MEDIA_ROOT, 'favicons/' + domain + '.ico')):
if os.path.isfile(os.path.join(settings.media_dir, 'favicons/', domain + '.ico')):
# If file exists, don't re-download it
self.favicon = domain + '.ico'
return
@@ -466,10 +462,10 @@ class Bookmark(BaseModel):
class PublicTag(BaseModel):
""" Publicly shared tag """
tagkey = CharField()
userkey = CharField()
tag = CharField()
created_date = DateTimeField(default=datetime.datetime.now)
tagkey = Column(VARCHAR(255))
userkey = Column(VARCHAR(255))
tag = Column(VARCHAR(255))
created_date = Column(DateTime, default=datetime.datetime.now)
def generate_key(self):
""" Generate hash-based key for publicly shared tag """
@@ -958,8 +954,8 @@ def publictag_json(tagkey):
abort(404)
@app.route('/pub/<tagkey>/feed')
def publictag_feed(tagkey):
@app.get('/pub/<tagkey>/feed')
async def publictag_feed(request: Request, tagkey: str):
""" rss/atom representation of the Read-only overview of the bookmarks in the userkey/tag of this PublicTag """
try:
this_tag = PublicTag.get(PublicTag.tagkey == tagkey)
@@ -973,7 +969,7 @@ def publictag_feed(tagkey):
feed.title(this_tag.tag)
feed.id(request.url)
feed.link(href=request.url, rel='self')
feed.link(href=make_external(url_for('publictag_page', tagkey=tagkey)))
feed.link(href=make_external(app.url_path_for('publictag_page', tagkey=tagkey)))
for bookmark in bookmarks:
entry = feed.add_entry()
@@ -993,20 +989,21 @@ def publictag_feed(tagkey):
entry.published(bookmark.created_date.replace(tzinfo=tz.tzlocal()))
entry.updated(updated_date.replace(tzinfo=tz.tzlocal()))
response = make_response(feed.atom_str(pretty=True))
response = Response(data=feed.atom_str(pretty=True), media_type='application/xml')
response.headers.set('Content-Type', 'application/atom+xml')
return response
except PublicTag.DoesNotExist:
abort(404)
raise HTTPException(status_code=404, detail='Tag not found')
@app.route('/<userkey>/<tag>/makepublic', methods=['GET', 'POST'])
def addpublictag(userkey, tag):
#user = get_object_or_404(User.get(User.key == userkey))
@app.get('/<userkey>/<tag>/makepublic')
@app.post('/<userkey>/<tag>/makepublic')
async def addpublictag(userkey: str, tag: str):
try:
User.get(User.key == userkey)
except User.DoesNotExist:
abort(404)
raise HTTPException(status_code=404, detail='User not found')
try:
publictag = PublicTag.get(PublicTag.userkey == userkey, PublicTag.tag == tag)
except PublicTag.DoesNotExist:
@@ -1019,10 +1016,14 @@ def addpublictag(userkey, tag):
newpublictag.save()
message = 'Public link to this tag created'
return redirect(url_for('tag_page', userkey=userkey, tag=tag, message=message))
message = 'Public link already existed'
return redirect(url_for('tag_page', userkey=userkey, tag=tag, message=message))
success = True
# return RedirectResponse(url=url_path_for('tag_page', userkey=userkey, tag=tag, message=message))
else:
message = 'Public link already existed'
success = False
# return redirect(url_for('tag_page', userkey=userkey, tag=tag, message=message))
url = app.url_path_for('tag_page', userkey=userkey, tag=tag, message=message)
return {'success': success, 'message': message, 'url': url}
@app.route('/<userkey>/<tag>/removepublic/<tagkey>', methods=['GET', 'POST'])