Validate Rosetta Token
How can we validate a Rosetta token that is passed to an API endpoint?
These code snippets demonstrate how the chalice server validates a Rosetta JWT that is passed to an endpoint. After retrieving the JWT from the request, it extracts the token identifier from the payload of the JWT and then loads the corresponding operational token from DynamoDB. Once it has the operational token, it uses the unique secret associated with it to validate the signature for the JWT, to make sure that the JWT has not been forged. Finally, it performs a number of other checks to make sure that the request came from the same device that claimed the token originally, that the token hasn't expired, and that the token has a valid user email address associated with it (which originally came from and was validated during the two factors of authentication).
--
app.py
--
from chalice import Chalice
from chalice import UnauthorizedError
from chalicelib import requestUtility
app = Chalice(app_name='rosetta-api-chalice')
#
# Convenience functions
#
def validateUser(requestHeaders):
tokenItem = requestUtility.RequestUtility().validateBearerToken(app, requestHeaders)
if tokenItem is None:
raise UnauthorizedError('Expired or invalid token identified')
userId = tokenItem.get('userId')
if userId is None or len(userId) <= 0:
raise UnauthorizedError('Expired or invalid token identified')
return userId
--
chalicelib/requestUtility.py
--
import time
from chalice import BadRequestError
from chalice import UnauthorizedError
from . dynamoDbUtility import DynamoDbUtility
from . tokenUtility import TokenUtility
class RequestUtility(object):
# region - Public Methods
def validateBearerToken(self, app, requestHeaders, allowForExchangeWindow=False):
# guard clause - no bearer token value
bearerTokenValue = self._getBearerTokenValue(requestHeaders)
if bearerTokenValue is None:
raise BadRequestError('No bearer token provided')
# guard clause - no tokenId in bearer token value
tokenId = TokenUtility().getTokenId(bearerTokenValue)
if tokenId is None:
app.log.debug('No token found for bearer token value: {0}'.format(bearerTokenValue))
raise BadRequestError('Invalid bearer token provided')
# guard clause - no token in table (expired and removed or never there)
tokenItem = DynamoDbUtility().getOperationalToken(tokenId)
if tokenItem is None:
app.log.debug('No token found for tokenId: {0}'.format(tokenId))
raise UnauthorizedError('Expired or invalid token identified')
# guard clause - inauthentic token (signature invalid)
tokenSecret = tokenItem.get('tokenInfo', {}).get('tokenSecret', '')
bearerTokenAuthentic = TokenUtility().isBearerTokenAuthentic(bearerTokenValue, tokenSecret)
if not bearerTokenAuthentic:
app.log.debug('Inauthentic bearer token value for tokenId: {0}'.format(tokenId))
raise UnauthorizedError('Expired or invalid token identified')
# guard clause - invalid device
requestDeviceId = self._getDeviceIdValue(requestHeaders)
tokenDeviceId = tokenItem.get('tokenInfo', {}).get('deviceId')
if requestDeviceId is None or requestDeviceId != tokenDeviceId:
app.log.debug('Authentic token from a different device: {0} ({1})'.format(tokenId, requestDeviceId))
raise UnauthorizedError('Expired or invalid token identified')
# guard clause - expired token
expirationTimestamp = tokenItem.get('tokenInfo', {}).get('expirationTimestamp', '0')
if not allowForExchangeWindow and time.time() > float(expirationTimestamp):
app.log.debug('Expired token for tokenId: {0}'.format(tokenId))
raise UnauthorizedError('Expired or invalid token identified')
# guard clause - invalid userId
userId = tokenItem.get('userId')
if userId is None or len(userId) <= 0:
app.log.debug('Malformed token for tokenId: {0}'.format(tokenId))
raise UnauthorizedError('Expired or invalid token identified')
# otherwise, return the token
return tokenItem
# region - Helper Methods
def _getBearerTokenValue(self, requestHeaders):
# guard clause - no headers
if requestHeaders is None:
return None
# guard clause - no authorization header or bearer token value
authorizationValue = requestHeaders.get('Authorization')
if authorizationValue is None:
return None
elif not authorizationValue.startswith('Bearer '):
return None
return authorizationValue.replace('Bearer ', '')
def _getDeviceIdValue(self, requestHeaders):
# guard clause - no headers
if requestHeaders is None:
return None
return requestHeaders.get('X-Device-Id')
--
chalicelib/tokenUtility.py
--
import base64
import json
import hashlib
import hmac
class TokenUtility(object):
# region - Public Methods
def getTokenId(self, bearerTokenValue):
# guard clause - invalid input
if bearerTokenValue is None:
return None
# guard clause - invalid input
bearerTokenParts = str(bearerTokenValue).split('.')
if len(bearerTokenParts) != 3:
return None
# guard clause - invalid input
payloadValue = bearerTokenParts[1]
if payloadValue is None:
return None
try:
decodedPayloadValue = base64.b64decode(payloadValue)
decodedPayloadJson = json.loads(decodedPayloadValue)
return decodedPayloadJson.get('identifier', None)
except Exception as e:
print("Problem getting token identifier from payload value: {0}".format(payloadValue))
return None
def isBearerTokenAuthentic(self, bearerTokenValue, tokenSecret):
# guard clause - invalid input
if bearerTokenValue is None:
return False
# guard clause - invalid input
bearerTokenParts = str(bearerTokenValue).split('.')
if len(bearerTokenParts) != 3:
return None
try:
headerValue = bearerTokenParts[0]
payloadValue = bearerTokenParts[1]
actualSignatureValue = bearerTokenParts[2]
expectedSignatureValue = self._createSignatureValue(tokenSecret, headerValue, payloadValue)
return actualSignatureValue == expectedSignatureValue
except Exception as e:
return False
# region - Helper Methods
def _createSignatureValue(self, tokenSecret, headerValue, payloadValue):
tokenSecretAscii = tokenSecret.encode('ascii')
messageValue = "{0}.{1}".format(headerValue, payloadValue)
messageValueAscii = messageValue.encode('ascii')
messageHmac = hmac.new(tokenSecretAscii, messageValueAscii, hashlib.sha256)
return messageHmac.hexdigest()
--
chalicelib/dynamoDbUtility.py
--
import boto3
from boto3.dynamodb.conditions import Key
ddbClient = boto3.resource('dynamodb')
ddbOperationalTokenTable = ddbClient.Table('OperationalToken')
class DynamoDbUtility(object):
# region - Public Methods - Operational Token
def getOperationalToken(self, tokenId):
tokenResponse = ddbOperationalTokenTable.get_item(
Key={ 'tokenId': tokenId }
)
if tokenResponse is None or tokenResponse.get('Item') is None:
return None
else:
return tokenResponse.get('Item')
--