first commit
This commit is contained in:
commit
f2db1af089
9 changed files with 270 additions and 0 deletions
8
.env.example
Normal file
8
.env.example
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
SPOTIPY_CLIENT_ID=..?
|
||||
SPOTIPY_CLIENT_SECRET=..?
|
||||
SPOTIPY_REDIRECT_URI=http://127.0.0.1:8888/callback
|
||||
|
||||
SUBSONIC_URL=https://music.example.com
|
||||
SUBSONIC_USER=..?
|
||||
SUBSONIC_PASS=..?
|
||||
SUBSONIC_CLIENT=spotify-migrator
|
||||
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
.venv
|
||||
__pycache__
|
||||
.env
|
||||
.cache
|
||||
20
README.md
Normal file
20
README.md
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
# spotify2subsonic
|
||||
|
||||
move your spotify playlists & liked songs to subsonic
|
||||
42.5% written with devstral on my own computer
|
||||
28.75% written by me manually
|
||||
28.75% written by guessing spotify api (i didnt bother reading no docs)
|
||||
|
||||
works flawlessly btw
|
||||
|
||||
## how to use
|
||||
0. copy .env.example to .env
|
||||
1. make a spotify app in https://developer.spotify.com/dashboard
|
||||
```
|
||||
SPOTIPY_CLIENT_ID=..?
|
||||
SPOTIPY_CLIENT_SECRET=..?
|
||||
```
|
||||
1.1. insert it's client id and secret into .env
|
||||
2. insert your subsonic password & username into .env
|
||||
3. install requirements with `pip install -r requirements.txt`
|
||||
4. run python main.py
|
||||
124
main.py
Normal file
124
main.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from spotify import (
|
||||
spotify_client,
|
||||
get_playlists,
|
||||
get_playlist_tracks,
|
||||
get_liked_tracks,
|
||||
)
|
||||
|
||||
from subsonic import (
|
||||
search_song,
|
||||
create_playlist,
|
||||
add_to_playlist,
|
||||
get_playlist_by_name,
|
||||
get_playlist_songs,
|
||||
)
|
||||
|
||||
from matcher import make_query
|
||||
from utils import parse_selection
|
||||
|
||||
|
||||
LIKED_NAME = "Spotify – Liked Songs"
|
||||
|
||||
|
||||
def main():
|
||||
sp = spotify_client()
|
||||
playlists = get_playlists(sp)
|
||||
|
||||
print("\nAvailable sources:\n")
|
||||
|
||||
sources = []
|
||||
|
||||
# Normal playlists
|
||||
for i, p in enumerate(playlists, 1):
|
||||
print(f"{i:2d}. 📀 {p['name']}")
|
||||
sources.append(("playlist", p))
|
||||
|
||||
# Liked songs option
|
||||
liked_index = len(sources) + 1
|
||||
print(f"{liked_index:2d}. ❤️ Liked Songs")
|
||||
sources.append(("liked", None))
|
||||
|
||||
choice = input("\nSelect (e.g. 1,3,5 | 2-6 | all): ")
|
||||
selected_indices = parse_selection(choice, len(sources))
|
||||
|
||||
if not selected_indices:
|
||||
print("Nothing selected.")
|
||||
return
|
||||
|
||||
report = {}
|
||||
|
||||
for idx in selected_indices:
|
||||
source_type, payload = sources[idx]
|
||||
|
||||
if source_type == "liked":
|
||||
name = LIKED_NAME
|
||||
print(f"\n▶ Migrating: {name}")
|
||||
tracks = get_liked_tracks(sp)
|
||||
else:
|
||||
name = payload["name"]
|
||||
print(f"\n▶ Migrating playlist: {name}")
|
||||
tracks = get_playlist_tracks(sp, payload["id"])
|
||||
|
||||
existing = get_playlist_by_name(name)
|
||||
if existing:
|
||||
pid = existing["id"]
|
||||
existing_songs = get_playlist_songs(pid)
|
||||
print(" ↺ Playlist exists on Subsonic, retrying missing tracks")
|
||||
else:
|
||||
pid = create_playlist(name)
|
||||
existing_songs = set()
|
||||
print(" + Created new Subsonic playlist")
|
||||
|
||||
added_now = []
|
||||
skipped = []
|
||||
missing = []
|
||||
|
||||
for t in tracks:
|
||||
query = make_query(t)
|
||||
sid = search_song(query)
|
||||
|
||||
if not sid:
|
||||
missing.append(query)
|
||||
print(f" ✗ {query}")
|
||||
continue
|
||||
|
||||
if sid in existing_songs:
|
||||
skipped.append(query)
|
||||
print(f" ↷ Already exists: {query}")
|
||||
continue
|
||||
|
||||
added_now.append((query, sid))
|
||||
print(f" ✔ {query}")
|
||||
|
||||
if added_now:
|
||||
add_to_playlist(pid, [sid for _, sid in added_now])
|
||||
|
||||
report[name] = {
|
||||
"added": [q for q, _ in added_now],
|
||||
"skipped": skipped,
|
||||
"missing": missing,
|
||||
}
|
||||
|
||||
print_summary(report)
|
||||
|
||||
|
||||
def print_summary(report):
|
||||
print("\n===== Migration Summary =====\n")
|
||||
|
||||
for name, data in report.items():
|
||||
print(f"📀 {name}")
|
||||
print(f" ✔ Added now: {len(data['added'])}")
|
||||
print(f" ↷ Already existed: {len(data['skipped'])}")
|
||||
print(f" ✗ Still missing: {len(data['missing'])}")
|
||||
|
||||
if data["missing"]:
|
||||
for m in data["missing"]:
|
||||
print(f" - {m}")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
matcher.py
Normal file
4
matcher.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
def make_query(track):
|
||||
artist = track["artists"][0]["name"]
|
||||
title = track["name"]
|
||||
return f"{artist} {title}"
|
||||
8
requirements.txt
Normal file
8
requirements.txt
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
certifi==2025.11.12
|
||||
charset-normalizer==3.4.4
|
||||
idna==3.11
|
||||
python-dotenv==1.2.1
|
||||
redis==7.1.0
|
||||
requests==2.32.5
|
||||
spotipy==2.25.2
|
||||
urllib3==2.6.2
|
||||
33
spotify.py
Normal file
33
spotify.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
import spotipy
|
||||
from spotipy.oauth2 import SpotifyOAuth
|
||||
|
||||
def spotify_client():
|
||||
return spotipy.Spotify(auth_manager=SpotifyOAuth(
|
||||
scope="user-library-read playlist-read-private playlist-read-collaborative"
|
||||
))
|
||||
|
||||
def get_playlists(sp):
|
||||
return sp.current_user_playlists(limit=50)["items"]
|
||||
|
||||
def get_playlist_tracks(sp, playlist_id):
|
||||
tracks = []
|
||||
results = sp.playlist_items(playlist_id, additional_types=["track"])
|
||||
while results:
|
||||
for item in results["items"]:
|
||||
if item["track"]:
|
||||
tracks.append(item["track"])
|
||||
results = sp.next(results) if results["next"] else None
|
||||
return tracks
|
||||
|
||||
|
||||
def get_liked_tracks(sp):
|
||||
tracks = []
|
||||
results = sp.current_user_saved_tracks(limit=50)
|
||||
|
||||
while results:
|
||||
for item in results["items"]:
|
||||
if item["track"]:
|
||||
tracks.append(item["track"])
|
||||
results = sp.next(results) if results["next"] else None
|
||||
|
||||
return tracks
|
||||
55
subsonic.py
Normal file
55
subsonic.py
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
import requests
|
||||
import os
|
||||
import hashlib
|
||||
import random
|
||||
import string
|
||||
|
||||
BASE = os.environ["SUBSONIC_URL"] + "/rest"
|
||||
|
||||
def _auth_params():
|
||||
salt = "".join(random.choices(string.ascii_letters + string.digits, k=6))
|
||||
token = hashlib.md5((os.environ["SUBSONIC_PASS"] + salt).encode()).hexdigest()
|
||||
return {
|
||||
"u": os.environ["SUBSONIC_USER"],
|
||||
"t": token,
|
||||
"s": salt,
|
||||
"v": "1.16.1",
|
||||
"c": os.environ["SUBSONIC_CLIENT"],
|
||||
"f": "json",
|
||||
}
|
||||
|
||||
def _req(endpoint, **params):
|
||||
p = _auth_params()
|
||||
p.update(params)
|
||||
r = requests.get(f"{BASE}/{endpoint}.view", params=p)
|
||||
r.raise_for_status()
|
||||
return r.json()["subsonic-response"]
|
||||
|
||||
def search_song(query):
|
||||
r = _req("search3", query=query, songCount=1)
|
||||
songs = r.get("searchResult3", {}).get("song", [])
|
||||
return songs[0]["id"] if songs else None
|
||||
|
||||
def create_playlist(name):
|
||||
r = _req("createPlaylist", name=name)
|
||||
return r["playlist"]["id"]
|
||||
|
||||
def add_to_playlist(pid, song_ids):
|
||||
for sid in song_ids:
|
||||
_req("updatePlaylist", playlistId=pid, songIdToAdd=sid)
|
||||
|
||||
|
||||
def get_playlists():
|
||||
r = _req("getPlaylists")
|
||||
return r["playlists"]["playlist"]
|
||||
|
||||
def get_playlist_by_name(name):
|
||||
for p in get_playlists():
|
||||
if p["name"] == name:
|
||||
return p
|
||||
return None
|
||||
|
||||
def get_playlist_songs(pid):
|
||||
r = _req("getPlaylist", id=pid)
|
||||
entries = r["playlist"].get("entry", [])
|
||||
return {e["id"] for e in entries}
|
||||
14
utils.py
Normal file
14
utils.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
def parse_selection(inp, max_index):
|
||||
if inp.strip().lower() == "all":
|
||||
return list(range(max_index))
|
||||
|
||||
result = set()
|
||||
for part in inp.split(","):
|
||||
part = part.strip()
|
||||
if "-" in part:
|
||||
a, b = part.split("-")
|
||||
result.update(range(int(a) - 1, int(b)))
|
||||
else:
|
||||
result.add(int(part) - 1)
|
||||
|
||||
return sorted(i for i in result if 0 <= i < max_index)
|
||||
Loading…
Add table
Add a link
Reference in a new issue