Files
TimeTrack/routes/users.py
Jens Luedicke 9a79778ad6 Squashed commit of the following:
commit 1eeea9f83ad9230a5c1f7a75662770eaab0df837
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 21:15:41 2025 +0200

    Disable resuming of old time entries.

commit 3e3ec2f01cb7943622b819a19179388078ae1315
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 20:59:19 2025 +0200

    Refactor db migrations.

commit 15a51a569da36c6b7c9e01ab17b6fdbdee6ad994
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 19:58:04 2025 +0200

    Apply new style for Time Tracking view.

commit 77e5278b303e060d2b03853b06277f8aa567ae68
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 18:06:04 2025 +0200

    Allow direct registrations as a Company.

commit 188a8772757cbef374243d3a5f29e4440ddecabe
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 18:04:45 2025 +0200

    Add email invitation feature.

commit d9ebaa02aa01b518960a20dccdd5a327d82f30c6
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 17:12:32 2025 +0200

    Apply common style for Company, User, Team management pages.

commit 81149caf4d8fc6317e2ab1b4f022b32fc5aa6d22
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 16:44:32 2025 +0200

    Move export functions to own module.

commit 1a26e19338e73f8849c671471dd15cc3c1b1fe82
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 15:51:15 2025 +0200

    Split up models.py.

commit 61f1ccd10f721b0ff4dc1eccf30c7a1ee13f204d
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 12:05:28 2025 +0200

    Move utility function into own modules.

commit 84b341ed35e2c5387819a8b9f9d41eca900ae79f
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 11:44:24 2025 +0200

    Refactor auth functions use.

commit 923e311e3da5b26d85845c2832b73b7b17c48adb
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 11:35:52 2025 +0200

    Refactor route nameing and fix bugs along the way.

commit f0a5c4419c340e62a2615c60b2a9de28204d2995
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 10:34:33 2025 +0200

    Fix URL endpoints in announcement template.

commit b74d74542a1c8dc350749e4788a9464d067a88b5
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 09:25:53 2025 +0200

    Move announcements to own module.

commit 9563a28021ac46c82c04fe4649b394dbf96f92c7
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 09:16:30 2025 +0200

    Combine Company view and edit templates.

commit 6687c373e681d54e4deab6b2582fed5cea9aadf6
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 08:17:42 2025 +0200

    Move Users, Company and System Administration to own modules.

commit 8b7894a2e3eb84bb059f546648b6b9536fea724e
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 07:40:57 2025 +0200

    Move Teams and Projects to own modules.

commit d11bf059d99839ecf1f5d7020b8c8c8a2454c00b
Author: Jens Luedicke <jens@luedicke.me>
Date:   Mon Jul 7 07:09:33 2025 +0200

    Move Tasks and Sprints to own modules.
2025-07-07 21:16:36 +02:00

532 lines
20 KiB
Python

