Continuando con el almacenamiento de datos en cache

Va a mejor la cosa, tengo que hacer tests si o tambien porque no estoy
nada seguro si es que todo funciona como espero ya que toda llamada a la
api corresponde a una llamada a cache y descubrir si es que esta todo en
cache como se espera

Voy a terminar del modo que lo estoy haciendo y tal vez cambio esto a un
modulo de cache el cual se encargara de o obtener datos desde cache o
llamar a music brainz para suplir los datos que no puede responder
This commit is contained in:
Daniel Cortes
2020-06-10 00:02:45 -04:00
parent d62253051b
commit 65c77c679f
3 changed files with 255 additions and 135 deletions

View File

@@ -1,11 +1,34 @@
"""Jobs a ejecutar en el fondo""" """Jobs a ejecutar con django-rq
NO SE DONDE DOCUMENTAR ESTO
La estructura que se utilizara para las keys es la siguiente
artist:{mbid} => full json
artist:{mbid}:release_groups => set of release_group_mbid
artist:{mbid}:release_groups:count => cantidad de las release groups
release_group:{mbid} => full json
release_group:{mbid}:artist => artist_mbid
release_group:{mbid}:releases => set of release_mbid
release_group:{mbid}:cover_art => json with cover arts
release:{mbid} => full json
release:{mbid}:release_group => release_group_mbid
release:{mbid:}:media => set of media_json
release:{mbid}:{media}:recordings => set of recording_mbid
release:{mbid}:cover_art => json with cover arts
recording:{mbid} => full json
recording:{mbid}:release => release_mbid
"""
import logging import logging
import json import json
import django_rq import django_rq
from fetcher import musicbrainz as mb from fetcher import musicbrainz as mb
from utils import get_redis_connection from utils import get_redis_connection, parse_date
_log = logging.getLogger('fetcher_jobs') _log = logging.getLogger('fetcher_jobs')
_log.addHandler(logging.NullHandler()) _log.addHandler(logging.NullHandler())
@@ -14,112 +37,124 @@ _log.addHandler(logging.NullHandler())
@django_rq.job('high') @django_rq.job('high')
def load_release_cover_art(release): def load_release_cover_art(release):
"""Carga en cache el cover art de una release""" """Carga en cache el cover art de una release"""
mbid = release
if isinstance(release, dict):
mbid = release.get('id')
with get_redis_connection() as redis: with get_redis_connection() as redis:
mbid = release if f'release:{mbid}:cover_art' not in redis:
if isinstance(release, dict): cover_art = mb.get_release_cover_art(mbid)
mbid = release.get('id') redis.set(f'release:{mbid}:cover_art', json.dumps(cover_art))
_log.info('Cover art de release %s almacenado en cache', mbid)
if f'release_cover_art_{mbid}' in redis:
_log.info('El cover art ya estaba guardado')
return
cover_art = mb.get_release_cover_art(mbid)
redis.set(f'release_cover_art_{mbid}', json.dumps(cover_art))
_log.info('Cover art de release %s almacenado en cache', mbid)
@django_rq.job('high') @django_rq.job('high')
def load_release_group_cover_art(release_group): def load_release_group_cover_art(release_group):
"""Carga en cache el cover art de un release group""" """Carga en cache el cover art de un release group"""
mbid = release_group
if isinstance(release_group, dict):
mbid = release_group.get('id')
with get_redis_connection() as redis: with get_redis_connection() as redis:
mbid = release_group if f'release_group:{mbid}:cover_art' not in redis:
if isinstance(release_group, dict): cover_art = mb.get_release_group_cover_art(mbid)
mbid = release_group.get('id') redis.set(f'release_group:{mbid}:cover_art', json.dumps(cover_art))
_log.info('Cover art de release group %s almacenado en cache', mbid)
if f'release_group_cover_art_{mbid}' in redis:
_log.info('El cover art ya estaba guardado')
return
cover_art = mb.get_release_group_cover_art(mbid)
redis.set(f'release_group_cover_art_{mbid}', json.dumps(cover_art))
_log.info('Cover art de release group %s almacenado en cache', mbid)
@django_rq.job
def load_entities_of_recording(recording):
"""Carga en cache una grabacion y sus entidades relacionadas"""
with get_redis_connection() as redis:
mbid = recording
if isinstance(recording, dict):
mbid = recording.get('id')
if f'recording_{mbid}' in redis:
_log.info('La recording ya se había procesado anteriormente')
return
if isinstance(recording, str):
recording = mb.get_recording_by_mbid(mbid)
redis.set(f'recording_{mbid}', json.dumps(recording))
_log.info('Recording %s fue almacenada correctamente', mbid)
@django_rq.job @django_rq.job
def load_entities_of_release(release): def load_entities_of_release(release):
"""Carga en cache una release y sus entidades relacionadas""" """Carga en cache una release y sus entidades relacionadas"""
mbid = release
if isinstance(release, dict):
mbid = release.get('id')
with get_redis_connection() as redis: with get_redis_connection() as redis:
mbid = release # Cargar release solo si no esta almacenado
if isinstance(release, dict): if f'release:{mbid}' not in redis:
mbid = release.get('id') if isinstance(release, str):
release = mb.get_release_by_mbid(mbid, ['recordings', 'artists'])
redis.set(f'release:{mbid}', json.dumps(release))
_log.info('Release %s fue almacenada en cache', mbid)
else:
release = json.loads(redis.get(f'release:{mbid}'))
if f'release_{mbid}' in redis: # Envía a cargar sus cover art
_log.info('La release ya se había procesado anteriormente') load_release_cover_art.delay(release.get('id'))
return
if isinstance(release, str): # Almacenar el artista de la release
release = mb.get_release_by_mbid(mbid) if len(release.get('artist_credit')) > 0:
artist = release.get('artist_credit')[0].get('artist').get('id')
redis.set(f'release:{mbid}:artist', artist)
redis.set(f'release_{mbid}', json.dumps(release)) # Obtener sus medias
_log.info('Release %s fue almacenada en cache', mbid) # Una release puede tener mas de un disco o dvds incluidos, entre otras cosas
# Dentro de estas existen tracks, las cuales no son accesibles de ninguna otra forma, y
load_release_cover_art.delay(release) # dentro de una tag esta definido el recording, es un endredo quizás innecesario asi que mas
# o menos me lo voy a saltar, lo único que interesa de la track es su orden dentro de su
offset = 0 # media
while True: medias = release.get('media', [])
recordings = mb.browse_recordings({'release': mbid}, limit=100, offset=offset) for raw_media in medias:
for recording in recordings.get('recordings'): media = {
load_entities_of_recording(recording) 'format': raw_media.get('format'),
'position': raw_media.get('position'),
offset += 100 'track_count': raw_media.get('track_count')
if offset > recordings.get('recording_count', 0): }
break redis.zadd(f'release:{mbid}:media', {json.dumps(media): media['position']})
for track in raw_media.get('tracks', []):
recording_key = f'release:{mbid}:{media.get("position")}:recordings'
recording = track.get('recording')
redis.zadd(recording_key, {recording.get('id'): track.get("position")})
redis.set(f'recording:{mbid}', json.dumps(recording))
redis.set(f'recording:{mbid}:release', mbid)
@django_rq.job @django_rq.job
def load_entities_of_release_group(release_group): def load_entities_of_release_group(release_group):
"""Carga en cache un release group y sus entidades relacionadas""" """Carga en cache un release group y sus entidades relacionadas"""
mbid = release_group
if isinstance(release_group, dict):
mbid = release_group.get('id')
with get_redis_connection() as redis: with get_redis_connection() as redis:
mbid = release_group # Cargar release_group solo si no esta almacenado
if isinstance(release_group, dict): if f'release_group:{mbid}' not in redis:
mbid = release_group.get('id') if isinstance(release_group, str):
release_group = mb.get_release_group_by_mbid(mbid, includes=['artists'])
if f'release_group_{mbid}' in redis: redis.set(f'release_group:{mbid}', json.dumps(release_group))
_log.info('La release group ya se habia procesado anteriormente') _log.info('Release Group %s almacenado en cache', mbid)
return
if isinstance(release_group, str):
release_group = mb.get_release_group_by_mbid(mbid)
redis.set(f'release_group_{mbid}', json.dumps(release_group))
_log.info('Release Group %s almacenado en cache', mbid)
# Envía a cargar sus cover art
load_release_group_cover_art.delay(release_group) load_release_group_cover_art.delay(release_group)
# Carga sus releases
offset = 0 offset = 0
while True: while True:
releases = mb.browse_releases({'release-group': mbid}, limit=100, offset=offset) # Cargo todas las releases posibles la primera vez, si no lo hiciera y necesitara pedir
for release in releases.get('releases'): # mas datos, usuaria una request en vano
load_entities_of_release(release) releases = mb.browse_releases({'release-group': mbid},
includes=['artist-credits'],
limit=100, offset=offset)
count = releases.get('release_count')
if f'release_group:{mbid}:releases' in redis:
# Si es que la cantidad de releases almacenadas es la misma que la que dice
# musicbrainz que existen, entonces no tengo porque continuar cargando
if redis.zcard(f'release_group:{mbid}:releases') == count:
break
# Almaceno el count que dice musicbrainz para asegurar que estén todos los
# elementos cargados
redis.set(f'release_group:{mbid}:releases:count', count)
for rel in releases.get('releases'):
rel_mbid = rel.get('id')
rel_release = parse_date(rel.get('release_events')[0].get('date')).timestamp()
# Se almacena el id del release en un set ordenado por fecha de lanzamiento
redis.zadd(f'release_group:{mbid}:releases', {rel_mbid: rel_release})
redis.set(f'release:{rel_mbid}:release_group', mbid)
load_entities_of_release.delay(rel)
offset += 100 offset += 100
if offset > releases.get('release_count', 0): if offset > releases.get('release_count', 0):
@@ -129,32 +164,46 @@ def load_entities_of_release_group(release_group):
@django_rq.job @django_rq.job
def load_artist_on_cache(artist): def load_artist_on_cache(artist):
"""Carga en cache a un artista y todas sus entidades""" """Carga en cache a un artista y todas sus entidades"""
mbid = artist
if isinstance(artist, dict):
mbid = artist.get('id')
with get_redis_connection() as redis: with get_redis_connection() as redis:
mbid = artist # Cargar artista solo si no estaba almacenado
if isinstance(artist, dict): if f'artist:{mbid}' not in redis:
mbid = artist.get('id') if isinstance(artist, str):
artist = mb.get_artist_by_mbid(mbid, includes=['tags'])
if f'artist_{mbid}' in redis: redis.set(f'artist:{mbid}', json.dumps(artist))
_log.info('El artista ya se había procesado anteriormente') _log.info('Artista %s almacenado en cache', mbid)
return
if isinstance(artist, str):
artist = mb.get_artist_by_mbid(mbid, includes=['tags'])
redis.set(f'artist_{mbid}', json.dumps(artist))
_log.info('Artista %s almacenado en cache', mbid)
# Cargar sus release_groups
offset = 0 offset = 0
while True: while True:
release_groups = mb.browse_release_groups({'artist': mbid}, limit=100, offset=offset) # Cargo todas las releases posibles la primera vez, si no lo hiciera y necesitara pedir
if f'artist_{mbid}:release_group_count' not in redis: # mas datos, usuaria una request en vano
redis.set(f'artist_{mbid}:release_group_count', release_groups = mb.browse_release_groups({'artist': mbid},
release_groups.get('release_group_count')) includes=['artist-credits'],
for release_group in release_groups.get('release_groups'): limit=100, offset=offset)
release_group_id = release_group.get('id') count = release_groups.get('release_group_count')
redis.rpush(f'artist_{mbid}:release_groups', release_group_id) if f'artist:{mbid}:release_groups' in redis:
redis.set(f'release_group_{release_group_id}:artist', mbid) # Si es que la cantidad de release_groups almacenadas es la misma que la que dice
load_entities_of_release_group.delay(release_group) # musicbrainz que existen, entonces no tengo porque continuar cargando
if redis.zcard(f'artist:{mbid}:release_groups') == count:
break
# Almaceno el count que dice musicbrainz para asegurar que estén todos los
# elementos cargado
redis.set(f'artist:{mbid}:release_groups:count', count)
for rel in release_groups.get('release_groups'):
rel_mbid = rel.get('id')
rel_date = parse_date(rel.get('first_release_date')).timestamp()
# Se almacena el id del release group en un set ordenado por su fecha de lanzamiento
redis.zadd(f'artist:{mbid}:release_groups', {rel_mbid: rel_date})
redis.set(f'release_group:{rel_mbid}:artist', mbid)
load_entities_of_release_group.delay(rel)
offset += 100 offset += 100
if offset > release_groups.get('release_group_count', 0): if offset > release_groups.get('release_group_count', 0):

