Validate Rosetta Token

How can we validate a Rosetta token that is passed to an API endpoint?

These code snippets demonstrate how the chalice server validates a Rosetta JWT that is passed to an endpoint. After retrieving the JWT from the request, it extracts the token identifier from the payload of the JWT and then loads the corresponding operational token from DynamoDB. Once it has the operational token, it uses the unique secret associated with it to validate the signature for the JWT, to make sure that the JWT has not been forged. Finally, it performs a number of other checks to make sure that the request came from the same device that claimed the token originally, that the token hasn't expired, and that the token has a valid user email address associated with it (which originally came from and was validated during the two factors of authentication).

--

app.py

--

from chalice import Chalice
from chalice import UnauthorizedError

from chalicelib import requestUtility

app = Chalice(app_name='rosetta-api-chalice')

#
# Convenience functions
#

def validateUser(requestHeaders):
    tokenItem = requestUtility.RequestUtility().validateBearerToken(app, requestHeaders)
    if tokenItem is None:
        raise UnauthorizedError('Expired or invalid token identified')

    userId = tokenItem.get('userId')
    if userId is None or len(userId) <= 0:
        raise UnauthorizedError('Expired or invalid token identified')

    return userId

--

chalicelib/requestUtility.py

--

import time

from chalice import BadRequestError
from chalice import UnauthorizedError

from . dynamoDbUtility import DynamoDbUtility
from . tokenUtility import TokenUtility

class RequestUtility(object):

    # region - Public Methods

    def validateBearerToken(self, app, requestHeaders, allowForExchangeWindow=False):
        # guard clause - no bearer token value
        bearerTokenValue = self._getBearerTokenValue(requestHeaders)
        if bearerTokenValue is None:
            raise BadRequestError('No bearer token provided')

        # guard clause - no tokenId in bearer token value
        tokenId = TokenUtility().getTokenId(bearerTokenValue)
        if tokenId is None:
            app.log.debug('No token found for bearer token value: {0}'.format(bearerTokenValue))
            raise BadRequestError('Invalid bearer token provided')

        # guard clause - no token in table (expired and removed or never there)
        tokenItem = DynamoDbUtility().getOperationalToken(tokenId)
        if tokenItem is None:
            app.log.debug('No token found for tokenId: {0}'.format(tokenId))
            raise UnauthorizedError('Expired or invalid token identified')

        # guard clause - inauthentic token (signature invalid)
        tokenSecret = tokenItem.get('tokenInfo', {}).get('tokenSecret', '')
        bearerTokenAuthentic = TokenUtility().isBearerTokenAuthentic(bearerTokenValue, tokenSecret)
        if not bearerTokenAuthentic:
            app.log.debug('Inauthentic bearer token value for tokenId: {0}'.format(tokenId))
            raise UnauthorizedError('Expired or invalid token identified')

        # guard clause - invalid device
        requestDeviceId = self._getDeviceIdValue(requestHeaders)
        tokenDeviceId = tokenItem.get('tokenInfo', {}).get('deviceId')
        if requestDeviceId is None or requestDeviceId != tokenDeviceId:
            app.log.debug('Authentic token from a different device: {0} ({1})'.format(tokenId, requestDeviceId))
            raise UnauthorizedError('Expired or invalid token identified')

        # guard clause - expired token
        expirationTimestamp = tokenItem.get('tokenInfo', {}).get('expirationTimestamp', '0')
        if not allowForExchangeWindow and time.time() > float(expirationTimestamp):
            app.log.debug('Expired token for tokenId: {0}'.format(tokenId))
            raise UnauthorizedError('Expired or invalid token identified')

        # guard clause - invalid userId
        userId = tokenItem.get('userId')
        if userId is None or len(userId) <= 0:
            app.log.debug('Malformed token for tokenId: {0}'.format(tokenId))
            raise UnauthorizedError('Expired or invalid token identified')

        # otherwise, return the token
        return tokenItem
 
    # region - Helper Methods

    def _getBearerTokenValue(self, requestHeaders):
        # guard clause - no headers
        if requestHeaders is None:
            return None

        # guard clause - no authorization header or bearer token value
        authorizationValue = requestHeaders.get('Authorization')
        if authorizationValue is None:
            return None
        elif not authorizationValue.startswith('Bearer '):
            return None

        return authorizationValue.replace('Bearer ', '')

    def _getDeviceIdValue(self, requestHeaders):
        # guard clause - no headers
        if requestHeaders is None:
            return None

        return requestHeaders.get('X-Device-Id')

--

chalicelib/tokenUtility.py

--

import base64
import json
import hashlib
import hmac

class TokenUtility(object):

    # region - Public Methods
    
    def getTokenId(self, bearerTokenValue):
        # guard clause - invalid input
        if bearerTokenValue is None:
            return None

        # guard clause - invalid input
        bearerTokenParts = str(bearerTokenValue).split('.')
        if len(bearerTokenParts) != 3:
            return None

        # guard clause - invalid input
        payloadValue = bearerTokenParts[1]
        if payloadValue is None:
            return None
            
        try:
            decodedPayloadValue = base64.b64decode(payloadValue)
            decodedPayloadJson = json.loads(decodedPayloadValue)
            return decodedPayloadJson.get('identifier', None)
        except Exception as e:
            print("Problem getting token identifier from payload value: {0}".format(payloadValue))
            return None

    def isBearerTokenAuthentic(self, bearerTokenValue, tokenSecret):
        # guard clause - invalid input
        if bearerTokenValue is None:
            return False

        # guard clause - invalid input
        bearerTokenParts = str(bearerTokenValue).split('.')
        if len(bearerTokenParts) != 3:
            return None

        try:
            headerValue = bearerTokenParts[0]
            payloadValue = bearerTokenParts[1]
            actualSignatureValue = bearerTokenParts[2]
            expectedSignatureValue = self._createSignatureValue(tokenSecret, headerValue, payloadValue)
            return actualSignatureValue == expectedSignatureValue
        except Exception as e:
            return False

    # region - Helper Methods
    
    def _createSignatureValue(self, tokenSecret, headerValue, payloadValue):
        tokenSecretAscii = tokenSecret.encode('ascii')
        messageValue = "{0}.{1}".format(headerValue, payloadValue)
        messageValueAscii = messageValue.encode('ascii')
        messageHmac = hmac.new(tokenSecretAscii, messageValueAscii, hashlib.sha256)
        return messageHmac.hexdigest()

--

chalicelib/dynamoDbUtility.py

--

import boto3

from boto3.dynamodb.conditions import Key

ddbClient = boto3.resource('dynamodb')
ddbOperationalTokenTable = ddbClient.Table('OperationalToken')

class DynamoDbUtility(object):

    # region - Public Methods - Operational Token

    def getOperationalToken(self, tokenId):
        tokenResponse = ddbOperationalTokenTable.get_item(
            Key={ 'tokenId': tokenId }
        )
        if tokenResponse is None or tokenResponse.get('Item') is None:
            return None
        else:
            return tokenResponse.get('Item')

--