diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 5785e6175..e50c7fe28 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -2146,23 +2146,45 @@ def _encodeClientJWT(self): headers=headers ) - def _decodePlexJWT(self): - """ Returns the decoded and verified Plex JWT using the Plex public JWK. """ - return jwt.decode( - self.jwtToken, - key=jwt.PyJWK.from_dict(self._getPlexPublicJWK()), - algorithms=['EdDSA'], - options={ - 'require': ['aud', 'iss', 'exp', 'iat', 'thumbprint'] - }, - audience=['plex.tv', self._clientIdentifier], - issuer='plex.tv', - ) + def decodePlexJWT(self, verify_signature=True): + """ Returns the decoded Plex JWT with optional signature verification using the Plex public JWK. + + Parameters: + verify_signature (bool): Whether to verify the JWT signature and required claims. + Defaults to True. Set to False to skip signature verification and required-claim enforcement. + """ + kwargs = { + 'jwt': self.jwtToken, + 'algorithms': ['EdDSA'], + 'options': {'verify_signature': verify_signature}, + 'audience': ['plex.tv', self._clientIdentifier], + 'issuer': 'plex.tv', + } + + if not verify_signature: + return jwt.decode(**kwargs) + + kwargs['options']['require'] = ['aud', 'iss', 'exp', 'iat', 'thumbprint'] + + for plexJWK in reversed(self._getPlexPublicJWK()): + try: + return jwt.decode( + key=jwt.PyJWK.from_dict(plexJWK), + **kwargs + ) + except jwt.InvalidSignatureError: + continue + except jwt.InvalidTokenError as e: + log.warning('Invalid Plex JWT: %s', str(e)) + raise + + log.warning('Plex JWT signature could not be verified with any known Plex JWKs') + raise jwt.InvalidSignatureError @property def decodedJWT(self): - """ Returns the decoded Plex JWT. """ - return self._decodePlexJWT() + """ Returns the decoded Plex JWT with signature verification and required-claim enforcement. """ + return self.decodePlexJWT() def _registerPlexDevice(self): """ Registers the public JWK with Plex. """ @@ -2185,10 +2207,10 @@ def _exchangePlexJWT(self): return data['auth_token'] def _getPlexPublicJWK(self): - """ Gets the Plex public JWK. """ + """ Gets the Plex public JWKs. """ url = f'{self.AUTH}/keys' data = self._query(url, method=self._session.get) - return data['keys'][0] + return data['keys'] def registerDevice(self): """ Registers the device with Plex using the provided token and private/public keypair. @@ -2234,14 +2256,7 @@ def verifyJWT(self, refreshWithinDays=1): """ try: decodedJWT = self.decodedJWT - except jwt.ExpiredSignatureError: - log.warning('Existing JWT has expired') - return False - except jwt.InvalidSignatureError: - log.warning('Existing JWT has invalid signature') - return False - except jwt.InvalidTokenError as e: - log.warning(f'Existing JWT is invalid: {e}') + except jwt.InvalidTokenError: return False else: if decodedJWT['thumbprint'] != self._keyID: @@ -2429,7 +2444,7 @@ def _query(self, url, method=None, headers=None, **kwargs): codename = codes.get(response.status_code)[0] errtext = response.text.replace('\n', ' ') raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') - if 'application/json' in response.headers.get('Content-Type', ''): + if 'application/json' in response.headers.get('Content-Type', '') and len(response.content): return response.json() return utils.parseXMLString(response.text) diff --git a/tests/test_myplex.py b/tests/test_myplex.py index 613b6efa5..ead1d4f87 100644 --- a/tests/test_myplex.py +++ b/tests/test_myplex.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- +import jwt + import pytest from plexapi.exceptions import BadRequest, NotFound, Unauthorized -from plexapi.myplex import MyPlexInvite +from plexapi.myplex import MyPlexAccount, MyPlexInvite, MyPlexJWTLogin from . import conftest as utils from .payloads import MYPLEX_INVITE @@ -366,3 +368,37 @@ def test_myplex_geoip(account): def test_myplex_ping(account): assert account.ping() + + +def test_myplex_jwt_login(account, tmp_path, monkeypatch): + jwtlogin = MyPlexJWTLogin( + token=account.authToken, + scopes=['username', 'email', 'friendly_name'] + ) + jwtlogin.generateKeypair(keyfiles=(tmp_path / 'private.key', tmp_path / 'public.key'), overwrite=True) + with pytest.raises(FileExistsError): + jwtlogin.generateKeypair(keyfiles=(tmp_path / 'private.key', tmp_path / 'public.key')) + jwtlogin.registerDevice() + jwtToken = jwtlogin.refreshJWT() + assert jwtlogin.decodedJWT['user']['username'] == account.username + assert MyPlexAccount(token=jwtToken) == account + + jwtlogin = MyPlexJWTLogin( + jwtToken=jwtToken, + keypair=(tmp_path / 'private.key', tmp_path / 'public.key'), + scopes=['username', 'email', 'friendly_name'] + ) + assert jwtlogin.verifyJWT() + newjwtToken = jwtlogin.refreshJWT() + assert newjwtToken != jwtToken + assert MyPlexAccount(token=newjwtToken) == account + + plexPublicJWKs = jwtlogin._getPlexPublicJWK() + invalidJWK = plexPublicJWKs[0].copy() + invalidJWK['x'] += 'invalid' + monkeypatch.setattr(MyPlexJWTLogin, "_getPlexPublicJWK", lambda self: plexPublicJWKs + [invalidJWK]) + assert jwtlogin.decodePlexJWT() + + monkeypatch.setattr(MyPlexJWTLogin, "_getPlexPublicJWK", lambda self: [invalidJWK]) + with pytest.raises(jwt.InvalidSignatureError): + jwtlogin.decodePlexJWT()