A last.fm clone written in Elixir
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

267 lines
6.3 KiB

defmodule Dagon.Listens.Workers.ListenbrainzWorker do
use GenServer
require Logger
import Ecto.Query
alias Dagon.Repo
alias Dagon.Listens.Artists.Artist
alias Dagon.Listens.Albums.Album
alias Dagon.Listens.Tracks.Track
alias Dagon.Listens.Listens.Listen
@fetch_interval 30 * 1_000
@base_url "https://api.listenbrainz.org/1/user"
@listen_count 100
@msid_missing "MISSING"
def start_link(args) do
Logger.info("Starting Listenbrainz Worker..")
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
def fetch() do
GenServer.call(__MODULE__, :fetch)
end
def update_fetch_interval(interval_in_seconds) do
GenServer.cast(__MODULE__, {:update_fetch_interval, interval_in_seconds})
end
def get_state() do
GenServer.call(__MODULE__, :get_state)
end
def init(_state) do
state = %{
rate_limit: %{
total: -1,
remaining: -1
},
fetch_interval: @fetch_interval,
last_timestamp: 0,
updated_at: DateTime.utc_now()
}
schedule_fetch(state, 10_000)
{:ok, state}
end
def handle_call(:fetch, _from, state) do
new_state = do_fetch(state)
{:reply, :ok, new_state}
end
def handle_info(:fetch, state) do
new_state = do_fetch(state)
{:noreply, new_state}
end
def handle_info({:ssl_closed, _}, state) do
Logger.error("Temporary TLS error")
{:noreply, state}
end
def handle_cast({:update_fetch_interval, interval_in_seconds}, state) do
Logger.info("Updating fetch_interval to #{interval_in_seconds}s")
new_state = Map.replace!(state, :fetch_interval, interval_in_seconds * 1000)
{:noreply, new_state}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
def schedule_fetch(state) do
Logger.info("Scheduling Listenbrainz Worker..")
Process.send_after(self(), :fetch, state.fetch_interval)
end
def schedule_fetch(_state, wait_time) do
Logger.info("Scheduling Listenbrainz Worker..")
Process.send_after(self(), :fetch, wait_time)
end
def do_fetch(state) do
last_ts = last_listen_timestamp(state)
user = "inhji"
url = "#{@base_url}/#{user}/listens?min_ts=#{last_ts}&count=#{@listen_count}"
Logger.info("Fetching new Listens for Timestamp #{last_ts}:")
Logger.info(url)
response =
HTTPoison.get!(url, [{"User-Agent", Dagon.user_agent()}],
hackney: [pool: :listenbrainz_pool]
)
state =
case response do
%HTTPoison.Response{body: body, headers: headers} ->
newest_timestamp = handle_fetch_response(state, body)
rate_limit = Dagon.Listens.RateLimit.calculate(headers, "LB")
state =
state
|> Map.put(:rate_limit, rate_limit)
|> Map.put(:last_timestamp, newest_timestamp)
_ ->
state
end
schedule_fetch(state)
state = Map.replace!(state, :updated_at, DateTime.utc_now())
state
end
def handle_fetch_response(state, body) do
listens =
body
|> Jason.decode!(keys: :atoms)
|> Map.get(:payload)
|> Map.get(:listens)
listens
|> prepare_listens()
|> Enum.filter(fn l -> !is_nil(l) end)
|> Enum.each(fn changeset ->
Repo.insert(changeset, log: false)
end)
case Enum.empty?(listens) do
true ->
state.last_timestamp
false ->
listens
|> List.last()
|> Map.get(:listened_at)
end
end
def last_listen_timestamp(state) do
if state.last_timestamp > 0 do
state.last_timestamp
else
query =
from l in Dagon.Listens.Listens.Listen,
order_by: [desc: l.listened_at],
limit: 1
case Repo.one(query, log: false) do
nil -> 1
listen -> DateTime.to_unix(listen.listened_at)
end
end
end
def prepare_listens(listens) do
Enum.map(listens, &prepare_listen/1)
end
def prepare_listen(listen) do
meta = listen.track_metadata
info = meta.additional_info
with {:ok, artist} <- maybe_create_artist(meta.artist_name, info.artist_msid),
{:ok, album} <- maybe_create_album(meta.release_name, info.release_msid, artist),
{:ok, track} <- maybe_create_track(meta.track_name, artist, album) do
Logger.info("[Listen] #{meta.track_name}")
Listen.changeset(%Listen{}, %{
track: listen.track_metadata.track_name,
album_id: album.id,
artist_id: artist.id,
track_id: track.id,
listened_at: DateTime.from_unix!(listen.listened_at)
})
else
{:error, reason} ->
Logger.error(reason)
nil
{:warn, reason} ->
Logger.warn(reason)
nil
end
end
def maybe_create_artist(nil, messybrainz_id) do
Logger.warn("Artist name was nil, skipping.")
{:warn, :artist_name_nil}
end
def maybe_create_artist(name, messybrainz_id) do
artist =
case Repo.get_by(Artist, [msid: messybrainz_id], log: false) do
nil ->
Logger.info("[Artist] #{name}")
%Artist{}
|> Artist.changeset(%{
name: name,
msid: messybrainz_id || @msid_missing
})
|> Repo.insert!(log: false)
artist ->
artist
end
{:ok, artist}
end
def maybe_create_album(nil, messybrainz_id, artist) do
Logger.warn("Album name was nil, skipping.")
{:warn, :album_name_nil}
end
def maybe_create_album(name, messybrainz_id, artist) do
album =
case Repo.get_by(Album, [msid: messybrainz_id], log: false) do
nil ->
Logger.info("[Album] #{name}")
%Album{}
|> Album.changeset(%{
name: name,
msid: messybrainz_id || @msid_missing,
artist_id: artist.id
})
|> Repo.insert!(log: false)
album ->
album
end
{:ok, album}
end
def maybe_create_track(name, artist, album) do
track =
Track
|> Repo.get_by([name: name, artist_id: artist.id, album_id: album.id], log: false)
|> Repo.preload([:album, :artist], log: false)
track =
case track do
nil ->
Logger.info("[Track] #{name}")
%Track{}
|> Track.changeset(%{
name: name,
artist_id: artist.id,
album_id: album.id
})
|> Repo.insert!(log: false)
track ->
track
end
{:ok, track}
end
end