Claim Rosetta Token

How can we claim a pending Rosetta token for a user and convert it into an operational token?

These code snippets demonstrate how the chalice server claims a pending token for a user and converts it into an operational token that the user and their client can use, as part of the second phase of the second factor of the authentication process. Note that this code will NOT be executed unless a valid Cognito JWT has been presented at the API endpoint along with this request.

First, the server validates that the request is from a compliant client (a deviceId and claimValue must be passed). Next, it ensures that the identified pending token exists, that the request appears to be from the same user and device that created it, that it hasn't expired, and that the provided claimValue matches the value that was sent in the email to the user. Assuming everything looks fine to that point, it then creates an operational token that inherits some of the pending token information and then deletes the pending token. Finally, it creates a bearer token (the Rosetta JWT) that represents that operational token and makes it available to the user via their client.

Note that operational token is allocated its own unique secret. The reason for this unique secret is to make it harder for a malicious actor to manufacturer (or forge) a valid token. Not only would they need to guess an actual token identifier (which should be hard in and of itself), but they would need to guess the corresponding secret that is associated with that token, because that secret is what is used to sign the JWT that is returned to the user (via their client) and what is used to validate the signature of an operational token when it is presented at an API endpoint.

--

app.py

--

from chalice import Chalice

from chalicelib import requestUtility

app = Chalice(app_name='rosetta-api-chalice')

#
# API endpoints - token-requests
#

@app.route('/token-requests/{tokenId}', methods=['PATCH'])
def claimPendingToken(tokenId):
    userId = validateCognitoToken(app.current_request.headers)
    tokenClaimInfo = app.current_request.json_body
    bearerToken = requestUtility.RequestUtility().claimPendingToken(tokenId, userId, tokenClaimInfo)
    return bearerToken

--

chalicelib/requestUtility.py

--

import time
import uuid

from chalice import BadRequestError
from chalice import UnauthorizedError

from . dynamoDbUtility import DynamoDbUtility
from . tokenUtility import TokenUtility

class RequestUtility(object):

    # region - Public Methods

    def claimPendingToken(self, tokenId, tokenUserId, tokenRequestInfo):
        # guard clause - no device identifier
        providedDeviceId = tokenRequestInfo.get('deviceId')
        if providedDeviceId is None:
            raise BadRequestError('No device identifier provided')

        # guard clause - no claim value
        providedClaimValue = tokenRequestInfo.get('claimValue')
        if providedClaimValue is None:
            raise BadRequestError('No claim value provided')

        # guard clause - no matching token (treat like phishing)
        pendingToken = DynamoDbUtility().getPendingToken(tokenId)
        if pendingToken is None:
            raise BadRequestError('Invalid token identifier provided')

        # guard clause - claiming user different than requesting user (treat like phishing)
        userId = pendingToken.get('userId')
        if userId != tokenUserId:
            raise BadRequestError('Invalid token identifier provided')
        
        # guard clause - already expired (treat like phishing)
        currentTimestamp = time.time()
        originalExpirationTimestamp = pendingToken.get('expirationTimestamp', 0)
        if currentTimestamp > originalExpirationTimestamp:
            raise BadRequestError('Invalid token identifier provided')

        # guard clause - device doesn't match (treat like phishing)
        originalDeviceId = pendingToken.get('tokenInfo', {}).get('deviceId')
        if originalDeviceId != providedDeviceId:
            raise BadRequestError('Invalid token identifier provided')

        # guard clause - claim doesn't match (treat like phishing)
        originalClaimValue = pendingToken.get('tokenInfo', {}).get('claimCode')
        if originalClaimValue != providedClaimValue:
            raise BadRequestError('Invalid token identifier provided')

        # guard clause - cannot create operational token (treat like internal error)
        tokenInfo = pendingToken.get('tokenInfo', {})
        operationalTokenId = DynamoDbUtility().createOperationalToken(userId, tokenInfo)
        if operationalTokenId is None:
            raise UnauthorizedError('Unable to claim token - cannot create operational token')

        # guard clause - cannot delete pending token (treat like internal error)
        pendingTokenDeleted = DynamoDbUtility().deletePendingToken(tokenId)
        if not pendingTokenDeleted:
            raise UnauthorizedError('Unable to claim token - cannot delete pending token')

        # create and return bearer token for the operational token
        tokenId = operationalTokenId
        tokenExpiration = pendingToken.get('tokenInfo', {}).get('expirationTimestamp', '0')
        tokenSecret = pendingToken.get('tokenInfo', {}).get('tokenSecret', str(uuid.uuid4()))
        return TokenUtility().createBearerToken(tokenId, str(tokenExpiration), tokenSecret)

