Browse Source

feat(listens): listenbrainz worker

pull/37/head
Inhji Y. 1 year ago
parent
commit
ed2624f5ec
  1. 0
      apps/listens/lib/listens/albums.ex
  2. 0
      apps/listens/lib/listens/albums/album.ex
  3. 0
      apps/listens/lib/listens/albums/uploader.ex
  4. 14
      apps/listens/lib/listens/application.ex
  5. 0
      apps/listens/lib/listens/artists/artist.ex
  6. 0
      apps/listens/lib/listens/artists/uploader.ex
  7. 22
      apps/listens/lib/listens/cache.ex
  8. 0
      apps/listens/lib/listens/listens/listen.ex
  9. 25
      apps/listens/lib/listens/rate_limit.ex
  10. 0
      apps/listens/lib/listens/tracks/track.ex
  11. 50
      apps/listens/lib/listens/workers/listenbrainz.ex
  12. 164
      apps/listens/lib/listens/workers/listenbrainz/handler.ex
  13. 2
      apps/listens/mix.exs
  14. 5
      config/config.exs
  15. 5
      mix.lock

0
apps/listens/lib/albums.ex → apps/listens/lib/listens/albums.ex

0
apps/listens/lib/albums/album.ex → apps/listens/lib/listens/albums/album.ex

0
apps/listens/lib/albums/uploader.ex → apps/listens/lib/listens/albums/uploader.ex

14
apps/listens/lib/listens/application.ex

@ -0,0 +1,14 @@
defmodule Listens.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
import Cachex.Spec
children = [
worker(Cachex, [:listenbrainz, []], id: :listenbrainz)
]
Supervisor.start_link(children, strategy: :one_for_one, name: Listens.Supervisor)
end
end

0
apps/listens/lib/artists/artist.ex → apps/listens/lib/listens/artists/artist.ex

0
apps/listens/lib/artists/uploader.ex → apps/listens/lib/listens/artists/uploader.ex

22
apps/listens/lib/listens/cache.ex

@ -0,0 +1,22 @@
defmodule Listens.Cache do
def try_get(cache, key, default) do
case Cachex.exists?(cache, key) do
{:ok, true} ->
Cachex.get!(cache, key)
_ ->
Cachex.put(cache, key, default)
default
end
end
def try_put(cache, key, value) do
case Cachex.exists?(cache, key) do
{:ok, true} ->
Cachex.update(cache, key, value)
_ ->
Cachex.put(cache, key, value)
end
end
end

0
apps/listens/lib/listens/listen.ex → apps/listens/lib/listens/listens/listen.ex

25
apps/listens/lib/listens/rate_limit.ex

@ -0,0 +1,25 @@
defmodule Listens.RateLimit do
def calculate(headers, :listenbrainz) do
keyword_list = convert_to_keyword_list(headers)
%{
total: Keyword.get(keyword_list, :"X-RateLimit-Limit"),
remaining: Keyword.get(keyword_list, :"X-RateLimit-Remaining")
}
end
def calculate(headers, :discogs) do
keyword_list = convert_to_keyword_list(headers)
%{
total: Keyword.get(keyword_list, :"X-Discogs-Ratelimit"),
remaining: Keyword.get(keyword_list, :"X-Discogs-Ratelimit-Remaining")
}
end
defp convert_to_keyword_list(list) do
Enum.map(list, fn {name, value} ->
{String.to_atom(name), value}
end)
end
end

0
apps/listens/lib/tracks/track.ex → apps/listens/lib/listens/tracks/track.ex

50
apps/listens/lib/listens/workers/listenbrainz.ex

