88 lines
2.7 KiB
Python
88 lines
2.7 KiB
Python
"""Modulo de funciones utilitarias"""
|
|
import json
|
|
import logging
|
|
import functools
|
|
|
|
from multiprocessing.dummy import Pool
|
|
|
|
from django.http import JsonResponse
|
|
|
|
from pygments import highlight
|
|
from pygments.lexers import JsonLexer # pylint: disable=no-name-in-module
|
|
from pygments.formatters import TerminalTrueColorFormatter # pylint: disable=no-name-in-module
|
|
|
|
|
|
_log = logging.getLogger('utils')
|
|
_log.addHandler(logging.NullHandler())
|
|
|
|
|
|
def pretty_print_json(json_input):
|
|
"""Formats and prints json to stdout with colors using pygments"""
|
|
formatted_json = json.dumps(json_input, indent=2)
|
|
print(highlight(formatted_json, JsonLexer(), TerminalTrueColorFormatter()))
|
|
|
|
|
|
def message_response(status, error_message):
|
|
"""Sends an error response with the status code of the error and an explanation"""
|
|
json_message = {
|
|
'status_code': status,
|
|
'message': error_message
|
|
}
|
|
|
|
return JsonResponse(json_message, status=status)
|
|
|
|
|
|
def require_json(function):
|
|
"""Decorator to make a view only accept a json body"""
|
|
@functools.wraps(function)
|
|
def decorator(*args, **kwargs):
|
|
try:
|
|
received_json = json.loads(args[0].body)
|
|
return function(*args, **kwargs, received_json=received_json)
|
|
except json.JSONDecodeError:
|
|
_log.warning('Function %s got a non json request body', function.__name__)
|
|
return message_response(400, 'Se envío json no valido')
|
|
|
|
return decorator
|
|
|
|
|
|
def expected_keys(keys, dictionary):
|
|
"""Verifica que un diccionario contiene todas las keys de una lista"""
|
|
for key in keys:
|
|
if key not in dictionary:
|
|
return f'No se encuentra {key}'
|
|
return None
|
|
|
|
|
|
def replace_key(dictionary, old, new):
|
|
"""Remplaza una key en un diccionario
|
|
|
|
En esencia crea una nueva key con los contenidos de la antigua y elimina la antigua
|
|
|
|
Esto lo hace inplace por lo que el diccionario es modificado
|
|
|
|
TODO que cree un nuevo diccionario, es mas bonito el codigo asi
|
|
"""
|
|
dictionary[new] = dictionary[old]
|
|
del dictionary[old]
|
|
|
|
|
|
def sanitize_keys(dictionary):
|
|
"""Remplaza las keys de un diccionario que contienen '-' ya que el diccionario pretende
|
|
ser json y acceder a json con esos caracteres en javascript es incomodo
|
|
|
|
El diccionario es modificado inplace pero de todas formas se retorna
|
|
"""
|
|
for key in list(dictionary.keys()):
|
|
if '-' in key and key in dictionary:
|
|
new_key = key.replace('-', '_')
|
|
replace_key(dictionary, key, new_key)
|
|
return dictionary
|
|
|
|
|
|
def parallel_map(items, function):
|
|
"""Ejecuta una función sobre cada elemento de una lista de items en paralelo utilizando
|
|
una Pool del modulo multiprocessing.dummy """
|
|
with Pool() as pool:
|
|
return pool.map(function, items)
|