--

chalicelib/dynamoDbUtility.py

--

import boto3
import uuid

from boto3.dynamodb.conditions import Key

ddbClient = boto3.resource('dynamodb')
ddbOperationalTokenTable = ddbClient.Table('OperationalToken')
ddbPendingTokenTable = ddbClient.Table('PendingToken')

class DynamoDbUtility(object):

    # region - Public Methods - Operational Token

    def createOperationalToken(self, userId, tokenInfo):
        tokenId = str(uuid.uuid4())
        expirationTimestamp = int(float(tokenInfo.get('expirationTimestamp', '0')))
        exchangeWindow = 7 * 24 * 3600 # keep token around for 7 days so that it can be exchanged
        ddbResponse = ddbOperationalTokenTable.put_item(
           Item={
                'tokenId': tokenId,
                'userId': userId,
                'expirationTimestamp': expirationTimestamp + exchangeWindow,
                'tokenInfo': tokenInfo
            }
        )
        if ddbResponse is None:
            return None
        elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
            return None

        return tokenId

    # region - Public Methods - Pending Token

    def deletePendingToken(self, tokenId):
        ddbResponse = ddbPendingTokenTable.delete_item(
           Key={
                'tokenId': tokenId
            }
        )
        if ddbResponse is None:
            return False
        elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
            return False

        return True

    def getPendingToken(self, tokenId):
        tokenResponse = ddbPendingTokenTable.get_item(
            Key={ 'tokenId': tokenId }
        )
        if tokenResponse is None or tokenResponse.get('Item') is None:
            return None
        else:
            return tokenResponse.get('Item')

--

chalicelib/tokenUtility.py

--

import base64
import json
import hashlib
import hmac

class TokenUtility(object):

    # region - Public Methods
    
    def createBearerToken(self, tokenId, tokenExpiration, tokenSecret):
        headerValue = self._createHeaderValue()
        payloadValue = self._createPayloadValue(tokenId, tokenExpiration)
        signatureValue = self._createSignatureValue(tokenSecret, headerValue, payloadValue)
        return '{0}.{1}.{2}'.format(headerValue, payloadValue, signatureValue)
        
    # region - Helper Methods
    
    def _createHeaderValue(self):
        headerJson = json.dumps({
          "alg": "HS256",
          "typ": "JWT"
        })
        headerJsonAscii = headerJson.encode('ascii')
        headerJsonB64 = base64.b64encode(headerJsonAscii)
        return headerJsonB64.decode('ascii')
        
    def _createPayloadValue(self, tokenId, tokenExpiration):
        payloadJson = json.dumps({
          "identifier": tokenId,
          "expiration": tokenExpiration
        })
        payloadJsonAscii = payloadJson.encode('ascii')
        payloadJsonB64 = base64.b64encode(payloadJsonAscii)
        return payloadJsonB64.decode('ascii')
        
    def _createSignatureValue(self, tokenSecret, headerValue, payloadValue):
        tokenSecretAscii = tokenSecret.encode('ascii')
        messageValue = "{0}.{1}".format(headerValue, payloadValue)
        messageValueAscii = messageValue.encode('ascii')
        messageHmac = hmac.new(tokenSecretAscii, messageValueAscii, hashlib.sha256)
        return messageHmac.hexdigest()