@ -0,0 +1,50 @@
defmodule Listens.Workers.Listenbrainz do
use Oban.Worker,
queue: :listens,
max_attempts: 5
require Logger
alias Listens.Workers.Listenbrainz.Handler
@base_url "https://api.listenbrainz.org/1/user"
@cache :listenbrainz
@default_listen_count 100
@last_listen_timestamp "last_listen_timestamp"
@fetch_listen_count "fetch_listen_count"
@rate_limit "rate_limit"
@impl Oban.Worker
def perform(%{"user" => user}, _job) do
count = Listens.Cache.try_get(@cache, @fetch_listen_count, @default_listen_count)
last_ts = Handler.last_listen_timestamp()
url = "#{@base_url}/#{user}/listens?min_ts=#{last_ts}&count=#{count}"
Logger.info("Fetching new Listens for Timestamp #{last_ts}:")
Logger.info(url)
case HTTPoison.get!(url) do
%HTTPoison.Response{body: body, headers: headers} ->
newest_timestamp =
body
|> Jason.decode!(keys: :atoms)
|> Map.get(:payload)
|> Map.get(:listens)
|> Handler.handle_fetch_response(last_ts)
rate_limit = Listens.RateLimit.calculate(headers, :listenbrainz)
Listens.Cache.try_put(@cache, @last_listen_timestamp, newest_timestamp)
Listens.Cache.try_put(@cache, @rate_limit, rate_limit)
error ->
Logger.error(inspect(error))
end
Listens.Cache.try_put(@cache, "updated_at", DateTime.utc_now())
:ok
end
end

164
apps/listens/lib/listens/workers/listenbrainz/handler.ex

@ -0,0 +1,164 @@
defmodule Listens.Workers.Listenbrainz.Handler do
import Ecto.Query
require Logger
alias Db.Repo
alias Listens.Artists.Artist
alias Listens.Albums.Album
alias Listens.Tracks.Track
alias Listens.Listens.Listen
@msid_missing "MISSING"
@cache :listenbrainz
@last_listen_timestamp "last_listen_timestamp"
def handle_fetch_response(listens, last_ts) do
listens
|> prepare_listens()
|> Enum.filter(fn l -> !is_nil(l) end)
|> Enum.each(fn changeset ->
Repo.insert(changeset, log: false)
end)
case Enum.empty?(listens) do
true ->
last_ts
false ->
listens
|> List.last()
|> Map.get(:listened_at)
end
end
def last_listen_timestamp() do
case Cachex.exists?(@cache, @last_listen_timestamp) do
{:ok, true} ->
Cachex.get!(@cache, @last_listen_timestamp)
_ ->
query =
from l in Listen,
order_by: [desc: l.listened_at],
limit: 1
ts =
case Repo.one(query, log: false) do
nil -> 1
listen -> DateTime.to_unix(listen.listened_at)
end
Cachex.put(@cache, @last_listen_timestamp, ts)
ts
end
end
def prepare_listens(listens) do
Enum.map(listens, &prepare_listen/1)
end
def prepare_listen(listen) do
meta = listen.track_metadata
info = meta.additional_info
with {:ok, artist} <- maybe_create_artist(meta.artist_name, info.artist_msid),
{:ok, album} <- maybe_create_album(meta.release_name, info.release_msid, artist),
{:ok, track} <- maybe_create_track(meta.track_name, artist, album) do
Logger.info("[Listen] #{meta.track_name}")
Listen.changeset(%Listen{}, %{
track: listen.track_metadata.track_name,
album_id: album.id,
artist_id: artist.id,
track_id: track.id,
listened_at: DateTime.from_unix!(listen.listened_at)
})
else
{:error, reason} ->
Logger.error(reason)
nil
{:warn, reason} ->
Logger.warn(reason)
nil
end
end
def maybe_create_artist(nil, _messybrainz_id) do
Logger.warn("Artist name was nil, skipping.")
{:warn, "artist_name_nil"}
end
def maybe_create_artist(name, messybrainz_id) do
artist =
case Repo.get_by(Artist, [msid: messybrainz_id], log: false) do
nil ->
Logger.info("[Artist] #{name}")
%Artist{}
|> Artist.changeset(%{
name: name,
msid: messybrainz_id || @msid_missing
})
|> Repo.insert!(log: false)
artist ->
artist
end
{:ok, artist}
end
def maybe_create_album(nil, _messybrainz_id, _artist) do
Logger.warn("Album name was nil, skipping.")
{:warn, "album_name_nil"}
end
def maybe_create_album(name, messybrainz_id, artist) do
album =
case Repo.get_by(Album, [msid: messybrainz_id], log: false) do
nil ->
Logger.info("[Album] #{name}")
%Album{}
|> Album.changeset(%{
name: name,
msid: messybrainz_id || @msid_missing,
artist_id: artist.id
})
|> Repo.insert!(log: false)
album ->
album
end
{:ok, album}
end
def maybe_create_track(name, artist, album) do
track =
Track
|> Repo.get_by([name: name, artist_id: artist.id, album_id: album.id], log: false)
|> Repo.preload([:album, :artist], log: false)
track =
case track do
nil ->
Logger.info("[Track] #{name}")
%Track{}
|> Track.changeset(%{
name: name,
artist_id: artist.id,
album_id: album.id
})
|> Repo.insert!(log: false)
track ->
track
end
{:ok, track}
end
end