View File

@@ -154,7 +154,7 @@ def get_artist(mbid):
"""Obtiene un artista desde musicbrainz incluyendo sus tags""" """Obtiene un artista desde musicbrainz incluyendo sus tags"""
with get_redis_connection() as redis: with get_redis_connection() as redis:
mb_artist = redis.get(f'artist_{mbid}') mb_artist = redis.get(f'artist:{mbid}')
if mb_artist is None: if mb_artist is None:
mb_artist = mb.get_artist_by_mbid(mbid, includes=['tags']) mb_artist = mb.get_artist_by_mbid(mbid, includes=['tags'])
else: else:
@@ -172,9 +172,9 @@ def get_disc(mbid):
"""Obtiene un disco desde musicbrainz""" """Obtiene un disco desde musicbrainz"""
with get_redis_connection() as redis: with get_redis_connection() as redis:
mb_disc = redis.get(f'release_group_{mbid}') mb_disc = redis.get(f'release_group:{mbid}')
if mb_disc is None: if mb_disc is None:
mb_disc = mb.get_release_group_by_mbid(mbid) mb_disc = mb.get_release_group_by_mbid(mbid, ['artists'])
else: else:
mb_disc = json.loads(mb_disc) mb_disc = json.loads(mb_disc)
@@ -182,9 +182,7 @@ def get_disc(mbid):
return mb_disc return mb_disc
jobs.load_entities_of_release_group.delay(mbid) jobs.load_entities_of_release_group.delay(mbid)
disc = map_disc(mb_disc) return map_disc(mb_disc)
return disc
def get_discs_of_artist(mbid, limit, page): def get_discs_of_artist(mbid, limit, page):
@@ -195,19 +193,23 @@ def get_discs_of_artist(mbid, limit, page):
mb_discs = [] mb_discs = []
total = 0 total = 0
# Si es que tengo un set de release_groups en redis me fijo si es que sus counts coinciden
# Si es que coinciden significa que se cargaron todos los discos, pero si no, quizás aun no
# terminan de guardarse, por lo que salto el código de obtención y voy directo a musicbrainz
with get_redis_connection() as redis: with get_redis_connection() as redis:
key_count = f'artist_{mbid}:release_group_count' key_releases = f'artist:{mbid}:release_groups'
if key_count in redis: if key_releases in redis:
key = f'artist_{mbid}:release_groups' if int(redis.get(f'{key_releases}:count')) == redis.zcard(key_releases):
mb_disc_ids = redis.lrange(key, offset, limit) mb_discs = [get_disc(mbid) for mbid in redis.zrange(key_releases, offset, limit)]
if redis.exists(*[f'release_group_{mbid}' for mbid in mb_disc_ids]) == len(mb_disc_ids): total = redis.zcard(key_releases)
mb_discs = [get_disc(mbid) for mbid in mb_disc_ids]
total = redis.llen(key)
if len(mb_discs) == 0: if len(mb_discs) == 0:
# Si es que no había ningún disco, enviar a cargar al artista, quizás nunca se a guardado
# en cache antes
jobs.load_artist_on_cache.delay(mbid) jobs.load_artist_on_cache.delay(mbid)
mb_discs_browse = mb.browse_release_groups(params={'artist': mbid}, mb_discs_browse = mb.browse_release_groups(params={'artist': mbid},
includes=['artist-credits'],
limit=limit, offset=offset) limit=limit, offset=offset)
if 'error' in mb_discs_browse: if 'error' in mb_discs_browse:
return mb_discs_browse return mb_discs_browse
@@ -227,12 +229,14 @@ def get_artist_of_disc(mbid):
mb_artist = None mb_artist = None
with get_redis_connection() as redis: with get_redis_connection() as redis:
key_id = f'release_group_{mbid}:artist' key_id = f'release_group:{mbid}:artist'
if key_id in redis: if key_id in redis:
mb_artist = get_artist(redis.get(key_id)) mb_artist = get_artist(redis.get(key_id))
if mb_artist is None: if mb_artist is None:
mb_artist_browse = mb.browse_artists(params={'release-group': mbid}, limit=0, offset=0) mb_artist_browse = mb.browse_artists(params={'release-group': mbid},
includes=['tags'],
limit=1, offset=0)
if 'error' in mb_artist_browse: if 'error' in mb_artist_browse:
return mb_artist_browse return mb_artist_browse
@@ -252,45 +256,82 @@ def get_artist_of_disc(mbid):
def get_release(mbid): def get_release(mbid):
"""Obtiene una release desde musicbrainz incluyendo sus artistas""" """Obtiene una release desde musicbrainz incluyendo sus artistas"""
with get_redis_connection() as redis: with get_redis_connection() as redis:
mb_release = redis.get(f'release_{mbid}') mb_release = redis.get(f'release:{mbid}')
if mb_release is None: if mb_release is None:
mb_release = mb.get_release_by_mbid(mbid, includes=['artists']) mb_release = mb.get_release_by_mbid(mbid, includes=['artists'])
else: else:
mb_release = json.loads(mb_release) mb_release = json.loads(mb_release)
if 'error' in mb_release: if 'error' in mb_release:
return mb_release return mb_release
jobs.load_entities_of_release.delay(mbid) jobs.load_entities_of_release.delay(mbid)
release = map_release(mb_release)
return release return map_release(mb_release)
def get_releases_of_disc(mbid, limit, page): def get_releases_of_disc(mbid, limit, page):
"""Obtiene las releases de un disco desde musicbrainz incluyendo los creditos a su artistas""" """Obtiene las releases de un disco desde musicbrainz"""
mb_releases = mb.browse_releases(params={'release-group': mbid}, includes=['artist-credits'],
limit=limit, offset=limit * (page - 1))
if 'error' in mb_releases: mb_releases = []
return mb_releases offset = limit * (page - 1)
total = 0
with get_redis_connection() as redis:
key_releases = f'release_group:{mbid}:releases'
if key_releases in redis:
if int(redis.get(f'{key_releases}:count')) == redis.zcard(key_releases):
mb_releases = [get_release(mbid) for mbid in redis.zrange(key_releases,
offset,
limit)]
total = redis.zcard(key_releases)
if len(mb_releases) == 0:
# Si es que no se encontraron releases antes es probable que nunca se cargo en cache el
# release group
jobs.load_entities_of_release_group.delay(mbid)
mb_releases = mb.browse_releases(params={'release-group': mbid},
includes=['artist-credits'],
limit=limit, offset=limit * (page - 1))
if 'error' in mb_releases:
return mb_releases
total = mb_releases.get('release_count')
mb_releases = mb_releases.get('releases')
return { return {
'paginate': paginate(mb_releases.get('release_count', 0), limit, page), 'paginate': paginate(total, limit, page),
'releases': [map_release(release) for release in mb_releases.get('releases')] 'releases': [map_release(release) for release in mb_releases]
} }
def get_artist_of_release(mbid, limit, page): def get_artist_of_release(mbid, limit, page):
"""Obtiene el artista de una release""" """Obtiene el artista de una release"""
mb_artists = mb.browse_artists(params={'release': mbid}, limit=limit, offset=limit * (page - 1)) mb_artist = None
if 'error' in mb_artists: with get_redis_connection() as redis:
return mb_artists key = f'release:{mbid}:artist'
if key in redis:
mb_artist = get_artist(redis.get(key))
if mb_artist is None:
mb_artist_browse = mb.browse_artists(params={'release': mbid},
includes=['tags'],
limit=limit,
offset=limit * (page - 1))
if 'error' in mb_artist_browse:
return mb_artist_browse
mb_artist = mb_artist_browse.get('artists')[0]
jobs.load_artist_on_cache.delay(mb_artist)
return { return {
'paginate': paginate(mb_artists.get('artist_count', 0), limit, page), 'artist': map_artist(mb_artist)
'artists': [map_artist(artist) for artist in mb_artists.get('artists')]
} }
@@ -309,7 +350,6 @@ def get_recording(mbid):
if 'error' in mb_recording: if 'error' in mb_recording:
return mb_recording return mb_recording
jobs.load_entities_of_recording.delay(mb_recording)
recording = map_recording(mb_recording) recording = map_recording(mb_recording)
return recording return recording

