Create Rosetta Token
How can we create a pending Rosetta token that the user will need to claim within a certain period of time?
These code snippets demonstrate how the chalice server creates a pending token in DynamoDB and then sends out a claim code to the user via SES, both as part of the first phase of the second factor of the authentication process. Note that this code will NOT be executed unless a valid Cognito JWT has been presented at the API endpoint along with this request.
First, the server validates that the request is from a compliant client (a deviceId and userId must be passed). Next, it ensures that the identified user is licensed for Rosetta Salt (I did this to prevent unintended users from using the password manager, to protect its cost, but you don't need to do that for yourself). Then, it creates a pending token (with its own unique secret) and persists it with DynamoDB. Finally, it sends an email with the claim code to the user via SES.
The reason that the pending token and its future related operational token have their own unique secret is to make it harder for a malicious actor to manufacturer (or forge) a valid token. Not only would they need to guess an actual token identifier (which should be hard in and of itself), but they would need to guess the corresponding secret that is associated with that token, because that secret is what is used to sign the JWT that is returned to the user (via their client) and what is used to validate the signature of an operational token when it is presented at an API endpoint.
--
app.py
--
from chalice import Chalice
from chalicelib import requestUtility
app = Chalice(app_name='rosetta-api-chalice')
#
# API endpoints - token-requests
#
@app.route('/token-requests', methods=['POST'])
def createPendingToken():
userId = validateCognitoToken(app.current_request.headers)
tokenRequestInfo = app.current_request.json_body
pendingTokenId = requestUtility.RequestUtility().createPendingToken(userId, tokenRequestInfo)
return pendingTokenId
--
chalicelib/requestUtility.py
--
import random
import time
import uuid
from chalice import BadRequestError
from chalice import ChaliceViewError
from chalice import UnauthorizedError
from . dynamoDbUtility import DynamoDbUtility
from . emailUtility import EmailUtility
class RequestUtility(object):
# region - Public Methods
def createPendingToken(self, tokenUserId, tokenRequestInfo):
# guard clause - no device identifier
deviceId = tokenRequestInfo.get('deviceId')
if deviceId is None:
raise BadRequestError('No device identifier provided')
# guard clause - no user identifier
userId = tokenRequestInfo.get('userId')
if userId is None or userId != tokenUserId:
raise BadRequestError('No user identifier provided')
# construct pending token info (NOTE that expiration in tokenInfo
# will be different from the expiration of the pending token itself)
claimCode = str(random.randint(1000000, 10000000))
currentTimestamp = int(time.time())
desiredExpiration = currentTimestamp + (12 * 60 * 60) # 12 hours
maxLicensedExpiration = self._validateLicensingForUser(userId)
expirationTimestamp = int(min(desiredExpiration, maxLicensedExpiration))
issuedTimestamp = currentTimestamp
tokenSecret = str(uuid.uuid4())
tokenInfo = {
'claimCode': claimCode,
'deviceId': deviceId,
'expirationTimestamp': str(expirationTimestamp),
'issuedTimestamp': str(issuedTimestamp),
'tokenSecret': tokenSecret,
'tokenType': 'user-requested'
}
# guard clause - unable to create pending token
pendingTokenId = DynamoDbUtility().createPendingToken(userId, tokenInfo)
if pendingTokenId is None:
raise ChaliceViewError('Unable to process your token request at this time')
# guard clause - unable to email user with claim code
deviceIdPrefix = deviceId[0:10]
subject = 'We have received your token request from {0}...'.format(deviceIdPrefix)
body = 'Please enter this code in the next few minutes to claim your token: {0}'.format(claimCode)
emailMessageId = EmailUtility().sendUserMessage(userId, subject, body)
if emailMessageId is None:
raise ChaliceViewError('Unable to process your token request at this time')
return pendingTokenId
# region - Helper Methods
def _getBestAdminLicense(self, adminLicenses):
# best admin license (from user perspective) is one that expires latest
maxExpirationTimestamp = 0
bestAdminLicense = {}
for adminLicense in adminLicenses:
licenseExpirationTimestamp = int(adminLicense.get('licenseInfo', {}).get('expirationTimestamp', '0'))
if licenseExpirationTimestamp > maxExpirationTimestamp:
maxExpirationTimestamp = licenseExpirationTimestamp
bestAdminLicense = adminLicense
return bestAdminLicense
def _validateLicensingForUser(self, userId):
# guard clause - no licenses for this user
adminLicenses = DynamoDbUtility().getAdministrativeLicenses(userId)
if adminLicenses is None or len(adminLicenses) <= 0:
raise UnauthorizedError('User is not licensed for this product')
# guard clause - best license for this user expired
currentTimestamp = time.time()
bestAdminLicense = self._getBestAdminLicense(adminLicenses)
maxLicensedExpiration = int(bestAdminLicense.get('licenseInfo', {}).get('expirationTimestamp', '0'))
if currentTimestamp > maxLicensedExpiration:
raise UnauthorizedError('User is not licensed for this product')
return maxLicensedExpiration
--
chalicelib/dynamoDbUtility.py
--
import boto3
import time
import uuid
from boto3.dynamodb.conditions import Key
ddbClient = boto3.resource('dynamodb')
ddbAdministrativeLicenseTable = ddbClient.Table('AdministrativeLicense')
ddbPendingTokenTable = ddbClient.Table('PendingToken')
class DynamoDbUtility(object):
# region - Public Methods - Administrative License
def getAdministrativeLicenses(self, userId):
ddbResponse = ddbAdministrativeLicenseTable.query(
IndexName='UserIdIndex',
KeyConditionExpression=Key('userId').eq(userId),
Limit=10
)
if ddbResponse is None:
return []
elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
return []
return ddbResponse.get('Items', [])
# region - Public Methods - Pending Token
def createPendingToken(self, userId, tokenInfo):
tokenId = str(uuid.uuid4())
expirationTimestamp = int(time.time()) + 300 # 5 minutes from now
ddbResponse = ddbPendingTokenTable.put_item(
Item={
'tokenId': tokenId,
'userId': userId,
'expirationTimestamp': expirationTimestamp,
'tokenInfo': tokenInfo
}
)
if ddbResponse is None:
return None
elif ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
return None
return tokenId
--
chalicelib/emailUtility.py
--
import boto3
sesClient = boto3.client('ses')
class EmailUtility(object):
# region - Public Methods
def sendUserMessage(self, userId, subject, body):
sesResponse = sesClient.send_email(
Source='noreply@mail.rosettasalt.org',
Destination={ 'ToAddresses': [ userId ] },
Message={
'Subject': { 'Data': subject, 'Charset': 'UTF-8' },
'Body': { 'Text': { 'Data': body, 'Charset': 'UTF-8' }}
}
)
if sesResponse is None:
return None
elif sesResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
return None
return sesResponse.get('MessageId')
--