Skip to content

Commit 2f4c770

Browse files
committed
Merge pull request jpadilla#131 from michaeldavis-wf/options-dict
Added `options=` argument to decode()
2 parents a2601ad + 6e6046c commit 2f4c770

File tree

5 files changed

+141
-13
lines changed

5 files changed

+141
-13
lines changed

AUTHORS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ Patches and Suggestions
2121
- Mark Adams <mark@markadams.me>
2222

2323
- Wouter Bolsterlee <uws@xs4all.nl>
24+
25+
- Michael Davis <mike.philip.davis@gmail.com> <mike.davis@workiva.com>

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
88
-------------------------------------------------------------------------
99
### Changed
1010
- Added this CHANGELOG.md file
11+
- Added flexible and complete verification options. #131
1112

1213
### Fixed
1314
- Placeholder

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,34 @@ except jwt.InvalidTokenError:
6262
pass # do something sensible here, e.g. return HTTP 403 status code
6363
```
6464

65+
You may also override exception checking via an `options` dictionary. The default
66+
options are as follows:
67+
68+
```python
69+
options = {
70+
'verify_signature': True,
71+
'verify_exp': True,
72+
'verify_nbf': True,
73+
'verify_iat': True,
74+
'verify_aud`: True
75+
}
76+
```
77+
78+
You can skip individual checks by passing an `options` dictionary with certain keys set to `False`.
79+
For example, if you want to verify the signature of a JWT that has already expired.
80+
81+
```python
82+
options = {
83+
'verify_exp': True,
84+
}
85+
86+
jwt.decode('someJWTstring', 'secret', options=options)
87+
```
88+
89+
**NOTE**: *Changing the default behavior is done at your own risk, and almost certainly will make your
90+
application less secure. Doing so should only be done with a very clear understanding of what you
91+
are doing.*
92+
6593
## Tests
6694

6795
You can run tests from the project root after cloning with:

jwt/api.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
class PyJWT(object):
19-
def __init__(self, algorithms=None):
19+
def __init__(self, algorithms=None, options=None):
2020
self._algorithms = get_default_algorithms()
2121
self._valid_algs = set(algorithms) if algorithms is not None else set(self._algorithms)
2222

@@ -25,6 +25,19 @@ def __init__(self, algorithms=None):
2525
if key not in self._valid_algs:
2626
del self._algorithms[key]
2727

28+
if not options:
29+
options = {}
30+
31+
self.default_options = {
32+
'verify_signature': True,
33+
'verify_exp': True,
34+
'verify_nbf': True,
35+
'verify_iat': True,
36+
'verify_aud': True,
37+
}
38+
39+
self.options = self._merge_options(self.default_options, options)
40+
2841
def register_algorithm(self, alg_id, alg_obj):
2942
"""
3043
Registers a new Algorithm for use when creating and verifying tokens.
@@ -110,14 +123,16 @@ def encode(self, payload, key, algorithm='HS256', headers=None, json_encoder=Non
110123

111124
return b'.'.join(segments)
112125

113-
def decode(self, jwt, key='', verify=True, algorithms=None, **kwargs):
126+
def decode(self, jwt, key='', verify=True, algorithms=None, options=None, **kwargs):
114127
payload, signing_input, header, signature = self._load(jwt)
115128

116129
if verify:
117-
self._verify_signature(payload, signing_input, header, signature,
118-
key, algorithms)
130+
merged_options = self._merge_options(override_options=options)
131+
if merged_options.get('verify_signature'):
132+
self._verify_signature(payload, signing_input, header, signature,
133+
key, algorithms)
119134

120-
self._validate_claims(payload, **kwargs)
135+
self._validate_claims(payload, options=merged_options, **kwargs)
121136

122137
return payload
123138

@@ -177,8 +192,8 @@ def _verify_signature(self, payload, signing_input, header, signature,
177192
except KeyError:
178193
raise InvalidAlgorithmError('Algorithm not supported')
179194

180-
def _validate_claims(self, payload, verify_expiration=True, leeway=0,
181-
audience=None, issuer=None):
195+
def _validate_claims(self, payload, audience=None, issuer=None, leeway=0,
196+
options=None, **kwargs):
182197
if isinstance(leeway, timedelta):
183198
leeway = timedelta_total_seconds(leeway)
184199

@@ -187,7 +202,7 @@ def _validate_claims(self, payload, verify_expiration=True, leeway=0,
187202

188203
now = timegm(datetime.utcnow().utctimetuple())
189204

190-
if 'iat' in payload:
205+
if 'iat' in payload and options.get('verify_iat'):
191206
try:
192207
iat = int(payload['iat'])
193208
except ValueError:
@@ -196,7 +211,7 @@ def _validate_claims(self, payload, verify_expiration=True, leeway=0,
196211
if iat > (now + leeway):
197212
raise InvalidIssuedAtError('Issued At claim (iat) cannot be in the future.')
198213

199-
if 'nbf' in payload and verify_expiration:
214+
if 'nbf' in payload and options.get('verify_nbf'):
200215
try:
201216
nbf = int(payload['nbf'])
202217
except ValueError:
@@ -205,7 +220,7 @@ def _validate_claims(self, payload, verify_expiration=True, leeway=0,
205220
if nbf > (now + leeway):
206221
raise ImmatureSignatureError('The token is not yet valid (nbf)')
207222

208-
if 'exp' in payload and verify_expiration:
223+
if 'exp' in payload and options.get('verify_exp'):
209224
try:
210225
exp = int(payload['exp'])
211226
except ValueError:
@@ -214,7 +229,7 @@ def _validate_claims(self, payload, verify_expiration=True, leeway=0,
214229
if exp < (now - leeway):
215230
raise ExpiredSignatureError('Signature has expired')
216231

217-
if 'aud' in payload:
232+
if 'aud' in payload and options.get('verify_aud'):
218233
audience_claims = payload['aud']
219234
if isinstance(audience_claims, string_types):
220235
audience_claims = [audience_claims]
@@ -233,6 +248,21 @@ def _validate_claims(self, payload, verify_expiration=True, leeway=0,
233248
if payload.get('iss') != issuer:
234249
raise InvalidIssuerError('Invalid issuer')
235250

251+
def _merge_options(self, default_options=None, override_options=None):
252+
if not default_options:
253+
default_options = {}
254+
255+
if not override_options:
256+
override_options = {}
257+
258+
try:
259+
merged_options = self.default_options.copy()
260+
merged_options.update(override_options)
261+
except (AttributeError, ValueError) as e:
262+
raise TypeError('options must be a dictionary: %s' % e)
263+
264+
return merged_options
265+
236266

237267
_jwt_global_obj = PyJWT()
238268
encode = _jwt_global_obj.encode

tests/test_api.py

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ def test_algorithms_parameter_removes_alg_from_algorithms_list(self):
7171
self.assertNotIn('none', self.jwt.get_algorithms())
7272
self.assertIn('HS256', self.jwt.get_algorithms())
7373

74+
def test_default_options(self):
75+
self.assertEqual(self.jwt.default_options, self.jwt.options)
76+
77+
def test_override_options(self):
78+
self.jwt = PyJWT(options={'verify_exp': False, 'verify_nbf': False})
79+
expected_options = self.jwt.default_options
80+
expected_options['verify_exp'] = False
81+
expected_options['verify_nbf'] = False
82+
self.assertEqual(expected_options, self.jwt.options)
83+
84+
def test_non_default_options_persist(self):
85+
self.jwt = PyJWT(options={'verify_iat': False, 'foobar': False})
86+
expected_options = self.jwt.default_options
87+
expected_options['verify_iat'] = False
88+
expected_options['foobar'] = False
89+
self.assertEqual(expected_options, self.jwt.options)
90+
91+
def test_options_must_be_dict(self):
92+
self.assertRaises(TypeError, PyJWT, options=object())
93+
self.assertRaises(TypeError, PyJWT, options=('something'))
94+
7495
def test_encode_decode(self):
7596
secret = 'secret'
7697
jwt_message = self.jwt.encode(self.payload, secret)
@@ -467,14 +488,14 @@ def test_decode_skip_expiration_verification(self):
467488
secret = 'secret'
468489
jwt_message = self.jwt.encode(self.payload, secret)
469490

470-
self.jwt.decode(jwt_message, secret, verify_expiration=False)
491+
self.jwt.decode(jwt_message, secret, options={'verify_exp': False})
471492

472493
def test_decode_skip_notbefore_verification(self):
473494
self.payload['nbf'] = time.time() + 10
474495
secret = 'secret'
475496
jwt_message = self.jwt.encode(self.payload, secret)
476497

477-
self.jwt.decode(jwt_message, secret, verify_expiration=False)
498+
self.jwt.decode(jwt_message, secret, options={'verify_nbf': False})
478499

479500
def test_decode_with_expiration_with_leeway(self):
480501
self.payload['exp'] = utc_timestamp() - 2
@@ -765,6 +786,52 @@ def test_raise_exception_token_without_issuer(self):
765786
with self.assertRaises(InvalidIssuerError):
766787
self.jwt.decode(token, 'secret', issuer=issuer)
767788

789+
def test_skip_check_audience(self):
790+
payload = {
791+
'some': 'payload',
792+
'aud': 'urn:me',
793+
}
794+
token = self.jwt.encode(payload, 'secret')
795+
self.jwt.decode(token, 'secret', options={'verify_aud': False})
796+
797+
def test_skip_check_exp(self):
798+
payload = {
799+
'some': 'payload',
800+
'exp': datetime.utcnow() - timedelta(days=1)
801+
}
802+
token = self.jwt.encode(payload, 'secret')
803+
self.jwt.decode(token, 'secret', options={'verify_exp': False})
804+
805+
def test_skip_check_signature(self):
806+
token = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
807+
".eyJzb21lIjoicGF5bG9hZCJ9"
808+
".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA")
809+
self.jwt.decode(token, 'secret', options={'verify_signature': False})
810+
811+
def test_skip_check_iat(self):
812+
payload = {
813+
'some': 'payload',
814+
'iat': datetime.utcnow() + timedelta(days=1)
815+
}
816+
token = self.jwt.encode(payload, 'secret')
817+
self.jwt.decode(token, 'secret', options={'verify_iat': False})
818+
819+
def test_skip_check_nbf(self):
820+
payload = {
821+
'some': 'payload',
822+
'nbf': datetime.utcnow() + timedelta(days=1)
823+
}
824+
token = self.jwt.encode(payload, 'secret')
825+
self.jwt.decode(token, 'secret', options={'verify_nbf': False})
826+
827+
def test_decode_options_must_be_dict(self):
828+
payload = {
829+
'some': 'payload',
830+
}
831+
token = self.jwt.encode(payload, 'secret')
832+
self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options=object())
833+
self.assertRaises(TypeError, self.jwt.decode, token, 'secret', options='something')
834+
768835
def test_custom_json_encoder(self):
769836

770837
class CustomJSONEncoder(json.JSONEncoder):

0 commit comments

Comments
 (0)