View File

@@ -2,14 +2,16 @@
import json import json
import logging import logging
import functools import functools
import redis import datetime
from django.http import JsonResponse from django.http import JsonResponse
from django.conf import settings from django.conf import settings
import redis
from pygments import highlight from pygments import highlight
from pygments.lexers import JsonLexer # pylint: disable=no-name-in-module from pygments.lexers import JsonLexer # pylint: disable=no-name-in-module
from pygments.formatters import TerminalTrueColorFormatter # pylint: disable=no-name-in-module from pygments.formatters import TerminalTrueColorFormatter # pylint: disable=no-name-in-module
from utils.ratelimit import ratelimit from utils.ratelimit import ratelimit
_log = logging.getLogger('utils') _log = logging.getLogger('utils')
@@ -94,3 +96,32 @@ def get_redis_connection():
port = settings.DATA_CACHE.get('port') port = settings.DATA_CACHE.get('port')
db = settings.DATA_CACHE.get('db') db = settings.DATA_CACHE.get('db')
return redis.Redis(host=host, port=port, db=db, decode_responses=True) return redis.Redis(host=host, port=port, db=db, decode_responses=True)
def parse_date(date, default=datetime.datetime(1, 1, 1)):
"""Intenta parsear una fecha la cual le pueden faltar partes
Esta pensado para parsear fechas entregadas por musicbrainz, el cual puede entrar fechas con
año, con año y mes, o, con año, mes y dia
Si es que no puede lograrlo retornara el default definido
"""
try:
parsed = datetime.datetime.strptime(date, "%Y")
return parsed
except ValueError:
pass
try:
parsed = datetime.datetime.strptime(date, "%Y-%m")
return parsed
except ValueError:
pass
try:
parsed = datetime.datetime.strptime(date, "%Y-%m-%d")
return parsed
except ValueError:
pass
return default