From 988f4b57bb9409e5009832ee860d8971e3eeab7b Mon Sep 17 00:00:00 2001 From: Jerry Date: Tue, 19 Nov 2024 11:46:14 -0500 Subject: [PATCH] Fix bandit github action --- .github/workflows/bandit.yml | 17 ++- app/app.py | 29 +++-- app/auth/__init__.py | 10 +- app/auth/auth_utils.py | 47 ++++---- app/auth/routes.py | 100 ++++++++-------- app/models.py | 111 +++++++++-------- app/scout/TBA.py | 12 +- app/scout/__init__.py | 4 +- app/scout/routes.py | 224 ++++++++++++++++------------------- app/scout/scouting_utils.py | 123 +++++++++---------- 10 files changed, 334 insertions(+), 343 deletions(-) diff --git a/.github/workflows/bandit.yml b/.github/workflows/bandit.yml index e784406..e55fd3f 100644 --- a/.github/workflows/bandit.yml +++ b/.github/workflows/bandit.yml @@ -6,24 +6,23 @@ jobs: bandit: runs-on: ubuntu-latest steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up Python 3.9 - uses: actions/setup-python@v5 - with: - python-version: 3.9 + uses: actions/setup-python@v5 + with: + python-version: 3.9 + - name: Install Bandit shell: bash run: pip install bandit[sarif] - - name: Checkout repository - uses: actions/checkout@v4 - - name: Scan shell: bash - run: bandit -c bandit.yml -r -f sarif -o resulat.sarif . + run: bandit -c bandit.yml -r -f sarif -o results.sarif . - name: Upload SARIF file uses: github/codeql-action/upload-sarif@v3 with: sarif_file: results.sarif - - \ No newline at end of file diff --git a/app/app.py b/app/app.py index 0ff010b..f4be39d 100644 --- a/app/app.py +++ b/app/app.py @@ -13,29 +13,28 @@ def create_app(): - app = Flask(__name__, static_folder='static', template_folder='templates') + app = Flask(__name__, static_folder="static", template_folder="templates") # Load config load_dotenv() app.config.update( - SECRET_KEY=os.getenv('SECRET_KEY', 'team334'), + SECRET_KEY=os.getenv("SECRET_KEY", "team334"), SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, - MONGO_URI=os.getenv( - 'MONGO_URI', 'mongodb://localhost:27017/scouting_app') + MONGO_URI=os.getenv("MONGO_URI", "mongodb://localhost:27017/scouting_app"), ) mongo.init_app(app) with app.app_context(): - if 'team_data' not in mongo.db.list_collection_names(): - mongo.db.create_collection('team_data') - if 'users' not in mongo.db.list_collection_names(): - mongo.db.create_collection('users') + if "team_data" not in mongo.db.list_collection_names(): + mongo.db.create_collection("team_data") + if "users" not in mongo.db.list_collection_names(): + mongo.db.create_collection("users") login_manager.init_app(app) - login_manager.login_view = 'auth.login' - login_manager.login_message_category = 'error' + login_manager.login_view = "auth.login" + login_manager.login_message_category = "error" try: user_manager = UserManager(app.config["MONGO_URI"]) @@ -57,16 +56,16 @@ def load_user(user_id): from auth.routes import auth_bp from scout.routes import scouting_bp - app.register_blueprint(auth_bp, url_prefix='/auth') - app.register_blueprint(scouting_bp, url_prefix='/') + app.register_blueprint(auth_bp, url_prefix="/auth") + app.register_blueprint(scouting_bp, url_prefix="/") - @app.route('/') + @app.route("/") def index(): - return render_template('index.html') + return render_template("index.html") return app -if __name__ == '__main__': +if __name__ == "__main__": app = create_app() app.run() diff --git a/app/auth/__init__.py b/app/auth/__init__.py index 5e37a57..e6b7574 100644 --- a/app/auth/__init__.py +++ b/app/auth/__init__.py @@ -3,9 +3,9 @@ all = [ - 'check_password_strength', - 'require_admin', - 'UserManager', - 'init_auth_routes', - 'auth_bp', + "check_password_strength", + "require_admin", + "UserManager", + "init_auth_routes", + "auth_bp", ] diff --git a/app/auth/auth_utils.py b/app/auth/auth_utils.py index b1c21f2..86e3843 100644 --- a/app/auth/auth_utils.py +++ b/app/auth/auth_utils.py @@ -23,13 +23,15 @@ async def wrapper(*args, **kwargs): last_error = e if attempt < retries - 1: # don't sleep on last attempt logger.warning( - f"Attempt {attempt + 1} failed: {str(e)}. Retrying...") + f"Attempt {attempt + 1} failed: {str(e)}. Retrying..." + ) time.sleep(delay) else: - logger.error( - f"All {retries} attempts failed: {str(e)}") + logger.error(f"All {retries} attempts failed: {str(e)}") raise last_error + return wrapper + return decorator @@ -54,16 +56,15 @@ def connect(self): """Establish connection to MongoDB with basic error handling""" try: if self.client is None: - self.client = MongoClient( - self.mongo_uri, serverSelectionTimeoutMS=5000) + self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000) # Test the connection self.client.server_info() self.db = self.client.get_default_database() logger.info("Successfully connected to MongoDB") # Ensure users collection exists - if 'users' not in self.db.list_collection_names(): - self.db.create_collection('users') + if "users" not in self.db.list_collection_names(): + self.db.create_collection("users") logger.info("Created users collection") except Exception as e: logger.error(f"Failed to connect to MongoDB: {str(e)}") @@ -78,21 +79,20 @@ def ensure_connected(self): # Test if connection is still alive self.client.server_info() except Exception: - logger.warning( - "Lost connection to MongoDB, attempting to reconnect...") + logger.warning("Lost connection to MongoDB, attempting to reconnect...") self.connect() @with_mongodb_retry(retries=3, delay=2) - async def create_user(self, email, username, password, team_number, role='user'): + async def create_user(self, email, username, password, team_number, role="user"): """Create a new user with retry mechanism""" self.ensure_connected() try: # Check for existing email - if self.db.users.find_one({'email': email}): + if self.db.users.find_one({"email": email}): return False, "Email already registered" # Check for existing username - if self.db.users.find_one({'username': username}): + if self.db.users.find_one({"username": username}): return False, "Username already taken" # Check password strength @@ -102,13 +102,13 @@ async def create_user(self, email, username, password, team_number, role='user') # Create user document user_data = { - 'email': email, - 'username': username, - 'team_number': int(team_number), - 'password_hash': generate_password_hash(password), - 'role': role, - 'created_at': datetime.now(timezone.utc), - 'last_login': None, + "email": email, + "username": username, + "team_number": int(team_number), + "password_hash": generate_password_hash(password), + "role": role, + "created_at": datetime.now(timezone.utc), + "last_login": None, } result = self.db.users.insert_one(user_data) @@ -125,14 +125,14 @@ async def authenticate_user(self, login, password): self.ensure_connected() try: if user_data := self.db.users.find_one( - {'$or': [{'email': login}, {'username': login}]} + {"$or": [{"email": login}, {"username": login}]} ): user = User.create_from_db(user_data) if user and user.check_password(password): # Update last login self.db.users.update_one( - {'_id': user._id}, - {'$set': {'last_login': datetime.now(timezone.utc)}}, + {"_id": user._id}, + {"$set": {"last_login": datetime.now(timezone.utc)}}, ) logger.info(f"Successful login: {login}") return True, user @@ -147,7 +147,8 @@ def get_user_by_id(self, user_id): self.ensure_connected() try: from bson.objectid import ObjectId - user_data = self.db.users.find_one({'_id': ObjectId(user_id)}) + + user_data = self.db.users.find_one({"_id": ObjectId(user_id)}) return User.create_from_db(user_data) if user_data else None except Exception as e: logger.error(f"Error loading user: {str(e)}") diff --git a/app/auth/routes.py b/app/auth/routes.py index a0fac13..c6919f1 100644 --- a/app/auth/routes.py +++ b/app/auth/routes.py @@ -1,4 +1,12 @@ -from flask import Blueprint, current_app, render_template, redirect, url_for, request, flash +from flask import ( + Blueprint, + current_app, + render_template, + redirect, + url_for, + request, + flash, +) from flask_login import login_required, login_user, current_user, logout_user from auth.auth_utils import UserManager import asyncio @@ -18,10 +26,11 @@ def async_route(f): @wraps(f) def wrapper(*args, **kwargs): return run_async(f(*args, **kwargs)) + return wrapper -auth_bp = Blueprint('auth', __name__) +auth_bp = Blueprint("auth", __name__) user_manager = None @@ -31,87 +40,82 @@ def on_blueprint_init(state): user_manager = UserManager(state.app.config["MONGO_URI"]) -@auth_bp.route('/login', methods=['GET', 'POST']) +@auth_bp.route("/login", methods=["GET", "POST"]) @async_route async def login(): if current_user.is_authenticated: - return redirect(url_for('index')) + return redirect(url_for("index")) form_data = {} - if request.method == 'POST': - login = request.form.get('login', '').strip() - password = request.form.get('password', '').strip() - remember = bool(request.form.get('remember', False)) + if request.method == "POST": + login = request.form.get("login", "").strip() + password = request.form.get("password", "").strip() + remember = bool(request.form.get("remember", False)) - form_data = { - 'login': login, - 'remember': remember - } + form_data = {"login": login, "remember": remember} if not login or not password: - flash('Please provide both login and password', 'error') - return render_template('auth/login.html', form_data=form_data) + flash("Please provide both login and password", "error") + return render_template("auth/login.html", form_data=form_data) try: success, user = await user_manager.authenticate_user(login, password) if success and user: login_user(user, remember=remember) - next_page = request.args.get('next') - if not next_page or not next_page.startswith('/'): - next_page = url_for('index') - flash('Successfully logged in', 'success') + next_page = request.args.get("next") + if not next_page or not next_page.startswith("/"): + next_page = url_for("index") + flash("Successfully logged in", "success") return redirect(next_page) else: - flash('Invalid login credentials', 'error') + flash("Invalid login credentials", "error") except Exception as e: - flash(f'An error occurred during login: {str(e)}', 'error') + flash(f"An error occurred during login: {str(e)}", "error") - return render_template('auth/login.html', form_data=form_data) + return render_template("auth/login.html", form_data=form_data) -@auth_bp.route('/register', methods=['GET', 'POST']) +@auth_bp.route("/register", methods=["GET", "POST"]) @async_route async def register(): if current_user.is_authenticated: - return redirect(url_for('index')) + return redirect(url_for("index")) form_data = {} - if request.method == 'POST': - email = request.form.get('email', '').strip().lower() - username = request.form.get('username', '').strip() - password = request.form.get('password', '').strip() - confirm_password = request.form.get('confirm_password', '').strip() - team_number = request.form.get('teamNumber', 0) - - form_data = { - 'email': email, - 'username': username, - 'team_number': team_number - } + if request.method == "POST": + email = request.form.get("email", "").strip().lower() + username = request.form.get("username", "").strip() + password = request.form.get("password", "").strip() + confirm_password = request.form.get("confirm_password", "").strip() + team_number = request.form.get("teamNumber", 0) + + form_data = {"email": email, "username": username, "team_number": team_number} if not all([email, username, password, confirm_password]): - flash('All fields are required', 'error') - return render_template('auth/register.html', form_data=form_data) + flash("All fields are required", "error") + return render_template("auth/register.html", form_data=form_data) if password != confirm_password: - flash('Passwords do not match', 'error') - return render_template('auth/register.html', form_data=form_data) + flash("Passwords do not match", "error") + return render_template("auth/register.html", form_data=form_data) try: - success, message = await user_manager.create_user(email, username, password, team_number) + success, message = await user_manager.create_user( + email, username, password, team_number + ) if success: - flash('Registration successful! Please login.', 'success') - return redirect(url_for('auth.login')) - flash(message, 'error') + flash("Registration successful! Please login.", "success") + return redirect(url_for("auth.login")) + flash(message, "error") except Exception as e: - flash(f'An error occurred during registration: {str(e)}', 'error') + flash(f"An error occurred during registration: {str(e)}", "error") - return render_template('auth/register.html', form_data=form_data) + return render_template("auth/register.html", form_data=form_data) -@auth_bp.route('/logout') +@auth_bp.route("/logout") @login_required def logout(): logout_user() - flash('Successfully logged out', 'success') - return redirect(url_for('auth.login')) + flash("Successfully logged out", "success") + return redirect(url_for("auth.login")) diff --git a/app/models.py b/app/models.py index 2f16794..69283e7 100644 --- a/app/models.py +++ b/app/models.py @@ -5,21 +5,20 @@ class User(UserMixin): def __init__(self, user_data): - self._id = user_data.get('_id') - self.email = user_data.get('email') - self.username = user_data.get('username') - self.teamNumber = user_data.get('teamNumber') - self.password_hash = user_data.get('password_hash') - self.role = user_data.get('role', 'user') - self.last_login = user_data.get('last_login') - self.created_at = user_data.get('created_at') + self._id = user_data.get("_id") + self.email = user_data.get("email") + self.username = user_data.get("username") + self.teamNumber = user_data.get("teamNumber") + self.password_hash = user_data.get("password_hash") + self.role = user_data.get("role", "user") + self.last_login = user_data.get("last_login") + self.created_at = user_data.get("created_at") def get_id(self): try: return str(self._id) except AttributeError as e: - raise NotImplementedError( - 'No `_id` attribute - override `get_id`') from e + raise NotImplementedError("No `_id` attribute - override `get_id`") from e def is_authenticated(self): return True @@ -39,44 +38,44 @@ def create_from_db(user_data): if not user_data: return None # Ensure _id is ObjectId - if '_id' in user_data and not isinstance(user_data['_id'], ObjectId): - user_data['_id'] = ObjectId(user_data['_id']) + if "_id" in user_data and not isinstance(user_data["_id"], ObjectId): + user_data["_id"] = ObjectId(user_data["_id"]) return User(user_data) def to_dict(self): return { - '_id': self._id, - 'email': self.email, - 'username': self.username, - 'teamNumber': self.teamNumber, - 'password_hash': self.password_hash, - 'role': self.role, - 'last_login': self.last_login, - 'created_at': self.created_at + "_id": self._id, + "email": self.email, + "username": self.username, + "teamNumber": self.teamNumber, + "password_hash": self.password_hash, + "role": self.role, + "last_login": self.last_login, + "created_at": self.created_at, } class TeamData: def __init__(self, data): - self._id = data.get('_id') - self.team_number = data.get('team_number') - self.event_code = data.get('event_code') - self.match_number = data.get('match_number') - self.auto_points = data.get('auto_points') - self.teleop_points = data.get('teleop_points') - self.endgame_points = data.get('endgame_points') - self.total_points = data.get('total_points') - self.notes = data.get('notes', '') - self.scouter_id = data.get('scouter_id') - self.created_at = data.get('created_at') + self._id = data.get("_id") + self.team_number = data.get("team_number") + self.event_code = data.get("event_code") + self.match_number = data.get("match_number") + self.auto_points = data.get("auto_points") + self.teleop_points = data.get("teleop_points") + self.endgame_points = data.get("endgame_points") + self.total_points = data.get("total_points") + self.notes = data.get("notes", "") + self.scouter_id = data.get("scouter_id") + self.created_at = data.get("created_at") # Handle the nested scouter data - scouter_data = data.get('scouter', {}) + scouter_data = data.get("scouter", {}) self.scouter = { - 'username': scouter_data.get('username', 'Unknown'), - 'team_number': scouter_data.get('team_number'), - 'email': scouter_data.get('email'), - 'role': scouter_data.get('role', 'user') + "username": scouter_data.get("username", "Unknown"), + "team_number": scouter_data.get("team_number"), + "email": scouter_data.get("email"), + "role": scouter_data.get("role", "user"), } @property @@ -89,41 +88,41 @@ def create_from_db(data): return None # Ensure _id is ObjectId - if '_id' in data and not isinstance(data['_id'], ObjectId): - data['_id'] = ObjectId(data['_id']) + if "_id" in data and not isinstance(data["_id"], ObjectId): + data["_id"] = ObjectId(data["_id"]) # Ensure scouter_id is ObjectId - if 'scouter_id' in data and not isinstance(data['scouter_id'], ObjectId): - data['scouter_id'] = ObjectId(data['scouter_id']) + if "scouter_id" in data and not isinstance(data["scouter_id"], ObjectId): + data["scouter_id"] = ObjectId(data["scouter_id"]) return TeamData(data) def to_dict(self): return { - 'id': self.id, - 'team_number': self.team_number, - 'event_code': self.event_code, - 'match_number': self.match_number, - 'auto_points': self.auto_points, - 'teleop_points': self.teleop_points, - 'endgame_points': self.endgame_points, - 'total_points': self.total_points, - 'notes': self.notes, - 'scouter_id': str(self.scouter_id), - 'created_at': self.created_at, - 'scouter': self.scouter + "id": self.id, + "team_number": self.team_number, + "event_code": self.event_code, + "match_number": self.match_number, + "auto_points": self.auto_points, + "teleop_points": self.teleop_points, + "endgame_points": self.endgame_points, + "total_points": self.total_points, + "notes": self.notes, + "scouter_id": str(self.scouter_id), + "created_at": self.created_at, + "scouter": self.scouter, } @property def scouter_name(self): """Returns formatted scouter name with team number if available""" - username = self.scouter.get('username', 'Unknown') - team_number = self.scouter.get('team_number') + username = self.scouter.get("username", "Unknown") + team_number = self.scouter.get("team_number") return f"{username} ({team_number})" @property def formatted_date(self): """Returns formatted creation date""" if self.created_at: - return self.created_at.strftime('%Y-%m-%d %H:%M:%S') - return 'N/A' + return self.created_at.strftime("%Y-%m-%d %H:%M:%S") + return "N/A" diff --git a/app/scout/TBA.py b/app/scout/TBA.py index 2b75bf6..ab77fb6 100644 --- a/app/scout/TBA.py +++ b/app/scout/TBA.py @@ -10,7 +10,10 @@ class TBAInterface: """Async interface for The Blue Alliance API""" - def __init__(self, auth_key: str = "uTHeEfPigDp9huQCpLNkWK7FBQIb01Qrzvt4MAjh9z2WQDkrsvNE77ch6bOPvPb6"): + def __init__( + self, + auth_key: str = "uTHeEfPigDp9huQCpLNkWK7FBQIb01Qrzvt4MAjh9z2WQDkrsvNE77ch6bOPvPb6", + ): self.auth_key = auth_key self.base_url = "https://www.thebluealliance.com/api/v3" self.headers = {"X-TBA-Auth-Key": self.auth_key} @@ -82,7 +85,9 @@ async def get_schedule(self, event_code: str) -> Optional[List[Dict]]: except aiohttp.ClientError as e: raise e - async def get_event_data(self, event_code: str) -> tuple[Optional[List[Dict]], Optional[List[Dict]]]: + async def get_event_data( + self, event_code: str + ) -> tuple[Optional[List[Dict]], Optional[List[Dict]]]: """ Get both teams and schedule data for an event concurrently @@ -93,8 +98,7 @@ async def get_event_data(self, event_code: str) -> tuple[Optional[List[Dict]], O tuple[Optional[List[Dict]], Optional[List[Dict]]]: Tuple of (teams, schedule) data """ async with self: - teams_task = asyncio.create_task( - self.get_teams_at_event(event_code)) + teams_task = asyncio.create_task(self.get_teams_at_event(event_code)) schedule_task = asyncio.create_task(self.get_schedule(event_code)) teams, schedule = await asyncio.gather(teams_task, schedule_task) diff --git a/app/scout/__init__.py b/app/scout/__init__.py index cb49c19..0fa35eb 100644 --- a/app/scout/__init__.py +++ b/app/scout/__init__.py @@ -2,6 +2,6 @@ from .TBA import * __all__ = [ - 'scouting_bp', - 'TBA', + "scouting_bp", + "TBA", ] diff --git a/app/scout/routes.py b/app/scout/routes.py index 449f7a0..f8642af 100644 --- a/app/scout/routes.py +++ b/app/scout/routes.py @@ -7,7 +7,7 @@ from scout.scouting_utils import ScoutingManager from .TBA import TBAInterface -scouting_bp = Blueprint('scouting', __name__) +scouting_bp = Blueprint("scouting", __name__) scouting_manager = None @@ -22,95 +22,99 @@ def async_route(f): @wraps(f) def wrapper(*args, **kwargs): return asyncio.run(f(*args, **kwargs)) + return wrapper -@scouting_bp.route('/scouting/add', methods=['GET', 'POST']) +@scouting_bp.route("/scouting/add", methods=["GET", "POST"]) @login_required def add_scouting_data(): - if request.method == 'POST': + if request.method == "POST": try: success, message = scouting_manager.add_scouting_data( - request.form, - current_user.get_id() + request.form, current_user.get_id() ) if success: - flash('Data added successfully', 'success') - return redirect(url_for('scouting.list_scouting_data')) + flash("Data added successfully", "success") + return redirect(url_for("scouting.list_scouting_data")) else: - flash(f'Error adding data: {message}', 'error') - return redirect(url_for('scouting.add_scouting_data')) + flash(f"Error adding data: {message}", "error") + return redirect(url_for("scouting.add_scouting_data")) except Exception as e: - flash(f'Error: {str(e)}', 'error') - return redirect(url_for('scouting.add_scouting_data')) + flash(f"Error: {str(e)}", "error") + return redirect(url_for("scouting.add_scouting_data")) - return render_template('scouting/add.html') + return render_template("scouting/add.html") -@scouting_bp.route('/scouting/list') -@scouting_bp.route('/scouting') +@scouting_bp.route("/scouting/list") +@scouting_bp.route("/scouting") @login_required def list_scouting_data(): try: team_data = scouting_manager.get_all_scouting_data() - return render_template('scouting/list.html', team_data=team_data) + return render_template("scouting/list.html", team_data=team_data) except Exception as e: - flash(f'Error fetching data: {str(e)}', 'error') - return render_template('scouting/list.html', team_data=[]) + flash(f"Error fetching data: {str(e)}", "error") + return render_template("scouting/list.html", team_data=[]) -@scouting_bp.route('/scouting/edit/', methods=['GET', 'POST']) +@scouting_bp.route("/scouting/edit/", methods=["GET", "POST"]) @login_required def edit_scouting_data(id): try: team_data = scouting_manager.get_team_data(id, current_user.get_id()) if not team_data: - flash('Team data not found or you do not have permission to edit it', 'error') - return redirect(url_for('scouting.list_scouting_data')) + flash( + "Team data not found or you do not have permission to edit it", "error" + ) + return redirect(url_for("scouting.list_scouting_data")) if not team_data.is_owner: - flash('You do not have permission to edit this entry', 'error') - return redirect(url_for('scouting.list_scouting_data')) - - if request.method == 'POST': - if scouting_manager.update_team_data(id, request.form, current_user.get_id()): - flash('Data updated successfully', 'success') - return redirect(url_for('scouting.list_scouting_data')) + flash("You do not have permission to edit this entry", "error") + return redirect(url_for("scouting.list_scouting_data")) + + if request.method == "POST": + if scouting_manager.update_team_data( + id, request.form, current_user.get_id() + ): + flash("Data updated successfully", "success") + return redirect(url_for("scouting.list_scouting_data")) else: - flash('Error updating data', 'error') + flash("Error updating data", "error") - return render_template('scouting/edit.html', team_data=team_data) + return render_template("scouting/edit.html", team_data=team_data) except Exception as e: - flash(f'Error: {str(e)}', 'error') - return redirect(url_for('scouting.list_scouting_data')) + flash(f"Error: {str(e)}", "error") + return redirect(url_for("scouting.list_scouting_data")) -@scouting_bp.route('/scouting/delete/') +@scouting_bp.route("/scouting/delete/") @login_required def delete_scouting_data(id): try: if scouting_manager.delete_team_data(id, current_user.get_id()): - flash('Record deleted successfully', 'success') + flash("Record deleted successfully", "success") else: - flash('Error deleting record or permission denied', 'error') + flash("Error deleting record or permission denied", "error") except Exception as e: - flash(f'Error: {str(e)}', 'error') - return redirect(url_for('scouting.list_scouting_data')) + flash(f"Error: {str(e)}", "error") + return redirect(url_for("scouting.list_scouting_data")) -@scouting_bp.route('/compare') +@scouting_bp.route("/compare") @login_required def compare_page(): - return render_template('compare.html') + return render_template("compare.html") -@scouting_bp.route('/api/compare') +@scouting_bp.route("/api/compare") @login_required @async_route async def compare_teams(): - team1 = request.args.get('team1', '').strip() - team2 = request.args.get('team2', '').strip() + team1 = request.args.get("team1", "").strip() + team2 = request.args.get("team2", "").strip() if not team1 or not team2: return jsonify({"error": "Both team numbers are required"}), 400 @@ -132,56 +136,37 @@ async def compare_teams(): # Fetch all scouting data for this team from MongoDB pipeline = [ + {"$match": {"team_number": int(team_num)}}, { - '$match': {'team_number': int(team_num)} - }, - { - '$lookup': { - 'from': 'users', - 'localField': 'scouter_id', - 'foreignField': '_id', - 'as': 'scouter' + "$lookup": { + "from": "users", + "localField": "scouter_id", + "foreignField": "_id", + "as": "scouter", } }, - { - '$unwind': '$scouter' - } + {"$unwind": "$scouter"}, ] - team_scouting_data = list( - scouting_manager.db.team_data.aggregate(pipeline)) + team_scouting_data = list(scouting_manager.db.team_data.aggregate(pipeline)) # Calculate statistics - auto_points = [entry['auto_points'] - for entry in team_scouting_data] - teleop_points = [entry['teleop_points'] - for entry in team_scouting_data] - endgame_points = [entry['endgame_points'] - for entry in team_scouting_data] - total_points = [entry['total_points'] - for entry in team_scouting_data] + auto_points = [entry["auto_points"] for entry in team_scouting_data] + teleop_points = [entry["teleop_points"] for entry in team_scouting_data] + endgame_points = [entry["endgame_points"] for entry in team_scouting_data] + total_points = [entry["total_points"] for entry in team_scouting_data] stats = { "matches_played": len(team_scouting_data), - "avg_auto": ( - sum(auto_points) / len(auto_points) - if auto_points - else 0 - ), + "avg_auto": (sum(auto_points) / len(auto_points) if auto_points else 0), "avg_teleop": ( - sum(teleop_points) / len(teleop_points) - if teleop_points - else 0 + sum(teleop_points) / len(teleop_points) if teleop_points else 0 ), "avg_endgame": ( - sum(endgame_points) / len(endgame_points) - if endgame_points - else 0 + sum(endgame_points) / len(endgame_points) if endgame_points else 0 ), "avg_total": ( - sum(total_points) / len(total_points) - if total_points - else 0 + sum(total_points) / len(total_points) if total_points else 0 ), "max_total": max(total_points, default=0), "min_total": min(total_points, default=0), @@ -189,14 +174,14 @@ async def compare_teams(): scouting_entries = [ { - "event_code": entry['event_code'], - "match_number": entry['match_number'], - "auto_points": entry['auto_points'], - "teleop_points": entry['teleop_points'], - "endgame_points": entry['endgame_points'], - "total_points": entry['total_points'], - "notes": entry['notes'], - "scouter": entry['scouter']['username'], + "event_code": entry["event_code"], + "match_number": entry["match_number"], + "auto_points": entry["auto_points"], + "teleop_points": entry["teleop_points"], + "endgame_points": entry["endgame_points"], + "total_points": entry["total_points"], + "notes": entry["notes"], + "scouter": entry["scouter"]["username"], } for entry in team_scouting_data ] @@ -209,7 +194,7 @@ async def compare_teams(): "state_prov": team.get("state_prov"), "country": team.get("country"), "stats": stats, - "scouting_data": scouting_entries + "scouting_data": scouting_entries, } return jsonify(teams_data) @@ -219,17 +204,17 @@ async def compare_teams(): return jsonify({"error": "Failed to fetch team data"}), 500 -@scouting_bp.route('/search') +@scouting_bp.route("/search") @login_required def search_page(): - return render_template('search.html') + return render_template("search.html") -@scouting_bp.route('/api/search') +@scouting_bp.route("/api/search") @login_required @async_route async def search_teams(): - query = request.args.get('q', '').strip() + query = request.args.get("q", "").strip() if not query: return jsonify([]) @@ -246,49 +231,48 @@ async def search_teams(): # Fetch all scouting data for this team pipeline = [ + {"$match": {"team_number": int(query)}}, { - '$match': {'team_number': int(query)} - }, - { - '$lookup': { - 'from': 'users', - 'localField': 'scouter_id', - 'foreignField': '_id', - 'as': 'scouter' + "$lookup": { + "from": "users", + "localField": "scouter_id", + "foreignField": "_id", + "as": "scouter", } }, - { - '$unwind': '$scouter' - } + {"$unwind": "$scouter"}, ] - team_scouting_data = list( - scouting_manager.db.team_data.aggregate(pipeline)) + team_scouting_data = list(scouting_manager.db.team_data.aggregate(pipeline)) scouting_entries = [ { - "id": str(entry['_id']), - "event_code": entry['event_code'], - "match_number": entry['match_number'], - "auto_points": entry['auto_points'], - "teleop_points": entry['teleop_points'], - "endgame_points": entry['endgame_points'], - "total_points": entry['total_points'], - "notes": entry['notes'], - "scouter": entry['scouter']['username'], + "id": str(entry["_id"]), + "event_code": entry["event_code"], + "match_number": entry["match_number"], + "auto_points": entry["auto_points"], + "teleop_points": entry["teleop_points"], + "endgame_points": entry["endgame_points"], + "total_points": entry["total_points"], + "notes": entry["notes"], + "scouter": entry["scouter"]["username"], } for entry in team_scouting_data ] - return jsonify([{ - "team_number": team["team_number"], - "nickname": team["nickname"], - "school_name": team.get("school_name"), - "city": team.get("city"), - "state_prov": team.get("state_prov"), - "country": team.get("country"), - "scouting_data": scouting_entries - }]) + return jsonify( + [ + { + "team_number": team["team_number"], + "nickname": team["nickname"], + "school_name": team.get("school_name"), + "city": team.get("city"), + "state_prov": team.get("state_prov"), + "country": team.get("country"), + "scouting_data": scouting_entries, + } + ] + ) except Exception as e: print(f"Error searching teams: {e}") diff --git a/app/scout/scouting_utils.py b/app/scout/scouting_utils.py index 82a3013..44afe88 100644 --- a/app/scout/scouting_utils.py +++ b/app/scout/scouting_utils.py @@ -22,13 +22,15 @@ def wrapper(*args, **kwargs): last_error = e if attempt < retries - 1: logger.warning( - f"Attempt {attempt + 1} failed: {str(e)}. Retrying...") + f"Attempt {attempt + 1} failed: {str(e)}. Retrying..." + ) time.sleep(delay) else: - logger.error( - f"All {retries} attempts failed: {str(e)}") + logger.error(f"All {retries} attempts failed: {str(e)}") raise last_error + return wrapper + return decorator @@ -43,19 +45,18 @@ def connect(self): """Establish connection to MongoDB with basic error handling""" try: if self.client is None: - self.client = MongoClient( - self.mongo_uri, serverSelectionTimeoutMS=5000) + self.client = MongoClient(self.mongo_uri, serverSelectionTimeoutMS=5000) # Test the connection self.client.server_info() self.db = self.client.get_default_database() logger.info("Successfully connected to MongoDB") # Ensure team_data collection exists - if 'team_data' not in self.db.list_collection_names(): - self.db.create_collection('team_data') + if "team_data" not in self.db.list_collection_names(): + self.db.create_collection("team_data") # Create indexes - self.db.team_data.create_index([('team_number', 1)]) - self.db.team_data.create_index([('scouter_id', 1)]) + self.db.team_data.create_index([("team_number", 1)]) + self.db.team_data.create_index([("scouter_id", 1)]) logger.info("Created team_data collection and indexes") except Exception as e: logger.error(f"Failed to connect to MongoDB: {str(e)}") @@ -70,8 +71,7 @@ def ensure_connected(self): # Test if connection is still alive self.client.server_info() except Exception: - logger.warning( - "Lost connection to MongoDB, attempting to reconnect...") + logger.warning("Lost connection to MongoDB, attempting to reconnect...") self.connect() @with_mongodb_retry(retries=3, delay=2) @@ -80,23 +80,27 @@ def add_scouting_data(self, data, scouter_id): self.ensure_connected() try: team_data = { - 'team_number': int(data['team_number']), - 'event_code': data['event_code'], - 'match_number': int(data['match_number']), - 'auto_points': int(data['auto_points']), - 'teleop_points': int(data['teleop_points']), - 'endgame_points': int(data['endgame_points']), - 'total_points': (int(data['auto_points']) + - int(data['teleop_points']) + - int(data['endgame_points'])), - 'notes': data['notes'], - 'scouter_id': ObjectId(scouter_id), - 'created_at': datetime.now(timezone.utc), + "team_number": int(data["team_number"]), + "event_code": data["event_code"], + "match_number": int(data["match_number"]), + "auto_points": int(data["auto_points"]), + "teleop_points": int(data["teleop_points"]), + "endgame_points": int(data["endgame_points"]), + "total_points": ( + int(data["auto_points"]) + + int(data["teleop_points"]) + + int(data["endgame_points"]) + ), + "notes": data["notes"], + "scouter_id": ObjectId(scouter_id), + "created_at": datetime.now(timezone.utc), } result = self.db.team_data.insert_one(team_data) - logger.info(f"Added new scouting data for team { - data['team_number']}") + logger.info( + f"Added new scouting data for team { + data['team_number']}" + ) return True, "Data added successfully" except Exception as e: logger.error(f"Error adding scouting data: {str(e)}") @@ -109,19 +113,14 @@ def get_all_scouting_data(self): try: pipeline = [ { - '$lookup': { - 'from': 'users', - 'localField': 'scouter_id', - 'foreignField': '_id', - 'as': 'scouter' + "$lookup": { + "from": "users", + "localField": "scouter_id", + "foreignField": "_id", + "as": "scouter", } }, - { - '$unwind': { - 'path': '$scouter', - 'preserveNullAndEmptyArrays': True - } - } + {"$unwind": {"path": "$scouter", "preserveNullAndEmptyArrays": True}}, ] team_data = list(self.db.team_data.aggregate(pipeline)) @@ -135,17 +134,18 @@ def get_team_data(self, team_id, scouter_id=None): """Get specific team data with optional scouter verification""" self.ensure_connected() try: - query = {'_id': ObjectId(team_id)} + query = {"_id": ObjectId(team_id)} if scouter_id: # If scouter_id provided, verify ownership - query['scouter_id'] = ObjectId(scouter_id) + query["scouter_id"] = ObjectId(scouter_id) data = self.db.team_data.find_one(query) if not data: return None # Add an is_owner field to the response - data['is_owner'] = str(data['scouter_id']) == str( - scouter_id) if scouter_id else False + data["is_owner"] = ( + str(data["scouter_id"]) == str(scouter_id) if scouter_id else False + ) return TeamData.create_from_db(data) except Exception as e: logger.error(f"Error fetching team data: {str(e)}") @@ -157,32 +157,34 @@ def update_team_data(self, team_id, data, scouter_id): self.ensure_connected() try: # First verify ownership - existing_data = self.db.team_data.find_one({ - '_id': ObjectId(team_id), - 'scouter_id': ObjectId(scouter_id) - }) + existing_data = self.db.team_data.find_one( + {"_id": ObjectId(team_id), "scouter_id": ObjectId(scouter_id)} + ) if not existing_data: logger.warning( - f"Update attempted by non-owner scouter_id: {scouter_id}") + f"Update attempted by non-owner scouter_id: {scouter_id}" + ) return False updated_data = { - 'team_number': int(data['team_number']), - 'event_code': data['event_code'], - 'match_number': int(data['match_number']), - 'auto_points': int(data['auto_points']), - 'teleop_points': int(data['teleop_points']), - 'endgame_points': int(data['endgame_points']), - 'total_points': (int(data['auto_points']) + - int(data['teleop_points']) + - int(data['endgame_points'])), - 'notes': data['notes'] + "team_number": int(data["team_number"]), + "event_code": data["event_code"], + "match_number": int(data["match_number"]), + "auto_points": int(data["auto_points"]), + "teleop_points": int(data["teleop_points"]), + "endgame_points": int(data["endgame_points"]), + "total_points": ( + int(data["auto_points"]) + + int(data["teleop_points"]) + + int(data["endgame_points"]) + ), + "notes": data["notes"], } result = self.db.team_data.update_one( - {'_id': ObjectId(team_id), 'scouter_id': ObjectId(scouter_id)}, - {'$set': updated_data} + {"_id": ObjectId(team_id), "scouter_id": ObjectId(scouter_id)}, + {"$set": updated_data}, ) return result.modified_count > 0 except Exception as e: @@ -194,10 +196,9 @@ def delete_team_data(self, team_id, scouter_id): """Delete team data if scouter has permission""" self.ensure_connected() try: - result = self.db.team_data.delete_one({ - '_id': ObjectId(team_id), - 'scouter_id': ObjectId(scouter_id) - }) + result = self.db.team_data.delete_one( + {"_id": ObjectId(team_id), "scouter_id": ObjectId(scouter_id)} + ) return result.deleted_count > 0 except Exception as e: logger.error(f"Error deleting team data: {str(e)}")