335 lines
10 KiB
Python
335 lines
10 KiB
Python
from __future__ import with_statement
|
|
import os
|
|
import tempfile
|
|
import itertools
|
|
from ConfigParser import SafeConfigParser
|
|
import pkg_resources
|
|
import scapi
|
|
import scapi.authentication
|
|
import logging
|
|
import webbrowser
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
_logger = logging.getLogger("scapi")
|
|
#_logger.setLevel(logging.DEBUG)
|
|
|
|
RUN_INTERACTIVE_TESTS = False
|
|
USE_OAUTH = True
|
|
|
|
TOKEN = "FjNE9aRTg8kpxuOjzwsX8Q"
|
|
SECRET = "NP5PGoyKcQv64E0aZgV4CRNzHfPwR4QghrWoqEgEE"
|
|
CONSUMER = "EEi2URUfM97pAAxHTogDpQ"
|
|
CONSUMER_SECRET = "NFYd8T3i4jVKGZ9TMy9LHaBQB3Sh8V5sxBiMeMZBow"
|
|
API_HOST = "api.soundcloud.dev:3000"
|
|
USER = ""
|
|
PASSWORD = ""
|
|
|
|
CONFIG_NAME = "soundcloud.cfg"
|
|
|
|
CONNECTOR = None
|
|
ROOT = None
|
|
def setup():
|
|
global CONNECTOR, ROOT
|
|
# load_config()
|
|
#scapi.ApiConnector(host='192.168.2.101:3000', user='tiga', password='test')
|
|
#scapi.ApiConnector(host='sandbox-api.soundcloud.com:3030', user='tiga', password='test')
|
|
scapi.USE_PROXY = False
|
|
scapi.PROXY = 'http://127.0.0.1:10000/'
|
|
|
|
if USE_OAUTH:
|
|
authenticator = scapi.authentication.OAuthAuthenticator(CONSUMER,
|
|
CONSUMER_SECRET,
|
|
TOKEN,
|
|
SECRET)
|
|
else:
|
|
authenticator = scapi.authentication.BasicAuthenticator(USER, PASSWORD, CONSUMER, CONSUMER_SECRET)
|
|
|
|
logger.debug("API_HOST: %s", API_HOST)
|
|
CONNECTOR = scapi.ApiConnector(host=API_HOST,
|
|
authenticator=authenticator)
|
|
ROOT = scapi.Scope(CONNECTOR)
|
|
|
|
def load_config(config_name=None):
|
|
global TOKEN, SECRET, CONSUMER_SECRET, CONSUMER, API_HOST, USER, PASSWORD
|
|
if config_name is None:
|
|
config_name = CONFIG_NAME
|
|
parser = SafeConfigParser()
|
|
current = os.getcwd()
|
|
while current:
|
|
name = os.path.join(current, config_name)
|
|
if os.path.exists(name):
|
|
parser.read([name])
|
|
TOKEN = parser.get('global', 'accesstoken')
|
|
SECRET = parser.get('global', 'accesstoken_secret')
|
|
CONSUMER = parser.get('global', 'consumer')
|
|
CONSUMER_SECRET = parser.get('global', 'consumer_secret')
|
|
API_HOST = parser.get('global', 'host')
|
|
USER = parser.get('global', 'user')
|
|
PASSWORD = parser.get('global', 'password')
|
|
logger.debug("token: %s", TOKEN)
|
|
logger.debug("secret: %s", SECRET)
|
|
logger.debug("consumer: %s", CONSUMER)
|
|
logger.debug("consumer_secret: %s", CONSUMER_SECRET)
|
|
logger.debug("user: %s", USER)
|
|
logger.debug("password: %s", PASSWORD)
|
|
logger.debug("host: %s", API_HOST)
|
|
break
|
|
new_current = os.path.dirname(current)
|
|
if new_current == current:
|
|
break
|
|
current = new_current
|
|
|
|
|
|
def test_load_config():
|
|
base = tempfile.mkdtemp()
|
|
oldcwd = os.getcwd()
|
|
cdir = os.path.join(base, "foo")
|
|
os.mkdir(cdir)
|
|
os.chdir(cdir)
|
|
test_config = """
|
|
[global]
|
|
host=host
|
|
consumer=consumer
|
|
consumer_secret=consumer_secret
|
|
accesstoken=accesstoken
|
|
accesstoken_secret=accesstoken_secret
|
|
user=user
|
|
password=password
|
|
"""
|
|
with open(os.path.join(base, CONFIG_NAME), "w") as cf:
|
|
cf.write(test_config)
|
|
load_config()
|
|
assert TOKEN == "accesstoken" and SECRET == "accesstoken_secret" and API_HOST == 'host'
|
|
assert CONSUMER == "consumer" and CONSUMER_SECRET == "consumer_secret"
|
|
assert USER == "user" and PASSWORD == "password"
|
|
os.chdir(oldcwd)
|
|
load_config()
|
|
|
|
|
|
def test_connect():
|
|
sca = ROOT
|
|
quite_a_few_users = list(itertools.islice(sca.users(), 0, 127))
|
|
|
|
logger.debug(quite_a_few_users)
|
|
assert isinstance(quite_a_few_users, list) and isinstance(quite_a_few_users[0], scapi.User)
|
|
user = sca.me()
|
|
logger.debug(user)
|
|
assert isinstance(user, scapi.User)
|
|
contacts = list(user.contacts())
|
|
assert isinstance(contacts, list)
|
|
assert isinstance(contacts[0], scapi.User)
|
|
logger.debug(contacts)
|
|
tracks = list(user.tracks())
|
|
assert isinstance(tracks, list)
|
|
assert isinstance(tracks[0], scapi.Track)
|
|
logger.debug(tracks)
|
|
|
|
|
|
def test_access_token_acquisition():
|
|
"""
|
|
This test is commented out because it needs user-interaction.
|
|
"""
|
|
if not RUN_INTERACTIVE_TESTS:
|
|
return
|
|
oauth_authenticator = scapi.authentication.OAuthAuthenticator(CONSUMER,
|
|
CONSUMER_SECRET,
|
|
None,
|
|
None)
|
|
|
|
sca = scapi.ApiConnector(host=API_HOST, authenticator=oauth_authenticator)
|
|
token, secret = sca.fetch_request_token()
|
|
authorization_url = sca.get_request_token_authorization_url(token)
|
|
webbrowser.open(authorization_url)
|
|
raw_input("please press return")
|
|
oauth_authenticator = scapi.authentication.OAuthAuthenticator(CONSUMER,
|
|
CONSUMER_SECRET,
|
|
token,
|
|
secret)
|
|
|
|
sca = scapi.ApiConnector(API_HOST, authenticator=oauth_authenticator)
|
|
token, secret = sca.fetch_access_token()
|
|
logger.info("Access token: '%s'", token)
|
|
logger.info("Access token secret: '%s'", secret)
|
|
oauth_authenticator = scapi.authentication.OAuthAuthenticator(CONSUMER,
|
|
CONSUMER_SECRET,
|
|
token,
|
|
secret)
|
|
|
|
sca = scapi.ApiConnector(API_HOST, authenticator=oauth_authenticator)
|
|
test_track_creation()
|
|
|
|
def test_track_creation():
|
|
sca = ROOT
|
|
track = sca.Track.new(title='bar')
|
|
assert isinstance(track, scapi.Track)
|
|
|
|
def test_track_update():
|
|
sca = ROOT
|
|
track = sca.Track.new(title='bar')
|
|
assert isinstance(track, scapi.Track)
|
|
track.title='baz'
|
|
track = sca.Track.get(track.id)
|
|
assert track.title == "baz"
|
|
|
|
def test_scoped_track_creation():
|
|
sca = ROOT
|
|
user = sca.me()
|
|
track = user.tracks.new(title="bar")
|
|
assert isinstance(track, scapi.Track)
|
|
|
|
def test_upload():
|
|
assert pkg_resources.resource_exists("scapi.tests.test_connect", "knaster.mp3")
|
|
data = pkg_resources.resource_stream("scapi.tests.test_connect", "knaster.mp3")
|
|
sca = ROOT
|
|
user = sca.me()
|
|
logger.debug(user)
|
|
asset = sca.assets.new(filedata=data)
|
|
assert isinstance(asset, scapi.Asset)
|
|
logger.debug(asset)
|
|
tracks = list(user.tracks())
|
|
track = tracks[0]
|
|
track.assets.append(asset)
|
|
|
|
def test_contact_list():
|
|
sca = ROOT
|
|
user = sca.me()
|
|
contacts = list(user.contacts())
|
|
assert isinstance(contacts, list)
|
|
assert isinstance(contacts[0], scapi.User)
|
|
|
|
def test_permissions():
|
|
sca = ROOT
|
|
user = sca.me()
|
|
tracks = itertools.islice(user.tracks(), 1)
|
|
for track in tracks:
|
|
permissions = list(track.permissions())
|
|
logger.debug(permissions)
|
|
assert isinstance(permissions, list)
|
|
if permissions:
|
|
assert isinstance(permissions[0], scapi.User)
|
|
|
|
def test_setting_permissions():
|
|
sca = ROOT
|
|
me = sca.me()
|
|
track = sca.Track.new(title='bar', sharing="private")
|
|
assert track.sharing == "private"
|
|
users = itertools.islice(sca.users(), 10)
|
|
users_to_set = [user for user in users if user != me]
|
|
assert users_to_set, "Didn't find any suitable users"
|
|
track.permissions = users_to_set
|
|
assert set(track.permissions()) == set(users_to_set)
|
|
|
|
def test_setting_comments():
|
|
sca = ROOT
|
|
user = sca.me()
|
|
track = sca.Track.new(title='bar', sharing="private")
|
|
comment = sca.Comment.create(body="This is the body of my comment", timestamp=10)
|
|
track.comments = comment
|
|
assert track.comments().next().body == comment.body
|
|
|
|
|
|
def test_setting_comments_the_way_shawn_says_its_correct():
|
|
sca = ROOT
|
|
track = sca.Track.new(title='bar', sharing="private")
|
|
cbody = "This is the body of my comment"
|
|
track.comments.new(body=cbody, timestamp=10)
|
|
assert list(track.comments())[0].body == cbody
|
|
|
|
def test_contact_add_and_removal():
|
|
sca = ROOT
|
|
me = sca.me()
|
|
for user in sca.users():
|
|
if user != me:
|
|
user_to_set = user
|
|
break
|
|
|
|
contacts = list(me.contacts())
|
|
if user_to_set in contacts:
|
|
me.contacts.remove(user_to_set)
|
|
|
|
me.contacts.append(user_to_set)
|
|
|
|
contacts = list(me.contacts() )
|
|
assert user_to_set.id in [c.id for c in contacts]
|
|
|
|
me.contacts.remove(user_to_set)
|
|
|
|
contacts = list(me.contacts() )
|
|
assert user_to_set not in contacts
|
|
|
|
|
|
def test_favorites():
|
|
sca = ROOT
|
|
me = sca.me()
|
|
|
|
favorites = list(me.favorites())
|
|
assert favorites == [] or isinstance(favorites[0], scapi.Track)
|
|
|
|
track = None
|
|
for user in sca.users():
|
|
if user == me:
|
|
continue
|
|
for track in user.tracks():
|
|
break
|
|
if track is not None:
|
|
break
|
|
|
|
me.favorites.append(track)
|
|
|
|
favorites = list(me.favorites())
|
|
assert track in favorites
|
|
|
|
me.favorites.remove(track)
|
|
|
|
favorites = list(me.favorites())
|
|
assert track not in favorites
|
|
|
|
def test_large_list():
|
|
sca = ROOT
|
|
tracks = list(sca.tracks())
|
|
if len(tracks) < scapi.ApiConnector.LIST_LIMIT:
|
|
for i in xrange(scapi.ApiConnector.LIST_LIMIT):
|
|
scapi.Track.new(title='test_track_%i' % i)
|
|
all_tracks = sca.tracks()
|
|
assert not isinstance(all_tracks, list)
|
|
all_tracks = list(all_tracks)
|
|
assert len(all_tracks) > scapi.ApiConnector.LIST_LIMIT
|
|
|
|
|
|
def test_events():
|
|
events = list(ROOT.events())
|
|
assert isinstance(events, list)
|
|
assert isinstance(events[0], scapi.Event)
|
|
|
|
def test_me_having_stress():
|
|
sca = ROOT
|
|
for _ in xrange(20):
|
|
setup()
|
|
sca.me()
|
|
|
|
def test_non_global_api():
|
|
root = scapi.Scope(CONNECTOR)
|
|
me = root.me()
|
|
assert isinstance(me, scapi.User)
|
|
|
|
# now get something *from* that user
|
|
favorites = list(me.favorites())
|
|
assert favorites
|
|
|
|
def test_playlists():
|
|
sca = ROOT
|
|
playlists = list(itertools.islice(sca.playlists(), 0, 127))
|
|
found = False
|
|
for playlist in playlists:
|
|
tracks = playlist.tracks
|
|
if not isinstance(tracks, list):
|
|
tracks = [tracks]
|
|
for trackdata in tracks:
|
|
print trackdata
|
|
user = trackdata.user
|
|
print user
|
|
print user.tracks()
|
|
print playlist.user
|
|
break
|