Files
starling-app/starling

439 lines
16 KiB
Python
Executable File

#!/usr/bin/env -S python3 -ttuI
# -*- python -*-
#-----------------------------------------------------------------------------#
import argparse
import base64
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.hashes import SHA512
from cryptography.hazmat.primitives.serialization import load_pem_private_key
import datetime
import hashlib
import io
import json
import re
import requests
import sys
import types
import urllib
base_url = 'https://api.starlingbank.com/api/v2'
uid_re = re.compile(r'[\da-f]{8}-[\da-f]{4}-4[\da-f]{3}-[\da-f]{4}-[\da-f]{12}')
#-----------------------------------------------------------------------------#
class Question:
def __init__(self, name, description, type, values=None):
self.name = name
self.description = description
self.type = type
self.values = values
self.answer = None
def ask(self):
while True:
sys.stdout.write(f'{self.description} ({self.type_description()}): ')
self.answer = sys.stdin.readline().rstrip('\r\n')
if self.valid_answer():
break
print(f'Invalid answer {self.answer!r}, please try again')
def type_description(self):
if self.type == 'string':
return f'{self.values[0]} - {self.values[1]} characters'
if self.type == 'digits':
return f'exactly {self.values} digits'
if self.type == 'enum':
return ', '.join(self.values)
if self.type == 'uid':
return 'UID'
else:
raise Exception(f'Unknown value type "{self.type}" for "{self.description}"')
def valid_answer(self):
if self.type == 'string':
size = len(self.answer)
return size >= self.values[0] and size <= self.values[1]
if self.type == 'digits':
size = len(self.answer)
return size == self.values and re.fullmatch('\d+', self.answer)
if self.type == 'enum':
return self.answer in self.values
if self.type == 'uid':
return uid_re.fullmatch(self.answer) != None
else:
raise Exception(f'Unknown value type "{self.type}" for "{self.description}"')
class Form:
def __init__(self, topic, *questions):
self.topic = topic
self.questions = questions
def complete(self):
print(f'Please enter the details for the {self.topic}:')
print('')
for question in self.questions:
question.ask()
print('')
print(f'Please check the details for the {self.topic}:')
print('')
for question in self.questions:
print(f'{question.description}: {question.answer}')
print('')
while True:
sys.stdout.write('Are these details corrent (y/n)? ')
answer = sys.stdin.readline().rstrip('\r\n')
if answer == 'y':
break
if answer == 'n':
sys.exit(1)
return dict(map(lambda q: (q.name, q.answer), self.questions))
class StarlingKey:
algorithm = 'ecdsa-sha512'
uid_line_re = re.compile(r'^Key Uid:\s*(' + uid_re.pattern + r')\s*$')
def __init__(self, name):
path = f'keys/{name}.key'
with io.open(path, 'rb') as fh:
line = fh.readline().decode('utf-8').rstrip()
match = self.uid_line_re.fullmatch(line)
if match is None:
sys.exit(f'ERROR: missing/malformed "Key Uid:" line in "{path}"')
self.uid = match.group(1)
self.key = load_pem_private_key(fh.read(), None)
#print(f'Loaded key with UID {self.uid} from "{path}"')
def sign(self, content):
signature = self.key.sign(content.encode('utf-8'), ECDSA(SHA512()))
return base64.b64encode(signature).decode('utf-8')
class StarlingClient:
new_payee_form = Form(
'new payee',
Question('payee_type', 'Payee type', 'enum', ('BUSINESS', 'INDIVIDUAL',)),
Question('payee_name', 'Payee\'s name', 'string', (1, 255,)),
Question('sort_code', 'Sort code', 'digits', 6),
Question('account_number', 'Account number', 'digits', 8),
Question('account_description', 'Account description', 'string', (1, 255,)),
)
new_account_form = Form(
'new account',
Question('payee_uid', 'Payee UID', 'uid'),
Question('sort_code', 'Sort code', 'digits', 6),
Question('account_number', 'Account number', 'digits', 8),
Question('account_description', 'Account description', 'string', (1, 255,)),
)
amount_re = re.compile(r'^(\d+)\.(\d\d)$')
def __init__(self):
self.tokens = self.read_tokens('main', 'payments')
self.api_key = StarlingKey('starling-api-key')
def read_tokens(self, *names):
tokens = {}
for name in names:
with io.open(f'tokens/{name}.txt') as fh:
tokens[name] = fh.read().rstrip()
return tokens
def get(self, path, token='main'):
url = f'{base_url}/{path}'
headers = {
'Authorization': f'Bearer {self.tokens[token]}',
}
response = requests.get(url, headers=headers)
return self.parse_response(response)
def put(self, path, data, token='main'):
url = f'{base_url}/{path}'
headers = {
'Authorization': f'Bearer {self.tokens[token]}',
}
response = requests.put(url, headers=headers, json=data)
return self.parse_response(response)
def signed_put(self, path, data, token='main'):
url = f'{base_url}/{path}'
body = (json.dumps(data) + '\n').encode('utf-8')
date = datetime.datetime.utcnow().isoformat() + '+00:00'
digest = base64.b64encode(hashlib.sha512(body).digest()).decode('utf-8')
signature = self.make_signature('put', url, date, digest)
headers = {
'Authorization': f'Bearer {self.tokens[token]}; Signature {signature}',
'Content-Type': 'application/json',
'Date': date,
'Digest': digest,
}
response = requests.put(url, headers=headers, data=body)
return self.parse_response(response)
def delete(self, path, token='main'):
url = f'{base_url}/{path}'
headers = {
'Authorization': f'Bearer {self.tokens[token]}',
}
response = requests.delete(url, headers=headers)
return self.parse_response(response)
def parse_response(self, response):
if response.status_code < 200 or response.status_code > 299:
print(f'ERROR: got HTTP status code {response.status_code}')
print(f'Request: {response.request}')
print(f'Response:')
print(response.text)
sys.exit(1)
return json.loads(response.text, object_hook=lambda obj: types.SimpleNamespace(**obj))
def make_signature(self, method, url, date, digest):
path = urllib.parse.urlparse(url).path
headers = '(request-target) Date Digest'
content = f'(request-target): {method} {path}\nDate: {date}\nDigest: {digest}'
signature = self.api_key.sign(content)
return f'keyid="{self.api_key.uid}",algorithm="{StarlingKey.algorithm}",headers="{headers}",signature="{signature}"'
### Low-level API wrappers ###
def account_holder(self):
return self.get('account-holder')
def account_holder_individual(self):
return self.get('account-holder/individual')
def accounts(self):
return self.get('accounts')
def balance(self, account_uid):
return self.get(f'accounts/{account_uid}/balance')
def payees(self, data=None):
if data is None:
return self.get('payees')
return self.put('payees', data)
def payees_account(self, payee_uid, data):
return self.put(f'payees/{payee_uid}/account', data)
def payment(self, account_uid, category_uid, data):
return self.signed_put(f'payments/local/account/{account_uid}/category/{category_uid}', data, token='payments')
### Mid-level methods to munge the data from the low-level calls ###
def formatted_balance(self, account_uid):
balance = self.balance(account_uid).effectiveBalance
if balance.currency == 'GBP':
symbol = '£'
elif balance.currency == 'EUR':
symbol = ''
else:
sys.exit(f'ERROR: Unsupported currency {balance.currency}')
major_units = int(balance.minorUnits / 100)
minor_units = balance.minorUnits % 100
return f'{symbol}{major_units}.{minor_units:02d}'
### High-level methods which correspond to actions ###
def default_action(self):
holder = self.account_holder()
holder_type = holder.accountHolderType
if holder_type == 'INDIVIDUAL':
details = self.account_holder_individual()
else:
sys.exit(f'ERROR: Unsupported account holder type {holder_type}')
print('Account holder:')
print(f' Name: {details.firstName} {details.lastName}')
print(f' Email: <{details.email}>')
print(f' Phone: {details.phone}')
print(f' UID: {holder.accountHolderUid}')
accounts = self.accounts().accounts
count = len(accounts)
if count == 0:
# I don't think this should happen!
print('This holder has no accounts.')
return
if count == 1:
print('This holder has one account:')
else:
print(f'This holder has {count} accounts:')
for account in accounts:
balance = self.formatted_balance(account.accountUid)
print(f' {account.name}:')
print(f' Balance: {balance}')
print(f' Account type: {account.accountType}')
print(f' Account UID: {account.accountUid}')
print(f' Default category: {account.defaultCategory}')
def list_payees(self):
payees = self.payees().payees
count = len(payees)
if count == 0:
print('There are no payees for this account holder.')
return
if count == 1:
print('There is one payee for this account holder:')
else:
print(f'There are {count} payees for this account holder:')
for payee in payees:
sys.stdout.write(f' {payee.payeeUid}: {payee.payeeName} ({payee.payeeType}) - ')
count = len(payee.accounts)
if count == 0:
print('no accounts.')
continue
if count == 1:
print('one account:')
else:
print(f'{count} accounts:')
for account in payee.accounts:
sort_code = f'{account.bankIdentifier[0:2]}-{account.bankIdentifier[2:4]}-{account.bankIdentifier[4:6]}'
account_number = account.accountIdentifier
details = f'{account.payeeAccountUid}: {sort_code} {account_number} {account.description}'
if account.defaultAccount:
details += ' (default)'
print(f' {details}')
def payee_add(self):
details = self.new_payee_form.complete()
data = {
'payeeName': details['payee_name'],
'payeeType': details['payee_type'],
'accounts': [
{
'description': details['account_description'],
'defaultAccount': True,
'countryCode': 'GB',
'accountIdentifier': details['account_number'],
'bankIdentifier': details['sort_code'],
'bankIdentifierType': 'SORT_CODE',
},
],
}
response = self.payees(data)
if response.success:
print(f'Successfully created payee with UID {response.payeeUid}')
else:
print(f'Failed to create payee: {response}')
sys.exit(1)
def payee_del(self, payee_uid):
response = self.delete(f'payees/{payee_uid}')
print(response)
def account_add(self):
details = self.new_account_form.complete()
data = {
'description': details['account_description'],
'defaultAccount': False,
'countryCode': 'GB',
'accountIdentifier': details['account_number'],
'bankIdentifier': details['sort_code'],
'bankIdentifierType': 'SORT_CODE',
}
response = self.payees_account(details['payee_uid'], data)
if response.success:
print(f'Successfully added account with UID {response.payeeAccountUid}')
else:
print(f'Failed to create account: {response}')
sys.exit(1)
def pay(self, payee_uid, amount, ref):
match = self.amount_re.match(amount)
if match is None:
sys.exit(f'ERROR: bad amount "{amount}" - must be number with two decimal places')
minor_units = (int(match.group(1)) * 100) + int(match.group(2))
primary = None
for account in self.accounts().accounts:
if account.accountType == 'PRIMARY':
primary = account
break
else:
sys.exit('ERROR: No PRIMARY account found')
data = {
'externalIdentifier': str(datetime.datetime.utcnow().timestamp()),
'destinationPayeeAccountUid': payee_uid,
'reference': ref,
'amount': {
'currency': 'GBP',
'minorUnits': minor_units,
}
}
response = self.payment(primary.accountUid, primary.defaultCategory, data)
print(f'Successfully created payment order with UID {response.paymentOrderUid}')
#-----------------------------------------------------------------------------#
parser = argparse.ArgumentParser(
usage='%(prog)s [options] [action [arg ...]]',
description='Interacts with Starling Bank\'s API to manage bank accounts.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''recognised actions and args:
<none> - show account holder and details
payees - list the customer's payees
payee add - adds a new payee
payee del <payee_uid> - deletes an existing payee
account add <payee_uid> - adds a new account to a payee
pay <account_uid> <amount> <ref> - pay an existing payee
''')
parser.add_argument('action', help='which action to perform', nargs='*')
args = parser.parse_args()
client = StarlingClient()
action_args = args.action
if len(action_args) == 0:
client.default_action()
sys.exit(0)
action = action_args.pop(0)
count = len(action_args)
if action == 'payees':
if count > 0:
parser.error(f'Too many arguments for "{action}" action')
client.list_payees()
elif action == 'payee':
if count < 1:
parser.error(f'Too few arguments for "{action}" action')
subaction = action_args.pop(0)
if subaction == 'add':
if count > 1:
parser.error(f'Too many arguments for "{action} {subaction}" action')
client.payee_add()
elif subaction == 'del':
if count < 2:
parser.error(f'Too few arguments for "{action} {subaction}" action')
if count > 2:
parser.error(f'Too many arguments for "{action} {subaction}" action')
client.payee_del(*action_args)
else:
parser.error(f'Unknown "{action} {subaction}" action')
elif action == 'account':
if count < 1:
parser.error(f'Too few arguments for "{action}" action')
subaction = action_args.pop(0)
if subaction == 'add':
if count > 1:
parser.error(f'Too many arguments for "{action} {subaction}" action')
client.account_add()
elif subaction == 'del':
if count < 2:
parser.error(f'Too few arguments for "{action} {subaction}" action')
if count > 2:
parser.error(f'Too many arguments for "{action} {subaction}" action')
client.account_del(*actuon_args)
else:
parser.error(f'Unknown "{action} {subaction}" action')
elif action == 'pay':
if count < 3:
parser.error(f'Too few arguments for "{action}" action')
if count > 3:
parser.error(f'Too many arguments for "{action}" action')
client.pay(*action_args)
else:
parser.error(f'Unknown action "{action}"')
#-----------------------------------------------------------------------------#