-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathapp_utils.py
More file actions
133 lines (110 loc) · 4.82 KB
/
Copy pathapp_utils.py
File metadata and controls
133 lines (110 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (c) 2025 Ciberfobia
#
# Este programa es software libre: puede redistribuirlo y/o modificarlo
# bajo los términos de la Licencia Pública General de GNU (GPL), publicada
# por la Free Software Foundation, en su versión 2 o (a su elección) cualquier
# versión posterior.
#
# Este programa se distribuye con la esperanza de que sea útil, pero
# SIN NINGUNA GARANTÍA; ni siquiera la garantía implícita de
# COMERCIABILIDAD o IDONEIDAD PARA UN PROPÓSITO PARTICULAR.
#
# Para más detalles, consulte la Licencia Pública General de GNU.
# Debería haber recibido una copia de la misma junto con este programa;
# en caso contrario, visite: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
from flask import request, jsonify, current_app
from functools import wraps
import jsonschema
import os
import json
import time
from config import LOCAL_STORAGE_PATH
def validate_payload(schema):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.json:
return jsonify({"message": "Missing JSON in request"}), 400
try:
jsonschema.validate(instance=request.json, schema=schema)
except jsonschema.exceptions.ValidationError as validation_error:
return jsonify({"message": f"Invalid payload: {validation_error.message}"}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def log_job_status(job_id, data):
"""
Log job status to a file in the STORAGE_PATH/jobs folder
Args:
job_id (str): The unique job ID
data (dict): Data to write to the log file
"""
jobs_dir = os.path.join(LOCAL_STORAGE_PATH, 'jobs')
# Create jobs directory if it doesn't exist
if not os.path.exists(jobs_dir):
os.makedirs(jobs_dir, exist_ok=True)
# Create or update the job log file
job_file = os.path.join(jobs_dir, f"{job_id}.json")
# Write data directly to file
with open(job_file, 'w') as f:
json.dump(data, f, indent=2)
def queue_task_wrapper(bypass_queue=False):
def decorator(f):
def wrapper(*args, **kwargs):
return current_app.queue_task(bypass_queue=bypass_queue)(f)(*args, **kwargs)
return wrapper
return decorator
def discover_and_register_blueprints(app, base_dir='routes'):
"""
Dynamically discovers and registers all Flask blueprints in the routes directory.
Recursively searches all subdirectories for Python modules containing Blueprint instances.
Args:
app (Flask): The Flask application instance
base_dir (str): Base directory to start searching for blueprints (default: 'routes')
"""
import importlib
import pkgutil
import inspect
import sys
import os
from flask import Blueprint
import logging
import glob
logger = logging.getLogger(__name__)
logger.info(f"Discovering blueprints in {base_dir}")
# Add the current working directory to sys.path if it's not already there
cwd = os.getcwd()
if cwd not in sys.path:
sys.path.insert(0, cwd)
# Get the absolute path to the base directory
if not os.path.isabs(base_dir):
base_dir = os.path.join(cwd, base_dir)
registered_blueprints = set()
# Find all Python files in the routes directory, including subdirectories
python_files = glob.glob(os.path.join(base_dir, '**', '*.py'), recursive=True)
logger.info(f"Found {len(python_files)} Python files in {base_dir}")
for file_path in python_files:
try:
# Convert file path to import path
rel_path = os.path.relpath(file_path, cwd)
# Remove .py extension
module_path = os.path.splitext(rel_path)[0]
# Convert path separators to dots for import
module_path = module_path.replace(os.path.sep, '.')
# Skip __init__.py files
if module_path.endswith('__init__'):
continue
#logger.info(f"Attempting to import module: {module_path}")
# Import the module
module = importlib.import_module(module_path)
# Find all Blueprint instances in the module
for name, obj in inspect.getmembers(module):
if isinstance(obj, Blueprint) and obj not in registered_blueprints:
pid = os.getpid()
logger.info(f"PID {pid} Registering: {module_path}")
app.register_blueprint(obj)
registered_blueprints.add(obj)
except Exception as e:
logger.error(f"Error importing module {module_path}: {str(e)}")
logger.info(f"PID {pid} Registered {len(registered_blueprints)} blueprints")
return registered_blueprints