Create Rosetta Token

How can we create a pending Rosetta token that the user will need to claim within a certain period of time?

These code snippets demonstrate how the chalice server creates a pending token in DynamoDB and then sends out a claim code to the user via SES, both as part of the first phase of the second factor of the authentication process. Note that this code will NOT be executed unless a valid Cognito JWT has been presented at the API endpoint along with this request.

First, the server validates that the request is from a compliant client (a deviceId and userId must be passed). Next, it ensures that the identified user is licensed for Rosetta Salt (I did this to prevent unintended users from using the password manager, to protect its cost, but you don't need to do that for yourself). Then, it creates a pending token (with its own unique secret) and persists it with DynamoDB. Finally, it sends an email with the claim code to the user via SES.

The reason that the pending token and its future related operational token have their own unique secret is to make it harder for a malicious actor to manufacturer (or forge) a valid token. Not only would they need to guess an actual token identifier (which should be hard in and of itself), but they would need to guess the corresponding secret that is associated with that token, because that secret is what is used to sign the JWT that is returned to the user (via their client) and what is used to validate the signature of an operational token when it is presented at an API endpoint.

--

app.py

--

from chalice import Chalice

from chalicelib import requestUtility

app = Chalice(app_name='rosetta-api-chalice')

#
# API endpoints - token-requests
#

@app.route('/token-requests', methods=['POST'])
def createPendingToken():
    userId = validateCognitoToken(app.current_request.headers)
    tokenRequestInfo = app.current_request.json_body
    pendingTokenId = requestUtility.RequestUtility().createPendingToken(userId, tokenRequestInfo)
    return pendingTokenId

--

chalicelib/requestUtility.py

--

import random
import time
import uuid

from chalice import BadRequestError
from chalice import ChaliceViewError
from chalice import UnauthorizedError

from . dynamoDbUtility import DynamoDbUtility
from . emailUtility import EmailUtility

class RequestUtility(object):

    # region - Public Methods

    def createPendingToken(self, tokenUserId, tokenRequestInfo):
        # guard clause - no device identifier
        deviceId = tokenRequestInfo.get('deviceId')
        if deviceId is None:
            raise BadRequestError('No device identifier provided')

        # guard clause - no user identifier
        userId = tokenRequestInfo.get('userId')
        if userId is None or userId != tokenUserId:
            raise BadRequestError('No user identifier provided')

        # construct pending token info (NOTE that expiration in tokenInfo 
        # will be different from the expiration of the pending token itself)
        claimCode = str(random.randint(1000000, 10000000))
        currentTimestamp = int(time.time())
        desiredExpiration = currentTimestamp + (12 * 60 * 60) # 12 hours
        maxLicensedExpiration = self._validateLicensingForUser(userId)
        expirationTimestamp = int(min(desiredExpiration, maxLicensedExpiration))
        issuedTimestamp = currentTimestamp
        tokenSecret = str(uuid.uuid4())
        tokenInfo = {
            'claimCode': claimCode,
            'deviceId': deviceId,
            'expirationTimestamp': str(expirationTimestamp),
            'issuedTimestamp': str(issuedTimestamp),
            'tokenSecret': tokenSecret,
            'tokenType': 'user-requested'
        }

        # guard clause - unable to create pending token
        pendingTokenId = DynamoDbUtility().createPendingToken(userId, tokenInfo)
        if pendingTokenId is None:
            raise ChaliceViewError('Unable to process your token request at this time')

        # guard clause - unable to email user with claim code
        deviceIdPrefix = deviceId[0:10]
        subject = 'We have received your token request from {0}...'.format(deviceIdPrefix)
        body = 'Please enter this code in the next few minutes to claim your token: {0}'.format(claimCode)
        emailMessageId = EmailUtility().sendUserMessage(userId, subject, body)
        if emailMessageId is None:
            raise ChaliceViewError('Unable to process your token request at this time')

        return pendingTokenId

    # region - Helper Methods

    def _getBestAdminLicense(self, adminLicenses):
        # best admin license (from user perspective) is one that expires latest
        maxExpirationTimestamp = 0
        bestAdminLicense = {}

        for adminLicense in adminLicenses:
            licenseExpirationTimestamp = int(adminLicense.get('licenseInfo', {}).get('expirationTimestamp', '0'))
            if licenseExpirationTimestamp > maxExpirationTimestamp:
                maxExpirationTimestamp = licenseExpirationTimestamp
                bestAdminLicense = adminLicense

        return bestAdminLicense

    def _validateLicensingForUser(self, userId):
        # guard clause - no licenses for this user
        adminLicenses = DynamoDbUtility().getAdministrativeLicenses(userId)
        if adminLicenses is None or len(adminLicenses) <= 0:
            raise UnauthorizedError('User is not licensed for this product')

        # guard clause - best license for this user expired
        currentTimestamp = time.time()
        bestAdminLicense = self._getBestAdminLicense(adminLicenses)
        maxLicensedExpiration = int(bestAdminLicense.get('licenseInfo', {}).get('expirationTimestamp', '0'))
        if currentTimestamp > maxLicensedExpiration:
            raise UnauthorizedError('User is not licensed for this product')

        return maxLicensedExpiration

--

chalicelib/dynamoDbUtility.py

--

import boto3
import time
import uuid

from boto3.dynamodb.conditions import Key

ddbClient = boto3.resource('dynamodb')
ddbAdministrativeLicenseTable = ddbClient.Table('AdministrativeLicense')
ddbPendingTokenTable = ddbClient.Table('PendingToken')

class DynamoDbUtility(object):

    # region - Public Methods - Administrative License

    def getAdministrativeLicenses(self, userId):
        ddbResponse = ddbAdministrativeLicenseTable.query(
            IndexName='UserIdIndex',
            KeyConditionExpression=Key('userId').eq(userId),
            Limit=10
        )
        if ddbResponse is None:
            return []
        elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
            return []

        return ddbResponse.get('Items', [])

    # region - Public Methods - Pending Token

    def createPendingToken(self, userId, tokenInfo):
        tokenId = str(uuid.uuid4())
        expirationTimestamp = int(time.time()) + 300 # 5 minutes from now
        ddbResponse = ddbPendingTokenTable.put_item(
           Item={
                'tokenId': tokenId,
                'userId': userId,
                'expirationTimestamp': expirationTimestamp,
                'tokenInfo': tokenInfo
            }
        )
        if ddbResponse is None:
            return None
        elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
            return None

        return tokenId

--

chalicelib/emailUtility.py

--

import boto3

sesClient = boto3.client('ses')

class EmailUtility(object):

    # region - Public Methods

    def sendUserMessage(self, userId, subject, body):
        sesResponse = sesClient.send_email(
            Source='noreply@mail.rosettasalt.org',
            Destination={ 'ToAddresses': [ userId ] },
            Message={
                'Subject': { 'Data': subject, 'Charset': 'UTF-8' },
                'Body': { 'Text': { 'Data': body, 'Charset': 'UTF-8' }}
            }
        )
        if sesResponse is None:
            return None
        elif sesResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
            return None

        return sesResponse.get('MessageId')

--