Add Sprint Management feature.

This commit is contained in:
2025-07-04 20:03:30 +02:00
committed by Jens Luedicke
parent 49cc0c94d2
commit 9f4190a29b
8 changed files with 2667 additions and 25 deletions

592
app.py
View File

@@ -1,8 +1,8 @@
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, Announcement, SystemEvent, KanbanBoard, KanbanColumn, KanbanCard, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, Sprint, SprintStatus, Announcement, SystemEvent, KanbanBoard, KanbanColumn, KanbanCard, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate
from data_formatting import (
format_duration, prepare_export_data, prepare_team_hours_export_data,
format_table_data, format_graph_data, format_team_data
format_table_data, format_graph_data, format_team_data, format_burndown_data
)
from data_export import (
export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel,
@@ -3331,6 +3331,13 @@ def analytics_data():
# Format data based on view type
if view_type == 'graph':
formatted_data = format_graph_data(data, granularity)
# For burndown chart, we need task data instead of time entries
chart_type = request.args.get('chart_type', 'timeSeries')
if chart_type == 'burndown':
# Get tasks for burndown chart
tasks = get_filtered_tasks_for_burndown(g.user, mode, start_date, end_date, project_filter)
burndown_data = format_burndown_data(tasks, start_date, end_date)
formatted_data.update(burndown_data)
elif view_type == 'team':
formatted_data = format_team_data(data, granularity)
else:
@@ -3374,6 +3381,48 @@ def get_filtered_analytics_data(user, mode, start_date=None, end_date=None, proj
return query.order_by(TimeEntry.arrival_time.desc()).all()
def get_filtered_tasks_for_burndown(user, mode, start_date=None, end_date=None, project_filter=None):
"""Get filtered tasks for burndown chart"""
# Base query - get tasks from user's company
query = Task.query.join(Project).filter(Project.company_id == user.company_id)
# Apply user/team filter
if mode == 'personal':
# For personal mode, get tasks assigned to the user or created by them
query = query.filter(
(Task.assigned_to_id == user.id) |
(Task.created_by_id == user.id)
)
elif mode == 'team' and user.team_id:
# For team mode, get tasks from projects assigned to the team
query = query.filter(Project.team_id == user.team_id)
# Apply project filter
if project_filter:
if project_filter == 'none':
# No project filter for tasks - they must belong to a project
return []
else:
try:
project_id = int(project_filter)
query = query.filter(Task.project_id == project_id)
except ValueError:
pass
# Apply date filters - use task creation date and completion date
if start_date:
query = query.filter(
(Task.created_at >= datetime.combine(start_date, time.min)) |
(Task.completed_date >= start_date)
)
if end_date:
query = query.filter(
Task.created_at <= datetime.combine(end_date, time.max)
)
return query.order_by(Task.created_at.desc()).all()
@app.route('/api/companies/<int:company_id>/teams')
@system_admin_required
def api_company_teams(company_id):
@@ -3770,6 +3819,104 @@ def kanban_overview():
projects=projects,
create_board=create_board)
# Unified Task Management Route
@app.route('/tasks')
@role_required(Role.TEAM_MEMBER)
@company_required
def unified_task_management():
"""Unified task management interface combining tasks and kanban"""
# Get all projects the user has access to (for filtering and task creation)
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
# Admins and Supervisors can see all company projects
available_projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).order_by(Project.name).all()
elif g.user.team_id:
# Team members see team projects + unassigned projects
available_projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).order_by(Project.name).all()
# Filter by actual access permissions
available_projects = [p for p in available_projects if p.is_user_allowed(g.user)]
else:
# Unassigned users see only unassigned projects
available_projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).order_by(Project.name).all()
available_projects = [p for p in available_projects if p.is_user_allowed(g.user)]
# Get team members for task assignment (company-scoped)
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
# Admins can assign to anyone in the company
team_members = User.query.filter_by(
company_id=g.user.company_id,
is_blocked=False
).order_by(User.username).all()
elif g.user.team_id:
# Team members can assign to team members + supervisors/admins
team_members = User.query.filter(
User.company_id == g.user.company_id,
User.is_blocked == False,
db.or_(
User.team_id == g.user.team_id,
User.role.in_([Role.ADMIN, Role.SUPERVISOR])
)
).order_by(User.username).all()
else:
# Unassigned users can assign to supervisors/admins only
team_members = User.query.filter(
User.company_id == g.user.company_id,
User.is_blocked == False,
User.role.in_([Role.ADMIN, Role.SUPERVISOR])
).order_by(User.username).all()
return render_template('unified_task_management.html',
title='Task Management',
available_projects=available_projects,
team_members=team_members)
# Sprint Management Route
@app.route('/sprints')
@role_required(Role.TEAM_MEMBER)
@company_required
def sprint_management():
"""Sprint management interface"""
# Get all projects the user has access to (for sprint assignment)
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
# Admins and Supervisors can see all company projects
available_projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).order_by(Project.name).all()
elif g.user.team_id:
# Team members see team projects + unassigned projects
available_projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).order_by(Project.name).all()
# Filter by actual access permissions
available_projects = [p for p in available_projects if p.is_user_allowed(g.user)]
else:
# Unassigned users see only unassigned projects
available_projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).order_by(Project.name).all()
available_projects = [p for p in available_projects if p.is_user_allowed(g.user)]
return render_template('sprint_management.html',
title='Sprint Management',
available_projects=available_projects)
# Task API Routes
@app.route('/api/tasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@@ -3801,8 +3948,8 @@ def create_task():
task = Task(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
status=TaskStatus[data.get('status', 'NOT_STARTED')],
priority=TaskPriority[data.get('priority', 'MEDIUM')],
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
project_id=project_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
@@ -3837,8 +3984,8 @@ def get_task(task_id):
'id': task.id,
'name': task.name,
'description': task.description,
'status': task.status.value,
'priority': task.priority.value,
'status': task.status.name,
'priority': task.priority.name,
'estimated_hours': task.estimated_hours,
'assigned_to_id': task.assigned_to_id,
'start_date': task.start_date.isoformat() if task.start_date else None,
@@ -3871,13 +4018,13 @@ def update_task(task_id):
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
task.status = TaskStatus[data['status']]
if data['status'] == 'COMPLETED':
task.completed_date = datetime.now().date()
else:
task.completed_date = None
if 'priority' in data:
task.priority = TaskPriority(data['priority'])
task.priority = TaskPriority[data['priority']]
if 'estimated_hours' in data:
task.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data:
@@ -3917,6 +4064,419 @@ def delete_task(task_id):
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Unified Task Management APIs
@app.route('/api/tasks/unified')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_unified_tasks():
"""Get all tasks for unified kanban view"""
try:
# Base query for tasks in user's company
query = Task.query.join(Project).filter(Project.company_id == g.user.company_id)
# Apply access restrictions based on user role and team
if g.user.role not in [Role.ADMIN, Role.SUPERVISOR]:
# Regular users can only see tasks from projects they have access to
accessible_project_ids = []
projects = Project.query.filter_by(company_id=g.user.company_id).all()
for project in projects:
if project.is_user_allowed(g.user):
accessible_project_ids.append(project.id)
if accessible_project_ids:
query = query.filter(Task.project_id.in_(accessible_project_ids))
else:
# No accessible projects, return empty list
return jsonify({'success': True, 'tasks': []})
tasks = query.order_by(Task.created_at.desc()).all()
task_list = []
for task in tasks:
# Determine if this is a team task
is_team_task = (
g.user.team_id and
task.project and
task.project.team_id == g.user.team_id
)
task_data = {
'id': task.id,
'name': task.name,
'description': task.description,
'status': task.status.name,
'priority': task.priority.name,
'estimated_hours': task.estimated_hours,
'project_id': task.project_id,
'project_name': task.project.name if task.project else None,
'project_code': task.project.code if task.project else None,
'assigned_to_id': task.assigned_to_id,
'assigned_to_name': task.assigned_to.username if task.assigned_to else None,
'created_by_id': task.created_by_id,
'created_by_name': task.created_by.username if task.created_by else None,
'start_date': task.start_date.isoformat() if task.start_date else None,
'due_date': task.due_date.isoformat() if task.due_date else None,
'completed_date': task.completed_date.isoformat() if task.completed_date else None,
'created_at': task.created_at.isoformat(),
'is_team_task': is_team_task,
'subtask_count': len(task.subtasks) if task.subtasks else 0,
'sprint_id': task.sprint_id,
'sprint_name': task.sprint.name if task.sprint else None,
'is_current_sprint': task.sprint.is_current if task.sprint else False
}
task_list.append(task_data)
return jsonify({'success': True, 'tasks': task_list})
except Exception as e:
logger.error(f"Error in get_unified_tasks: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>/status', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_task_status(task_id):
"""Update task status (for kanban card movement)"""
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
data = request.get_json()
new_status = data.get('status')
if not new_status:
return jsonify({'success': False, 'message': 'Status is required'})
# Validate status value - convert from enum name to enum object
try:
task_status = TaskStatus[new_status]
except KeyError:
return jsonify({'success': False, 'message': 'Invalid status value'})
# Update task status
old_status = task.status
task.status = task_status
# Set completion date if status is COMPLETED
if task_status == TaskStatus.COMPLETED:
task.completed_date = datetime.now().date()
elif old_status == TaskStatus.COMPLETED:
# Clear completion date if moving away from completed
task.completed_date = None
db.session.commit()
return jsonify({
'success': True,
'message': 'Task status updated successfully',
'old_status': old_status.name,
'new_status': task_status.name
})
except Exception as e:
db.session.rollback()
logger.error(f"Error updating task status: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/admin/migrate-kanban-cards', methods=['POST'])
@role_required(Role.ADMIN)
@company_required
def migrate_kanban_cards():
"""Admin endpoint to migrate orphaned KanbanCards to Tasks"""
try:
from models import KanbanCard, Task, TaskStatus, TaskPriority, KanbanColumn
def map_column_to_task_status(column_name):
"""Map Kanban column name to Task status"""
column_name = column_name.upper().strip()
if any(keyword in column_name for keyword in ['TODO', 'TO DO', 'BACKLOG', 'PLANNED', 'NOT STARTED']):
return TaskStatus.NOT_STARTED
elif any(keyword in column_name for keyword in ['DOING', 'IN PROGRESS', 'ACTIVE', 'WORKING']):
return TaskStatus.IN_PROGRESS
elif any(keyword in column_name for keyword in ['HOLD', 'BLOCKED', 'WAITING', 'PAUSED']):
return TaskStatus.ON_HOLD
elif any(keyword in column_name for keyword in ['DONE', 'COMPLETE', 'FINISHED', 'CLOSED']):
return TaskStatus.COMPLETED
else:
return TaskStatus.NOT_STARTED
def get_task_priority_from_card(card):
"""Determine task priority from card properties"""
text_to_check = f"{card.title} {card.description or ''}".upper()
if any(keyword in text_to_check for keyword in ['URGENT', 'CRITICAL', 'ASAP', '!!!', 'HIGH PRIORITY']):
return TaskPriority.URGENT
elif any(keyword in text_to_check for keyword in ['HIGH', 'IMPORTANT', '!!']):
return TaskPriority.HIGH
elif any(keyword in text_to_check for keyword in ['LOW', 'MINOR', 'NICE TO HAVE']):
return TaskPriority.LOW
else:
return TaskPriority.MEDIUM
# Find orphaned KanbanCards in user's company
orphaned_cards = db.session.query(KanbanCard).join(KanbanColumn).join(KanbanBoard).filter(
KanbanBoard.company_id == g.user.company_id,
KanbanCard.task_id.is_(None)
).all()
if not orphaned_cards:
return jsonify({'success': True, 'message': 'No orphaned KanbanCards found', 'converted_count': 0})
converted_count = 0
for card in orphaned_cards:
column = KanbanColumn.query.get(card.column_id)
if not column:
continue
task_status = map_column_to_task_status(column.name)
task_priority = get_task_priority_from_card(card)
task = Task(
name=card.title,
description=card.description or '',
status=task_status,
priority=task_priority,
project_id=card.project_id,
assigned_to_id=card.assigned_to_id,
due_date=card.due_date,
completed_date=card.completed_date if task_status == TaskStatus.COMPLETED else None,
created_by_id=card.created_by_id,
created_at=card.created_at,
)
db.session.add(task)
db.session.flush()
card.task_id = task.id
converted_count += 1
db.session.commit()
return jsonify({
'success': True,
'message': f'Successfully converted {converted_count} KanbanCards to Tasks',
'converted_count': converted_count
})
except Exception as e:
db.session.rollback()
logger.error(f"Error in migrate_kanban_cards: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
# Sprint Management APIs
@app.route('/api/sprints')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_sprints():
"""Get all sprints for the user's company"""
try:
# Base query for sprints in user's company
query = Sprint.query.filter(Sprint.company_id == g.user.company_id)
# Apply access restrictions based on user role and team
if g.user.role not in [Role.ADMIN, Role.SUPERVISOR]:
# Regular users can only see sprints they have access to
accessible_sprint_ids = []
sprints = query.all()
for sprint in sprints:
if sprint.can_user_access(g.user):
accessible_sprint_ids.append(sprint.id)
if accessible_sprint_ids:
query = query.filter(Sprint.id.in_(accessible_sprint_ids))
else:
# No accessible sprints, return empty list
return jsonify({'success': True, 'sprints': []})
sprints = query.order_by(Sprint.created_at.desc()).all()
sprint_list = []
for sprint in sprints:
task_summary = sprint.get_task_summary()
sprint_data = {
'id': sprint.id,
'name': sprint.name,
'description': sprint.description,
'status': sprint.status.name,
'company_id': sprint.company_id,
'project_id': sprint.project_id,
'project_name': sprint.project.name if sprint.project else None,
'project_code': sprint.project.code if sprint.project else None,
'start_date': sprint.start_date.isoformat(),
'end_date': sprint.end_date.isoformat(),
'goal': sprint.goal,
'capacity_hours': sprint.capacity_hours,
'created_by_id': sprint.created_by_id,
'created_by_name': sprint.created_by.username if sprint.created_by else None,
'created_at': sprint.created_at.isoformat(),
'is_current': sprint.is_current,
'duration_days': sprint.duration_days,
'days_remaining': sprint.days_remaining,
'progress_percentage': sprint.progress_percentage,
'task_summary': task_summary
}
sprint_list.append(sprint_data)
return jsonify({'success': True, 'sprints': sprint_list})
except Exception as e:
logger.error(f"Error in get_sprints: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/sprints', methods=['POST'])
@role_required(Role.TEAM_LEADER) # Team leaders and above can create sprints
@company_required
def create_sprint():
"""Create a new sprint"""
try:
data = request.get_json()
# Validate required fields
name = data.get('name')
start_date = data.get('start_date')
end_date = data.get('end_date')
if not name:
return jsonify({'success': False, 'message': 'Sprint name is required'})
if not start_date:
return jsonify({'success': False, 'message': 'Start date is required'})
if not end_date:
return jsonify({'success': False, 'message': 'End date is required'})
# Parse dates
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
except ValueError:
return jsonify({'success': False, 'message': 'Invalid date format'})
if start_date >= end_date:
return jsonify({'success': False, 'message': 'End date must be after start date'})
# Verify project access if project is specified
project_id = data.get('project_id')
if project_id:
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
# Create sprint
sprint = Sprint(
name=name,
description=data.get('description', ''),
status=SprintStatus[data.get('status', 'PLANNING')],
company_id=g.user.company_id,
project_id=int(project_id) if project_id else None,
start_date=start_date,
end_date=end_date,
goal=data.get('goal'),
capacity_hours=int(data.get('capacity_hours')) if data.get('capacity_hours') else None,
created_by_id=g.user.id
)
db.session.add(sprint)
db.session.commit()
return jsonify({'success': True, 'message': 'Sprint created successfully'})
except Exception as e:
db.session.rollback()
logger.error(f"Error creating sprint: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/sprints/<int:sprint_id>', methods=['PUT'])
@role_required(Role.TEAM_LEADER)
@company_required
def update_sprint(sprint_id):
"""Update an existing sprint"""
try:
sprint = Sprint.query.filter_by(id=sprint_id, company_id=g.user.company_id).first()
if not sprint or not sprint.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Sprint not found or access denied'})
data = request.get_json()
# Update sprint fields
if 'name' in data:
sprint.name = data['name']
if 'description' in data:
sprint.description = data['description']
if 'status' in data:
sprint.status = SprintStatus[data['status']]
if 'goal' in data:
sprint.goal = data['goal']
if 'capacity_hours' in data:
sprint.capacity_hours = int(data['capacity_hours']) if data['capacity_hours'] else None
if 'project_id' in data:
project_id = data['project_id']
if project_id:
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
sprint.project_id = int(project_id)
else:
sprint.project_id = None
# Update dates if provided
if 'start_date' in data:
try:
sprint.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date()
except ValueError:
return jsonify({'success': False, 'message': 'Invalid start date format'})
if 'end_date' in data:
try:
sprint.end_date = datetime.strptime(data['end_date'], '%Y-%m-%d').date()
except ValueError:
return jsonify({'success': False, 'message': 'Invalid end date format'})
# Validate date order
if sprint.start_date >= sprint.end_date:
return jsonify({'success': False, 'message': 'End date must be after start date'})
db.session.commit()
return jsonify({'success': True, 'message': 'Sprint updated successfully'})
except Exception as e:
db.session.rollback()
logger.error(f"Error updating sprint: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/sprints/<int:sprint_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER)
@company_required
def delete_sprint(sprint_id):
"""Delete a sprint and remove it from all associated tasks"""
try:
sprint = Sprint.query.filter_by(id=sprint_id, company_id=g.user.company_id).first()
if not sprint or not sprint.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Sprint not found or access denied'})
# Remove sprint assignment from all tasks
Task.query.filter_by(sprint_id=sprint_id).update({'sprint_id': None})
# Delete the sprint
db.session.delete(sprint)
db.session.commit()
return jsonify({'success': True, 'message': 'Sprint deleted successfully'})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting sprint: {str(e)}")
return jsonify({'success': False, 'message': str(e)})
# Subtask API Routes
@app.route('/api/subtasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@@ -3952,8 +4512,8 @@ def create_subtask():
subtask = SubTask(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
status=TaskStatus[data.get('status', 'NOT_STARTED')],
priority=TaskPriority[data.get('priority', 'MEDIUM')],
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
task_id=task_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
@@ -3988,8 +4548,8 @@ def get_subtask(subtask_id):
'id': subtask.id,
'name': subtask.name,
'description': subtask.description,
'status': subtask.status.value,
'priority': subtask.priority.value,
'status': subtask.status.name,
'priority': subtask.priority.name,
'estimated_hours': subtask.estimated_hours,
'assigned_to_id': subtask.assigned_to_id,
'start_date': subtask.start_date.isoformat() if subtask.start_date else None,
@@ -4022,13 +4582,13 @@ def update_subtask(subtask_id):
if 'description' in data:
subtask.description = data['description']
if 'status' in data:
subtask.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
subtask.status = TaskStatus[data['status']]
if data['status'] == 'COMPLETED':
subtask.completed_date = datetime.now().date()
else:
subtask.completed_date = None
if 'priority' in data:
subtask.priority = TaskPriority(data['priority'])
subtask.priority = TaskPriority[data['priority']]
if 'estimated_hours' in data:
subtask.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data: