""" Mediador entre musicbrainz y las urls Su objetivo es modificar los datos que entrega musicbrainz para que sean correspondientes con lo que debe entregar la api, se encarga de casos como traducción. """ from math import ceil from country_list import countries_for_language from utils import parallel_map import fetcher.musicbrainz as mb def full_country_name(country_code): """Obtiene el nombre de un pais en español dado su codigo de pais""" return dict(countries_for_language('es')).get(country_code, country_code) def translate_artist_type(artist_type): """Traduce los tipos de artista a español""" translation = { 'Person': 'Persona', 'Group': 'Grupo', 'Orchestra': 'Orquesta', 'Choir': 'Coro', 'Character': 'Personaje', 'Other': 'Otro', } return translation.get(artist_type, artist_type) def map_artist(mb_artist): """Mapea el modelo de artista entregado por musicbrainz a uno propio""" artist = { 'id': mb_artist.get('id'), 'name': mb_artist.get('name'), 'sort_name': mb_artist.get('sort_name'), 'disambiguation': mb_artist.get('disambiguation'), 'type': translate_artist_type(mb_artist.get('type')), 'country': full_country_name(mb_artist.get('country')), 'tags': sorted(mb_artist.get('tags', []), key=lambda tag: tag['count'], reverse=True), } return artist def map_artist_credit(mb_artist_credit): """Mapea el modelo de credito a artista entregado por musicbrainz a uno propio""" return { 'id': mb_artist_credit.get('artist').get('id'), 'name': mb_artist_credit.get('artist').get('name'), 'sort_name': mb_artist_credit.get('artist').get('sort_name'), 'disambiguation': mb_artist_credit.get('artist').get('disambiguation'), } def map_disc(mb_disc): """Mapea el modelo de disco entregado por musicbrainz a uno propio""" disc = { 'id': mb_disc.get('id'), 'title': mb_disc.get('title'), 'disambiguation': mb_disc.get('disambiguation'), 'first_release_date': mb_disc.get('first_release_date'), 'primary_type': mb_disc.get('primary_type'), 'cover_art': get_cover_art_disc(mb_disc.get('id')), 'artist': [map_artist_credit(c) for c in mb_disc.get('artist_credit')] } if len(mb_disc.get('secondary_types', [])) > 0: disc['secondary_type'] = mb_disc['secondary_types'][0] return disc def map_release(mb_release): """Mapea el modelo de release entregado por musicbrainz a uno propio""" return { 'id': mb_release.get('id'), 'title': mb_release.get('title'), 'disambiguation': mb_release.get('disambiguation'), 'status': mb_release.get('status'), 'country': mb_release.get('country'), 'date': mb_release.get('date'), 'cover_art': get_cover_art_release(mb_release.get('id')), 'artist': [map_artist_credit(c) for c in mb_release.get('artist_credit')] } def map_recording(mb_recording): """Mapea el modelo de recording entregado por musicbrainz a uno propio""" return { 'id': mb_recording.get('id'), 'title': mb_recording.get('title'), 'disambiguation': mb_recording.get('disambiguation'), 'length': mb_recording.get('length'), 'artist': [map_artist_credit(c) for c in mb_recording.get('artist_credit')] } def map_coverart(mb_cover): """Mapea el modelo de coverart entregado por musicbrainz a uno propio""" return { 'image': mb_cover.get('image'), '1200': mb_cover.get('thumbnails', {}).get('1200'), '500': mb_cover.get('thumbnails', {}).get('500'), '250': mb_cover.get('thumbnails', {}).get('250'), 'large': mb_cover.get('thumbnails', {}).get('large'), 'small': mb_cover.get('thumbnails', {}).get('small'), } def find_best_cover(mb_covers): """Intenta obtener la cover art mas apropiada a partir de una lista de estas""" only_aproved_front = [x for x in mb_covers.get('images') if x.get('approved', False) and x.get('front', False) and not x.get('back', False)] if len(only_aproved_front) > 0: return only_aproved_front[0] only_aproved = [x for x in mb_covers.get('images') if x.get('approved', False)] if len(only_aproved) > 0: return only_aproved[0] return mb_covers.get('images')[0] def paginate(count, limit, page): """Crea un modelo de paginado a partir de la cantidad de elementos, el limite de elementos y la pagina actual""" return { 'total': count, 'current_page': page, 'last_page': ceil(count / limit), 'per_page': limit, } def get_artist(mbid): """Obtiene un artista desde musicbrainz incluyendo sus tags""" mb_artist = mb.get_artist_by_mbid(mbid, includes=['tags']) if 'error' in mb_artist: return mb_artist return map_artist(mb_artist) def get_disc(mbid): """Obtiene un disco desde musicbrainz incluyendo sus artistas""" mb_disc = mb.get_release_group_by_mbid(mbid, includes=['artists']) if 'error' in mb_disc: return mb_disc return map_disc(mb_disc) def get_discs_of_artist(mbid, limit, page): """Obtiene los discos de un artista desde musicbrainz incluyendo los creditos al artista""" mb_discs = mb.browse_release_groups(params={'artist': mbid}, includes=['artist-credits'], limit=limit, offset=limit * (page - 1)) if 'error' in mb_discs: return mb_discs return { 'paginate': paginate(mb_discs['release_group_count'], limit, page), 'discs': parallel_map(mb_discs['release_groups'], map_disc) } def get_artist_of_disc(mbid, limit, page): """Obtiene el artista de un disco""" mb_artists = mb.browse_artists(params={'disc': mbid}, limit=limit, offset=limit * (page - 1)) if 'error' in mb_artists: return mb_artists return { 'paginate': paginate(mb_artists.get('artist_count', 0), limit, page), 'artists': parallel_map(mb_artists['artists'], map_artist) } def get_release(mbid): """Obtiene una release desde musicbrainz incluyendo sus artistas""" mb_release = mb.get_release_by_mbid(mbid, includes=['artists']) if 'error' in mb_release: return mb_release return map_release(mb_release) def get_releases_of_disc(mbid, limit, page): """Obtiene las releases de un disco desde musicbrainz incluyendo los creditos a su artistas""" mb_releases = mb.browse_releases(params={'release-group': mbid}, includes=['artist-credits'], limit=limit, offset=limit * (page - 1)) if 'error' in mb_releases: return mb_releases return { 'paginate': paginate(mb_releases.get('release_count', 0), limit, page), 'releases': parallel_map(mb_releases.get('releases'), map_release) } def get_artist_of_release(mbid, limit, page): """Obtiene el artista de una release""" mb_artists = mb.browse_artists(params={'release': mbid}, limit=limit, offset=limit * (page - 1)) if 'error' in mb_artists: return mb_artists return { 'paginate': paginate(mb_artists.get('artist_count', 0), limit, page), 'artists': parallel_map(mb_artists.get('artists'), map_artist) } def get_recording(mbid): """Obtiene una grabacion incluyendo a su artista""" mb_recording = mb.get_recording_by_mbid(mbid, includes=['artists']) if 'error' in mb_recording: return mb_recording return map_recording(mb_recording) def get_recordings_of_release(mbid, limit, page): """Obtiene las grabaciones de una release incluyendo los creditos a su artista""" mb_recordings = mb.browse_recordings(params={'release': mbid}, includes=['artist-credits'], limit=limit, offset=limit * (page - 1)) if 'error' in mb_recordings: return mb_recordings return { 'paginate': paginate(mb_recordings.get('recording_count', 0), limit, page), 'recordings': parallel_map(mb_recordings['recordings'], map_recording) } def get_release_of_recording(mbid, limit, page): """Obtiene la release de una grabacion incluyendo los creditos a su artista""" mb_releases = mb.browse_releases(params={'recording': mbid}, includes=['artists-credits'], limit=limit, offset=limit * (page - 1)) if 'error' in mb_releases: return mb_releases return { 'paginate': paginate(mb_releases.get('release_count', 0), limit, page), 'releases': parallel_map(mb_releases.get('releases'), map_release) } def get_artist_of_recording(mbid, limit, page): """Obtiene el artista de una grabacion""" mb_artists = mb.browse_artists(params={'recording': mbid}, limit=limit, offset=limit * (page - 1)) if 'error' in mb_artists: return mb_artists return { 'paginate': paginate(mb_artists.get('artist_count', 0), limit, page), 'artists': parallel_map(mb_artists['artists'], map_artist) } def get_cover_art_disc(mbid): """Obtiene el cover art de un disco""" mb_covers = mb.get_release_group_cover_art(mbid) if 'error' in mb_covers: return None if len(mb_covers.get('images', [])) == 1: return map_coverart(mb_covers.get('images')[0]) return map_coverart(find_best_cover(mb_covers)) def get_cover_art_release(mbid): """Obtiene el cover art de una release""" mb_covers = mb.get_release_cover_art(mbid) if 'error' in mb_covers: return None if len(mb_covers.get('images', [])) == 1: return map_coverart(mb_covers.get('images')[0]) return map_coverart(find_best_cover(mb_covers)) def get_cover_art_recording(mbid): """Obtiene el cover art de una grabacion""" release = get_release_of_recording(mbid, limit=1, page=1) if 'error' in release: return None return get_cover_art_release(release['releases'][0]['id']) def search_artist(query, limit, page): """Busca un artista dada una query""" mb_artists = mb.search_artist(query=query, limit=limit, offset=limit * (page - 1)) if 'error' in mb_artists: return mb_artists return { 'paginate': paginate(mb_artists['count'], limit, page), 'artists': parallel_map(mb_artists['artists'], map_artist) } def search_disc(query, limit, page): """Busca un disco dada una query""" mb_discs = mb.search_release_group(query=query, includes=['artist'], limit=limit, offset=limit * (page - 1)) return { 'paginate': paginate(mb_discs['count'], limit, page), 'discs': parallel_map(mb_discs['release_groups'], map_disc) } def search_release(query, limit, page): """Busca una release dada una query""" mb_releases = mb.search_release(query=query, includes=['artist'], limit=limit, offset=limit * (page - 1)) return { 'paginate': paginate(mb_releases['count'], limit, page), 'releases': parallel_map(mb_releases['releases'], map_release) } def search_recording(query, limit, page): """Busca una grabacion dada una query""" mb_recording = mb.search_recording(query=query, includes=['artist'], limit=limit, offset=limit * (page - 1)) return { 'paginate': paginate(mb_recording['count'], limit, page), 'recordings': parallel_map(mb_recording['recordings'], map_recording) }