import json
from six.moves import urllib
import keyring
import requests
from pyutrack.errors import (
response_to_exc, AuthorizationError, LoginError,
PermissionsError, ResponseError
)
[docs]class Credentials(object):
KEYRING_COOKIE = 'pyutrack:cookies'
KEYRING_PASSWORD = 'pyutrack:password'
def __init__(self, username, password=None, cookies=None):
self._username = username
self._cookies = None
self._password = None
self.load_from_keyring()
if password:
self._password = password
if cookies:
self._cookies = cookies
[docs] @staticmethod
def from_keyring(username):
ytc = Credentials(username)
if ytc.load_from_keyring():
return ytc
@property
def cookies(self):
return self._cookies
@cookies.setter
def cookies(self, value):
self._cookies = value
@property
def username(self):
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
return self._password
@password.setter
def password(self, value):
self._password = value
[docs] def load_from_keyring(self):
if self.username:
keyring_cookie = keyring.get_password(
Credentials.KEYRING_COOKIE, self.username
)
cookies = json.loads(keyring_cookie) if keyring_cookie else None
password = keyring.get_password(
Credentials.KEYRING_PASSWORD, self.username
)
if cookies:
self.cookies = cookies
if password:
self.password = password
return cookies or password
[docs] def persist(self):
if self.cookies:
keyring.set_password(
self.KEYRING_COOKIE, self.username, json.dumps(self.cookies)
)
if self.password:
keyring.set_password(
self.KEYRING_PASSWORD, self.username, self.password
)
[docs] def reset_cookies(self):
if keyring.get_password(self.KEYRING_COOKIE, self.username):
keyring.delete_password(self.KEYRING_COOKIE, self.username)
self.cookies = None
[docs] def reset_password(self):
if keyring.get_password(self.KEYRING_PASSWORD, self.username):
keyring.delete_password(self.KEYRING_PASSWORD, self.username)
self.password = None
[docs]def fix_auth(meth):
def inner(self, *args, **kwargs):
try:
return meth(self, *args, **kwargs)
except (AuthorizationError, ) as e:
if self.credentials.cookies:
self.credentials.reset_cookies()
if self.credentials.password:
self.login()
return meth(self, *args, **kwargs)
raise e
return inner
[docs]class Connection(object):
def __init__(self, base_url=None, credentials=None):
self.__session = requests.Session()
self.__session.debug = True
self.__session.headers.update({'Accept': 'application/json'})
self.__session_args = {}
self.__credentials = credentials
self.__base_url = None
if self.credentials and self.credentials.cookies:
self.__session.cookies.update(credentials.cookies)
self.__api_url = ''
if base_url:
self.api_url = base_url
[docs] def login(self, persist_credentials=True):
try:
self.post(
'user/login', {
'login': self.credentials.username,
'password': self.credentials.password
}, False
)
self.credentials.cookies = self.__session.cookies.get_dict()
if persist_credentials:
self.credentials.persist()
return True
except (PermissionsError, ) as e:
raise LoginError(e)
@property
def credentials(self):
return self.__credentials
@credentials.setter
def credentials(self, credentials):
if credentials.cookies:
self.__session.cookies.update(credentials.cookies)
self.__credentials = credentials
@property
def api_url(self):
return self.__api_url
@api_url.setter
def api_url(self, api_url):
ref = api_url.rstrip('/')
if ref.endswith('/rest'):
self.__api_url = ref
else:
self.__api_url = '%s/rest' % ref
self.__base_url = self.__api_url.replace('/rest','')
@property
def base_url(self):
return self.__base_url
[docs] @fix_auth
def get(self, path, parse=True):
return self.__unwrap(
self.__session.get(
'%s/%s' % (self.__api_url, path), **self.__session_args
), parse
)
[docs] @fix_auth
def post(self, path, data, parse=True):
return self.__unwrap(
self.__session.post(
'%s/%s' % (self.__api_url, path), data, **self.__session_args
), parse
)
[docs] @fix_auth
def put(self, path, data, parse=True):
return self.__unwrap(
self.__session.put(
'%s/%s' % (self.__api_url, path), data, **self.__session_args
), parse
)
[docs] @fix_auth
def delete(self, path, parse=False):
return self.__unwrap(
self.__session.delete(
'%s/%s' % (self.__api_url, path), **self.__session_args
), parse
)
def __unwrap(self, response, parse):
if not response.ok:
raise response_to_exc(response)
if response.status_code == 201 and response.headers.get('location'):
return self.__session.get(response.headers['location']).json()
if not parse:
return response.content
try:
return response.json()
except (ValueError, ):
raise ResponseError('Unexpected response format from server')