Files
TimeTrack/app.py

4472 lines
167 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, Announcement, SystemEvent, KanbanBoard, KanbanColumn, KanbanCard
from data_formatting import (
format_duration, prepare_export_data, prepare_team_hours_export_data,
format_table_data, format_graph_data, format_team_data
)
from data_export import (
export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel,
export_analytics_csv, export_analytics_excel
)
from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings
import logging
from datetime import datetime, time, timedelta
import os
import csv
import io
import pandas as pd
from sqlalchemy import func
from functools import wraps
from flask_mail import Mail, Message
from dotenv import load_dotenv
from werkzeug.security import check_password_hash
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:////data/timetrack.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_timetrack')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Session lasts for 7 days
# Configure Flask-Mail
app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.example.com')
app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT') or 587)
app.config['MAIL_USE_TLS'] = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', 'your-email@example.com')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', 'your-password')
app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', 'TimeTrack <noreply@timetrack.com>')
# Log mail configuration (without password)
logger.info(f"Mail server: {app.config['MAIL_SERVER']}")
logger.info(f"Mail port: {app.config['MAIL_PORT']}")
logger.info(f"Mail use TLS: {app.config['MAIL_USE_TLS']}")
logger.info(f"Mail username: {app.config['MAIL_USERNAME']}")
logger.info(f"Mail default sender: {app.config['MAIL_DEFAULT_SENDER']}")
mail = Mail(app)
# Initialize the database with the app
db.init_app(app)
# Consolidated migration using migrate_db module
def run_migrations():
"""Run all database migrations using the consolidated migrate_db module."""
# Check if we're using PostgreSQL or SQLite
database_url = app.config['SQLALCHEMY_DATABASE_URI']
print(f"DEBUG: Database URL: {database_url}")
is_postgresql = 'postgresql://' in database_url or 'postgres://' in database_url
print(f"DEBUG: Is PostgreSQL: {is_postgresql}")
if is_postgresql:
print("Using PostgreSQL - skipping SQLite migrations, ensuring tables exist...")
with app.app_context():
db.create_all()
init_system_settings()
print("PostgreSQL setup completed successfully!")
else:
print("Using SQLite - running SQLite migrations...")
try:
from migrate_db import run_all_migrations
run_all_migrations()
print("SQLite database migrations completed successfully!")
except ImportError as e:
print(f"Error importing migrate_db: {e}")
print("Falling back to basic table creation...")
with app.app_context():
db.create_all()
init_system_settings()
except Exception as e:
print(f"Error during SQLite migration: {e}")
print("Falling back to basic table creation...")
with app.app_context():
db.create_all()
init_system_settings()
def migrate_to_company_model():
"""Migrate existing data to support company model (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_to_company_model, get_db_path
db_path = get_db_path()
migrate_to_company_model(db_path)
except ImportError:
print("migrate_db module not available - skipping company model migration")
except Exception as e:
print(f"Error during company migration: {e}")
raise
def init_system_settings():
"""Initialize system settings with default values if they don't exist"""
if not SystemSettings.query.filter_by(key='registration_enabled').first():
print("Adding registration_enabled system setting...")
reg_setting = SystemSettings(
key='registration_enabled',
value='true',
description='Controls whether new user registration is allowed'
)
db.session.add(reg_setting)
db.session.commit()
if not SystemSettings.query.filter_by(key='email_verification_required').first():
print("Adding email_verification_required system setting...")
email_setting = SystemSettings(
key='email_verification_required',
value='true',
description='Controls whether email verification is required for new user accounts'
)
db.session.add(email_setting)
db.session.commit()
def migrate_data():
"""Handle data migrations and setup (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_data
migrate_data()
except ImportError:
print("migrate_db module not available - skipping data migration")
except Exception as e:
print(f"Error during data migration: {e}")
raise
def migrate_work_config_data():
"""Migrate existing WorkConfig data to new architecture (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_work_config_data, get_db_path
db_path = get_db_path()
migrate_work_config_data(db_path)
except ImportError:
print("migrate_db module not available - skipping work config data migration")
except Exception as e:
print(f"Error during work config migration: {e}")
raise
def migrate_task_system():
"""Create tables for the task management system (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_task_system, get_db_path
db_path = get_db_path()
migrate_task_system(db_path)
except ImportError:
print("migrate_db module not available - skipping task system migration")
except Exception as e:
print(f"Error during task system migration: {e}")
raise
# Call this function during app initialization
@app.before_first_request
def initialize_app():
run_migrations() # This handles all migrations including work config data
# Add this after initializing the app but before defining routes
@app.context_processor
def inject_globals():
"""Make certain variables available to all templates."""
# Get active announcements for current user
active_announcements = []
if g.user:
active_announcements = Announcement.get_active_announcements_for_user(g.user)
# Get tracking script settings
tracking_script_enabled = False
tracking_script_code = ''
try:
tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first()
if tracking_enabled_setting:
tracking_script_enabled = tracking_enabled_setting.value == 'true'
tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first()
if tracking_code_setting:
tracking_script_code = tracking_code_setting.value
except Exception:
pass # In case database isn't available yet
return {
'Role': Role,
'AccountType': AccountType,
'current_year': datetime.now().year,
'active_announcements': active_announcements,
'tracking_script_enabled': tracking_script_enabled,
'tracking_script_code': tracking_script_code
}
# Template filters for date/time formatting
@app.template_filter('from_json')
def from_json_filter(json_str):
"""Parse JSON string to Python object."""
if not json_str:
return []
try:
import json
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return []
@app.template_filter('format_date')
def format_date_filter(dt):
"""Format date according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d') if dt else ''
from time_utils import format_date_by_preference, get_user_format_settings
date_format, _ = get_user_format_settings(g.user)
return format_date_by_preference(dt, date_format)
@app.template_filter('format_time')
def format_time_filter(dt):
"""Format time according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M:%S') if dt else ''
from time_utils import format_time_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_by_preference(dt, time_format_24h)
@app.template_filter('format_time_short')
def format_time_short_filter(dt):
"""Format time without seconds according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M') if dt else ''
from time_utils import format_time_short_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_short_by_preference(dt, time_format_24h)
@app.template_filter('format_datetime')
def format_datetime_filter(dt):
"""Format datetime according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ''
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return format_datetime_by_preference(dt, date_format, time_format_24h)
@app.template_filter('format_duration')
def format_duration_filter(duration_seconds):
"""Format duration in readable format."""
if duration_seconds is None:
return '00:00:00'
from time_utils import format_duration_readable
return format_duration_readable(duration_seconds)
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
# Admin-only decorator
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None or (g.user.role != Role.ADMIN and g.user.role != Role.SYSTEM_ADMIN):
flash('You need administrator privileges to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
# System Admin-only decorator
def system_admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None or g.user.role != Role.SYSTEM_ADMIN:
flash('You need system administrator privileges to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
def get_system_setting(key, default='false'):
"""Helper function to get system setting value"""
setting = SystemSettings.query.filter_by(key=key).first()
return setting.value if setting else default
def is_system_admin(user=None):
"""Helper function to check if user is system admin"""
if user is None:
user = g.user
return user and user.role == Role.SYSTEM_ADMIN
def can_access_system_settings(user=None):
"""Helper function to check if user can access system-wide settings"""
return is_system_admin(user)
def get_available_roles():
"""Get roles available for assignment, excluding SYSTEM_ADMIN unless one already exists"""
roles = list(Role)
# Only show SYSTEM_ADMIN role if at least one system admin already exists
# This prevents accidental creation of system admins
system_admin_exists = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() > 0
if not system_admin_exists:
roles = [role for role in roles if role != Role.SYSTEM_ADMIN]
return roles
# Add this decorator function after your existing decorators
def role_required(min_role):
"""
Decorator to restrict access based on user role.
min_role should be a Role enum value (e.g., Role.TEAM_LEADER)
"""
def role_decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
# Admin and System Admin always have access
if g.user.role == Role.ADMIN or g.user.role == Role.SYSTEM_ADMIN:
return f(*args, **kwargs)
# Check role hierarchy
role_hierarchy = {
Role.TEAM_MEMBER: 1,
Role.TEAM_LEADER: 2,
Role.SUPERVISOR: 3,
Role.ADMIN: 4,
Role.SYSTEM_ADMIN: 5
}
if role_hierarchy.get(g.user.role, 0) < role_hierarchy.get(min_role, 0):
flash('You do not have sufficient permissions to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
return role_decorator
def company_required(f):
"""
Decorator to ensure user has a valid company association and set company context.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
# System admins can access without company association
if g.user.role == Role.SYSTEM_ADMIN:
return f(*args, **kwargs)
if g.user.company_id is None:
flash('You must be associated with a company to access this page.', 'error')
return redirect(url_for('setup_company'))
# Set company context
g.company = Company.query.get(g.user.company_id)
if not g.company or not g.company.is_active:
flash('Your company is not active. Please contact support.', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.before_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
g.company = None
else:
g.user = User.query.get(user_id)
if g.user:
# Set company context
if g.user.company_id:
g.company = Company.query.get(g.user.company_id)
else:
g.company = None
# Check if user is verified
if not g.user.is_verified and request.endpoint not in ['verify_email', 'static', 'logout', 'setup_company']:
# Allow unverified users to access only verification and static resources
if request.endpoint not in ['login', 'register']:
flash('Please verify your email address before accessing this page.', 'warning')
session.clear()
return redirect(url_for('login'))
else:
g.company = None
@app.route('/')
def home():
if g.user:
# Get active entry (no departure time)
active_entry = TimeEntry.query.filter_by(
user_id=g.user.id,
departure_time=None
).first()
# Get today's completed entries for history
today = datetime.now().date()
history = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.departure_time.isnot(None),
TimeEntry.arrival_time >= datetime.combine(today, time.min),
TimeEntry.arrival_time <= datetime.combine(today, time.max)
).order_by(TimeEntry.arrival_time.desc()).all()
# Get available projects for this user (company-scoped)
available_projects = []
if g.user.company_id:
all_projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).all()
for project in all_projects:
if project.is_user_allowed(g.user):
available_projects.append(project)
return render_template('index.html', title='Home',
active_entry=active_entry,
history=history,
available_projects=available_projects)
else:
return render_template('about.html', title='Home')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user:
# Fix role if it's a string or None
if isinstance(user.role, str) or user.role is None:
# Map string role values to enum values
role_mapping = {
'Team Member': Role.TEAM_MEMBER,
'TEAM_MEMBER': Role.TEAM_MEMBER,
'Team Leader': Role.TEAM_LEADER,
'TEAM_LEADER': Role.TEAM_LEADER,
'Supervisor': Role.SUPERVISOR,
'SUPERVISOR': Role.SUPERVISOR,
'Administrator': Role.ADMIN,
'ADMIN': Role.ADMIN
}
if isinstance(user.role, str):
user.role = role_mapping.get(user.role, Role.TEAM_MEMBER)
else:
user.role = Role.ADMIN if user.role == Role.ADMIN else Role.TEAM_MEMBER
db.session.commit()
# Now proceed with password check
if user.check_password(password):
# Check if user is blocked
if user.is_blocked:
flash('Your account has been disabled. Please contact an administrator.', 'error')
return render_template('login.html')
# Check if 2FA is enabled
if user.two_factor_enabled:
# Store user ID for 2FA verification
session['2fa_user_id'] = user.id
return redirect(url_for('verify_2fa'))
else:
# Continue with normal login process
session['user_id'] = user.id
session['username'] = user.username
session['role'] = user.role.value
# Log successful login
SystemEvent.log_event(
'user_login',
f'User {user.username} logged in successfully',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Login successful!', 'success')
return redirect(url_for('home'))
# Log failed login attempt
SystemEvent.log_event(
'login_failed',
f'Failed login attempt for username: {username}',
'auth',
'warning',
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Invalid username or password', 'error')
return render_template('login.html', title='Login')
@app.route('/logout')
def logout():
# Log logout event before clearing session
if 'user_id' in session:
user = User.query.get(session['user_id'])
if user:
SystemEvent.log_event(
'user_logout',
f'User {user.username} logged out',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
session.clear()
flash('You have been logged out.', 'info')
return redirect(url_for('login'))
@app.route('/register', methods=['GET', 'POST'])
def register():
# Check if registration is enabled
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
if not registration_enabled:
flash('Registration is currently disabled by the administrator.', 'error')
return redirect(url_for('login'))
# Check if companies exist, if not redirect to company setup
if Company.query.count() == 0:
flash('No companies exist yet. Please set up your company first.', 'info')
return redirect(url_for('setup_company'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
company_code = request.form.get('company_code', '').strip()
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif password != confirm_password:
error = 'Passwords do not match'
elif not company_code:
error = 'Company code is required'
# Find company by code
company = None
if company_code:
company = Company.query.filter_by(slug=company_code.lower()).first()
if not company:
error = 'Invalid company code'
# Check for existing users within the company
if company and not error:
if User.query.filter_by(username=username, company_id=company.id).first():
error = 'Username already exists in this company'
elif User.query.filter_by(email=email, company_id=company.id).first():
error = 'Email already registered in this company'
if error is None and company:
try:
# Check if this is the first user account in this company
is_first_user_in_company = User.query.filter_by(company_id=company.id).count() == 0
# Check if email verification is required
email_verification_required = get_system_setting('email_verification_required', 'true') == 'true'
new_user = User(
username=username,
email=email,
company_id=company.id,
is_verified=False
)
new_user.set_password(password)
# Make first user in company an admin with full privileges
if is_first_user_in_company:
new_user.role = Role.ADMIN
new_user.is_verified = True # Auto-verify first user in company
elif not email_verification_required:
# If email verification is disabled, auto-verify new users
new_user.is_verified = True
# Generate verification token (even if not needed, for consistency)
token = new_user.generate_verification_token()
db.session.add(new_user)
db.session.commit()
if is_first_user_in_company:
# First user in company gets admin privileges and is auto-verified
logger.info(f"First user account created in company {company.name}: {username} with admin privileges")
flash(f'Welcome! You are the first user in {company.name} and have been granted administrator privileges. You can now log in.', 'success')
elif not email_verification_required:
# Email verification is disabled, user can log in immediately
logger.info(f"User account created with auto-verification in company {company.name}: {username}")
flash('Registration successful! You can now log in.', 'success')
else:
# Send verification email for regular users when verification is required
verification_url = url_for('verify_email', token=token, _external=True)
msg = Message('Verify your TimeTrack account', recipients=[email])
msg.body = f'''Hello {username},
Thank you for registering with TimeTrack. To complete your registration, please click on the link below:
{verification_url}
This link will expire in 24 hours.
If you did not register for TimeTrack, please ignore this email.
Best regards,
The TimeTrack Team
'''
mail.send(msg)
logger.info(f"Verification email sent to {email}")
flash('Registration initiated! Please check your email to verify your account.', 'success')
return redirect(url_for('login'))
except Exception as e:
db.session.rollback()
logger.error(f"Error during registration: {str(e)}")
error = f"An error occurred during registration: {str(e)}"
flash(error, 'error')
return render_template('register.html', title='Register')
@app.route('/register/freelancer', methods=['GET', 'POST'])
def register_freelancer():
"""Freelancer registration route - creates user without company token"""
# Check if registration is enabled
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
if not registration_enabled:
flash('Registration is currently disabled by the administrator.', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
business_name = request.form.get('business_name', '').strip()
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif password != confirm_password:
error = 'Passwords do not match'
# Check for existing users globally (freelancers get unique usernames/emails)
if not error:
if User.query.filter_by(username=username).first():
error = 'Username already exists'
elif User.query.filter_by(email=email).first():
error = 'Email already registered'
if error is None:
try:
# Create personal company for freelancer
company_name = business_name if business_name else f"{username}'s Workspace"
# Generate unique company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# Ensure slug uniqueness
base_slug = slug
counter = 1
while Company.query.filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
# Create personal company
personal_company = Company(
name=company_name,
slug=slug,
description=f"Personal workspace for {username}",
is_personal=True,
max_users=1 # Limit to single user
)
db.session.add(personal_company)
db.session.flush() # Get company ID
# Create freelancer user
new_user = User(
username=username,
email=email,
company_id=personal_company.id,
account_type=AccountType.FREELANCER,
business_name=business_name if business_name else None,
role=Role.ADMIN, # Freelancers are admins of their personal company
is_verified=True # Auto-verify freelancers
)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
logger.info(f"Freelancer account created: {username} with personal company: {company_name}")
flash(f'Welcome {username}! Your freelancer account has been created successfully. You can now log in.', 'success')
return redirect(url_for('login'))
except Exception as e:
db.session.rollback()
logger.error(f"Error during freelancer registration: {str(e)}")
error = f"An error occurred during registration: {str(e)}"
if error:
flash(error, 'error')
return render_template('register_freelancer.html', title='Register as Freelancer')
@app.route('/setup_company', methods=['GET', 'POST'])
def setup_company():
"""Company setup route for creating new companies with admin users"""
existing_companies = Company.query.count()
# Determine access level
is_initial_setup = existing_companies == 0
is_super_admin = g.user and g.user.role == Role.ADMIN and existing_companies > 0
is_authorized = is_initial_setup or is_super_admin
# Check authorization for non-initial setups
if not is_initial_setup and not is_super_admin:
flash('You do not have permission to create new companies.', 'error')
return redirect(url_for('home') if g.user else url_for('login'))
if request.method == 'POST':
company_name = request.form.get('company_name')
company_description = request.form.get('company_description', '')
admin_username = request.form.get('admin_username')
admin_email = request.form.get('admin_email')
admin_password = request.form.get('admin_password')
confirm_password = request.form.get('confirm_password')
# Validate input
error = None
if not company_name:
error = 'Company name is required'
elif not admin_username:
error = 'Admin username is required'
elif not admin_email:
error = 'Admin email is required'
elif not admin_password:
error = 'Admin password is required'
elif admin_password != confirm_password:
error = 'Passwords do not match'
elif len(admin_password) < 6:
error = 'Password must be at least 6 characters long'
if error is None:
try:
# Generate company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# Ensure slug uniqueness
base_slug = slug
counter = 1
while Company.query.filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
# Create company
company = Company(
name=company_name,
slug=slug,
description=company_description,
is_active=True
)
db.session.add(company)
db.session.flush() # Get company.id without committing
# Check if username/email already exists in this company context
existing_user_by_username = User.query.filter_by(
username=admin_username,
company_id=company.id
).first()
existing_user_by_email = User.query.filter_by(
email=admin_email,
company_id=company.id
).first()
if existing_user_by_username:
error = 'Username already exists in this company'
elif existing_user_by_email:
error = 'Email already registered in this company'
if error is None:
# Create admin user
admin_user = User(
username=admin_username,
email=admin_email,
company_id=company.id,
role=Role.ADMIN,
is_verified=True # Auto-verify company admin
)
admin_user.set_password(admin_password)
db.session.add(admin_user)
db.session.commit()
if is_initial_setup:
# Auto-login the admin user for initial setup
session['user_id'] = admin_user.id
session['username'] = admin_user.username
session['role'] = admin_user.role.value
flash(f'Company "{company_name}" created successfully! You are now logged in as the administrator.', 'success')
return redirect(url_for('home'))
else:
# For super admin creating additional companies, don't auto-login
flash(f'Company "{company_name}" created successfully! Admin user "{admin_username}" has been created with the company code "{slug}".', 'success')
return redirect(url_for('admin_company') if g.user else url_for('login'))
else:
db.session.rollback()
except Exception as e:
db.session.rollback()
logger.error(f"Error during company setup: {str(e)}")
error = f"An error occurred during setup: {str(e)}"
if error:
flash(error, 'error')
return render_template('setup_company.html',
title='Company Setup',
existing_companies=existing_companies,
is_initial_setup=is_initial_setup,
is_super_admin=is_super_admin)
@app.route('/verify_email/<token>')
def verify_email(token):
user = User.query.filter_by(verification_token=token).first()
if not user:
flash('Invalid or expired verification link.', 'error')
return redirect(url_for('login'))
if user.verify_token(token):
db.session.commit()
flash('Email verified successfully! You can now log in.', 'success')
else:
flash('Invalid or expired verification link.', 'error')
return redirect(url_for('login'))
@app.route('/dashboard')
@role_required(Role.TEAM_LEADER)
def dashboard():
# Get dashboard data based on user role
dashboard_data = {}
if g.user.role == Role.ADMIN and g.user.company_id:
# Admin sees everything within their company
dashboard_data.update({
'total_users': User.query.filter_by(company_id=g.user.company_id).count(),
'total_teams': Team.query.filter_by(company_id=g.user.company_id).count(),
'blocked_users': User.query.filter_by(company_id=g.user.company_id, is_blocked=True).count(),
'unverified_users': User.query.filter_by(company_id=g.user.company_id, is_verified=False).count(),
'recent_registrations': User.query.filter_by(company_id=g.user.company_id).order_by(User.id.desc()).limit(5).all()
})
if g.user.role in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
# Team leaders and supervisors see team-related data
if g.user.team_id or g.user.role == Role.ADMIN:
if g.user.role == Role.ADMIN and g.user.company_id:
# Admin can see all teams in their company
teams = Team.query.filter_by(company_id=g.user.company_id).all()
team_members = User.query.filter(
User.team_id.isnot(None),
User.company_id == g.user.company_id
).all()
else:
# Team leaders/supervisors see their own team
teams = [Team.query.get(g.user.team_id)] if g.user.team_id else []
team_members = User.query.filter_by(
team_id=g.user.team_id,
company_id=g.user.company_id
).all() if g.user.team_id else []
dashboard_data.update({
'teams': teams,
'team_members': team_members,
'team_member_count': len(team_members)
})
# Get recent time entries for the user's oversight
if g.user.role == Role.ADMIN:
# Admin sees all recent entries
recent_entries = TimeEntry.query.order_by(TimeEntry.arrival_time.desc()).limit(10).all()
elif g.user.team_id:
# Team leaders see their team's entries
team_user_ids = [user.id for user in User.query.filter_by(team_id=g.user.team_id).all()]
recent_entries = TimeEntry.query.filter(TimeEntry.user_id.in_(team_user_ids)).order_by(TimeEntry.arrival_time.desc()).limit(10).all()
else:
recent_entries = []
dashboard_data['recent_entries'] = recent_entries
return render_template('dashboard.html', title='Dashboard', **dashboard_data)
# Redirect old admin dashboard URL to new dashboard
@app.route('/admin/users')
@admin_required
@company_required
def admin_users():
users = User.query.filter_by(company_id=g.user.company_id).all()
return render_template('admin_users.html', title='User Management', users=users)
@app.route('/admin/users/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_user():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
auto_verify = 'auto_verify' in request.form
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif User.query.filter_by(username=username, company_id=g.user.company_id).first():
error = 'Username already exists in your company'
elif User.query.filter_by(email=email, company_id=g.user.company_id).first():
error = 'Email already registered in your company'
if error is None:
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
# Create new user with role and team
new_user = User(
username=username,
email=email,
company_id=g.user.company_id,
is_verified=auto_verify,
role=role,
team_id=team_id if team_id else None
)
new_user.set_password(password)
if not auto_verify:
# Generate verification token and send email
token = new_user.generate_verification_token()
verification_url = url_for('verify_email', token=token, _external=True)
msg = Message('Verify your TimeTrack account', recipients=[email])
msg.body = f'''Hello {username},
An administrator has created an account for you on TimeTrack. To activate your account, please click on the link below:
{verification_url}
This link will expire in 24 hours.
Best regards,
The TimeTrack Team
'''
mail.send(msg)
db.session.add(new_user)
db.session.commit()
if auto_verify:
flash(f'User {username} created and automatically verified!', 'success')
else:
flash(f'User {username} created! Verification email sent.', 'success')
return redirect(url_for('admin_users'))
flash(error, 'error')
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('create_user.html', title='Create User', teams=teams, roles=roles)
@app.route('/admin/users/edit/<int:user_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif username != user.username and User.query.filter_by(username=username, company_id=g.user.company_id).first():
error = 'Username already exists in your company'
elif email != user.email and User.query.filter_by(email=email, company_id=g.user.company_id).first():
error = 'Email already registered in your company'
if error is None:
user.username = username
user.email = email
# Convert role string to enum
try:
user.role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
user.role = Role.TEAM_MEMBER
user.team_id = team_id if team_id else None
if password:
user.set_password(password)
db.session.commit()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('admin_users'))
flash(error, 'error')
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles)
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def delete_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent deleting yourself
if user.id == session.get('user_id'):
flash('You cannot delete your own account', 'error')
return redirect(url_for('admin_users'))
username = user.username
db.session.delete(user)
db.session.commit()
flash(f'User {username} deleted successfully', 'success')
return redirect(url_for('admin_users'))
@app.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
user = User.query.get(session['user_id'])
if request.method == 'POST':
email = request.form.get('email')
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validate input
error = None
if not email:
error = 'Email is required'
elif email != user.email and User.query.filter_by(email=email).first():
error = 'Email already registered'
# Password change validation
if new_password:
if not current_password:
error = 'Current password is required to set a new password'
elif not user.check_password(current_password):
error = 'Current password is incorrect'
elif new_password != confirm_password:
error = 'New passwords do not match'
if error is None:
user.email = email
if new_password:
user.set_password(new_password)
db.session.commit()
flash('Profile updated successfully!', 'success')
return redirect(url_for('profile'))
flash(error, 'error')
return render_template('profile.html', title='My Profile', user=user)
@app.route('/2fa/setup', methods=['GET', 'POST'])
@login_required
def setup_2fa():
if request.method == 'POST':
# Verify the TOTP code before enabling 2FA
totp_code = request.form.get('totp_code')
if not totp_code:
flash('Please enter the verification code from your authenticator app.', 'error')
return redirect(url_for('setup_2fa'))
try:
if g.user.verify_2fa_token(totp_code, allow_setup=True):
g.user.two_factor_enabled = True
db.session.commit()
flash('Two-factor authentication has been successfully enabled!', 'success')
return redirect(url_for('profile'))
else:
flash('Invalid verification code. Please make sure your device time is synchronized and try again.', 'error')
return redirect(url_for('setup_2fa'))
except Exception as e:
logger.error(f"2FA setup error: {str(e)}")
flash('An error occurred during 2FA setup. Please try again.', 'error')
return redirect(url_for('setup_2fa'))
# GET request - show setup page
if g.user.two_factor_enabled:
flash('Two-factor authentication is already enabled.', 'info')
return redirect(url_for('profile'))
# Generate secret if not exists
if not g.user.two_factor_secret:
g.user.generate_2fa_secret()
db.session.commit()
# Generate QR code
import qrcode
import io
import base64
qr_uri = g.user.get_2fa_uri()
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(qr_uri)
qr.make(fit=True)
# Create QR code image
qr_img = qr.make_image(fill_color="black", back_color="white")
img_buffer = io.BytesIO()
qr_img.save(img_buffer, format='PNG')
img_buffer.seek(0)
qr_code_b64 = base64.b64encode(img_buffer.getvalue()).decode()
return render_template('setup_2fa.html',
title='Setup Two-Factor Authentication',
secret=g.user.two_factor_secret,
qr_code=qr_code_b64)
@app.route('/2fa/disable', methods=['POST'])
@login_required
def disable_2fa():
password = request.form.get('password')
if not password or not g.user.check_password(password):
flash('Please enter your correct password to disable 2FA.', 'error')
return redirect(url_for('profile'))
g.user.two_factor_enabled = False
g.user.two_factor_secret = None
db.session.commit()
flash('Two-factor authentication has been disabled.', 'success')
return redirect(url_for('profile'))
@app.route('/2fa/verify', methods=['GET', 'POST'])
def verify_2fa():
# Check if user is in 2FA verification state
user_id = session.get('2fa_user_id')
if not user_id:
return redirect(url_for('login'))
user = User.query.get(user_id)
if not user or not user.two_factor_enabled:
session.pop('2fa_user_id', None)
return redirect(url_for('login'))
if request.method == 'POST':
totp_code = request.form.get('totp_code')
if user.verify_2fa_token(totp_code):
# Complete login process
session.pop('2fa_user_id', None)
session['user_id'] = user.id
session['username'] = user.username
session['role'] = user.role.value
# Log successful 2FA login
SystemEvent.log_event(
'user_login_2fa',
f'User {user.username} logged in successfully with 2FA',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Login successful!', 'success')
return redirect(url_for('home'))
else:
# Log failed 2FA attempt
SystemEvent.log_event(
'2fa_failed',
f'Failed 2FA verification for user {user.username}',
'auth',
'warning',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Invalid verification code. Please try again.', 'error')
return render_template('verify_2fa.html', title='Two-Factor Authentication')
@app.route('/about')
def about():
return render_template('about.html', title='About')
@app.route('/contact', methods=['GET', 'POST'])
@login_required
def contact():
# redacted
return render_template('contact.html', title='Contact')
# We can keep this route as a redirect to home for backward compatibility
@app.route('/timetrack')
@login_required
def timetrack():
return redirect(url_for('home'))
@app.route('/api/arrive', methods=['POST'])
@login_required
def arrive():
# Get project and notes from request
project_id = request.json.get('project_id') if request.json else None
notes = request.json.get('notes') if request.json else None
# Validate project access if project is specified
if project_id:
project = Project.query.get(project_id)
if not project or not project.is_user_allowed(g.user):
return jsonify({'error': 'Invalid or unauthorized project'}), 403
# Create a new time entry with arrival time for the current user
new_entry = TimeEntry(
user_id=g.user.id,
arrival_time=datetime.now(),
project_id=int(project_id) if project_id else None,
notes=notes
)
db.session.add(new_entry)
db.session.commit()
# Format response with user preferences
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return jsonify({
'id': new_entry.id,
'arrival_time': format_datetime_by_preference(new_entry.arrival_time, date_format, time_format_24h),
'project': {
'id': new_entry.project.id,
'code': new_entry.project.code,
'name': new_entry.project.name
} if new_entry.project else None,
'notes': new_entry.notes
})
@app.route('/api/leave/<int:entry_id>', methods=['POST'])
@login_required
def leave(entry_id):
# Find the time entry for the current user
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
# Set the departure time
departure_time = datetime.now()
# Apply time rounding if enabled
rounded_arrival, rounded_departure = apply_time_rounding(entry.arrival_time, departure_time, g.user)
entry.arrival_time = rounded_arrival
entry.departure_time = rounded_departure
# If currently paused, add the final break duration
if entry.is_paused and entry.pause_start_time:
final_break_duration = int((rounded_departure - entry.pause_start_time).total_seconds())
entry.total_break_duration += final_break_duration
entry.is_paused = False
entry.pause_start_time = None
# Apply rounding to break duration if enabled
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
if interval_minutes > 0:
entry.total_break_duration = round_duration_to_interval(
entry.total_break_duration, interval_minutes, round_to_nearest
)
# Calculate work duration considering breaks
entry.duration, effective_break = calculate_work_duration(
rounded_arrival,
rounded_departure,
entry.total_break_duration,
g.user
)
db.session.commit()
return jsonify({
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat(),
'duration': entry.duration,
'total_break_duration': entry.total_break_duration,
'effective_break_duration': effective_break
})
# Add this new route to handle pausing/resuming
@app.route('/api/toggle-pause/<int:entry_id>', methods=['POST'])
@login_required
def toggle_pause(entry_id):
# Find the time entry for the current user
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
now = datetime.now()
if entry.is_paused:
# Resuming work - calculate break duration
break_duration = int((now - entry.pause_start_time).total_seconds())
entry.total_break_duration += break_duration
entry.is_paused = False
entry.pause_start_time = None
message = "Work resumed"
else:
# Pausing work
entry.is_paused = True
entry.pause_start_time = now
message = "Work paused"
db.session.commit()
return jsonify({
'id': entry.id,
'is_paused': entry.is_paused,
'total_break_duration': entry.total_break_duration,
'message': message
})
@app.route('/config', methods=['GET', 'POST'])
@login_required
def config():
# Get user preferences or create default if none exists
preferences = UserPreferences.query.filter_by(user_id=g.user.id).first()
if not preferences:
preferences = UserPreferences(user_id=g.user.id)
db.session.add(preferences)
db.session.commit()
if request.method == 'POST':
try:
# Update only user preferences (no company policies)
preferences.time_format_24h = 'time_format_24h' in request.form
preferences.date_format = request.form.get('date_format', 'ISO')
preferences.time_rounding_minutes = int(request.form.get('time_rounding_minutes', 0))
preferences.round_to_nearest = 'round_to_nearest' in request.form
db.session.commit()
flash('Preferences updated successfully!', 'success')
return redirect(url_for('config'))
except ValueError:
flash('Please enter valid values for all fields', 'error')
# Get company work policies for display (read-only)
company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
# Import time utils for display options
from time_utils import get_available_rounding_options, get_available_date_formats
rounding_options = get_available_rounding_options()
date_format_options = get_available_date_formats()
return render_template('config.html', title='User Preferences',
preferences=preferences,
company_config=company_config,
rounding_options=rounding_options,
date_format_options=date_format_options)
@app.route('/api/delete/<int:entry_id>', methods=['DELETE'])
@login_required
def delete_entry(entry_id):
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
db.session.delete(entry)
db.session.commit()
return jsonify({'success': True, 'message': 'Entry deleted successfully'})
@app.route('/api/update/<int:entry_id>', methods=['PUT'])
@login_required
def update_entry(entry_id):
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
data = request.json
if not data:
return jsonify({'success': False, 'message': 'No JSON data provided'}), 400
if 'arrival_time' in data:
try:
# Accept only ISO 8601 format
arrival_time_str = data['arrival_time']
entry.arrival_time = datetime.fromisoformat(arrival_time_str.replace('Z', '+00:00'))
except (ValueError, AttributeError) as e:
return jsonify({'success': False, 'message': f'Invalid arrival time format. Expected ISO 8601: {arrival_time_str}'}), 400
if 'departure_time' in data and data['departure_time']:
try:
# Accept only ISO 8601 format
departure_time_str = data['departure_time']
entry.departure_time = datetime.fromisoformat(departure_time_str.replace('Z', '+00:00'))
# Recalculate duration if both times are present
if entry.arrival_time and entry.departure_time:
# Calculate work duration considering breaks
entry.duration, _ = calculate_work_duration(
entry.arrival_time,
entry.departure_time,
entry.total_break_duration,
g.user
)
except (ValueError, AttributeError) as e:
return jsonify({'success': False, 'message': f'Invalid departure time format. Expected ISO 8601: {departure_time_str}'}), 400
db.session.commit()
return jsonify({
'success': True,
'message': 'Entry updated successfully',
'entry': {
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat() if entry.departure_time else None,
'duration': entry.duration,
'is_paused': entry.is_paused,
'total_break_duration': entry.total_break_duration
}
})
def calculate_work_duration(arrival_time, departure_time, total_break_duration, user):
"""
Calculate work duration considering both configured and actual break times.
Args:
arrival_time: Datetime of arrival
departure_time: Datetime of departure
total_break_duration: Actual logged break duration in seconds
user: User object to get company configuration
Returns:
tuple: (work_duration_in_seconds, effective_break_duration_in_seconds)
"""
# Calculate raw duration
raw_duration = (departure_time - arrival_time).total_seconds()
# Get company work configuration for break rules
company_config = CompanyWorkConfig.query.filter_by(company_id=user.company_id).first()
if not company_config:
# Use Germany defaults if no company config exists
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
break_threshold_hours = preset['break_threshold_hours']
mandatory_break_minutes = preset['mandatory_break_minutes']
additional_break_threshold_hours = preset['additional_break_threshold_hours']
additional_break_minutes = preset['additional_break_minutes']
else:
break_threshold_hours = company_config.break_threshold_hours
mandatory_break_minutes = company_config.mandatory_break_minutes
additional_break_threshold_hours = company_config.additional_break_threshold_hours
additional_break_minutes = company_config.additional_break_minutes
# Calculate mandatory breaks based on work duration
work_hours = raw_duration / 3600 # Convert seconds to hours
configured_break_seconds = 0
# Apply primary break if work duration exceeds threshold
if work_hours > break_threshold_hours:
configured_break_seconds += mandatory_break_minutes * 60
# Apply additional break if work duration exceeds additional threshold
if work_hours > additional_break_threshold_hours:
configured_break_seconds += additional_break_minutes * 60
# Use the greater of configured breaks or actual logged breaks
effective_break_duration = max(configured_break_seconds, total_break_duration)
# Calculate final work duration
work_duration = int(raw_duration - effective_break_duration)
return work_duration, effective_break_duration
@app.route('/api/resume/<int:entry_id>', methods=['POST'])
@login_required
def resume_entry(entry_id):
# Find the entry to resume for the current user
entry_to_resume = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
# Check if there's already an active entry
active_entry = TimeEntry.query.filter_by(user_id=session['user_id'], departure_time=None).first()
if active_entry:
return jsonify({
'success': False,
'message': 'Cannot resume this entry. Another session is already active.'
}), 400
# Clear the departure time to make this entry active again
entry_to_resume.departure_time = None
# Reset pause state if it was paused
entry_to_resume.is_paused = False
entry_to_resume.pause_start_time = None
db.session.commit()
return jsonify({
'success': True,
'message': 'Work resumed on existing entry',
'id': entry_to_resume.id,
'arrival_time': entry_to_resume.arrival_time.isoformat(),
'total_break_duration': entry_to_resume.total_break_duration
})
@app.route('/api/manual-entry', methods=['POST'])
@login_required
def manual_entry():
try:
data = request.get_json()
# Extract data from request
project_id = data.get('project_id')
start_date = data.get('start_date')
start_time = data.get('start_time')
end_date = data.get('end_date')
end_time = data.get('end_time')
break_minutes = int(data.get('break_minutes', 0))
notes = data.get('notes', '')
# Validate required fields
if not all([start_date, start_time, end_date, end_time]):
return jsonify({'error': 'Start and end date/time are required'}), 400
# Parse datetime strings
try:
arrival_datetime = datetime.strptime(f"{start_date} {start_time}", '%Y-%m-%d %H:%M:%S')
departure_datetime = datetime.strptime(f"{end_date} {end_time}", '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
# Try without seconds if parsing fails
arrival_datetime = datetime.strptime(f"{start_date} {start_time}:00", '%Y-%m-%d %H:%M:%S')
departure_datetime = datetime.strptime(f"{end_date} {end_time}:00", '%Y-%m-%d %H:%M:%S')
except ValueError:
return jsonify({'error': 'Invalid date/time format'}), 400
# Validate that end time is after start time
if departure_datetime <= arrival_datetime:
return jsonify({'error': 'End time must be after start time'}), 400
# Apply time rounding if enabled
rounded_arrival, rounded_departure = apply_time_rounding(arrival_datetime, departure_datetime, g.user)
# Validate project access if project is specified
if project_id:
project = Project.query.get(project_id)
if not project or not project.is_user_allowed(g.user):
return jsonify({'error': 'Invalid or unauthorized project'}), 403
# Check for overlapping entries for this user (using rounded times)
overlapping_entry = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.departure_time.isnot(None),
TimeEntry.arrival_time < rounded_departure,
TimeEntry.departure_time > rounded_arrival
).first()
if overlapping_entry:
return jsonify({
'error': 'This time entry overlaps with an existing entry'
}), 400
# Calculate total duration in seconds (using rounded times)
total_duration = int((rounded_departure - rounded_arrival).total_seconds())
break_duration_seconds = break_minutes * 60
# Apply rounding to break duration if enabled
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
if interval_minutes > 0:
break_duration_seconds = round_duration_to_interval(
break_duration_seconds, interval_minutes, round_to_nearest
)
# Validate break duration doesn't exceed total duration
if break_duration_seconds >= total_duration:
return jsonify({'error': 'Break duration cannot exceed total work duration'}), 400
# Calculate work duration (total duration minus breaks)
work_duration = total_duration - break_duration_seconds
# Create the manual time entry (using rounded times)
new_entry = TimeEntry(
user_id=g.user.id,
arrival_time=rounded_arrival,
departure_time=rounded_departure,
duration=work_duration,
total_break_duration=break_duration_seconds,
project_id=int(project_id) if project_id else None,
notes=notes,
is_paused=False,
pause_start_time=None
)
db.session.add(new_entry)
db.session.commit()
return jsonify({
'success': True,
'message': 'Manual time entry added successfully',
'entry_id': new_entry.id
})
except Exception as e:
logger.error(f"Error creating manual time entry: {str(e)}")
db.session.rollback()
return jsonify({'error': 'An error occurred while creating the time entry'}), 500
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
@app.route('/test')
def test():
return "App is working!"
@app.route('/admin/users/toggle-status/<int:user_id>')
@admin_required
@company_required
def toggle_user_status(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent blocking yourself
if user.id == session.get('user_id'):
flash('You cannot block your own account', 'error')
return redirect(url_for('admin_users'))
# Toggle the blocked status
user.is_blocked = not user.is_blocked
db.session.commit()
if user.is_blocked:
flash(f'User {user.username} has been blocked', 'success')
else:
flash(f'User {user.username} has been unblocked', 'success')
return redirect(url_for('admin_users'))
# Add this route to manage system settings
@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
if request.method == 'POST':
# Update registration setting
registration_enabled = 'registration_enabled' in request.form
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
if reg_setting:
reg_setting.value = 'true' if registration_enabled else 'false'
# Update email verification setting
email_verification_required = 'email_verification_required' in request.form
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
if email_setting:
email_setting.value = 'true' if email_verification_required else 'false'
db.session.commit()
flash('System settings updated successfully!', 'success')
# Get current settings
settings = {}
for setting in SystemSettings.query.all():
if setting.key == 'registration_enabled':
settings['registration_enabled'] = setting.value == 'true'
elif setting.key == 'email_verification_required':
settings['email_verification_required'] = setting.value == 'true'
return render_template('admin_settings.html', title='System Settings', settings=settings)
@app.route('/system-admin/dashboard')
@system_admin_required
def system_admin_dashboard():
"""System Administrator Dashboard - view all data across companies"""
# Global statistics
total_companies = Company.query.count()
total_users = User.query.count()
total_teams = Team.query.count()
total_projects = Project.query.count()
total_time_entries = TimeEntry.query.count()
# System admin count
system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
regular_admins = User.query.filter_by(role=Role.ADMIN).count()
# Recent activity (last 7 days)
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_users = User.query.filter(User.created_at >= week_ago).count()
recent_companies = Company.query.filter(Company.created_at >= week_ago).count()
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= week_ago).count()
# Top companies by user count
top_companies = db.session.query(
Company.name,
Company.id,
db.func.count(User.id).label('user_count')
).join(User).group_by(Company.id).order_by(db.func.count(User.id).desc()).limit(5).all()
# Recent companies
recent_companies_list = Company.query.order_by(Company.created_at.desc()).limit(5).all()
# System health checks
orphaned_users = User.query.filter_by(company_id=None).count()
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
blocked_users = User.query.filter_by(is_blocked=True).count()
return render_template('system_admin_dashboard.html',
title='System Administrator Dashboard',
total_companies=total_companies,
total_users=total_users,
total_teams=total_teams,
total_projects=total_projects,
total_time_entries=total_time_entries,
system_admins=system_admins,
regular_admins=regular_admins,
recent_users=recent_users,
recent_companies=recent_companies,
recent_time_entries=recent_time_entries,
top_companies=top_companies,
recent_companies_list=recent_companies_list,
orphaned_users=orphaned_users,
orphaned_time_entries=orphaned_time_entries,
blocked_users=blocked_users)
@app.route('/system-admin/users')
@system_admin_required
def system_admin_users():
"""System Admin: View all users across all companies"""
filter_type = request.args.get('filter', '')
page = request.args.get('page', 1, type=int)
per_page = 50
# Build query based on filter
query = User.query
if filter_type == 'blocked':
query = query.filter_by(is_blocked=True)
elif filter_type == 'system_admins':
query = query.filter_by(role=Role.SYSTEM_ADMIN)
elif filter_type == 'admins':
query = query.filter_by(role=Role.ADMIN)
elif filter_type == 'unverified':
query = query.filter_by(is_verified=False)
elif filter_type == 'freelancers':
query = query.filter_by(account_type=AccountType.FREELANCER)
# Add company join for display
query = query.join(Company).add_columns(Company.name.label('company_name'))
# Order by creation date (newest first)
query = query.order_by(User.created_at.desc())
# Paginate results
users = query.paginate(page=page, per_page=per_page, error_out=False)
return render_template('system_admin_users.html',
title='System Admin - All Users',
users=users,
current_filter=filter_type)
@app.route('/system-admin/users/<int:user_id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_edit_user(user_id):
"""System Admin: Edit any user across companies"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
# Get form data
username = request.form.get('username')
email = request.form.get('email')
role = request.form.get('role')
is_blocked = request.form.get('is_blocked') == 'on'
is_verified = request.form.get('is_verified') == 'on'
company_id = request.form.get('company_id')
team_id = request.form.get('team_id') or None
# Validation
error = None
# Check if username is unique within the company
existing_user = User.query.filter(
User.username == username,
User.company_id == company_id,
User.id != user_id
).first()
if existing_user:
error = f'Username "{username}" is already taken in this company.'
# Check if email is unique within the company
existing_email = User.query.filter(
User.email == email,
User.company_id == company_id,
User.id != user_id
).first()
if existing_email:
error = f'Email "{email}" is already registered in this company.'
if not error:
# Update user
user.username = username
user.email = email
user.role = Role(role)
user.is_blocked = is_blocked
user.is_verified = is_verified
user.company_id = company_id
user.team_id = team_id
db.session.commit()
flash(f'User {username} updated successfully.', 'success')
return redirect(url_for('system_admin_users'))
flash(error, 'error')
# Get all companies and teams for form dropdowns
companies = Company.query.order_by(Company.name).all()
teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all()
roles = get_available_roles()
return render_template('system_admin_edit_user.html',
title=f'Edit User: {user.username}',
user=user,
companies=companies,
teams=teams,
roles=roles)
@app.route('/system-admin/users/<int:user_id>/delete', methods=['POST'])
@system_admin_required
def system_admin_delete_user(user_id):
"""System Admin: Delete any user (with safety checks)"""
user = User.query.get_or_404(user_id)
# Safety check: prevent deleting the last system admin
if user.role == Role.SYSTEM_ADMIN:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
if system_admin_count <= 1:
flash('Cannot delete the last system administrator.', 'error')
return redirect(url_for('system_admin_users'))
# Safety check: prevent deleting yourself
if user.id == g.user.id:
flash('Cannot delete your own account.', 'error')
return redirect(url_for('system_admin_users'))
username = user.username
company_name = user.company.name if user.company else 'Unknown'
# Delete related data first
TimeEntry.query.filter_by(user_id=user.id).delete()
WorkConfig.query.filter_by(user_id=user.id).delete()
# Delete the user
db.session.delete(user)
db.session.commit()
flash(f'User "{username}" from company "{company_name}" has been deleted.', 'success')
return redirect(url_for('system_admin_users'))
@app.route('/system-admin/companies')
@system_admin_required
def system_admin_companies():
"""System Admin: View all companies"""
page = request.args.get('page', 1, type=int)
per_page = 20
companies = Company.query.order_by(Company.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False)
# Get user counts for each company
company_stats = {}
for company in companies.items:
user_count = User.query.filter_by(company_id=company.id).count()
admin_count = User.query.filter(
User.company_id == company.id,
User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN])
).count()
company_stats[company.id] = {
'user_count': user_count,
'admin_count': admin_count
}
return render_template('system_admin_companies.html',
title='System Admin - All Companies',
companies=companies,
company_stats=company_stats)
@app.route('/system-admin/companies/<int:company_id>')
@system_admin_required
def system_admin_company_detail(company_id):
"""System Admin: View detailed company information"""
company = Company.query.get_or_404(company_id)
# Get company statistics
users = User.query.filter_by(company_id=company.id).all()
teams = Team.query.filter_by(company_id=company.id).all()
projects = Project.query.filter_by(company_id=company.id).all()
# Recent activity
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_time_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= week_ago
).count()
# Role distribution
role_counts = {}
for role in Role:
count = User.query.filter_by(company_id=company.id, role=role).count()
if count > 0:
role_counts[role.value] = count
return render_template('system_admin_company_detail.html',
title=f'Company: {company.name}',
company=company,
users=users,
teams=teams,
projects=projects,
recent_time_entries=recent_time_entries,
role_counts=role_counts)
@app.route('/system-admin/time-entries')
@system_admin_required
def system_admin_time_entries():
"""System Admin: View time entries across all companies"""
page = request.args.get('page', 1, type=int)
company_filter = request.args.get('company', '')
per_page = 50
# Build query
query = TimeEntry.query.join(User).join(Company)
if company_filter:
query = query.filter(Company.id == company_filter)
# Add columns for display
query = query.add_columns(
User.username,
Company.name.label('company_name'),
Project.name.label('project_name')
).outerjoin(Project)
# Order by arrival time (newest first)
query = query.order_by(TimeEntry.arrival_time.desc())
# Paginate
entries = query.paginate(page=page, per_page=per_page, error_out=False)
# Get companies for filter dropdown
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_time_entries.html',
title='System Admin - Time Entries',
entries=entries,
companies=companies,
current_company=company_filter)
@app.route('/system-admin/settings', methods=['GET', 'POST'])
@system_admin_required
def system_admin_settings():
"""System Admin: Global system settings"""
if request.method == 'POST':
# Update system settings
registration_enabled = request.form.get('registration_enabled') == 'on'
email_verification = request.form.get('email_verification_required') == 'on'
tracking_script_enabled = request.form.get('tracking_script_enabled') == 'on'
tracking_script_code = request.form.get('tracking_script_code', '')
# Update or create settings
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
if reg_setting:
reg_setting.value = 'true' if registration_enabled else 'false'
else:
reg_setting = SystemSettings(
key='registration_enabled',
value='true' if registration_enabled else 'false',
description='Controls whether new user registration is allowed'
)
db.session.add(reg_setting)
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
if email_setting:
email_setting.value = 'true' if email_verification else 'false'
else:
email_setting = SystemSettings(
key='email_verification_required',
value='true' if email_verification else 'false',
description='Controls whether email verification is required for new accounts'
)
db.session.add(email_setting)
tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first()
if tracking_enabled_setting:
tracking_enabled_setting.value = 'true' if tracking_script_enabled else 'false'
else:
tracking_enabled_setting = SystemSettings(
key='tracking_script_enabled',
value='true' if tracking_script_enabled else 'false',
description='Controls whether custom tracking script is enabled'
)
db.session.add(tracking_enabled_setting)
tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first()
if tracking_code_setting:
tracking_code_setting.value = tracking_script_code
else:
tracking_code_setting = SystemSettings(
key='tracking_script_code',
value=tracking_script_code,
description='Custom tracking script code (HTML/JavaScript)'
)
db.session.add(tracking_code_setting)
db.session.commit()
flash('System settings updated successfully.', 'success')
return redirect(url_for('system_admin_settings'))
# Get current settings
settings = {}
all_settings = SystemSettings.query.all()
for setting in all_settings:
if setting.key == 'registration_enabled':
settings['registration_enabled'] = setting.value == 'true'
elif setting.key == 'email_verification_required':
settings['email_verification_required'] = setting.value == 'true'
elif setting.key == 'tracking_script_enabled':
settings['tracking_script_enabled'] = setting.value == 'true'
elif setting.key == 'tracking_script_code':
settings['tracking_script_code'] = setting.value
# System statistics
total_companies = Company.query.count()
total_users = User.query.count()
total_system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
return render_template('system_admin_settings.html',
title='System Administrator Settings',
settings=settings,
total_companies=total_companies,
total_users=total_users,
total_system_admins=total_system_admins)
@app.route('/system-admin/health')
@system_admin_required
def system_admin_health():
"""System Admin: System health check and event log"""
# Get system health summary
health_summary = SystemEvent.get_system_health_summary()
# Get recent events (last 7 days)
recent_events = SystemEvent.get_recent_events(days=7, limit=100)
# Get events by severity for quick stats
errors = SystemEvent.get_events_by_severity('error', days=7, limit=20)
warnings = SystemEvent.get_events_by_severity('warning', days=7, limit=20)
# System metrics
from datetime import datetime, timedelta
now = datetime.now()
# Database connection test
db_healthy = True
db_error = None
try:
db.session.execute('SELECT 1')
except Exception as e:
db_healthy = False
db_error = str(e)
SystemEvent.log_event(
'database_check_failed',
f'Database health check failed: {str(e)}',
'system',
'error'
)
# Application uptime (approximate based on first event)
first_event = SystemEvent.query.order_by(SystemEvent.timestamp.asc()).first()
uptime_start = first_event.timestamp if first_event else now
uptime_duration = now - uptime_start
# Recent activity stats
today = now.date()
today_events = SystemEvent.query.filter(
func.date(SystemEvent.timestamp) == today
).count()
# Log the health check
SystemEvent.log_event(
'system_health_check',
f'System health check performed by {session.get("username", "unknown")}',
'system',
'info',
user_id=session.get('user_id'),
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
return render_template('system_admin_health.html',
title='System Health Check',
health_summary=health_summary,
recent_events=recent_events,
errors=errors,
warnings=warnings,
db_healthy=db_healthy,
db_error=db_error,
uptime_duration=uptime_duration,
today_events=today_events)
@app.route('/system-admin/announcements')
@system_admin_required
def system_admin_announcements():
"""System Admin: Manage announcements"""
page = request.args.get('page', 1, type=int)
per_page = 20
announcements = Announcement.query.order_by(Announcement.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False)
return render_template('system_admin_announcements.html',
title='System Admin - Announcements',
announcements=announcements)
@app.route('/system-admin/announcements/new', methods=['GET', 'POST'])
@system_admin_required
def system_admin_announcement_new():
"""System Admin: Create new announcement"""
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
announcement_type = request.form.get('announcement_type', 'info')
is_urgent = request.form.get('is_urgent') == 'on'
is_active = request.form.get('is_active') == 'on'
# Handle date fields
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
start_datetime = None
end_datetime = None
if start_date:
try:
start_datetime = datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
except ValueError:
pass
if end_date:
try:
end_datetime = datetime.strptime(end_date, '%Y-%m-%dT%H:%M')
except ValueError:
pass
# Handle targeting
target_all_users = request.form.get('target_all_users') == 'on'
target_roles = None
target_companies = None
if not target_all_users:
selected_roles = request.form.getlist('target_roles')
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
target_roles = json.dumps(selected_roles)
if selected_companies:
import json
target_companies = json.dumps([int(c) for c in selected_companies])
announcement = Announcement(
title=title,
content=content,
announcement_type=announcement_type,
is_urgent=is_urgent,
is_active=is_active,
start_date=start_datetime,
end_date=end_datetime,
target_all_users=target_all_users,
target_roles=target_roles,
target_companies=target_companies,
created_by_id=g.user.id
)
db.session.add(announcement)
db.session.commit()
flash('Announcement created successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
# Get roles and companies for targeting options
roles = [role.value for role in Role]
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_announcement_form.html',
title='Create Announcement',
announcement=None,
roles=roles,
companies=companies)
@app.route('/system-admin/announcements/<int:id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_announcement_edit(id):
"""System Admin: Edit announcement"""
announcement = Announcement.query.get_or_404(id)
if request.method == 'POST':
announcement.title = request.form.get('title')
announcement.content = request.form.get('content')
announcement.announcement_type = request.form.get('announcement_type', 'info')
announcement.is_urgent = request.form.get('is_urgent') == 'on'
announcement.is_active = request.form.get('is_active') == 'on'
# Handle date fields
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
if start_date:
try:
announcement.start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
except ValueError:
announcement.start_date = None
else:
announcement.start_date = None
if end_date:
try:
announcement.end_date = datetime.strptime(end_date, '%Y-%m-%dT%H:%M')
except ValueError:
announcement.end_date = None
else:
announcement.end_date = None
# Handle targeting
announcement.target_all_users = request.form.get('target_all_users') == 'on'
if not announcement.target_all_users:
selected_roles = request.form.getlist('target_roles')
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
announcement.target_roles = json.dumps(selected_roles)
else:
announcement.target_roles = None
if selected_companies:
import json
announcement.target_companies = json.dumps([int(c) for c in selected_companies])
else:
announcement.target_companies = None
else:
announcement.target_roles = None
announcement.target_companies = None
announcement.updated_at = datetime.now()
db.session.commit()
flash('Announcement updated successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
# Get roles and companies for targeting options
roles = [role.value for role in Role]
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_announcement_form.html',
title='Edit Announcement',
announcement=announcement,
roles=roles,
companies=companies)
@app.route('/system-admin/announcements/<int:id>/delete', methods=['POST'])
@system_admin_required
def system_admin_announcement_delete(id):
"""System Admin: Delete announcement"""
announcement = Announcement.query.get_or_404(id)
db.session.delete(announcement)
db.session.commit()
flash('Announcement deleted successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
@app.route('/admin/work-policies', methods=['GET', 'POST'])
@admin_required
@company_required
def admin_work_policies():
# Get or create company work config
work_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
if not work_config:
# Create default config for the company
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
work_config = CompanyWorkConfig(
company_id=g.user.company_id,
work_hours_per_day=preset['work_hours_per_day'],
mandatory_break_minutes=preset['mandatory_break_minutes'],
break_threshold_hours=preset['break_threshold_hours'],
additional_break_minutes=preset['additional_break_minutes'],
additional_break_threshold_hours=preset['additional_break_threshold_hours'],
region=WorkRegion.GERMANY,
region_name=preset['region_name'],
created_by_id=g.user.id
)
db.session.add(work_config)
db.session.commit()
if request.method == 'POST':
try:
# Handle regional preset selection
if request.form.get('action') == 'apply_preset':
region_code = request.form.get('region_preset')
if region_code:
region = WorkRegion(region_code)
preset = CompanyWorkConfig.get_regional_preset(region)
work_config.work_hours_per_day = preset['work_hours_per_day']
work_config.mandatory_break_minutes = preset['mandatory_break_minutes']
work_config.break_threshold_hours = preset['break_threshold_hours']
work_config.additional_break_minutes = preset['additional_break_minutes']
work_config.additional_break_threshold_hours = preset['additional_break_threshold_hours']
work_config.region = region
work_config.region_name = preset['region_name']
db.session.commit()
flash(f'Applied {preset["region_name"]} work policy preset', 'success')
return redirect(url_for('admin_work_policies'))
# Handle manual configuration update
else:
work_config.work_hours_per_day = float(request.form.get('work_hours_per_day', 8.0))
work_config.mandatory_break_minutes = int(request.form.get('mandatory_break_minutes', 30))
work_config.break_threshold_hours = float(request.form.get('break_threshold_hours', 6.0))
work_config.additional_break_minutes = int(request.form.get('additional_break_minutes', 15))
work_config.additional_break_threshold_hours = float(request.form.get('additional_break_threshold_hours', 9.0))
work_config.region = WorkRegion.CUSTOM
work_config.region_name = 'Custom Configuration'
db.session.commit()
flash('Work policies updated successfully!', 'success')
return redirect(url_for('admin_work_policies'))
except ValueError:
flash('Please enter valid numbers for all fields', 'error')
# Get available regional presets
regional_presets = []
for region in WorkRegion:
preset = CompanyWorkConfig.get_regional_preset(region)
regional_presets.append({
'code': region.value,
'name': preset['region_name'],
'description': f"{preset['work_hours_per_day']}h/day, {preset['mandatory_break_minutes']}min break after {preset['break_threshold_hours']}h"
})
return render_template('admin_work_policies.html',
title='Work Policies',
work_config=work_config,
regional_presets=regional_presets,
WorkRegion=WorkRegion)
# Company Management Routes
@app.route('/admin/company')
@admin_required
@company_required
def admin_company():
"""View and manage company settings"""
company = g.company
# Get company statistics
stats = {
'total_users': User.query.filter_by(company_id=company.id).count(),
'total_teams': Team.query.filter_by(company_id=company.id).count(),
'total_projects': Project.query.filter_by(company_id=company.id).count(),
'active_projects': Project.query.filter_by(company_id=company.id, is_active=True).count(),
}
return render_template('admin_company.html', title='Company Management', company=company, stats=stats)
@app.route('/admin/company/edit', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_company():
"""Edit company details"""
company = g.company
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description', '')
max_users = request.form.get('max_users')
is_active = 'is_active' in request.form
# Validate input
error = None
if not name:
error = 'Company name is required'
elif name != company.name and Company.query.filter_by(name=name).first():
error = 'Company name already exists'
if max_users:
try:
max_users = int(max_users)
if max_users < 1:
error = 'Maximum users must be at least 1'
except ValueError:
error = 'Maximum users must be a valid number'
else:
max_users = None
if error is None:
company.name = name
company.description = description
company.max_users = max_users
company.is_active = is_active
db.session.commit()
flash('Company details updated successfully!', 'success')
return redirect(url_for('admin_company'))
else:
flash(error, 'error')
return render_template('edit_company.html', title='Edit Company', company=company)
@app.route('/admin/company/users')
@admin_required
@company_required
def company_users():
"""List all users in the company with detailed information"""
users = User.query.filter_by(company_id=g.company.id).order_by(User.created_at.desc()).all()
# Calculate user statistics
user_stats = {
'total': len(users),
'verified': len([u for u in users if u.is_verified]),
'unverified': len([u for u in users if not u.is_verified]),
'blocked': len([u for u in users if u.is_blocked]),
'active': len([u for u in users if not u.is_blocked and u.is_verified]),
'admins': len([u for u in users if u.role == Role.ADMIN]),
'supervisors': len([u for u in users if u.role == Role.SUPERVISOR]),
'team_leaders': len([u for u in users if u.role == Role.TEAM_LEADER]),
'team_members': len([u for u in users if u.role == Role.TEAM_MEMBER]),
}
return render_template('company_users.html', title='Company Users',
users=users, stats=user_stats, company=g.company)
# Add these routes for team management
@app.route('/admin/teams')
@admin_required
@company_required
def admin_teams():
teams = Team.query.filter_by(company_id=g.user.company_id).all()
return render_template('admin_teams.html', title='Team Management', teams=teams)
@app.route('/admin/teams/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_team():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
new_team = Team(name=name, description=description, company_id=g.user.company_id)
db.session.add(new_team)
db.session.commit()
flash(f'Team "{name}" created successfully!', 'success')
return redirect(url_for('admin_teams'))
flash(error, 'error')
return render_template('create_team.html', title='Create Team')
@app.route('/admin/teams/edit/<int:team_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
team.name = name
team.description = description
db.session.commit()
flash(f'Team "{name}" updated successfully!', 'success')
return redirect(url_for('admin_teams'))
flash(error, 'error')
return render_template('edit_team.html', title='Edit Team', team=team)
@app.route('/admin/teams/delete/<int:team_id>', methods=['POST'])
@admin_required
@company_required
def delete_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
# Check if team has members
if team.users:
flash('Cannot delete team with members. Remove all members first.', 'error')
return redirect(url_for('admin_teams'))
team_name = team.name
db.session.delete(team)
db.session.commit()
flash(f'Team "{team_name}" deleted successfully!', 'success')
return redirect(url_for('admin_teams'))
@app.route('/admin/teams/<int:team_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def manage_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
action = request.form.get('action')
if action == 'update_team':
# Update team details
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
team.name = name
team.description = description
db.session.commit()
flash(f'Team "{name}" updated successfully!', 'success')
else:
flash(error, 'error')
elif action == 'add_member':
# Add user to team
user_id = request.form.get('user_id')
if user_id:
user = User.query.get(user_id)
if user:
user.team_id = team.id
db.session.commit()
flash(f'User {user.username} added to team!', 'success')
else:
flash('User not found', 'error')
else:
flash('No user selected', 'error')
elif action == 'remove_member':
# Remove user from team
user_id = request.form.get('user_id')
if user_id:
user = User.query.get(user_id)
if user and user.team_id == team.id:
user.team_id = None
db.session.commit()
flash(f'User {user.username} removed from team!', 'success')
else:
flash('User not found or not in this team', 'error')
else:
flash('No user selected', 'error')
# Get team members
team_members = User.query.filter_by(team_id=team.id).all()
# Get users not in this team for the add member form (company-scoped)
available_users = User.query.filter(
User.company_id == g.user.company_id,
(User.team_id != team.id) | (User.team_id == None)
).all()
return render_template(
'manage_team.html',
title=f'Manage Team: {team.name}',
team=team,
team_members=team_members,
available_users=available_users
)
# Project Management Routes
@app.route('/admin/projects')
@role_required(Role.SUPERVISOR) # Supervisors and Admins can manage projects
@company_required
def admin_projects():
projects = Project.query.filter_by(company_id=g.user.company_id).order_by(Project.created_at.desc()).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('admin_projects.html', title='Project Management', projects=projects, categories=categories)
@app.route('/admin/projects/create', methods=['GET', 'POST'])
@role_required(Role.SUPERVISOR)
@company_required
def create_project():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
code = request.form.get('code')
team_id = request.form.get('team_id') or None
category_id = request.form.get('category_id') or None
start_date_str = request.form.get('start_date')
end_date_str = request.form.get('end_date')
# Validate input
error = None
if not name:
error = 'Project name is required'
elif not code:
error = 'Project code is required'
elif Project.query.filter_by(code=code, company_id=g.user.company_id).first():
error = 'Project code already exists in your company'
# Parse dates
start_date = None
end_date = None
if start_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid start date format'
if end_date_str:
try:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid end date format'
if start_date and end_date and start_date > end_date:
error = 'Start date cannot be after end date'
if error is None:
project = Project(
name=name,
description=description,
code=code.upper(),
company_id=g.user.company_id,
team_id=int(team_id) if team_id else None,
category_id=int(category_id) if category_id else None,
start_date=start_date,
end_date=end_date,
created_by_id=g.user.id
)
db.session.add(project)
db.session.commit()
flash(f'Project "{name}" created successfully!', 'success')
return redirect(url_for('admin_projects'))
else:
flash(error, 'error')
# Get available teams and categories for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('create_project.html', title='Create Project', teams=teams, categories=categories)
@app.route('/admin/projects/edit/<int:project_id>', methods=['GET', 'POST'])
@role_required(Role.SUPERVISOR)
@company_required
def edit_project(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
code = request.form.get('code')
team_id = request.form.get('team_id') or None
category_id = request.form.get('category_id') or None
is_active = request.form.get('is_active') == 'on'
start_date_str = request.form.get('start_date')
end_date_str = request.form.get('end_date')
# Validate input
error = None
if not name:
error = 'Project name is required'
elif not code:
error = 'Project code is required'
elif code != project.code and Project.query.filter_by(code=code, company_id=g.user.company_id).first():
error = 'Project code already exists in your company'
# Parse dates
start_date = None
end_date = None
if start_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid start date format'
if end_date_str:
try:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid end date format'
if start_date and end_date and start_date > end_date:
error = 'Start date cannot be after end date'
if error is None:
project.name = name
project.description = description
project.code = code.upper()
project.team_id = int(team_id) if team_id else None
project.category_id = int(category_id) if category_id else None
project.is_active = is_active
project.start_date = start_date
project.end_date = end_date
db.session.commit()
flash(f'Project "{name}" updated successfully!', 'success')
return redirect(url_for('admin_projects'))
else:
flash(error, 'error')
# Get available teams and categories for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('edit_project.html', title='Edit Project', project=project, teams=teams, categories=categories)
@app.route('/admin/projects/delete/<int:project_id>', methods=['POST'])
@role_required(Role.ADMIN) # Only admins can delete projects
@company_required
def delete_project(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if there are time entries associated with this project
time_entries_count = TimeEntry.query.filter_by(project_id=project_id).count()
if time_entries_count > 0:
flash(f'Cannot delete project "{project.name}" - it has {time_entries_count} time entries associated with it. Deactivate the project instead.', 'error')
else:
project_name = project.name
db.session.delete(project)
db.session.commit()
flash(f'Project "{project_name}" deleted successfully!', 'success')
return redirect(url_for('admin_projects'))
@app.route('/api/team/hours_data', methods=['GET'])
@login_required
@role_required(Role.TEAM_LEADER) # Only team leaders and above can access
@company_required
def team_hours_data():
# Get the current user's team
team = Team.query.get(g.user.team_id)
if not team:
return jsonify({
'success': False,
'message': 'You are not assigned to any team.'
}), 400
# Get date range from query parameters or use current week as default
today = datetime.now().date()
start_of_week = today - timedelta(days=today.weekday())
end_of_week = start_of_week + timedelta(days=6)
start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d'))
include_self = request.args.get('include_self', 'false') == 'true'
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
return jsonify({
'success': False,
'message': 'Invalid date format.'
}), 400
# Get all team members
team_members = User.query.filter_by(team_id=team.id).all()
# Prepare data structure for team members' hours
team_data = []
for member in team_members:
# Skip if the member is the current user (team leader) and include_self is False
if member.id == g.user.id and not include_self:
continue
# Get time entries for this member in the date range
entries = TimeEntry.query.filter(
TimeEntry.user_id == member.id,
TimeEntry.arrival_time >= datetime.combine(start_date, time.min),
TimeEntry.arrival_time <= datetime.combine(end_date, time.max)
).order_by(TimeEntry.arrival_time).all()
# Calculate daily and total hours
daily_hours = {}
total_seconds = 0
for entry in entries:
if entry.duration: # Only count completed entries
entry_date = entry.arrival_time.date()
date_str = entry_date.strftime('%Y-%m-%d')
if date_str not in daily_hours:
daily_hours[date_str] = 0
daily_hours[date_str] += entry.duration
total_seconds += entry.duration
# Convert seconds to hours for display
for date_str in daily_hours:
daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours
total_hours = round(total_seconds / 3600, 2) # Convert to hours
# Format entries for JSON response
formatted_entries = []
for entry in entries:
formatted_entries.append({
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat() if entry.departure_time else None,
'duration': entry.duration,
'total_break_duration': entry.total_break_duration
})
# Add member data to team data
team_data.append({
'user': {
'id': member.id,
'username': member.username,
'email': member.email
},
'daily_hours': daily_hours,
'total_hours': total_hours,
'entries': formatted_entries
})
# Generate a list of dates in the range for the table header
date_range = []
current_date = start_date
while current_date <= end_date:
date_range.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
return jsonify({
'success': True,
'team': {
'id': team.id,
'name': team.name,
'description': team.description
},
'team_data': team_data,
'date_range': date_range,
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat()
})
@app.route('/export')
def export():
return render_template('export.html', title='Export Data')
def get_date_range(period, start_date_str=None, end_date_str=None):
"""Get start and end date based on period or custom date range."""
today = datetime.now().date()
if period:
if period == 'today':
return today, today
elif period == 'week':
start_date = today - timedelta(days=today.weekday())
return start_date, today
elif period == 'month':
start_date = today.replace(day=1)
return start_date, today
elif period == 'all':
earliest_entry = TimeEntry.query.order_by(TimeEntry.arrival_time).first()
start_date = earliest_entry.arrival_time.date() if earliest_entry else today
return start_date, today
else:
# Custom date range
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
return start_date, end_date
except (ValueError, TypeError):
raise ValueError('Invalid date format')
@app.route('/download_export')
def download_export():
"""Handle export download requests."""
export_format = request.args.get('format', 'csv')
period = request.args.get('period')
try:
start_date, end_date = get_date_range(
period,
request.args.get('start_date'),
request.args.get('end_date')
)
except ValueError:
flash('Invalid date format. Please use YYYY-MM-DD format.')
return redirect(url_for('export'))
# Query entries within the date range
start_datetime = datetime.combine(start_date, time.min)
end_datetime = datetime.combine(end_date, time.max)
entries = TimeEntry.query.filter(
TimeEntry.arrival_time >= start_datetime,
TimeEntry.arrival_time <= end_datetime
).order_by(TimeEntry.arrival_time).all()
if not entries:
flash('No entries found for the selected date range.')
return redirect(url_for('export'))
# Prepare data and filename
data = prepare_export_data(entries)
filename = f"timetrack_export_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}"
# Export based on format
if export_format == 'csv':
return export_to_csv(data, filename)
elif export_format == 'excel':
return export_to_excel(data, filename)
else:
flash('Invalid export format.')
return redirect(url_for('export'))
@app.route('/analytics')
@app.route('/analytics/<mode>')
@login_required
def analytics(mode='personal'):
"""Unified analytics view combining history, team hours, and graphs"""
# Validate mode parameter
if mode not in ['personal', 'team']:
mode = 'personal'
# Check team access for team mode
if mode == 'team':
if not g.user.team_id:
flash('You must be assigned to a team to view team analytics.', 'warning')
return redirect(url_for('analytics', mode='personal'))
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
flash('You do not have permission to view team analytics.', 'error')
return redirect(url_for('analytics', mode='personal'))
# Get available projects for filtering
available_projects = []
all_projects = Project.query.filter_by(is_active=True).all()
for project in all_projects:
if project.is_user_allowed(g.user):
available_projects.append(project)
# Get team members if in team mode
team_members = []
if mode == 'team' and g.user.team_id:
team_members = User.query.filter_by(team_id=g.user.team_id).all()
# Default date range (current week)
today = datetime.now().date()
start_of_week = today - timedelta(days=today.weekday())
end_of_week = start_of_week + timedelta(days=6)
return render_template('analytics.html',
title='Time Analytics',
mode=mode,
available_projects=available_projects,
team_members=team_members,
default_start_date=start_of_week.strftime('%Y-%m-%d'),
default_end_date=end_of_week.strftime('%Y-%m-%d'))
@app.route('/api/analytics/data')
@login_required
def analytics_data():
"""API endpoint for analytics data"""
mode = request.args.get('mode', 'personal')
view_type = request.args.get('view', 'table')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
project_filter = request.args.get('project_id')
granularity = request.args.get('granularity', 'daily')
# Validate mode
if mode not in ['personal', 'team']:
return jsonify({'error': 'Invalid mode'}), 400
# Check permissions for team mode
if mode == 'team':
if not g.user.team_id:
return jsonify({'error': 'No team assigned'}), 403
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
return jsonify({'error': 'Insufficient permissions'}), 403
try:
# Parse dates
if start_date:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if end_date:
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# Get filtered data
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
# Format data based on view type
if view_type == 'graph':
formatted_data = format_graph_data(data, granularity)
elif view_type == 'team':
formatted_data = format_team_data(data, granularity)
else:
formatted_data = format_table_data(data)
return jsonify(formatted_data)
except Exception as e:
logger.error(f"Error in analytics_data: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
def get_filtered_analytics_data(user, mode, start_date=None, end_date=None, project_filter=None):
"""Get filtered time entry data for analytics"""
# Base query
query = TimeEntry.query
# Apply user/team filter
if mode == 'personal':
query = query.filter(TimeEntry.user_id == user.id)
elif mode == 'team' and user.team_id:
team_user_ids = [u.id for u in User.query.filter_by(team_id=user.team_id).all()]
query = query.filter(TimeEntry.user_id.in_(team_user_ids))
# Apply date filters
if start_date:
query = query.filter(func.date(TimeEntry.arrival_time) >= start_date)
if end_date:
query = query.filter(func.date(TimeEntry.arrival_time) <= end_date)
# Apply project filter
if project_filter:
if project_filter == 'none':
query = query.filter(TimeEntry.project_id.is_(None))
else:
try:
project_id = int(project_filter)
query = query.filter(TimeEntry.project_id == project_id)
except ValueError:
pass
return query.order_by(TimeEntry.arrival_time.desc()).all()
@app.route('/api/companies/<int:company_id>/teams')
@system_admin_required
def api_company_teams(company_id):
"""API: Get teams for a specific company (System Admin only)"""
teams = Team.query.filter_by(company_id=company_id).order_by(Team.name).all()
return jsonify([{
'id': team.id,
'name': team.name,
'description': team.description
} for team in teams])
@app.route('/api/system-admin/stats')
@system_admin_required
def api_system_admin_stats():
"""API: Get real-time system statistics for dashboard"""
from datetime import datetime, timedelta
# Get basic counts
total_companies = Company.query.count()
total_users = User.query.count()
total_teams = Team.query.count()
total_projects = Project.query.count()
total_time_entries = TimeEntry.query.count()
# Active sessions
active_sessions = TimeEntry.query.filter_by(departure_time=None, is_paused=False).count()
paused_sessions = TimeEntry.query.filter_by(is_paused=True).count()
# Recent activity (last 24 hours)
yesterday = datetime.now() - timedelta(days=1)
recent_users = User.query.filter(User.created_at >= yesterday).count()
recent_companies = Company.query.filter(Company.created_at >= yesterday).count()
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= yesterday).count()
# System health
orphaned_users = User.query.filter_by(company_id=None).count()
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
blocked_users = User.query.filter_by(is_blocked=True).count()
unverified_users = User.query.filter_by(is_verified=False).count()
return jsonify({
'totals': {
'companies': total_companies,
'users': total_users,
'teams': total_teams,
'projects': total_projects,
'time_entries': total_time_entries
},
'active': {
'sessions': active_sessions,
'paused_sessions': paused_sessions
},
'recent': {
'users': recent_users,
'companies': recent_companies,
'time_entries': recent_time_entries
},
'health': {
'orphaned_users': orphaned_users,
'orphaned_time_entries': orphaned_time_entries,
'blocked_users': blocked_users,
'unverified_users': unverified_users
}
})
@app.route('/api/system-admin/companies/<int:company_id>/users')
@system_admin_required
def api_company_users(company_id):
"""API: Get users for a specific company (System Admin only)"""
company = Company.query.get_or_404(company_id)
users = User.query.filter_by(company_id=company.id).order_by(User.username).all()
return jsonify({
'company': {
'id': company.id,
'name': company.name,
'is_personal': company.is_personal
},
'users': [{
'id': user.id,
'username': user.username,
'email': user.email,
'role': user.role.value,
'is_blocked': user.is_blocked,
'is_verified': user.is_verified,
'created_at': user.created_at.isoformat(),
'team_id': user.team_id
} for user in users]
})
@app.route('/api/system-admin/users/<int:user_id>/toggle-block', methods=['POST'])
@system_admin_required
def api_toggle_user_block(user_id):
"""API: Toggle user blocked status (System Admin only)"""
user = User.query.get_or_404(user_id)
# Safety check: prevent blocking yourself
if user.id == g.user.id:
return jsonify({'error': 'Cannot block your own account'}), 400
# Safety check: prevent blocking the last system admin
if user.role == Role.SYSTEM_ADMIN and not user.is_blocked:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count()
if system_admin_count <= 1:
return jsonify({'error': 'Cannot block the last system administrator'}), 400
user.is_blocked = not user.is_blocked
db.session.commit()
return jsonify({
'id': user.id,
'username': user.username,
'is_blocked': user.is_blocked,
'message': f'User {"blocked" if user.is_blocked else "unblocked"} successfully'
})
@app.route('/api/system-admin/companies/<int:company_id>/stats')
@system_admin_required
def api_company_stats(company_id):
"""API: Get detailed statistics for a specific company"""
company = Company.query.get_or_404(company_id)
# User counts by role
role_counts = {}
for role in Role:
count = User.query.filter_by(company_id=company.id, role=role).count()
if count > 0:
role_counts[role.value] = count
# Team and project counts
team_count = Team.query.filter_by(company_id=company.id).count()
project_count = Project.query.filter_by(company_id=company.id).count()
active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count()
# Time entries statistics
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
month_ago = datetime.now() - timedelta(days=30)
weekly_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= week_ago
).count()
monthly_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= month_ago
).count()
# Active sessions
active_sessions = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.departure_time == None,
TimeEntry.is_paused == False
).count()
return jsonify({
'company': {
'id': company.id,
'name': company.name,
'is_personal': company.is_personal,
'is_active': company.is_active
},
'users': {
'total': sum(role_counts.values()),
'by_role': role_counts
},
'structure': {
'teams': team_count,
'projects': project_count,
'active_projects': active_projects
},
'activity': {
'weekly_entries': weekly_entries,
'monthly_entries': monthly_entries,
'active_sessions': active_sessions
}
})
@app.route('/api/analytics/export')
@login_required
def analytics_export():
"""Export analytics data in various formats"""
export_format = request.args.get('format', 'csv')
view_type = request.args.get('view', 'table')
mode = request.args.get('mode', 'personal')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
project_filter = request.args.get('project_id')
# Validate permissions
if mode == 'team':
if not g.user.team_id:
flash('No team assigned', 'error')
return redirect(url_for('analytics'))
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
flash('Insufficient permissions', 'error')
return redirect(url_for('analytics'))
try:
# Parse dates
if start_date:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if end_date:
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# Get data
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
if export_format == 'csv':
return export_analytics_csv(data, view_type, mode)
elif export_format == 'excel':
return export_analytics_excel(data, view_type, mode)
else:
flash('Invalid export format', 'error')
return redirect(url_for('analytics'))
except Exception as e:
logger.error(f"Error in analytics export: {str(e)}")
flash('Error generating export', 'error')
return redirect(url_for('analytics'))
# Task Management Routes
@app.route('/admin/projects/<int:project_id>/tasks')
@role_required(Role.TEAM_MEMBER) # All authenticated users can view tasks
@company_required
def manage_project_tasks(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if user has access to this project
if not project.is_user_allowed(g.user):
flash('You do not have access to this project.', 'error')
return redirect(url_for('admin_projects'))
# Get all tasks for this project
tasks = Task.query.filter_by(project_id=project_id).order_by(Task.created_at.desc()).all()
# Get team members for assignment dropdown
if project.team_id:
# If project is assigned to a specific team, only show team members
team_members = User.query.filter_by(team_id=project.team_id, company_id=g.user.company_id).all()
else:
# If project is available to all teams, show all company users
team_members = User.query.filter_by(company_id=g.user.company_id).all()
return render_template('manage_project_tasks.html',
title=f'Tasks - {project.name}',
project=project,
tasks=tasks,
team_members=team_members)
@app.route('/admin/projects/<int:project_id>/kanban')
@role_required(Role.TEAM_MEMBER)
@company_required
def project_kanban(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if user has access to this project
if not project.is_user_allowed(g.user):
flash('You do not have access to this project.', 'error')
return redirect(url_for('admin_projects'))
# Get all Kanban boards for this project
boards = KanbanBoard.query.filter_by(project_id=project_id, is_active=True).order_by(KanbanBoard.created_at.desc()).all()
# Get team members for assignment dropdown
if project.team_id:
team_members = User.query.filter_by(team_id=project.team_id, company_id=g.user.company_id).all()
else:
team_members = User.query.filter_by(company_id=g.user.company_id).all()
# Get tasks for task assignment dropdown
tasks = Task.query.filter_by(project_id=project_id).order_by(Task.name).all()
return render_template('project_kanban.html',
title=f'Kanban - {project.name}',
project=project,
boards=boards,
team_members=team_members,
tasks=tasks)
@app.route('/kanban')
@role_required(Role.TEAM_MEMBER)
@company_required
def kanban_overview():
# Get all projects the user has access to
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
# Admins and Supervisors can see all company projects
projects = Project.query.filter_by(company_id=g.user.company_id, is_active=True).order_by(Project.name).all()
elif g.user.team_id:
# Team members see team projects + unassigned projects
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).order_by(Project.name).all()
else:
# Unassigned users see only unassigned projects
projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).order_by(Project.name).all()
# Get Kanban boards for each project
project_boards = {}
for project in projects:
boards = KanbanBoard.query.filter_by(
project_id=project.id,
is_active=True
).order_by(KanbanBoard.created_at.desc()).all()
if boards: # Only include projects that have Kanban boards
project_boards[project] = boards
return render_template('kanban_overview.html',
title='Kanban Overview',
project_boards=project_boards)
# Task API Routes
@app.route('/api/tasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_task():
try:
data = request.get_json()
project_id = data.get('project_id')
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
# Validate required fields
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Task name is required'})
# Parse dates
start_date = None
due_date = None
if data.get('start_date'):
start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date()
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create task
task = Task(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
project_id=project_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
start_date=start_date,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
task_data = {
'id': task.id,
'name': task.name,
'description': task.description,
'status': task.status.value,
'priority': task.priority.value,
'estimated_hours': task.estimated_hours,
'assigned_to_id': task.assigned_to_id,
'start_date': task.start_date.isoformat() if task.start_date else None,
'due_date': task.due_date.isoformat() if task.due_date else None
}
return jsonify({'success': True, 'task': task_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
data = request.get_json()
# Update task fields
if 'name' in data:
task.name = data['name']
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
task.completed_date = datetime.now().date()
else:
task.completed_date = None
if 'priority' in data:
task.priority = TaskPriority(data['priority'])
if 'estimated_hours' in data:
task.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data:
task.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'start_date' in data:
task.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None
if 'due_date' in data:
task.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
db.session.commit()
return jsonify({'success': True, 'message': 'Task updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER) # Only team leaders and above can delete tasks
@company_required
def delete_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
db.session.delete(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Subtask API Routes
@app.route('/api/subtasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_subtask():
try:
data = request.get_json()
task_id = data.get('task_id')
# Verify task access
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
# Validate required fields
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Subtask name is required'})
# Parse dates
start_date = None
due_date = None
if data.get('start_date'):
start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date()
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create subtask
subtask = SubTask(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
task_id=task_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
start_date=start_date,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(subtask)
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
subtask_data = {
'id': subtask.id,
'name': subtask.name,
'description': subtask.description,
'status': subtask.status.value,
'priority': subtask.priority.value,
'estimated_hours': subtask.estimated_hours,
'assigned_to_id': subtask.assigned_to_id,
'start_date': subtask.start_date.isoformat() if subtask.start_date else None,
'due_date': subtask.due_date.isoformat() if subtask.due_date else None
}
return jsonify({'success': True, 'subtask': subtask_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
data = request.get_json()
# Update subtask fields
if 'name' in data:
subtask.name = data['name']
if 'description' in data:
subtask.description = data['description']
if 'status' in data:
subtask.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
subtask.completed_date = datetime.now().date()
else:
subtask.completed_date = None
if 'priority' in data:
subtask.priority = TaskPriority(data['priority'])
if 'estimated_hours' in data:
subtask.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data:
subtask.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'start_date' in data:
subtask.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None
if 'due_date' in data:
subtask.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER) # Only team leaders and above can delete subtasks
@company_required
def delete_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
db.session.delete(subtask)
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Category Management API Routes
@app.route('/api/admin/categories', methods=['POST'])
@role_required(Role.ADMIN)
@company_required
def create_category():
try:
data = request.get_json()
name = data.get('name')
description = data.get('description', '')
color = data.get('color', '#007bff')
icon = data.get('icon', '')
if not name:
return jsonify({'success': False, 'message': 'Category name is required'})
# Check if category already exists
existing = ProjectCategory.query.filter_by(
name=name,
company_id=g.user.company_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Category name already exists'})
category = ProjectCategory(
name=name,
description=description,
color=color,
icon=icon,
company_id=g.user.company_id,
created_by_id=g.user.id
)
db.session.add(category)
db.session.commit()
return jsonify({'success': True, 'message': 'Category created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/admin/categories/<int:category_id>', methods=['PUT'])
@role_required(Role.ADMIN)
@company_required
def update_category(category_id):
try:
category = ProjectCategory.query.filter_by(
id=category_id,
company_id=g.user.company_id
).first()
if not category:
return jsonify({'success': False, 'message': 'Category not found'})
data = request.get_json()
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Category name is required'})
# Check if name conflicts with another category
existing = ProjectCategory.query.filter(
ProjectCategory.name == name,
ProjectCategory.company_id == g.user.company_id,
ProjectCategory.id != category_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Category name already exists'})
category.name = name
category.description = data.get('description', '')
category.color = data.get('color', category.color)
category.icon = data.get('icon', '')
db.session.commit()
return jsonify({'success': True, 'message': 'Category updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/admin/categories/<int:category_id>', methods=['DELETE'])
@role_required(Role.ADMIN)
@company_required
def delete_category(category_id):
try:
category = ProjectCategory.query.filter_by(
id=category_id,
company_id=g.user.company_id
).first()
if not category:
return jsonify({'success': False, 'message': 'Category not found'})
# Unassign projects from this category
projects = Project.query.filter_by(category_id=category_id).all()
for project in projects:
project.category_id = None
db.session.delete(category)
db.session.commit()
return jsonify({'success': True, 'message': 'Category deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Kanban API Routes
@app.route('/api/kanban/stats')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_stats():
try:
# Get all projects the user has access to
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
projects = Project.query.filter_by(company_id=g.user.company_id, is_active=True).all()
elif g.user.team_id:
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).all()
else:
projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).all()
# Count boards and cards
total_boards = 0
total_cards = 0
projects_with_boards = 0
for project in projects:
boards = KanbanBoard.query.filter_by(project_id=project.id, is_active=True).all()
if boards:
projects_with_boards += 1
total_boards += len(boards)
for board in boards:
for column in board.columns:
total_cards += len([card for card in column.cards if card.is_active])
return jsonify({
'success': True,
'stats': {
'projects_with_boards': projects_with_boards,
'total_boards': total_boards,
'total_cards': total_cards
}
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_boards():
try:
project_id = request.args.get('project_id')
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
boards = KanbanBoard.query.filter_by(project_id=project_id, is_active=True).all()
boards_data = []
for board in boards:
boards_data.append({
'id': board.id,
'name': board.name,
'description': board.description,
'is_default': board.is_default,
'created_at': board.created_at.isoformat(),
'column_count': len(board.columns)
})
return jsonify({'success': True, 'boards': boards_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards', methods=['POST'])
@role_required(Role.TEAM_LEADER)
@company_required
def create_kanban_board():
try:
data = request.get_json()
project_id = int(data.get('project_id'))
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Board name is required'})
# Check if board name already exists in project
existing = KanbanBoard.query.filter_by(project_id=project_id, name=name).first()
if existing:
return jsonify({'success': False, 'message': 'Board name already exists in this project'})
# Create board
board = KanbanBoard(
name=name,
description=data.get('description', ''),
project_id=project_id,
is_default=data.get('is_default') in ['true', 'on', True],
created_by_id=g.user.id
)
db.session.add(board)
db.session.flush() # Get board ID for columns
# Create default columns
default_columns = [
{'name': 'To Do', 'position': 1, 'color': '#6c757d'},
{'name': 'In Progress', 'position': 2, 'color': '#007bff'},
{'name': 'Done', 'position': 3, 'color': '#28a745'}
]
for col_data in default_columns:
column = KanbanColumn(
name=col_data['name'],
position=col_data['position'],
color=col_data['color'],
board_id=board.id
)
db.session.add(column)
db.session.commit()
return jsonify({'success': True, 'message': 'Board created successfully', 'board_id': board.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards/<int:board_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_board(board_id):
try:
board = KanbanBoard.query.join(Project).filter(
KanbanBoard.id == board_id,
Project.company_id == g.user.company_id
).first()
if not board or not board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Board not found or access denied'})
columns_data = []
for column in board.columns:
if not column.is_active:
continue
cards_data = []
for card in column.cards:
if not card.is_active:
continue
cards_data.append({
'id': card.id,
'title': card.title,
'description': card.description,
'position': card.position,
'color': card.color,
'assigned_to': {
'id': card.assigned_to.id,
'username': card.assigned_to.username
} if card.assigned_to else None,
'task_id': card.task_id,
'task_name': card.task.name if card.task else None,
'due_date': card.due_date.isoformat() if card.due_date else None,
'created_at': card.created_at.isoformat()
})
columns_data.append({
'id': column.id,
'name': column.name,
'description': column.description,
'position': column.position,
'color': column.color,
'wip_limit': column.wip_limit,
'card_count': column.card_count,
'is_over_wip_limit': column.is_over_wip_limit,
'cards': cards_data
})
board_data = {
'id': board.id,
'name': board.name,
'description': board.description,
'project': {
'id': board.project.id,
'name': board.project.name,
'code': board.project.code
},
'columns': columns_data
}
return jsonify({'success': True, 'board': board_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_kanban_card():
try:
data = request.get_json()
column_id = data.get('column_id')
# Verify column access
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
title = data.get('title')
if not title:
return jsonify({'success': False, 'message': 'Card title is required'})
# Calculate position (add to end of column)
max_position = db.session.query(func.max(KanbanCard.position)).filter_by(
column_id=column_id, is_active=True
).scalar() or 0
# Parse due date
due_date = None
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create card
card = KanbanCard(
title=title,
description=data.get('description', ''),
position=max_position + 1,
color=data.get('color'),
column_id=column_id,
task_id=int(data.get('task_id')) if data.get('task_id') else None,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(card)
db.session.commit()
return jsonify({'success': True, 'message': 'Card created successfully', 'card_id': card.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>/move', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def move_kanban_card(card_id):
try:
data = request.get_json()
new_column_id = data.get('column_id')
new_position = data.get('position')
# Verify card access
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
# Verify new column access
new_column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == new_column_id,
Project.company_id == g.user.company_id
).first()
if not new_column or not new_column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Target column not found or access denied'})
old_column_id = card.column_id
old_position = card.position
# Update positions in old column (if moving to different column)
if old_column_id != new_column_id:
# Shift cards down in old column
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == old_column_id,
KanbanCard.position > old_position,
KanbanCard.is_active == True
).all()
for c in cards_to_shift:
c.position -= 1
# Update positions in new column
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == new_column_id,
KanbanCard.position >= new_position,
KanbanCard.is_active == True,
KanbanCard.id != card_id
).all()
for c in cards_to_shift:
c.position += 1
# Update card
card.column_id = new_column_id
card.position = new_position
card.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Card moved successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_kanban_card(card_id):
try:
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
data = request.get_json()
# Update card fields
if 'title' in data:
card.title = data['title']
if 'description' in data:
card.description = data['description']
if 'color' in data:
card.color = data['color']
if 'assigned_to_id' in data:
card.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'task_id' in data:
card.task_id = int(data['task_id']) if data['task_id'] else None
if 'due_date' in data:
card.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
card.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Card updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>', methods=['DELETE'])
@role_required(Role.TEAM_MEMBER)
@company_required
def delete_kanban_card(card_id):
try:
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
column_id = card.column_id
position = card.position
# Soft delete
card.is_active = False
card.updated_at = datetime.now()
# Shift remaining cards up
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == column_id,
KanbanCard.position > position,
KanbanCard.is_active == True
).all()
for c in cards_to_shift:
c.position -= 1
db.session.commit()
return jsonify({'success': True, 'message': 'Card deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Kanban Column Management API
@app.route('/api/kanban/columns', methods=['POST'])
@role_required(Role.TEAM_LEADER)
@company_required
def create_kanban_column():
try:
data = request.get_json()
board_id = int(data.get('board_id'))
# Verify board access
board = KanbanBoard.query.join(Project).filter(
KanbanBoard.id == board_id,
Project.company_id == g.user.company_id
).first()
if not board or not board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Board not found or access denied'})
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Column name is required'})
# Check if column name already exists in board
existing = KanbanColumn.query.filter_by(board_id=board_id, name=name).first()
if existing:
return jsonify({'success': False, 'message': 'Column name already exists in this board'})
# Calculate position (add to end)
max_position = db.session.query(func.max(KanbanColumn.position)).filter_by(
board_id=board_id, is_active=True
).scalar() or 0
# Create column
column = KanbanColumn(
name=name,
description=data.get('description', ''),
position=max_position + 1,
color=data.get('color', '#6c757d'),
wip_limit=int(data.get('wip_limit')) if data.get('wip_limit') else None,
board_id=board_id
)
db.session.add(column)
db.session.commit()
return jsonify({'success': True, 'message': 'Column created successfully', 'column_id': column.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>', methods=['PUT'])
@role_required(Role.TEAM_LEADER)
@company_required
def update_kanban_column(column_id):
try:
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
data = request.get_json()
# Check for name conflicts (excluding current column)
if 'name' in data:
existing = KanbanColumn.query.filter(
KanbanColumn.board_id == column.board_id,
KanbanColumn.name == data['name'],
KanbanColumn.id != column_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Column name already exists in this board'})
# Update column fields
if 'name' in data:
column.name = data['name']
if 'description' in data:
column.description = data['description']
if 'color' in data:
column.color = data['color']
if 'wip_limit' in data:
column.wip_limit = int(data['wip_limit']) if data['wip_limit'] else None
column.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Column updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>/move', methods=['PUT'])
@role_required(Role.TEAM_LEADER)
@company_required
def move_kanban_column(column_id):
try:
data = request.get_json()
new_position = int(data.get('position'))
# Verify column access
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
old_position = column.position
board_id = column.board_id
if old_position == new_position:
return jsonify({'success': True, 'message': 'Column position unchanged'})
# Update positions of other columns
if old_position < new_position:
# Moving right: shift columns left
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position > old_position,
KanbanColumn.position <= new_position,
KanbanColumn.is_active == True,
KanbanColumn.id != column_id
).all()
for c in columns_to_shift:
c.position -= 1
else:
# Moving left: shift columns right
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position >= new_position,
KanbanColumn.position < old_position,
KanbanColumn.is_active == True,
KanbanColumn.id != column_id
).all()
for c in columns_to_shift:
c.position += 1
# Update the moved column
column.position = new_position
column.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Column moved successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER)
@company_required
def delete_kanban_column(column_id):
try:
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
# Check if column has active cards
active_cards = KanbanCard.query.filter_by(column_id=column_id, is_active=True).count()
if active_cards > 0:
return jsonify({'success': False, 'message': f'Cannot delete column with {active_cards} active cards. Move or delete cards first.'})
board_id = column.board_id
position = column.position
# Soft delete the column
column.is_active = False
column.updated_at = datetime.now()
# Shift remaining columns left
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position > position,
KanbanColumn.is_active == True
).all()
for c in columns_to_shift:
c.position -= 1
db.session.commit()
return jsonify({'success': True, 'message': 'Column deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(debug=False, host='0.0.0.0', port=port)