Files
musiclist-server/fetcher/medium.py
2020-07-13 00:21:42 -04:00

535 lines
16 KiB
Python

"""
Mediador entre musicbrainz y las urls
Su objetivo es modificar los datos que entrega musicbrainz para que sean
correspondientes con lo que debe entregar la api, se encarga de casos como
traducción.
"""
import logging
from math import ceil
from country_list import countries_for_language
from fetcher import cache
import fetcher.musicbrainz as mb
_log = logging.getLogger('fetcher_medium')
_log.addHandler(logging.NullHandler())
###
# Utility code
###
def full_country_name(country_code):
"""Obtiene el nombre de un pais en español dado su codigo de pais"""
if country_code == 'XW':
return 'Internacional'
elif country_code == 'XE':
return 'Europa'
elif country_code == 'XU':
return 'Pais Desconocido'
return dict(countries_for_language('es')).get(country_code, country_code)
def translate_artist_type(artist_type):
"""Traduce los tipos de artista a español"""
translation = {
'Person': 'Persona',
'Group': 'Grupo',
'Orchestra': 'Orquesta',
'Choir': 'Coro',
'Character': 'Personaje',
'Other': 'Otro',
}
return translation.get(artist_type, artist_type)
def find_best_cover(mb_covers):
"""Intenta obtener la cover art mas apropiada a partir de una lista de estas"""
only_aproved_front = [x for x in mb_covers.get('images') if x.get('approved', False)
and x.get('front', False) and not x.get('back', False)]
if len(only_aproved_front) > 0:
return only_aproved_front[0]
only_aproved = [x for x in mb_covers.get('images') if x.get('approved', False)]
if len(only_aproved) > 0:
return only_aproved[0]
return mb_covers.get('images')[0]
def paginate(count, limit, page):
"""Crea un modelo de paginado a partir de la cantidad de elementos, el limite de elementos y la
pagina actual"""
return {
'total': count,
'current_page': page,
'last_page': ceil(count / limit),
'per_page': limit,
}
###
# Mapear entidades
##
def map_artist(mb_artist):
"""Mapea el modelo de artista entregado por musicbrainz a uno propio"""
artist = {
'id': mb_artist.get('id'),
'name': mb_artist.get('name'),
'sort_name': mb_artist.get('sort_name'),
'disambiguation': mb_artist.get('disambiguation'),
'type': translate_artist_type(mb_artist.get('type')),
'country': full_country_name(mb_artist.get('country')),
'tags': sorted(mb_artist.get('tags', []), key=lambda tag: tag['count'], reverse=True),
}
return artist
def map_artist_credit(mb_artist_credit):
"""Mapea el modelo de credito a artista entregado por musicbrainz a uno propio"""
return {
'id': mb_artist_credit.get('artist').get('id'),
'name': mb_artist_credit.get('artist').get('name'),
'sort_name': mb_artist_credit.get('artist').get('sort_name'),
'disambiguation': mb_artist_credit.get('artist').get('disambiguation'),
}
def map_disc(mb_disc, artist=True):
"""Mapea el modelo de disco entregado por musicbrainz a uno propio"""
disc = {
'id': mb_disc.get('id'),
'title': mb_disc.get('title'),
'disambiguation': mb_disc.get('disambiguation'),
'first_release_date': mb_disc.get('first_release_date'),
'primary_type': mb_disc.get('primary_type')
}
if artist:
disc['artist'] = map_artist_credit(mb_disc.get('artist_credit')[0])
if len(mb_disc.get('secondary_types', [])) > 0:
disc['secondary_type'] = mb_disc['secondary_types'][0]
return disc
def map_release(mb_release, artist=True):
"""Mapea el modelo de release entregado por musicbrainz a uno propio"""
release = {
'id': mb_release.get('id'),
'title': mb_release.get('title'),
'disambiguation': mb_release.get('disambiguation'),
'status': mb_release.get('status'),
'country': full_country_name(mb_release.get('country')),
'date': mb_release.get('date'),
'coverart': mb_release.get('cover_art_archive', {}).get('count', 0) > 0,
}
if artist:
release['artist'] = map_artist_credit(mb_release.get('artist_credit')[0])
return release
def map_media(mb_media):
"""Transforma una instancia de media dentro de una release a una util para mis propósitos"""
media = {
'format': mb_media.get('format'),
'position': mb_media.get('position'),
'track_count': mb_media.get('track_count'),
'recordings': [map_recording(track.get('recording')) for track in mb_media.get('tracks')]
}
return media
def map_recording(mb_recording, artist=True):
"""Mapea el modelo de recording entregado por musicbrainz a uno propio"""
recording = {
'id': mb_recording.get('id'),
'title': mb_recording.get('title'),
'disambiguation': mb_recording.get('disambiguation'),
'length': mb_recording.get('length'),
}
if artist:
recording['artist'] = map_artist_credit(mb_recording.get('artist_credit')[0])
return recording
def map_coverart(mb_cover):
"""Mapea el modelo de coverart entregado por musicbrainz a uno propio"""
def https(link):
if link:
return link.replace('http://', 'https://')
return {
'image': https(mb_cover.get('image')),
'1200': https(mb_cover.get('thumbnails', {}).get('1200')),
'500': https(mb_cover.get('thumbnails', {}).get('500')),
'250': https(mb_cover.get('thumbnails', {}).get('250')),
'large': https(mb_cover.get('thumbnails', {}).get('large')),
'small': https(mb_cover.get('thumbnails', {}).get('small')),
}
##
# Obtener entidades
##
##
# Artistas
##
def get_artist(mbid):
"""Obtiene un artista desde musicbrainz incluyendo sus tags"""
_log.info('Obteniendo artista con id %s', mbid)
mb_artist = cache.get_artist(mbid)
if mb_artist:
return map_artist(mb_artist)
mb_artist = mb.get_artist_by_mbid(mbid, includes=['tags'])
if 'error' in mb_artist:
_log.debug('Error en artista %s', mb_artist)
return mb_artist
return map_artist(mb_artist)
def get_disc(mbid):
"""Obtiene un disco desde musicbrainz"""
_log.info('Obteniendo disco con id %s', mbid)
mb_disc = cache.get_disc(mbid)
if mb_disc:
return map_disc(mb_disc)
mb_disc = mb.get_release_group_by_mbid(mbid, ['artists'])
if 'error' in mb_disc:
_log.debug('El disco tiene un error %s', mb_disc)
return mb_disc
return map_disc(mb_disc)
def get_discs_of_artist(mbid, limit, page):
"""Obtiene los discos de un artista desde musicbrainz incluyendo"""
_log.info('Obteniendo los discos del artista %s', mbid)
offset = limit * (page - 1)
mb_discs, total = cache.get_discs_of_artist(mbid, limit, offset)
if mb_discs and total:
return {
'paginate': paginate(total, limit, page),
'discs': [map_disc(disc) for disc in mb_discs]
}
mb_discs_browse = mb.browse_release_groups(params={'artist': mbid},
includes=['artist-credits'],
limit=limit, offset=offset)
if 'error' in mb_discs_browse:
_log.error('Error al buscar %s', mb_discs_browse)
return mb_discs_browse
mb_discs = mb_discs_browse.get('release_groups')
total = mb_discs_browse.get('release_group_count')
return {
'paginate': paginate(total, limit, page),
'discs': [map_disc(disc) for disc in mb_discs]
}
def get_artist_of_disc(mbid):
"""Obtiene el artista de un disco"""
_log.info('Obteniendo artista del disco %s', mbid)
mb_artist = cache.get_artist_of_disc(mbid)
if mb_artist:
return map_artist(mb_artist)
mb_artist_browse = mb.browse_artists(params={'release-group': mbid},
includes=['tags'],
limit=1, offset=0)
if 'error' in mb_artist_browse:
_log.debug('Error al buscar %s', mb_artist_browse)
return mb_artist_browse
mb_artist = mb_artist_browse.get('artists')[0]
return map_artist(mb_artist)
##
# Releases
##
def get_release(mbid):
"""Obtiene una release desde musicbrainz incluyendo sus artistas"""
_log.info('Obteniendo release %s', mbid)
mb_release = cache.get_release(mbid)
if mb_release:
return map_release(mb_release)
mb_release = mb.get_release_by_mbid(mbid, includes=['artists'])
if 'error' in mb_release:
_log.error('Error al buscar', mb_release)
return mb_release
return map_release(mb_release)
def get_releases_of_disc(mbid, limit, page):
"""Obtiene las releases de un disco desde musicbrainz"""
_log.info('Obteniendo releases del disco %s', mbid)
offset = limit * (page - 1)
mb_releases, total = cache.get_releases_of_disc(mbid, limit, offset)
if mb_releases and total:
return {
'paginate': paginate(total, limit, page),
'releases': [map_release(release) for release in mb_releases]
}
mb_releases = mb.browse_releases(params={'release-group': mbid},
includes=['artist-credits'],
limit=limit, offset=limit * (page - 1))
if 'error' in mb_releases:
_log.error('Error al buscar %s', mb_releases)
return mb_releases
total = mb_releases.get('release_count')
mb_releases = mb_releases.get('releases')
return {
'paginate': paginate(total, limit, page),
'releases': [map_release(release) for release in mb_releases]
}
def get_artist_of_release(mbid):
"""Obtiene el artista de una release"""
_log.info('Obteniendo el artista de la release %s', mbid)
mb_artist = cache.get_artist_of_release(mbid)
if mb_artist:
return map_artist(mb_artist)
mb_artist_browse = mb.browse_artists(params={'release': mbid},
includes=['tags'],
limit=1, offset=0)
if 'error' in mb_artist_browse:
_log.error('Error al buscar %s', mb_artist_browse)
return mb_artist_browse
mb_artist = mb_artist_browse.get('artists')[0]
return map_artist(mb_artist)
##
# Recordings
##
def get_recording(mbid):
"""Obtiene una grabación"""
_log.info('Obteniendo la grabacion %s', mbid)
mb_recording = cache.get_recording(mbid)
if mb_recording:
return map_recording(mb_recording)
mb_recording = mb.get_recording_by_mbid(mbid, includes=['artists'])
if 'error' in mb_recording:
_log.error('Error al buscar %s', mb_recording)
return mb_recording
return map_recording(mb_recording)
def get_recordings_of_release(mbid):
"""Obtiene las grabaciones de una release
Realmente no existen grabaciones para este caso de uso, si no, media, el cual representa
los medios físicos en los que esta una grabación, asi que se entrega una lista de medias con sus
grabaciones acopladas, todo ordenado y con indexes de orden
"""
_log.info('Obteniendo recordings de la release %s', mbid)
mb_medias = cache.get_recordings_of_release(mbid)
if mb_medias:
medias = []
for media in mb_medias:
recordings = []
for recording in media.get('recordings'):
recordings.append(map_recording(recording))
media['recordings'] = recordings
medias.append(media)
return {'medias': medias}
mb_release = mb.get_release_by_mbid(mbid, ['recordings', 'artist-credits'])
if 'error' in mb_release:
_log.debug('Error al cargar recordings %s', mb_release)
return mb_release
medias = [map_media(media) for media in mb_release.get('media', [])]
return {'medias': medias}
def get_releases_of_recording(mbid, limit, page):
"""Obtiene la release de una grabacion incluyendo los creditos a su artista"""
_log.info('Obteniendo las releases de la recording %s', mbid)
offset = limit * (page - 1)
releases, total = cache.get_releases_of_recording(mbid, limit, offset)
if releases and total:
return {
'paginate': paginate(total, limit, page),
'releases': releases
}
mb_releases_browse = mb.browse_releases(params={'recording': mbid},
includes=['artist-credits', 'recordings'],
limit=limit, offset=offset)
if 'error' in mb_releases_browse:
_log.debug('Error al buscar %s', mb_releases_browse)
return mb_releases_browse
releases = [map_release(release) for release in mb_releases_browse.get('releases')]
return {
'paginate': paginate(total, limit, page),
'releases': releases
}
def get_artist_of_recording(mbid):
"""Obtiene el artista de una grabacion"""
_log.info('Obteniendo el artista de la recording %s', mbid)
mb_artist = cache.get_artist_of_recording(mbid)
if mb_artist:
return map_artist(mb_artist)
mb_artist_browse = mb.browse_artists(params={'recording': mbid},
includes=['tags'],
limit=1, offset=0)
if 'error' in mb_artist_browse:
_log.debug('Error al buscar %s', mb_artist_browse)
return mb_artist_browse
mb_artist = mb_artist_browse.get('artists')[0]
return map_artist(mb_artist)
##
# CoverArt
##
def get_cover_art_disc(mbid):
"""Obtiene el cover art de un disco"""
mb_covers = cache.get_cover_art_disc(mbid)
if not mb_covers:
mb_covers = mb.get_release_group_cover_art(mbid)
if 'error' in mb_covers:
_log.error('Error al cargar cover %s', mb_covers)
return None
return map_coverart(find_best_cover(mb_covers))
def get_cover_art_release(mbid):
"""Obtiene el cover art de una release"""
mb_covers = cache.get_cover_art_release(mbid)
if not mb_covers:
mb_covers = mb.get_release_cover_art(mbid)
if 'error' in mb_covers:
_log.error('Error al cargar cover %s', mb_covers)
return None
return map_coverart(find_best_cover(mb_covers))
##
# Busqueda
##
def search_artist(query, limit, page):
"""Busca un artista dada una query"""
mb_artists = mb.search_artist(query=query,
includes=['tags'],
limit=limit, offset=limit * (page - 1))
if 'error' in mb_artists:
return mb_artists
return {
'paginate': paginate(mb_artists['count'], limit, page),
'artists': [map_artist(artist) for artist in mb_artists['artists']]
}
def search_disc(query, limit, page):
"""Busca un disco dada una query"""
mb_discs = mb.search_release_group(query=query,
includes=['artist'],
limit=limit, offset=limit * (page - 1))
return {
'paginate': paginate(mb_discs['count'], limit, page),
'discs': [map_disc(disc) for disc in mb_discs['release_groups']]
}
def search_release(query, limit, page):
"""Busca una release dada una query"""
mb_releases = mb.search_release(query=query,
includes=['artist'],
limit=limit, offset=limit * (page - 1))
return {
'paginate': paginate(mb_releases['count'], limit, page),
'releases': [map_release(release) for release in mb_releases['releases']]
}
def search_recording(query, limit, page):
"""Busca una grabacion dada una query"""
mb_recording = mb.search_recording(query=query,
includes=['artist'],
limit=limit, offset=limit * (page - 1))
return {
'paginate': paginate(mb_recording['count'], limit, page),
'recordings': [map_recording(recording) for recording in mb_recording['recordings']]
}