4348db7 add PyJWT we use in backend git-subtree-dir: pyextra git-subtree-split: 4348db7e867dafbbcb7ec0302b5528688d1102c6pull/15/head
parent
342bb13bff
commit
7ccae06bce
19 changed files with 1289 additions and 0 deletions
@ -0,0 +1,70 @@ |
||||
Metadata-Version: 1.1 |
||||
Name: PyJWT |
||||
Version: 1.4.1 |
||||
Summary: JSON Web Token implementation in Python |
||||
Home-page: http://github.com/jpadilla/pyjwt |
||||
Author: José Padilla |
||||
Author-email: hello@jpadilla.com |
||||
License: MIT |
||||
Description: # PyJWT |
||||
|
||||
[![travis-status-image]][travis] |
||||
[![appveyor-status-image]][appveyor] |
||||
[![pypi-version-image]][pypi] |
||||
[![coveralls-status-image]][coveralls] |
||||
[![docs-status-image]][docs] |
||||
|
||||
A Python implementation of [RFC 7519][jwt-spec]. |
||||
Original implementation was written by [@progrium][progrium]. |
||||
|
||||
## Installing |
||||
|
||||
``` |
||||
$ pip install PyJWT |
||||
``` |
||||
|
||||
## Usage |
||||
|
||||
```python |
||||
>>> import jwt |
||||
>>> encoded = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256') |
||||
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg' |
||||
|
||||
>>> jwt.decode(encoded, 'secret', algorithms=['HS256']) |
||||
{'some': 'payload'} |
||||
``` |
||||
|
||||
## Tests |
||||
|
||||
You can run tests from the project root after cloning with: |
||||
|
||||
``` |
||||
$ python setup.py test |
||||
``` |
||||
|
||||
[travis-status-image]: https://secure.travis-ci.org/jpadilla/pyjwt.svg?branch=master |
||||
[travis]: http://travis-ci.org/jpadilla/pyjwt?branch=master |
||||
[appveyor-status-image]: https://ci.appveyor.com/api/projects/status/h8nt70aqtwhht39t?svg=true |
||||
[appveyor]: https://ci.appveyor.com/project/jpadilla/pyjwt |
||||
[pypi-version-image]: https://img.shields.io/pypi/v/pyjwt.svg |
||||
[pypi]: https://pypi.python.org/pypi/pyjwt |
||||
[coveralls-status-image]: https://coveralls.io/repos/jpadilla/pyjwt/badge.svg?branch=master |
||||
[coveralls]: https://coveralls.io/r/jpadilla/pyjwt?branch=master |
||||
[docs-status-image]: https://readthedocs.org/projects/pyjwt/badge/?version=latest |
||||
[docs]: http://pyjwt.readthedocs.org |
||||
[jwt-spec]: https://tools.ietf.org/html/rfc7519 |
||||
[progrium]: https://github.com/progrium |
||||
|
||||
Keywords: jwt json web token security signing |
||||
Platform: UNKNOWN |
||||
Classifier: Development Status :: 5 - Production/Stable |
||||
Classifier: Intended Audience :: Developers |
||||
Classifier: Natural Language :: English |
||||
Classifier: License :: OSI Approved :: MIT License |
||||
Classifier: Programming Language :: Python |
||||
Classifier: Programming Language :: Python :: 2.6 |
||||
Classifier: Programming Language :: Python :: 2.7 |
||||
Classifier: Programming Language :: Python :: 3.3 |
||||
Classifier: Programming Language :: Python :: 3.4 |
||||
Classifier: Programming Language :: Python :: 3.5 |
||||
Classifier: Topic :: Utilities |
@ -0,0 +1,49 @@ |
||||
AUTHORS |
||||
CHANGELOG.md |
||||
LICENSE |
||||
MANIFEST.in |
||||
README.md |
||||
setup.cfg |
||||
setup.py |
||||
tox.ini |
||||
PyJWT.egg-info/PKG-INFO |
||||
PyJWT.egg-info/SOURCES.txt |
||||
PyJWT.egg-info/dependency_links.txt |
||||
PyJWT.egg-info/entry_points.txt |
||||
PyJWT.egg-info/requires.txt |
||||
PyJWT.egg-info/top_level.txt |
||||
jwt/__init__.py |
||||
jwt/__main__.py |
||||
jwt/algorithms.py |
||||
jwt/api_jws.py |
||||
jwt/api_jwt.py |
||||
jwt/compat.py |
||||
jwt/exceptions.py |
||||
jwt/utils.py |
||||
jwt/contrib/__init__.py |
||||
jwt/contrib/algorithms/__init__.py |
||||
jwt/contrib/algorithms/py_ecdsa.py |
||||
jwt/contrib/algorithms/pycrypto.py |
||||
tests/__init__.py |
||||
tests/compat.py |
||||
tests/test_algorithms.py |
||||
tests/test_api_jws.py |
||||
tests/test_api_jwt.py |
||||
tests/test_compat.py |
||||
tests/test_exceptions.py |
||||
tests/test_jwt.py |
||||
tests/utils.py |
||||
tests/contrib/__init__.py |
||||
tests/contrib/test_algorithms.py |
||||
tests/keys/__init__.py |
||||
tests/keys/jwk_ec_key.json |
||||
tests/keys/jwk_ec_pub.json |
||||
tests/keys/jwk_hmac.json |
||||
tests/keys/jwk_rsa_key.json |
||||
tests/keys/jwk_rsa_pub.json |
||||
tests/keys/testkey2_rsa.pub.pem |
||||
tests/keys/testkey_ec |
||||
tests/keys/testkey_ec.pub |
||||
tests/keys/testkey_rsa |
||||
tests/keys/testkey_rsa.cer |
||||
tests/keys/testkey_rsa.pub |
@ -0,0 +1 @@ |
||||
|
@ -0,0 +1,3 @@ |
||||
[console_scripts] |
||||
jwt = jwt.__main__:main |
||||
|
@ -0,0 +1,31 @@ |
||||
../../../../bin/jwt |
||||
../jwt/__init__.py |
||||
../jwt/__init__.pyc |
||||
../jwt/__main__.py |
||||
../jwt/__main__.pyc |
||||
../jwt/algorithms.py |
||||
../jwt/algorithms.pyc |
||||
../jwt/api_jws.py |
||||
../jwt/api_jws.pyc |
||||
../jwt/api_jwt.py |
||||
../jwt/api_jwt.pyc |
||||
../jwt/compat.py |
||||
../jwt/compat.pyc |
||||
../jwt/contrib/__init__.py |
||||
../jwt/contrib/__init__.pyc |
||||
../jwt/contrib/algorithms/__init__.py |
||||
../jwt/contrib/algorithms/__init__.pyc |
||||
../jwt/contrib/algorithms/py_ecdsa.py |
||||
../jwt/contrib/algorithms/py_ecdsa.pyc |
||||
../jwt/contrib/algorithms/pycrypto.py |
||||
../jwt/contrib/algorithms/pycrypto.pyc |
||||
../jwt/exceptions.py |
||||
../jwt/exceptions.pyc |
||||
../jwt/utils.py |
||||
../jwt/utils.pyc |
||||
PKG-INFO |
||||
SOURCES.txt |
||||
dependency_links.txt |
||||
entry_points.txt |
||||
requires.txt |
||||
top_level.txt |
@ -0,0 +1,13 @@ |
||||
|
||||
[crypto] |
||||
cryptography |
||||
|
||||
[flake8] |
||||
flake8 |
||||
flake8-import-order |
||||
pep8-naming |
||||
|
||||
[test] |
||||
pytest==2.7.3 |
||||
pytest-cov |
||||
pytest-runner |
@ -0,0 +1 @@ |
||||
jwt |
@ -0,0 +1,29 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# flake8: noqa |
||||
|
||||
""" |
||||
JSON Web Token implementation |
||||
|
||||
Minimum implementation based on this spec: |
||||
http://self-issued.info/docs/draft-jones-json-web-token-01.html |
||||
""" |
||||
|
||||
|
||||
__title__ = 'pyjwt' |
||||
__version__ = '1.4.1' |
||||
__author__ = 'José Padilla' |
||||
__license__ = 'MIT' |
||||
__copyright__ = 'Copyright 2015 José Padilla' |
||||
|
||||
|
||||
from .api_jwt import ( |
||||
encode, decode, register_algorithm, unregister_algorithm, |
||||
get_unverified_header, PyJWT |
||||
) |
||||
from .api_jws import PyJWS |
||||
from .exceptions import ( |
||||
InvalidTokenError, DecodeError, InvalidAudienceError, |
||||
ExpiredSignatureError, ImmatureSignatureError, InvalidIssuedAtError, |
||||
InvalidIssuerError, ExpiredSignature, InvalidAudience, InvalidIssuer, |
||||
MissingRequiredClaimError |
||||
) |
@ -0,0 +1,135 @@ |
||||
#!/usr/bin/env python |
||||
|
||||
from __future__ import absolute_import, print_function |
||||
|
||||
import json |
||||
import optparse |
||||
import sys |
||||
import time |
||||
|
||||
from . import DecodeError, __package__, __version__, decode, encode |
||||
|
||||
|
||||
def main(): |
||||
|
||||
usage = '''Encodes or decodes JSON Web Tokens based on input. |
||||
|
||||
%prog [options] input |
||||
|
||||
Decoding examples: |
||||
|
||||
%prog --key=secret json.web.token |
||||
%prog --no-verify json.web.token |
||||
|
||||
Encoding requires the key option and takes space separated key/value pairs |
||||
separated by equals (=) as input. Examples: |
||||
|
||||
%prog --key=secret iss=me exp=1302049071 |
||||
%prog --key=secret foo=bar exp=+10 |
||||
|
||||
The exp key is special and can take an offset to current Unix time.\ |
||||
''' |
||||
p = optparse.OptionParser( |
||||
usage=usage, |
||||
prog=__package__, |
||||
version='%s %s' % (__package__, __version__), |
||||
) |
||||
|
||||
p.add_option( |
||||
'-n', '--no-verify', |
||||
action='store_false', |
||||
dest='verify', |
||||
default=True, |
||||
help='ignore signature and claims verification on decode' |
||||
) |
||||
|
||||
p.add_option( |
||||
'--key', |
||||
dest='key', |
||||
metavar='KEY', |
||||
default=None, |
||||
help='set the secret key to sign with' |
||||
) |
||||
|
||||
p.add_option( |
||||
'--alg', |
||||
dest='algorithm', |
||||
metavar='ALG', |
||||
default='HS256', |
||||
help='set crypto algorithm to sign with. default=HS256' |
||||
) |
||||
|
||||
options, arguments = p.parse_args() |
||||
|
||||
if len(arguments) > 0 or not sys.stdin.isatty(): |
||||
if len(arguments) == 1 and (not options.verify or options.key): |
||||
# Try to decode |
||||
try: |
||||
if not sys.stdin.isatty(): |
||||
token = sys.stdin.read() |
||||
else: |
||||
token = arguments[0] |
||||
|
||||
token = token.encode('utf-8') |
||||
data = decode(token, key=options.key, verify=options.verify) |
||||
|
||||
print(json.dumps(data)) |
||||
sys.exit(0) |
||||
except DecodeError as e: |
||||
print(e) |
||||
sys.exit(1) |
||||
|
||||
# Try to encode |
||||
if options.key is None: |
||||
print('Key is required when encoding. See --help for usage.') |
||||
sys.exit(1) |
||||
|
||||
# Build payload object to encode |
||||
payload = {} |
||||
|
||||
for arg in arguments: |
||||
try: |
||||
k, v = arg.split('=', 1) |
||||
|
||||
# exp +offset special case? |
||||
if k == 'exp' and v[0] == '+' and len(v) > 1: |
||||
v = str(int(time.time()+int(v[1:]))) |
||||
|
||||
# Cast to integer? |
||||
if v.isdigit(): |
||||
v = int(v) |
||||
else: |
||||
# Cast to float? |
||||
try: |
||||
v = float(v) |
||||
except ValueError: |
||||
pass |
||||
|
||||
# Cast to true, false, or null? |
||||
constants = {'true': True, 'false': False, 'null': None} |
||||
|
||||
if v in constants: |
||||
v = constants[v] |
||||
|
||||
payload[k] = v |
||||
except ValueError: |
||||
print('Invalid encoding input at {}'.format(arg)) |
||||
sys.exit(1) |
||||
|
||||
try: |
||||
token = encode( |
||||
payload, |
||||
key=options.key, |
||||
algorithm=options.algorithm |
||||
) |
||||
|
||||
print(token) |
||||
sys.exit(0) |
||||
except Exception as e: |
||||
print(e) |
||||
sys.exit(1) |
||||
else: |
||||
p.print_help() |
||||
|
||||
if __name__ == '__main__': |
||||
main() |
@ -0,0 +1,290 @@ |
||||
import hashlib |
||||
import hmac |
||||
|
||||
from .compat import constant_time_compare, string_types, text_type |
||||
from .exceptions import InvalidKeyError |
||||
from .utils import der_to_raw_signature, raw_to_der_signature |
||||
|
||||
try: |
||||
from cryptography.hazmat.primitives import hashes |
||||
from cryptography.hazmat.primitives.serialization import ( |
||||
load_pem_private_key, load_pem_public_key, load_ssh_public_key |
||||
) |
||||
from cryptography.hazmat.primitives.asymmetric.rsa import ( |
||||
RSAPrivateKey, RSAPublicKey |
||||
) |
||||
from cryptography.hazmat.primitives.asymmetric.ec import ( |
||||
EllipticCurvePrivateKey, EllipticCurvePublicKey |
||||
) |
||||
from cryptography.hazmat.primitives.asymmetric import ec, padding |
||||
from cryptography.hazmat.backends import default_backend |
||||
from cryptography.exceptions import InvalidSignature |
||||
|
||||
has_crypto = True |
||||
except ImportError: |
||||
has_crypto = False |
||||
|
||||
|
||||
def get_default_algorithms(): |
||||
""" |
||||
Returns the algorithms that are implemented by the library. |
||||
""" |
||||
default_algorithms = { |
||||
'none': NoneAlgorithm(), |
||||
'HS256': HMACAlgorithm(HMACAlgorithm.SHA256), |
||||
'HS384': HMACAlgorithm(HMACAlgorithm.SHA384), |
||||
'HS512': HMACAlgorithm(HMACAlgorithm.SHA512) |
||||
} |
||||
|
||||
if has_crypto: |
||||
default_algorithms.update({ |
||||
'RS256': RSAAlgorithm(RSAAlgorithm.SHA256), |
||||
'RS384': RSAAlgorithm(RSAAlgorithm.SHA384), |
||||
'RS512': RSAAlgorithm(RSAAlgorithm.SHA512), |
||||
'ES256': ECAlgorithm(ECAlgorithm.SHA256), |
||||
'ES384': ECAlgorithm(ECAlgorithm.SHA384), |
||||
'ES512': ECAlgorithm(ECAlgorithm.SHA512), |
||||
'PS256': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA256), |
||||
'PS384': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA384), |
||||
'PS512': RSAPSSAlgorithm(RSAPSSAlgorithm.SHA512) |
||||
}) |
||||
|
||||
return default_algorithms |
||||
|
||||
|
||||
class Algorithm(object): |
||||
""" |
||||
The interface for an algorithm used to sign and verify tokens. |
||||
""" |
||||
def prepare_key(self, key): |
||||
""" |
||||
Performs necessary validation and conversions on the key and returns |
||||
the key value in the proper format for sign() and verify(). |
||||
""" |
||||
raise NotImplementedError |
||||
|
||||
def sign(self, msg, key): |
||||
""" |
||||
Returns a digital signature for the specified message |
||||
using the specified key value. |
||||
""" |
||||
raise NotImplementedError |
||||
|
||||
def verify(self, msg, key, sig): |
||||
""" |
||||
Verifies that the specified digital signature is valid |
||||
for the specified message and key values. |
||||
""" |
||||
raise NotImplementedError |
||||
|
||||
|
||||
class NoneAlgorithm(Algorithm): |
||||
""" |
||||
Placeholder for use when no signing or verification |
||||
operations are required. |
||||
""" |
||||
def prepare_key(self, key): |
||||
if key == '': |
||||
key = None |
||||
|
||||
if key is not None: |
||||
raise InvalidKeyError('When alg = "none", key value must be None.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
return b'' |
||||
|
||||
def verify(self, msg, key, sig): |
||||
return False |
||||
|
||||
|
||||
class HMACAlgorithm(Algorithm): |
||||
""" |
||||
Performs signing and verification operations using HMAC |
||||
and the specified hash function. |
||||
""" |
||||
SHA256 = hashlib.sha256 |
||||
SHA384 = hashlib.sha384 |
||||
SHA512 = hashlib.sha512 |
||||
|
||||
def __init__(self, hash_alg): |
||||
self.hash_alg = hash_alg |
||||
|
||||
def prepare_key(self, key): |
||||
if not isinstance(key, string_types) and not isinstance(key, bytes): |
||||
raise TypeError('Expecting a string- or bytes-formatted key.') |
||||
|
||||
if isinstance(key, text_type): |
||||
key = key.encode('utf-8') |
||||
|
||||
invalid_strings = [ |
||||
b'-----BEGIN PUBLIC KEY-----', |
||||
b'-----BEGIN CERTIFICATE-----', |
||||
b'ssh-rsa' |
||||
] |
||||
|
||||
if any([string_value in key for string_value in invalid_strings]): |
||||
raise InvalidKeyError( |
||||
'The specified key is an asymmetric key or x509 certificate and' |
||||
' should not be used as an HMAC secret.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
return hmac.new(key, msg, self.hash_alg).digest() |
||||
|
||||
def verify(self, msg, key, sig): |
||||
return constant_time_compare(sig, self.sign(msg, key)) |
||||
|
||||
if has_crypto: |
||||
|
||||
class RSAAlgorithm(Algorithm): |
||||
""" |
||||
Performs signing and verification operations using |
||||
RSASSA-PKCS-v1_5 and the specified hash function. |
||||
""" |
||||
SHA256 = hashes.SHA256 |
||||
SHA384 = hashes.SHA384 |
||||
SHA512 = hashes.SHA512 |
||||
|
||||
def __init__(self, hash_alg): |
||||
self.hash_alg = hash_alg |
||||
|
||||
def prepare_key(self, key): |
||||
if isinstance(key, RSAPrivateKey) or \ |
||||
isinstance(key, RSAPublicKey): |
||||
return key |
||||
|
||||
if isinstance(key, string_types): |
||||
if isinstance(key, text_type): |
||||
key = key.encode('utf-8') |
||||
|
||||
try: |
||||
if key.startswith(b'ssh-rsa'): |
||||
key = load_ssh_public_key(key, backend=default_backend()) |
||||
else: |
||||
key = load_pem_private_key(key, password=None, backend=default_backend()) |
||||
except ValueError: |
||||
key = load_pem_public_key(key, backend=default_backend()) |
||||
else: |
||||
raise TypeError('Expecting a PEM-formatted key.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
signer = key.signer( |
||||
padding.PKCS1v15(), |
||||
self.hash_alg() |
||||
) |
||||
|
||||
signer.update(msg) |
||||
return signer.finalize() |
||||
|
||||
def verify(self, msg, key, sig): |
||||
verifier = key.verifier( |
||||
sig, |
||||
padding.PKCS1v15(), |
||||
self.hash_alg() |
||||
) |
||||
|
||||
verifier.update(msg) |
||||
|
||||
try: |
||||
verifier.verify() |
||||
return True |
||||
except InvalidSignature: |
||||
return False |
||||
|
||||
class ECAlgorithm(Algorithm): |
||||
""" |
||||
Performs signing and verification operations using |
||||
ECDSA and the specified hash function |
||||
""" |
||||
SHA256 = hashes.SHA256 |
||||
SHA384 = hashes.SHA384 |
||||
SHA512 = hashes.SHA512 |
||||
|
||||
def __init__(self, hash_alg): |
||||
self.hash_alg = hash_alg |
||||
|
||||
def prepare_key(self, key): |
||||
if isinstance(key, EllipticCurvePrivateKey) or \ |
||||
isinstance(key, EllipticCurvePublicKey): |
||||
return key |
||||
|
||||
if isinstance(key, string_types): |
||||
if isinstance(key, text_type): |
||||
key = key.encode('utf-8') |
||||
|
||||
# Attempt to load key. We don't know if it's |
||||
# a Signing Key or a Verifying Key, so we try |
||||
# the Verifying Key first. |
||||
try: |
||||
key = load_pem_public_key(key, backend=default_backend()) |
||||
except ValueError: |
||||
key = load_pem_private_key(key, password=None, backend=default_backend()) |
||||
|
||||
else: |
||||
raise TypeError('Expecting a PEM-formatted key.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
signer = key.signer(ec.ECDSA(self.hash_alg())) |
||||
|
||||
signer.update(msg) |
||||
der_sig = signer.finalize() |
||||
|
||||
return der_to_raw_signature(der_sig, key.curve) |
||||
|
||||
def verify(self, msg, key, sig): |
||||
try: |
||||
der_sig = raw_to_der_signature(sig, key.curve) |
||||
except ValueError: |
||||
return False |
||||
|
||||
verifier = key.verifier(der_sig, ec.ECDSA(self.hash_alg())) |
||||
|
||||
verifier.update(msg) |
||||
|
||||
try: |
||||
verifier.verify() |
||||
return True |
||||
except InvalidSignature: |
||||
return False |
||||
|
||||
class RSAPSSAlgorithm(RSAAlgorithm): |
||||
""" |
||||
Performs a signature using RSASSA-PSS with MGF1 |
||||
""" |
||||
|
||||
def sign(self, msg, key): |
||||
signer = key.signer( |
||||
padding.PSS( |
||||
mgf=padding.MGF1(self.hash_alg()), |
||||
salt_length=self.hash_alg.digest_size |
||||
), |
||||
self.hash_alg() |
||||
) |
||||
|
||||
signer.update(msg) |
||||
return signer.finalize() |
||||
|
||||
def verify(self, msg, key, sig): |
||||
verifier = key.verifier( |
||||
sig, |
||||
padding.PSS( |
||||
mgf=padding.MGF1(self.hash_alg()), |
||||
salt_length=self.hash_alg.digest_size |
||||
), |
||||
self.hash_alg() |
||||
) |
||||
|
||||
verifier.update(msg) |
||||
|
||||
try: |
||||
verifier.verify() |
||||
return True |
||||
except InvalidSignature: |
||||
return False |
@ -0,0 +1,204 @@ |
||||
import binascii |
||||
import json |
||||
import warnings |
||||
|
||||
from collections import Mapping |
||||
|
||||
from .algorithms import Algorithm, get_default_algorithms # NOQA |
||||
from .compat import binary_type, string_types, text_type |
||||
from .exceptions import DecodeError, InvalidAlgorithmError, InvalidTokenError |
||||
from .utils import base64url_decode, base64url_encode, merge_dict |
||||
|
||||
|
||||
class PyJWS(object): |
||||
header_typ = 'JWT' |
||||
|
||||
def __init__(self, algorithms=None, options=None): |
||||
self._algorithms = get_default_algorithms() |
||||
self._valid_algs = (set(algorithms) if algorithms is not None |
||||
else set(self._algorithms)) |
||||
|
||||
# Remove algorithms that aren't on the whitelist |
||||
for key in list(self._algorithms.keys()): |
||||
if key not in self._valid_algs: |
||||
del self._algorithms[key] |
||||
|
||||
if not options: |
||||
options = {} |
||||
|
||||
self.options = merge_dict(self._get_default_options(), options) |
||||
|
||||
@staticmethod |
||||
def _get_default_options(): |
||||
return { |
||||
'verify_signature': True |
||||
} |
||||
|
||||
def register_algorithm(self, alg_id, alg_obj): |
||||
""" |
||||
Registers a new Algorithm for use when creating and verifying tokens. |
||||
""" |
||||
if alg_id in self._algorithms: |
||||
raise ValueError('Algorithm already has a handler.') |
||||
|
||||
if not isinstance(alg_obj, Algorithm): |
||||
raise TypeError('Object is not of type `Algorithm`') |
||||
|
||||
self._algorithms[alg_id] = alg_obj |
||||
self._valid_algs.add(alg_id) |
||||
|
||||
def unregister_algorithm(self, alg_id): |
||||
""" |
||||
Unregisters an Algorithm for use when creating and verifying tokens |
||||
Throws KeyError if algorithm is not registered. |
||||
""" |
||||
if alg_id not in self._algorithms: |
||||
raise KeyError('The specified algorithm could not be removed' |
||||
' because it is not registered.') |
||||
|
||||
del self._algorithms[alg_id] |
||||
self._valid_algs.remove(alg_id) |
||||
|
||||
def get_algorithms(self): |
||||
""" |
||||
Returns a list of supported values for the 'alg' parameter. |
||||
""" |
||||
return list(self._valid_algs) |
||||
|
||||
def encode(self, payload, key, algorithm='HS256', headers=None, |
||||
json_encoder=None): |
||||
segments = [] |
||||
|
||||
if algorithm is None: |
||||
algorithm = 'none' |
||||
|
||||
if algorithm not in self._valid_algs: |
||||
pass |
||||
|
||||
# Header |
||||
header = {'typ': self.header_typ, 'alg': algorithm} |
||||
|
||||
if headers: |
||||
self._validate_headers(headers) |
||||
header.update(headers) |
||||
|
||||
json_header = json.dumps( |
||||
header, |
||||
separators=(',', ':'), |
||||
cls=json_encoder |
||||
).encode('utf-8') |
||||
|
||||
segments.append(base64url_encode(json_header)) |
||||
segments.append(base64url_encode(payload)) |
||||
|
||||
# Segments |
||||
signing_input = b'.'.join(segments) |
||||
try: |
||||
alg_obj = self._algorithms[algorithm] |
||||
key = alg_obj.prepare_key(key) |
||||
signature = alg_obj.sign(signing_input, key) |
||||
|
||||
except KeyError: |
||||
raise NotImplementedError('Algorithm not supported') |
||||
|
||||
segments.append(base64url_encode(signature)) |
||||
|
||||
return b'.'.join(segments) |
||||
|
||||
def decode(self, jws, key='', verify=True, algorithms=None, options=None, |
||||
**kwargs): |
||||
payload, signing_input, header, signature = self._load(jws) |
||||
|
||||
if verify: |
||||
merged_options = merge_dict(self.options, options) |
||||
if merged_options.get('verify_signature'): |
||||
self._verify_signature(payload, signing_input, header, signature, |
||||
key, algorithms) |
||||
else: |
||||
warnings.warn('The verify parameter is deprecated. ' |
||||
'Please use options instead.', DeprecationWarning) |
||||
|
||||
return payload |
||||
|
||||
def get_unverified_header(self, jwt): |
||||
"""Returns back the JWT header parameters as a dict() |
||||
|
||||
Note: The signature is not verified so the header parameters |
||||
should not be fully trusted until signature verification is complete |
||||
""" |
||||
headers = self._load(jwt)[2] |
||||
self._validate_headers(headers) |
||||
|
||||
return headers |
||||
|
||||
def _load(self, jwt): |
||||
if isinstance(jwt, text_type): |
||||
jwt = jwt.encode('utf-8') |
||||
|
||||
if not issubclass(type(jwt), binary_type): |
||||
raise DecodeError("Invalid token type. Token must be a {0}".format( |
||||
binary_type)) |
||||
|
||||
try: |
||||
signing_input, crypto_segment = jwt.rsplit(b'.', 1) |
||||
header_segment, payload_segment = signing_input.split(b'.', 1) |
||||
except ValueError: |
||||
raise DecodeError('Not enough segments') |
||||
|
||||
try: |
||||
header_data = base64url_decode(header_segment) |
||||
except (TypeError, binascii.Error): |
||||
raise DecodeError('Invalid header padding') |
||||
|
||||
try: |
||||
header = json.loads(header_data.decode('utf-8')) |
||||
except ValueError as e: |
||||
raise DecodeError('Invalid header string: %s' % e) |
||||
|
||||
if not isinstance(header, Mapping): |
||||
raise DecodeError('Invalid header string: must be a json object') |
||||
|
||||
try: |
||||
payload = base64url_decode(payload_segment) |
||||
except (TypeError, binascii.Error): |
||||
raise DecodeError('Invalid payload padding') |
||||
|
||||
try: |
||||
signature = base64url_decode(crypto_segment) |
||||
except (TypeError, binascii.Error): |
||||
raise DecodeError('Invalid crypto padding') |
||||
|
||||
return (payload, signing_input, header, signature) |
||||
|
||||
def _verify_signature(self, payload, signing_input, header, signature, |
||||
key='', algorithms=None): |
||||
|
||||
alg = header.get('alg') |
||||
|
||||
if algorithms is not None and alg not in algorithms: |
||||
raise InvalidAlgorithmError('The specified alg value is not allowed') |
||||
|
||||
try: |
||||
alg_obj = self._algorithms[alg] |
||||
key = alg_obj.prepare_key(key) |
||||
|
||||
if not alg_obj.verify(signing_input, key, signature): |
||||
raise DecodeError('Signature verification failed') |
||||
|
||||
except KeyError: |
||||
raise InvalidAlgorithmError('Algorithm not supported') |
||||
|
||||
def _validate_headers(self, headers): |
||||
if 'kid' in headers: |
||||
self._validate_kid(headers['kid']) |
||||
|
||||
def _validate_kid(self, kid): |
||||
if not isinstance(kid, string_types): |
||||
raise InvalidTokenError('Key ID header parameter must be a string') |
||||
|
||||
_jws_global_obj = PyJWS() |
||||
encode = _jws_global_obj.encode |
||||
decode = _jws_global_obj.decode |
||||
register_algorithm = _jws_global_obj.register_algorithm |
||||
unregister_algorithm = _jws_global_obj.unregister_algorithm |
||||
get_unverified_header = _jws_global_obj.get_unverified_header |
@ -0,0 +1,187 @@ |
||||
import json |
||||
import warnings |
||||
|
||||
from calendar import timegm |
||||
from collections import Mapping |
||||
from datetime import datetime, timedelta |
||||
|
||||
from .api_jws import PyJWS |
||||
from .algorithms import Algorithm, get_default_algorithms # NOQA |
||||
from .compat import string_types, timedelta_total_seconds |
||||
from .exceptions import ( |
||||
DecodeError, ExpiredSignatureError, ImmatureSignatureError, |
||||
InvalidAudienceError, InvalidIssuedAtError, |
||||
InvalidIssuerError, MissingRequiredClaimError |
||||
) |
||||
from .utils import merge_dict |
||||
|
||||
|
||||
class PyJWT(PyJWS): |
||||
header_type = 'JWT' |
||||
|
||||
@staticmethod |
||||
def _get_default_options(): |
||||
return { |
||||
'verify_signature': True, |
||||
'verify_exp': True, |
||||
'verify_nbf': True, |
||||
'verify_iat': True, |
||||
'verify_aud': True, |
||||
'verify_iss': True, |
||||
'require_exp': False, |
||||
'require_iat': False, |
||||
'require_nbf': False |
||||
} |
||||
|
||||
def encode(self, payload, key, algorithm='HS256', headers=None, |
||||
json_encoder=None): |
||||
# Check that we get a mapping |
||||
if not isinstance(payload, Mapping): |
||||
raise TypeError('Expecting a mapping object, as JWT only supports ' |
||||
'JSON objects as payloads.') |
||||
|
||||
# Payload |
||||
for time_claim in ['exp', 'iat', 'nbf']: |
||||
# Convert datetime to a intDate value in known time-format claims |
||||
if isinstance(payload.get(time_claim), datetime): |
||||
payload[time_claim] = timegm(payload[time_claim].utctimetuple()) |
||||
|
||||
json_payload = json.dumps( |
||||
payload, |
||||
separators=(',', ':'), |
||||
cls=json_encoder |
||||
).encode('utf-8') |
||||
|
||||
return super(PyJWT, self).encode( |
||||
json_payload, key, algorithm, headers, json_encoder |
||||
) |
||||
|
||||
def decode(self, jwt, key='', verify=True, algorithms=None, options=None, |
||||
**kwargs): |
||||
payload, signing_input, header, signature = self._load(jwt) |
||||
|
||||
decoded = super(PyJWT, self).decode(jwt, key, verify, algorithms, |
||||
options, **kwargs) |
||||
|
||||
try: |
||||
payload = json.loads(decoded.decode('utf-8')) |
||||
except ValueError as e: |
||||
raise DecodeError('Invalid payload string: %s' % e) |
||||
if not isinstance(payload, Mapping): |
||||
raise DecodeError('Invalid payload string: must be a json object') |
||||
|
||||
if verify: |
||||
merged_options = merge_dict(self.options, options) |
||||
self._validate_claims(payload, merged_options, **kwargs) |
||||
|
||||
return payload |
||||
|
||||
def _validate_claims(self, payload, options, audience=None, issuer=None, |
||||
leeway=0, **kwargs): |
||||
|
||||
if 'verify_expiration' in kwargs: |
||||
options['verify_exp'] = kwargs.get('verify_expiration', True) |
||||
warnings.warn('The verify_expiration parameter is deprecated. ' |
||||
'Please use options instead.', DeprecationWarning) |
||||
|
||||
if isinstance(leeway, timedelta): |
||||
leeway = timedelta_total_seconds(leeway) |
||||
|
||||
if not isinstance(audience, (string_types, type(None))): |
||||
raise TypeError('audience must be a string or None') |
||||
|
||||
self._validate_required_claims(payload, options) |
||||
|
||||
now = timegm(datetime.utcnow().utctimetuple()) |
||||
|
||||
if 'iat' in payload and options.get('verify_iat'): |
||||
self._validate_iat(payload, now, leeway) |
||||
|
||||
if 'nbf' in payload and options.get('verify_nbf'): |
||||
self._validate_nbf(payload, now, leeway) |
||||
|
||||
if 'exp' in payload and options.get('verify_exp'): |
||||
self._validate_exp(payload, now, leeway) |
||||
|
||||
if options.get('verify_iss'): |
||||
self._validate_iss(payload, issuer) |
||||
|
||||
if options.get('verify_aud'): |
||||
self._validate_aud(payload, audience) |
||||
|
||||
def _validate_required_claims(self, payload, options): |
||||
if options.get('require_exp') and payload.get('exp') is None: |
||||
raise MissingRequiredClaimError('exp') |
||||
|
||||
if options.get('require_iat') and payload.get('iat') is None: |
||||
raise MissingRequiredClaimError('iat') |
||||
|
||||
if options.get('require_nbf') and payload.get('nbf') is None: |
||||
raise MissingRequiredClaimError('nbf') |
||||
|
||||
def _validate_iat(self, payload, now, leeway): |
||||
try: |
||||
iat = int(payload['iat']) |
||||
except ValueError: |
||||
raise DecodeError('Issued At claim (iat) must be an integer.') |
||||
|
||||
if iat > (now + leeway): |
||||
raise InvalidIssuedAtError('Issued At claim (iat) cannot be in' |
||||
' the future.') |
||||
|
||||
def _validate_nbf(self, payload, now, leeway): |
||||
try: |
||||
nbf = int(payload['nbf']) |
||||
except ValueError: |
||||
raise DecodeError('Not Before claim (nbf) must be an integer.') |
||||
|
||||
if nbf > (now + leeway): |
||||
raise ImmatureSignatureError('The token is not yet valid (nbf)') |
||||
|
||||
def _validate_exp(self, payload, now, leeway): |
||||
try: |
||||
exp = int(payload['exp']) |
||||
except ValueError: |
||||
raise DecodeError('Expiration Time claim (exp) must be an' |
||||
' integer.') |
||||
|
||||
if exp < (now - leeway): |
||||
raise ExpiredSignatureError('Signature has expired') |
||||
|
||||
def _validate_aud(self, payload, audience): |
||||
if audience is None and 'aud' not in payload: |
||||
return |
||||
|
||||
if audience is not None and 'aud' not in payload: |
||||
# Application specified an audience, but it could not be |
||||
# verified since the token does not contain a claim. |
||||
raise MissingRequiredClaimError('aud') |
||||
|
||||
audience_claims = payload['aud'] |
||||
|
||||
if isinstance(audience_claims, string_types): |
||||
audience_claims = [audience_claims] |
||||
if not isinstance(audience_claims, list): |
||||
raise InvalidAudienceError('Invalid claim format in token') |
||||
if any(not isinstance(c, string_types) for c in audience_claims): |
||||
raise InvalidAudienceError('Invalid claim format in token') |
||||
if audience not in audience_claims: |
||||
raise InvalidAudienceError('Invalid audience') |
||||
|
||||
def _validate_iss(self, payload, issuer): |
||||
if issuer is None: |
||||
return |
||||
|
||||
if 'iss' not in payload: |
||||
raise MissingRequiredClaimError('iss') |
||||
|
||||
if payload['iss'] != issuer: |
||||
raise InvalidIssuerError('Invalid issuer') |
||||
|
||||
|
||||
_jwt_global_obj = PyJWT() |
||||
encode = _jwt_global_obj.encode |
||||
decode = _jwt_global_obj.decode |
||||
register_algorithm = _jwt_global_obj.register_algorithm |
||||
unregister_algorithm = _jwt_global_obj.unregister_algorithm |
||||
get_unverified_header = _jwt_global_obj.get_unverified_header |
@ -0,0 +1,54 @@ |
||||
""" |
||||
The `compat` module provides support for backwards compatibility with older |
||||
versions of python, and compatibility wrappers around optional packages. |
||||
""" |
||||
# flake8: noqa |
||||
import sys |
||||
import hmac |
||||
|
||||
|
||||
PY3 = sys.version_info[0] == 3 |
||||
|
||||
|
||||
if PY3: |
||||
string_types = str, |
||||
text_type = str |
||||
binary_type = bytes |
||||
else: |
||||
string_types = basestring, |
||||
text_type = unicode |
||||
binary_type = str |
||||
|
||||
|
||||
def timedelta_total_seconds(delta): |
||||
try: |
||||
delta.total_seconds |
||||
except AttributeError: |
||||
# On Python 2.6, timedelta instances do not have |
||||
# a .total_seconds() method. |
||||
total_seconds = delta.days * 24 * 60 * 60 + delta.seconds |
||||
else: |
||||
total_seconds = delta.total_seconds() |
||||
|
||||
return total_seconds |
||||
|
||||
|
||||
try: |
||||
constant_time_compare = hmac.compare_digest |
||||
except AttributeError: |
||||
# Fallback for Python < 2.7 |
||||
def constant_time_compare(val1, val2): |
||||
""" |
||||
Returns True if the two strings are equal, False otherwise. |
||||
|
||||
The time taken is independent of the number of characters that match. |
||||
""" |
||||
if len(val1) != len(val2): |
||||
return False |
||||
|
||||
result = 0 |
||||
|
||||
for x, y in zip(val1, val2): |
||||
result |= ord(x) ^ ord(y) |
||||
|
||||
return result == 0 |
@ -0,0 +1,60 @@ |
||||
# Note: This file is named py_ecdsa.py because import behavior in Python 2 |
||||
# would cause ecdsa.py to squash the ecdsa library that it depends upon. |
||||
|
||||
import hashlib |
||||
|
||||
import ecdsa |
||||
|
||||
from jwt.algorithms import Algorithm |
||||
from jwt.compat import string_types, text_type |
||||
|
||||
|
||||
class ECAlgorithm(Algorithm): |
||||
""" |
||||
Performs signing and verification operations using |
||||
ECDSA and the specified hash function |
||||
|
||||
This class requires the ecdsa package to be installed. |
||||
|
||||
This is based off of the implementation in PyJWT 0.3.2 |
||||
""" |
||||
SHA256 = hashlib.sha256 |
||||
SHA384 = hashlib.sha384 |
||||
SHA512 = hashlib.sha512 |
||||
|
||||
def __init__(self, hash_alg): |
||||
self.hash_alg = hash_alg |
||||
|
||||
def prepare_key(self, key): |
||||
|
||||
if isinstance(key, ecdsa.SigningKey) or \ |
||||
isinstance(key, ecdsa.VerifyingKey): |
||||
return key |
||||
|
||||
if isinstance(key, string_types): |
||||
if isinstance(key, text_type): |
||||
key = key.encode('utf-8') |
||||
|
||||
# Attempt to load key. We don't know if it's |
||||
# a Signing Key or a Verifying Key, so we try |
||||
# the Verifying Key first. |
||||
try: |
||||
key = ecdsa.VerifyingKey.from_pem(key) |
||||
except ecdsa.der.UnexpectedDER: |
||||
key = ecdsa.SigningKey.from_pem(key) |
||||
|
||||
else: |
||||
raise TypeError('Expecting a PEM-formatted key.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
return key.sign(msg, hashfunc=self.hash_alg, |
||||
sigencode=ecdsa.util.sigencode_string) |
||||
|
||||
def verify(self, msg, key, sig): |
||||
try: |
||||
return key.verify(sig, msg, hashfunc=self.hash_alg, |
||||
sigdecode=ecdsa.util.sigdecode_string) |
||||
except AssertionError: |
||||
return False |
@ -0,0 +1,47 @@ |
||||
import Crypto.Hash.SHA256 |
||||
import Crypto.Hash.SHA384 |
||||
import Crypto.Hash.SHA512 |
||||
|
||||
from Crypto.PublicKey import RSA |
||||
from Crypto.Signature import PKCS1_v1_5 |
||||
|
||||
from jwt.algorithms import Algorithm |
||||
from jwt.compat import string_types, text_type |
||||
|
||||
|
||||
class RSAAlgorithm(Algorithm): |
||||
""" |
||||
Performs signing and verification operations using |
||||
RSASSA-PKCS-v1_5 and the specified hash function. |
||||
|
||||
This class requires PyCrypto package to be installed. |
||||
|
||||
This is based off of the implementation in PyJWT 0.3.2 |
||||
""" |
||||
SHA256 = Crypto.Hash.SHA256 |
||||
SHA384 = Crypto.Hash.SHA384 |
||||
SHA512 = Crypto.Hash.SHA512 |
||||
|
||||
def __init__(self, hash_alg): |
||||
self.hash_alg = hash_alg |
||||
|
||||
def prepare_key(self, key): |
||||
|
||||
if isinstance(key, RSA._RSAobj): |
||||
return key |
||||
|
||||
if isinstance(key, string_types): |
||||
if isinstance(key, text_type): |
||||
key = key.encode('utf-8') |
||||
|
||||
key = RSA.importKey(key) |
||||
else: |
||||
raise TypeError('Expecting a PEM- or RSA-formatted key.') |
||||
|
||||
return key |
||||
|
||||
def sign(self, msg, key): |
||||
return PKCS1_v1_5.new(key).sign(self.hash_alg.new(msg)) |
||||
|
||||
def verify(self, msg, key, sig): |
||||
return PKCS1_v1_5.new(key).verify(self.hash_alg.new(msg), sig) |
@ -0,0 +1,48 @@ |
||||
class InvalidTokenError(Exception): |
||||
pass |
||||
|
||||
|
||||
class DecodeError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class ExpiredSignatureError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class InvalidAudienceError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class InvalidIssuerError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class InvalidIssuedAtError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class ImmatureSignatureError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class InvalidKeyError(Exception): |
||||
pass |
||||
|
||||
|
||||
class InvalidAlgorithmError(InvalidTokenError): |
||||
pass |
||||
|
||||
|
||||
class MissingRequiredClaimError(InvalidTokenError): |
||||
def __init__(self, claim): |
||||
self.claim = claim |
||||
|
||||
def __str__(self): |
||||
return 'Token is missing the "%s" claim' % self.claim |
||||
|
||||
|
||||
# Compatibility aliases (deprecated) |
||||
ExpiredSignature = ExpiredSignatureError |
||||
InvalidAudience = InvalidAudienceError |
||||
InvalidIssuer = InvalidIssuerError |
@ -0,0 +1,67 @@ |
||||
import base64 |
||||
import binascii |
||||
|
||||
try: |
||||
from cryptography.hazmat.primitives.asymmetric.utils import ( |
||||
decode_rfc6979_signature, encode_rfc6979_signature |
||||
) |
||||
except ImportError: |
||||
pass |
||||
|
||||
|
||||
def base64url_decode(input): |
||||
rem = len(input) % 4 |
||||
|
||||
if rem > 0: |
||||
input += b'=' * (4 - rem) |
||||
|
||||
return base64.urlsafe_b64decode(input) |
||||
|
||||
|
||||
def base64url_encode(input): |
||||
return base64.urlsafe_b64encode(input).replace(b'=', b'') |
||||
|
||||
|
||||
def merge_dict(original, updates): |
||||
if not updates: |
||||
return original |
||||
|
||||
try: |
||||
merged_options = original.copy() |
||||
merged_options.update(updates) |
||||
except (AttributeError, ValueError) as e: |
||||
raise TypeError('original and updates must be a dictionary: %s' % e) |
||||
|
||||
return merged_options |
||||
|
||||
|
||||
def number_to_bytes(num, num_bytes): |
||||
padded_hex = '%0*x' % (2 * num_bytes, num) |
||||
big_endian = binascii.a2b_hex(padded_hex.encode('ascii')) |
||||
return big_endian |
||||
|
||||
|
||||
def bytes_to_number(string): |
||||
return int(binascii.b2a_hex(string), 16) |
||||
|
||||
|
||||
def der_to_raw_signature(der_sig, curve): |
||||
num_bits = curve.key_size |
||||
num_bytes = (num_bits + 7) // 8 |
||||
|
||||
r, s = decode_rfc6979_signature(der_sig) |
||||
|
||||
return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes) |
||||
|
||||
|
||||
def raw_to_der_signature(raw_sig, curve): |
||||
num_bits = curve.key_size |
||||
num_bytes = (num_bits + 7) // 8 |
||||
|
||||
if len(raw_sig) != 2 * num_bytes: |
||||
raise ValueError('Invalid signature') |
||||
|
||||
r = bytes_to_number(raw_sig[:num_bytes]) |
||||
s = bytes_to_number(raw_sig[num_bytes:]) |
||||
|
||||
return encode_rfc6979_signature(r, s) |
Loading…
Reference in new issue