2
apps/listens/mix.exs

@ -18,6 +18,7 @@ defmodule Listens.MixProject do
# Run "mix help compile.app" to learn about applications.
def application do
[
mod: {Listens.Application, []},
extra_applications: [:logger]
]
end
@ -25,6 +26,7 @@ defmodule Listens.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:cachex, "~> 3.2"},
{:waffle, "~> 1.0.1"},
{:waffle_ecto, "~> 0.0.8"},
{:ecto_sql, "~> 3.1"},

5
config/config.exs

@ -19,7 +19,10 @@ config :tomie,
config :tomie, Oban,
repo: Db.Repo,
prune: {:maxlen, 10_000},
queues: [default: 10]
queues: [default: 10, listens: 5],
crontab: [
{"* * * * *", Listens.Workers.Listenbrainz, args: %{user: "inhji"}}
]
config :tomie_web,
ecto_repos: [Db.Repo],

5
mix.lock

@ -1,5 +1,6 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"cachex": {:hex, :cachex, "3.2.0", "a596476c781b0646e6cb5cd9751af2e2974c3e0d5498a8cab71807618b74fe2f", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "aef93694067a43697ae0531727e097754a9e992a1e7946296f5969d6dd9ac986"},
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "805abd97539caf89ec6d4732c91e62ba9da0cda51ac462380bbd28ee697a8c42"},
"combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
@ -12,6 +13,7 @@
"ecto": {:hex, :ecto, "3.4.2", "6890af71025769bd27ef62b1ed1925cfe23f7f0460bcb3041da4b705215ff23e", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3959b8a83e086202a4bd86b4b5e6e71f9f1840813de14a57d502d3fc2ef7132"},
"ecto_autoslug_field": {:hex, :ecto_autoslug_field, "2.0.1", "2177c1c253f6dd3efd4b56d1cb76104d0a6ef044c6b9a7a0ad6d32665c4111e5", [:mix], [{:ecto, ">= 2.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:slugger, ">= 0.2.0", [hex: :slugger, repo: "hexpm", optional: false]}], "hexpm", "a3cc73211f2e75b89a03332183812ebe1ac08be2e25a1df5aa3d1422f92c45c3"},
"ecto_sql": {:hex, :ecto_sql, "3.4.2", "3d842665a81ba2137b62aa70151afe81dae44824cd09b2076a255937ab4e2dc9", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.3.0 or ~> 0.4.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.0", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f2b064102467e1525314a464b6fea0707ff28ee132a15006727ccf51b73492ff"},
"eternal": {:hex, :eternal, "1.2.1", "d5b6b2499ba876c57be2581b5b999ee9bdf861c647401066d3eeed111d096bc4", [:mix], [], "hexpm", "b14f1dc204321429479c569cfbe8fb287541184ed040956c8862cb7a677b8406"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
"ex_utils": {:hex, :ex_utils, "0.1.7", "2c133e0bcdc49a858cf8dacf893308ebc05bc5fba501dc3d2935e65365ec0bf3", [:mix], [], "hexpm", "66d4fe75285948f2d1e69c2a5ddd651c398c813574f8d36a9eef11dc20356ef6"},
"excoveralls": {:hex, :excoveralls, "0.12.3", "2142be7cb978a3ae78385487edda6d1aff0e482ffc6123877bb7270a8ffbcfe0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "568a3e616c264283f5dea5b020783ae40eef3f7ee2163f7a67cbd7b35bcadada"},
@ -26,6 +28,7 @@
"httpoison": {:hex, :httpoison, "1.6.2", "ace7c8d3a361cebccbed19c283c349b3d26991eff73a1eaaa8abae2e3c8089b6", [:mix], [{:hackney, "~> 1.15 and >= 1.15.2", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "aa2c74bd271af34239a3948779612f87df2422c2fdcfdbcec28d9c105f0773fe"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"},
"jason": {:hex, :jason, "1.2.0", "10043418c42d2493d0ee212d3fddd25d7ffe484380afad769a0a38795938e448", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "116747dbe057794c3a3e4e143b7c8390b29f634e16c78a7f59ba75bfa6852e7f"},
"jumper": {:hex, :jumper, "1.0.1", "3c00542ef1a83532b72269fab9f0f0c82bf23a35e27d278bfd9ed0865cecabff", [:mix], [], "hexpm", "318c59078ac220e966d27af3646026db9b5a5e6703cb2aa3e26bcfaba65b7433"},
"makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm", "d34f013c156db51ad57cc556891b9720e6a1c1df5fe2e15af999c84d6cebeb1a"},
@ -54,6 +57,7 @@
"pow": {:hex, :pow, "1.0.20", "b99993811af5233681bfc521e81ca706d25a56f2be54bad6424db327ce840ab9", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.3.0 and < 1.6.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, ">= 2.0.0 and <= 3.0.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:plug, ">= 1.5.0 and < 2.0.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "4b6bd271399ccb353abbdbdc316199fe7fd7ae36bbf47059d53e366831c34fc8"},
"que": {:hex, :que, "0.10.1", "788ed0ec92ed69bdf9cfb29bf41a94ca6355b8d44959bd0669cf706e557ac891", [:mix], [{:ex_utils, "~> 0.1.6", [hex: :ex_utils, repo: "hexpm", optional: false]}, {:memento, "~> 0.3.0", [hex: :memento, repo: "hexpm", optional: false]}], "hexpm", "a737b365253e75dbd24b2d51acc1d851049e87baae08cd0c94e2bc5cd65088d5"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"sleeplocks": {:hex, :sleeplocks, "1.1.1", "3d462a0639a6ef36cc75d6038b7393ae537ab394641beb59830a1b8271faeed3", [:rebar3], [], "hexpm", "84ee37aeff4d0d92b290fff986d6a95ac5eedf9b383fadfd1d88e9b84a1c02e1"},
"slugger": {:hex, :slugger, "0.3.0", "efc667ab99eee19a48913ccf3d038b1fb9f165fa4fbf093be898b8099e61b6ed", [:mix], [], "hexpm", "20d0ded0e712605d1eae6c5b4889581c3460d92623a930ddda91e0e609b5afba"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
@ -62,6 +66,7 @@
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "f354efb2400dd7a80fd9eb6c8419068c4f632da4ac47f3d8822d6e33f08bc852"},
"tzdata": {:hex, :tzdata, "1.0.3", "73470ad29dde46e350c60a66e6b360d3b99d2d18b74c4c349dbebbc27a09a3eb", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a6e1ee7003c4d04ecbd21dd3ec690d4c6662db5d3bbdd7262d53cdf5e7c746c1"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
"unsafe": {:hex, :unsafe, "1.0.1", "a27e1874f72ee49312e0a9ec2e0b27924214a05e3ddac90e91727bc76f8613d8", [:mix], [], "hexpm", "6c7729a2d214806450d29766abc2afaa7a2cbecf415be64f36a6691afebb50e5"},
"waffle": {:hex, :waffle, "1.0.1", "2eb6d14866b07717fc1450ee2e7a4a3583660fc62a7ee1984b8e75c642c09d2a", [:mix], [{:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:ex_aws_s3, "~> 2.0", [hex: :ex_aws_s3, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "36a7303fd6014dfce83c1b732219a51607a7469c4b4d6b47a47e0600dc7c01dc"},
"waffle_ecto": {:hex, :waffle_ecto, "0.0.8", "a89273d09b27cfd534c0d99265cbc8dd14e6f8d74e568a0a353f29b0c79be926", [:mix], [{:ecto, ">= 2.1.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:waffle, "~> 1.0.0", [hex: :waffle, repo: "hexpm", optional: false]}], "hexpm", "dcc6170b3f75c9c9db1b009dff80bd4157503f1638d20331c1814595515008d5"},
}
Loading…
Cancel
Save