Files
TimeTrack/routes/users.py

531 lines
20 KiB
Python

"""
User management routes
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash, g, session, abort
from models import db, User, Role, Team, TimeEntry, WorkConfig, UserPreferences, Project, Task, SubTask, ProjectCategory, UserDashboard, Comment, Company
from routes.auth import admin_required, company_required, login_required, system_admin_required
from flask_mail import Message
from flask import current_app
from utils.validation import FormValidator
from utils.repository import UserRepository
import logging
logger = logging.getLogger(__name__)
users_bp = Blueprint('users', __name__, url_prefix='/admin/users')
def get_available_roles():
"""Get roles available for user creation/editing based on current user's role"""
current_user_role = g.user.role
if current_user_role == Role.SYSTEM_ADMIN:
# System admin can assign any role
return [Role.TEAM_MEMBER, Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN, Role.SYSTEM_ADMIN]
elif current_user_role == Role.ADMIN:
# Admin can assign any role except system admin
return [Role.TEAM_MEMBER, Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]
elif current_user_role == Role.SUPERVISOR:
# Supervisor can only assign team member and team leader roles
return [Role.TEAM_MEMBER, Role.TEAM_LEADER]
else:
# Others cannot assign roles
return []
@users_bp.route('')
@admin_required
@company_required
def admin_users():
# Redirect to the new unified organization management page
return redirect(url_for('organization.admin_organization'))
@users_bp.route('/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_user():
if request.method == 'POST':
validator = FormValidator()
user_repo = UserRepository()
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
auto_verify = 'auto_verify' in request.form
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate required fields
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
validator.validate_required(password, 'Password')
# Validate uniqueness
if validator.is_valid():
validator.validate_unique(User, 'username', username, company_id=g.user.company_id)
validator.validate_unique(User, 'email', email, company_id=g.user.company_id)
if validator.is_valid():
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
# Create new user with role and team
new_user = user_repo.create(
username=username,
email=email,
company_id=g.user.company_id,
is_verified=auto_verify,
role=role,
team_id=team_id if team_id else None
)
new_user.set_password(password)
if not auto_verify:
# Generate verification token and send email
token = new_user.generate_verification_token()
verification_url = url_for('verify_email', token=token, _external=True)
try:
from flask_mail import Mail
mail = Mail(current_app)
# Get branding for email
from models import BrandingSettings
branding = BrandingSettings.get_settings()
msg = Message(
f'Welcome to {branding.app_name} - Verify Your Email',
sender=(branding.app_name, current_app.config['MAIL_USERNAME']),
recipients=[email]
)
msg.body = f'''Welcome to {branding.app_name}!
Your administrator has created an account for you. Please verify your email address to activate your account.
Username: {username}
Company: {g.company.name}
Click the link below to verify your email:
{verification_url}
This link will expire in 24 hours.
If you did not expect this email, please ignore it.
Best regards,
The {branding.app_name} Team
'''
mail.send(msg)
logger.info(f"Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send verification email: {str(e)}")
flash('User created but verification email could not be sent. Please contact the user directly.', 'warning')
user_repo.save()
flash(f'User {username} created successfully!', 'success')
return redirect(url_for('users.admin_users'))
validator.flash_errors()
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('create_user.html', title='Create User', teams=teams, roles=roles)
@users_bp.route('/edit/<int:user_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_user(user_id):
user_repo = UserRepository()
user = user_repo.get_by_id_and_company(user_id, g.user.company_id)
if not user:
abort(404)
if request.method == 'POST':
validator = FormValidator()
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate required fields
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
# Validate uniqueness (exclude current user)
if validator.is_valid():
if username != user.username:
validator.validate_unique(User, 'username', username, company_id=g.user.company_id)
if email != user.email:
validator.validate_unique(User, 'email', email, company_id=g.user.company_id)
if validator.is_valid():
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
user_repo.update(user,
username=username,
email=email,
role=role,
team_id=team_id if team_id else None
)
if password:
user.set_password(password)
user_repo.save()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('users.admin_users'))
validator.flash_errors()
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles)
@users_bp.route('/delete/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def delete_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent deleting yourself
if user.id == session.get('user_id'):
flash('You cannot delete your own account', 'error')
return redirect(url_for('users.admin_users'))
username = user.username
try:
# Check if user owns any critical resources
owns_projects = Project.query.filter_by(created_by_id=user_id).count() > 0
owns_tasks = Task.query.filter_by(created_by_id=user_id).count() > 0
owns_subtasks = SubTask.query.filter_by(created_by_id=user_id).count() > 0
needs_ownership_transfer = owns_projects or owns_tasks or owns_subtasks
if needs_ownership_transfer:
# Find an alternative admin/supervisor to transfer ownership to
alternative_admin = User.query.filter(
User.company_id == g.user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).first()
if alternative_admin:
# Transfer ownership of projects to alternative admin
if owns_projects:
Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Transfer ownership of tasks to alternative admin
if owns_tasks:
Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Transfer ownership of subtasks to alternative admin
if owns_subtasks:
SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
else:
# No alternative admin found but user owns resources
flash('Cannot delete this user. They own resources but no other administrator or supervisor found to transfer ownership to.', 'error')
return redirect(url_for('users.admin_users'))
# Delete user-specific records that can be safely removed
TimeEntry.query.filter_by(user_id=user_id).delete()
WorkConfig.query.filter_by(user_id=user_id).delete()
UserPreferences.query.filter_by(user_id=user_id).delete()
# Delete user dashboards (cascades to widgets)
UserDashboard.query.filter_by(user_id=user_id).delete()
# Clear task and subtask assignments
Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
# Now safe to delete the user
db.session.delete(user)
db.session.commit()
if needs_ownership_transfer and alternative_admin:
flash(f'User {username} deleted successfully. Projects and tasks transferred to {alternative_admin.username}', 'success')
else:
flash(f'User {username} deleted successfully.', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting user {user_id}: {str(e)}")
flash(f'Error deleting user: {str(e)}', 'error')
return redirect(url_for('users.admin_users'))
@users_bp.route('/toggle-status/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def toggle_user_status(user_id):
"""Toggle user active/blocked status"""
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent blocking yourself
if user.id == g.user.id:
flash('You cannot block your own account', 'error')
return redirect(url_for('users.admin_users'))
# Toggle the blocked status
user.is_blocked = not user.is_blocked
db.session.commit()
status = 'blocked' if user.is_blocked else 'unblocked'
flash(f'User {user.username} has been {status}', 'success')
return redirect(url_for('users.admin_users'))
# System Admin User Routes
@users_bp.route('/system-admin')
@system_admin_required
def system_admin_users():
"""System admin view of all users across all companies"""
# Get filter parameters
company_id = request.args.get('company_id', type=int)
search_query = request.args.get('search', '')
filter_type = request.args.get('filter', '')
page = request.args.get('page', 1, type=int)
per_page = 50
# Build query that returns tuples of (User, company_name)
query = db.session.query(User, Company.name).join(Company)
# Apply company filter
if company_id:
query = query.filter(User.company_id == company_id)
# Apply search filter
if search_query:
query = query.filter(
db.or_(
User.username.ilike(f'%{search_query}%'),
User.email.ilike(f'%{search_query}%')
)
)
# Apply type filter
if filter_type == 'system_admins':
query = query.filter(User.role == Role.SYSTEM_ADMIN)
elif filter_type == 'admins':
query = query.filter(User.role == Role.ADMIN)
elif filter_type == 'blocked':
query = query.filter(User.is_blocked == True)
elif filter_type == 'unverified':
query = query.filter(User.is_verified == False)
elif filter_type == 'freelancers':
query = query.filter(Company.is_personal == True)
# Order by company name and username
query = query.order_by(Company.name, User.username)
# Paginate the results
try:
users = query.paginate(page=page, per_page=per_page, error_out=False)
# Debug log
if users.items:
logger.info(f"First item type: {type(users.items[0])}")
logger.info(f"First item: {users.items[0]}")
except Exception as e:
logger.error(f"Error paginating users: {str(e)}")
# Fallback to empty pagination
from flask_sqlalchemy import Pagination
users = Pagination(query=None, page=1, per_page=per_page, total=0, items=[])
# Get all companies for filter dropdown
companies = Company.query.order_by(Company.name).all()
# Calculate statistics
all_users = User.query.all()
stats = {
'total_users': len(all_users),
'verified_users': len([u for u in all_users if u.is_verified]),
'blocked_users': len([u for u in all_users if u.is_blocked]),
'system_admins': len([u for u in all_users if u.role == Role.SYSTEM_ADMIN]),
'company_admins': len([u for u in all_users if u.role == Role.ADMIN]),
}
return render_template('system_admin_users.html',
title='System User Management',
users=users,
companies=companies,
stats=stats,
selected_company_id=company_id,
search_query=search_query,
current_filter=filter_type)
@users_bp.route('/system-admin/<int:user_id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_edit_user(user_id):
"""System admin edit any user"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
role_name = request.form.get('role')
company_id = request.form.get('company_id', type=int)
team_id = request.form.get('team_id', type=int)
is_verified = 'is_verified' in request.form
is_blocked = 'is_blocked' in request.form
# Validate input
validator = FormValidator()
validator.validate_required(username, 'Username')
validator.validate_required(email, 'Email')
# Validate uniqueness (exclude current user)
if validator.is_valid():
if username != user.username:
validator.validate_unique(User, 'username', username, company_id=user.company_id)
if email != user.email:
validator.validate_unique(User, 'email', email, company_id=user.company_id)
# Prevent removing the last system admin
if validator.is_valid() and user.role == Role.SYSTEM_ADMIN and role_name != 'SYSTEM_ADMIN':
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count()
if system_admin_count <= 1:
validator.errors.add('Cannot remove the last system administrator')
if validator.is_valid():
user.username = username
user.email = email
user.is_verified = is_verified
user.is_blocked = is_blocked
# Update company and team
if company_id:
user.company_id = company_id
if team_id:
user.team_id = team_id
else:
user.team_id = None # Clear team if not selected
# Update role
try:
user.role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
user.role = Role.TEAM_MEMBER
if password:
user.set_password(password)
db.session.commit()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('users.system_admin_users'))
validator.flash_errors()
# Get all companies and teams for the form
companies = Company.query.order_by(Company.name).all()
teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all()
return render_template('system_admin_edit_user.html',
title='Edit User (System Admin)',
user=user,
companies=companies,
teams=teams,
roles=list(Role),
Role=Role)
@users_bp.route('/system-admin/<int:user_id>/delete', methods=['POST'])
@system_admin_required
def system_admin_delete_user(user_id):
"""System admin delete any user"""
user = User.query.get_or_404(user_id)
# Prevent deleting yourself
if user.id == g.user.id:
flash('You cannot delete your own account', 'error')
return redirect(url_for('users.system_admin_users'))
# Prevent deleting the last system admin
if user.role == Role.SYSTEM_ADMIN:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
if system_admin_count <= 1:
flash('Cannot delete the last system administrator', 'error')
return redirect(url_for('users.system_admin_users'))
username = user.username
company_name = user.company.name
try:
# Check if this is the last admin/supervisor in the company
admin_count = User.query.filter(
User.company_id == user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).count()
if admin_count == 0:
# This is the last admin - need to handle company data
flash(f'User {username} is the last administrator in {company_name}. Company data will need to be handled.', 'warning')
# For now, just prevent deletion
return redirect(url_for('users.system_admin_users'))
# Otherwise proceed with normal deletion
# Delete user-specific records
TimeEntry.query.filter_by(user_id=user_id).delete()
WorkConfig.query.filter_by(user_id=user_id).delete()
UserPreferences.query.filter_by(user_id=user_id).delete()
UserDashboard.query.filter_by(user_id=user_id).delete()
# Clear assignments
Task.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
SubTask.query.filter_by(assigned_to_id=user_id).update({'assigned_to_id': None})
# Transfer ownership of created items
alternative_admin = User.query.filter(
User.company_id == user.company_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR]),
User.id != user_id
).first()
if alternative_admin:
Project.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
Task.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
SubTask.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
ProjectCategory.query.filter_by(created_by_id=user_id).update({'created_by_id': alternative_admin.id})
# Delete the user
db.session.delete(user)
db.session.commit()
flash(f'User {username} from {company_name} deleted successfully!', 'success')
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting user {user_id}: {str(e)}")
flash(f'Error deleting user: {str(e)}', 'error')
return redirect(url_for('users.system_admin_users'))