Comenzando a utilizar SQLAlchemy

Tuve que reescribir bastante para lograrlo, pero ya funciona :3
This commit is contained in:
Daniel Cortes
2019-03-11 20:59:55 -03:00
parent db7e0a4901
commit cb89bf932d
15 changed files with 311 additions and 320 deletions

View File

@@ -1,155 +1,74 @@
import os
import random
from flask import Flask, Blueprint, flash, request, redirect, url_for, current_app, render_template, send_from_directory, g
from flask import Flask, Blueprint, flash, request, redirect, url_for, current_app, render_template, \
send_from_directory, g
from werkzeug.utils import secure_filename
from werkzeug.exceptions import abort
from files.models import db, File, Category, User
from files.auth import admin_required
from files.db import get_db
bp = Blueprint('files', __name__)
bp.add_url_rule('/uploads/<path:filename>', 'uploaded_file', build_only=True)
def get_extension(filename):
return filename.rsplit('.', 1)[1].lower()
def get_path_in_upload(filename):
def _get_extension(filename):
return filename.rsplit('.', 1)[1].lower()
def _get_path_in_upload(filename):
return os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
def save_file(file, private, category):
def _save_file(file, private, category):
if private is None:
is_private = 0
else:
is_private = 1
db = get_db()
filename = secure_filename(file.filename)
db.execute(
'INSERT INTO files (filename, private, category)'
' VALUES (?,?,?)',
(filename, is_private, category['id'],)
)
file.save(get_path_in_upload(filename))
file.save(_get_path_in_upload(filename))
f = File(filename, is_private, category.id)
db.commit()
db.session.add(f)
db.session.commit()
return filename
def _rename_file(file, new_name):
db = get_db()
new_name = secure_filename(new_name)
db.execute(
'UPDATE files'
' SET filename = ?'
' WHERE id = ?',
(new_name, file['id'],)
)
os.rename(get_path_in_upload(file['filename']), get_path_in_upload(new_name))
os.rename(_get_path_in_upload(file.filename), _get_path_in_upload(new_name))
file.filename = new_name
db.commit()
return new_name
db.session.commit()
def _delete_file(file):
db = get_db()
db.execute(
'DELETE from files'
' WHERE id = ?',
(file['id'],)
)
os.remove(get_path_in_upload(file['filename']))
db.commit()
os.remove(_get_path_in_upload(file.filename))
db.session.delete(file)
db.session.commit()
def get_files():
db = get_db()
files = db.execute(
'SELECT id, filename, private, category'
' FROM files'
' ORDER BY filename'
).fetchall()
return files
def get_public_files():
db = get_db()
files = db.execute(
'SELECT id, filename, private, category'
' FROM files'
' WHERE private = false'
' ORDER BY filename'
).fetchall()
return files
def get_files_by_category(id):
db = get_db()
files = db.execute(
'SELECT id, filename, private'
' FROM files'
' WHERE category = ?'
' ORDER BY filename',
(id,)
).fetchall()
def get_file(id):
db = get_db()
file = db.execute(
'SELECT id, filename, private, category'
' FROM files'
' WHERE id = ?'
' LIMIT 1',
(id,)
).fetchone()
return file
def get_categories():
db = get_db()
categories = db.execute(
'SELECT id, name'
' FROM categories'
' ORDER BY name'
).fetchall()
return categories
def get_category(id):
db = get_db()
category = db.execute(
'SELECT id, name'
' FROM categories'
' WHERE id = ?'
' LIMIT 1',
(id,)
).fetchone()
return category
@bp.route('/', methods=['GET', 'POST'])
@bp.route('/', methods=['GET', 'POST'])
def index():
if g.user is None:
files = get_public_files()
else:
files = get_files()
files = File.query.filter_by(private=0).all()
else:
files = File.query.all()
return render_template('files/index.html', files=files)
@bp.route('/upload', methods=['GET', 'POST'])
@admin_required
def upload_file():
db = get_db()
if request.method == 'POST':
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
else:
else:
file = request.files['file']
if 'private' not in request.form:
@@ -160,23 +79,27 @@ def upload_file():
if 'category' not in request.form:
flash('No category selected')
return redirect(request.url)
else:
category = get_category(request.form['category'])
category = Category.query.get(request.form['category'])
if category is None:
flash('The category selected won\'t exists')
return redirect(request.url)
if file.filename == '':
flash('No seleted file')
return redirect(request.url)
if file:
filename = save_file(file, private, category)
_save_file(file, private, category)
return redirect(url_for('files.index'))
return render_template('files/upload.html', categories=get_categories())
return render_template('files/upload.html', categories=Category.query.all())
@bp.route('/preview/<int:id>')
def preview_file(id):
file = get_file(id)
if (file['private'] == 1 and g.user is not None) or (file['private'] == 0):
file = File.query.get(id)
if (file.private == 1 and g.user is not None) or (file.private == 0):
return render_template('files/preview.html', file=file)
else:
return abort(404)
@@ -186,28 +109,27 @@ def preview_file(id):
@admin_required
def rename_file(id):
if request.method == 'POST':
file = get_file(id)
file = File.query.get(id)
new_name = request.form['new_name'].lower()
extension = file['filename'].rsplit('.', 1)[1].lower()
extension = file.filename.rsplit('.', 1)[1].lower()
if "." in new_name and get_extension(new_name):
new_name = new_name.rsplit('.',1)[0] + '.' + extension
if "." in new_name and _get_extension(new_name):
new_name = new_name.rsplit('.', 1)[0] + '.' + extension
else:
new_name = new_name + '.' + extension
_rename_file(file, new_name)
return redirect(url_for('files.preview_file', id=file['id']))
return redirect(url_for('files.preview_file', id=file.id))
@bp.route('/delete/<int:id>', methods=['POST'])
@admin_required
def delete_file(id):
if request.method == 'POST':
file = get_file(id)
print(file)
file = File.query.get(id)
_delete_file(file)
return redirect(url_for('index'))
else:
abort(404)