"""
User management routes
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash, g, session, abort
from models import db, User, Role, Team, TimeEntry, WorkConfig, UserPreferences, Project, Task, SubTask, ProjectCategory, UserDashboard, Comment, Company
from routes.auth import admin_required, company_required, login_required, system_admin_required
from flask_mail import Message
from flask import current_app
from utils.validation import FormValidator
from utils.repository import UserRepository
import logging
logger = logging.getLogger(__name__)
users_bp = Blueprint('users', __name__, url_prefix='/admin/users')
def get_available_roles():
"""Get roles available for user creation/editing based on current user's role"""
current_user_role = g.user.role
if current_user_role == Role.SYSTEM_ADMIN:
# System admin can assign any role
return [Role.TEAM_MEMBER, Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN, Role.SYSTEM_ADMIN]
elif current_user_role == Role.ADMIN:
# Admin can assign any role except system admin
return [Role.TEAM_MEMBER, Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]
elif current_user_role == Role.SUPERVISOR:
# Supervisor can only assign team member and team leader roles
return [Role.TEAM_MEMBER, Role.TEAM_LEADER]
else:
# Others cannot assign roles
return []
@users_bp.route('')
@admin_required
@company_required
def admin_users():
user_repo = UserRepository()
users = user_repo.get_by_company(g.user.company_id)
return render_template('admin_users.html', title='User Management', users=users)
@users_bp.route('/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_user():
if request.method == 'POST':
validator = FormValidator()
user_repo = UserRepository()
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
auto_verify = 'auto_verify' in request.form
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate required fields
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
validator.validate_required(password, 'Password')
# Validate uniqueness
if validator.is_valid():
validator.validate_unique(User, 'username', username, company_id=g.user.company_id)
validator.validate_unique(User, 'email', email, company_id=g.user.company_id)
if validator.is_valid():
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
# Create new user with role and team
new_user = user_repo.create(
username=username,
email=email,
company_id=g.user.company_id,
is_verified=auto_verify,
role=role,
team_id=team_id if team_id else None
)
new_user.set_password(password)
if not auto_verify:
# Generate verification token and send email
token = new_user.generate_verification_token()
verification_url = url_for('verify_email', token=token, _external=True)
try:
from flask_mail import Mail
mail = Mail(current_app)
# Get branding for email
from models import BrandingSettings
branding = BrandingSettings.get_settings()
msg = Message(
f'Welcome to {branding.app_name} - Verify Your Email',
sender=(branding.app_name, current_app.config['MAIL_USERNAME']),
recipients=[email]
)
msg.body = f'''Welcome to {branding.app_name}!
Your administrator has created an account for you. Please verify your email address to activate your account.
Username: {username}
Company: {g.company.name}
Click the link below to verify your email:
{verification_url}
This link will expire in 24 hours.
If you did not expect this email, please ignore it.
Best regards,
The {branding.app_name} Team
'''
mail.send(msg)
logger.info(f"Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send verification email: {str(e)}")
flash('User created but verification email could not be sent. Please contact the user directly.', 'warning')
user_repo.save()
flash(f'User {username} created successfully!', 'success')
return redirect(url_for('users.admin_users'))
validator.flash_errors()
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('create_user.html', title='Create User', teams=teams, roles=roles)
@users_bp.route('/edit/<int:user_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_user(user_id):
user_repo = UserRepository()
user = user_repo.get_by_id_and_company(user_id, g.user.company_id)
if not user:
abort(404)
if request.method == 'POST':
validator = FormValidator()
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate required fields
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
# Validate uniqueness (exclude current user)
if validator.is_valid():
if username != user.username:
validator.validate_unique(User, 'username', username, company_id=g.user.company_id)
if email != user.email:
validator.validate_unique(User, 'email', email, company_id=g.user.company_id)
if validator.is_valid():
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
user_repo.update(user,
username=username,
email=email,
role=role,
team_id=team_id if team_id else None
)
if password:
user.set_password(password)
user_repo.save()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('users.admin_users'))
validator.flash_errors()
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles)
@users_bp.route('/delete/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def delete_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent deleting yourself
if user.id == session.get('user_id'):
flash('You cannot delete your own account', 'error')
return redirect(url_for('users.admin_users'))
username = user.username
try:
# Check if user owns any critical resources
owns_projects = Project.query.filter_by(created_by_id=user_id).count() > 0
owns_tasks = Task.query.filter_by(created_by_id=user_id).count() > 0
owns_subtasks = SubTask.query.filter_by(created_by_id=user_id).count() > 0
needs_ownership_transfer = owns_projects or owns_tasks or owns_subtasks
if needs_ownership_transfer:
# Find an alternative admin/supervisor to transfer ownership to
alternative_admin = User.query.filter(
User.company_id == g.user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).first()
if alternative_admin:
# Transfer ownership of projects to alternative admin
if owns_projects:
Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Transfer ownership of tasks to alternative admin
if owns_tasks:
Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Transfer ownership of subtasks to alternative admin
if owns_subtasks:
SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
else:
# No alternative admin found but user owns resources
flash('Cannot delete this user. They own resources but no other administrator or supervisor found to transfer ownership to.', 'error')
return redirect(url_for('users.admin_users'))
# Delete user-specific records that can be safely removed
TimeEntry.query.filter_by(user_id=user_id).delete()
WorkConfig.query.filter_by(user_id=user_id).delete()
UserPreferences.query.filter_by(user_id=user_id).delete()
# Delete user dashboards (cascades to widgets)
UserDashboard.query.filter_by(user_id=user_id).delete()
# Clear task and subtask assignments
Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
# Now safe to delete the user
db.session.delete(user)
db.session.commit()
if needs_ownership_transfer and alternative_admin:
flash(f'User {username} deleted successfully. Projects and tasks transferred to {alternative_admin.username}', 'success')
else:
flash(f'User {username} deleted successfully.', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting user {user_id}: {str(e)}")
flash(f'Error deleting user: {str(e)}', 'error')
return redirect(url_for('users.admin_users'))
@users_bp.route('/toggle-status/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def toggle_user_status(user_id):
"""Toggle user active/blocked status"""
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent blocking yourself
if user.id == g.user.id:
flash('You cannot block your own account', 'error')
return redirect(url_for('users.admin_users'))
# Toggle the blocked status
user.is_blocked = not user.is_blocked
db.session.commit()
status = 'blocked' if user.is_blocked else 'unblocked'
flash(f'User {user.username} has been {status}', 'success')
return redirect(url_for('users.admin_users'))
# System Admin User Routes
@users_bp.route('/system-admin')
@system_admin_required
def system_admin_users():
"""System admin view of all users across all companies"""
# Get filter parameters
company_id = request.args.get('company_id', type=int)
search_query = request.args.get('search', '')
filter_type = request.args.get('filter', '')
page = request.args.get('page', 1, type=int)
per_page = 50
# Build query that returns tuples of (User, company_name)
query = db.session.query(User, Company.name).join(Company)
# Apply company filter
if company_id:
query = query.filter(User.company_id == company_id)
# Apply search filter
if search_query:
query = query.filter(
db.or_(
User.username.ilike(f'%{search_query}%'),
User.email.ilike(f'%{search_query}%')
)
)
# Apply type filter
if filter_type == 'system_admins':
query = query.filter(User.role == Role.SYSTEM_ADMIN)
elif filter_type == 'admins':
query = query.filter(User.role == Role.ADMIN)
elif filter_type == 'blocked':
query = query.filter(User.is_blocked == True)
elif filter_type == 'unverified':
query = query.filter(User.is_verified == False)
elif filter_type == 'freelancers':
query = query.filter(Company.is_personal == True)
# Order by company name and username
query = query.order_by(Company.name, User.username)
# Paginate the results
try:
users = query.paginate(page=page, per_page=per_page, error_out=False)
# Debug log
if users.items:
logger.info(f"First item type: {type(users.items[0])}")
logger.info(f"First item: {users.items[0]}")
except Exception as e:
logger.error(f"Error paginating users: {str(e)}")
# Fallback to empty pagination
from flask_sqlalchemy import Pagination
users = Pagination(query=None, page=1, per_page=per_page, total=0, items=[])
# Get all companies for filter dropdown
companies = Company.query.order_by(Company.name).all()
# Calculate statistics
all_users = User.query.all()
stats = {
'total_users': len(all_users),
'verified_users': len([u for u in all_users if u.is_verified]),
'blocked_users': len([u for u in all_users if u.is_blocked]),
'system_admins': len([u for u in all_users if u.role == Role.SYSTEM_ADMIN]),
'company_admins': len([u for u in all_users if u.role == Role.ADMIN]),
}
return render_template('system_admin_users.html',
title='System User Management',
users=users,
companies=companies,
stats=stats,
selected_company_id=company_id,
search_query=search_query,
current_filter=filter_type)
@users_bp.route('/system-admin/<int:user_id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_edit_user(user_id):
"""System admin edit any user"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
role_name = request.form.get('role')
company_id = request.form.get('company_id', type=int)
team_id = request.form.get('team_id', type=int)
is_verified = 'is_verified' in request.form
is_blocked = 'is_blocked' in request.form
# Validate input
validator = FormValidator()
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
# Validate uniqueness (exclude current user)
if validator.is_valid():
if username != user.username:
validator.validate_unique(User, 'username', username, company_id=user.company_id)
if email != user.email:
validator.validate_unique(User, 'email', email, company_id=user.company_id)
# Prevent removing the last system admin
if validator.is_valid() and user.role == Role.SYSTEM_ADMIN and role_name != 'SYSTEM_ADMIN':
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count()
if system_admin_count <= 1:
validator.errors.add('Cannot remove the last system administrator')
if validator.is_valid():
user.username = username
user.email = email
user.is_verified = is_verified
user.is_blocked = is_blocked
# Update company and team
if company_id:
user.company_id = company_id
if team_id:
user.team_id = team_id
else:
user.team_id = None # Clear team if not selected
# Update role
try:
user.role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
user.role = Role.TEAM_MEMBER
if password:
user.set_password(password)
db.session.commit()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('users.system_admin_users'))
validator.flash_errors()
# Get all companies and teams for the form
companies = Company.query.order_by(Company.name).all()
teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all()
return render_template('system_admin_edit_user.html',
title='Edit User (System Admin)',
user=user,
companies=companies,
teams=teams,
roles=list(Role),
Role=Role)
@users_bp.route('/system-admin/<int:user_id>/delete', methods=['POST'])
@system_admin_required
def system_admin_delete_user(user_id):
"""System admin delete any user"""
user = User.query.get_or_404(user_id)
# Prevent deleting yourself
if user.id == g.user.id:
flash('You cannot delete your own account', 'error')
return redirect(url_for('users.system_admin_users'))
# Prevent deleting the last system admin
if user.role == Role.SYSTEM_ADMIN:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
if system_admin_count <= 1:
flash('Cannot delete the last system administrator', 'error')
return redirect(url_for('users.system_admin_users'))
username = user.username
company_name = user.company.name
try:
# Check if this is the last admin/supervisor in the company
admin_count = User.query.filter(
User.company_id == user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).count()
if admin_count == 0:
# This is the last admin - need to handle company data
flash(f'User {username} is the last administrator in {company_name}. Company data will need to be handled.', 'warning')
# For now, just prevent deletion
return redirect(url_for('users.system_admin_users'))
# Otherwise proceed with normal deletion
# Delete user-specific records
TimeEntry.query.filter_by(user_id=user_id).delete()
WorkConfig.query.filter_by(user_id=user_id).delete()
UserPreferences.query.filter_by(user_id=user_id).delete()
UserDashboard.query.filter_by(user_id=user_id).delete()
# Clear assignments
Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
# Transfer ownership of created items
alternative_admin = User.query.filter(
User.company_id == user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).first()
if alternative_admin:
Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
ProjectCategory.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Delete the user
db.session.delete(user)
db.session.commit()
flash(f'User {username} from {company_name} deleted successfully!', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting user {user_id}: {str(e)}")
flash(f'Error deleting user: {str(e)}', 'error')
return redirect(url_for('users.system_admin_users'))