#!/usr/bin/env -S python3 -ttuI # -*- python -*- #-----------------------------------------------------------------------------# import argparse import base64 from cryptography.hazmat.primitives.asymmetric.ec import ECDSA from cryptography.hazmat.primitives.hashes import SHA512 from cryptography.hazmat.primitives.serialization import load_pem_private_key import datetime import hashlib import io import json import re import requests import sys import types import urllib base_url = 'https://api.starlingbank.com/api/v2' uid_re = re.compile(r'[\da-f]{8}-[\da-f]{4}-4[\da-f]{3}-[\da-f]{4}-[\da-f]{12}') #-----------------------------------------------------------------------------# class Question: def __init__(self, name, description, type, values=None): self.name = name self.description = description self.type = type self.values = values self.answer = None def ask(self): while True: sys.stdout.write(f'{self.description} ({self.type_description()}): ') self.answer = sys.stdin.readline().rstrip('\r\n') if self.valid_answer(): break print(f'Invalid answer {self.answer!r}, please try again') def type_description(self): if self.type == 'string': return f'{self.values[0]} - {self.values[1]} characters' if self.type == 'digits': return f'exactly {self.values} digits' if self.type == 'enum': return ', '.join(self.values) if self.type == 'uid': return 'UID' else: raise Exception(f'Unknown value type "{self.type}" for "{self.description}"') def valid_answer(self): if self.type == 'string': size = len(self.answer) return size >= self.values[0] and size <= self.values[1] if self.type == 'digits': size = len(self.answer) return size == self.values and re.fullmatch('\d+', self.answer) if self.type == 'enum': return self.answer in self.values if self.type == 'uid': return uid_re.fullmatch(self.answer) != None else: raise Exception(f'Unknown value type "{self.type}" for "{self.description}"') class Form: def __init__(self, topic, *questions): self.topic = topic self.questions = questions def complete(self): print(f'Please enter the details for the {self.topic}:') print('') for question in self.questions: question.ask() print('') print(f'Please check the details for the {self.topic}:') print('') for question in self.questions: print(f'{question.description}: {question.answer}') print('') while True: sys.stdout.write('Are these details corrent (y/n)? ') answer = sys.stdin.readline().rstrip('\r\n') if answer == 'y': break if answer == 'n': sys.exit(1) return dict(map(lambda q: (q.name, q.answer), self.questions)) class StarlingKey: algorithm = 'ecdsa-sha512' uid_line_re = re.compile(r'^Key Uid:\s*(' + uid_re.pattern + r')\s*$') def __init__(self, name): path = f'keys/{name}.key' with io.open(path, 'rb') as fh: line = fh.readline().decode('utf-8').rstrip() match = self.uid_line_re.fullmatch(line) if match is None: sys.exit(f'ERROR: missing/malformed "Key Uid:" line in "{path}"') self.uid = match.group(1) self.key = load_pem_private_key(fh.read(), None) #print(f'Loaded key with UID {self.uid} from "{path}"') def sign(self, content): signature = self.key.sign(content.encode('utf-8'), ECDSA(SHA512())) return base64.b64encode(signature).decode('utf-8') class StarlingClient: new_payee_form = Form( 'new payee', Question('payee_type', 'Payee type', 'enum', ('BUSINESS', 'INDIVIDUAL',)), Question('payee_name', 'Payee\'s name', 'string', (1, 255,)), Question('sort_code', 'Sort code', 'digits', 6), Question('account_number', 'Account number', 'digits', 8), Question('account_description', 'Account description', 'string', (1, 255,)), ) new_account_form = Form( 'new account', Question('payee_uid', 'Payee UID', 'uid'), Question('sort_code', 'Sort code', 'digits', 6), Question('account_number', 'Account number', 'digits', 8), Question('account_description', 'Account description', 'string', (1, 255,)), ) amount_re = re.compile(r'^(\d+)\.(\d\d)$') def __init__(self): self.tokens = self.read_tokens('main', 'payments') self.api_key = StarlingKey('starling-api-key') def read_tokens(self, *names): tokens = {} for name in names: with io.open(f'tokens/{name}.txt') as fh: tokens[name] = fh.read().rstrip() return tokens def get(self, path, token='main'): url = f'{base_url}/{path}' headers = { 'Authorization': f'Bearer {self.tokens[token]}', } response = requests.get(url, headers=headers) return self.parse_response(response) def put(self, path, data, token='main'): url = f'{base_url}/{path}' headers = { 'Authorization': f'Bearer {self.tokens[token]}', } response = requests.put(url, headers=headers, json=data) return self.parse_response(response) def signed_put(self, path, data, token='main'): url = f'{base_url}/{path}' body = (json.dumps(data) + '\n').encode('utf-8') date = datetime.datetime.utcnow().isoformat() + '+00:00' digest = base64.b64encode(hashlib.sha512(body).digest()).decode('utf-8') signature = self.make_signature('put', url, date, digest) headers = { 'Authorization': f'Bearer {self.tokens[token]}; Signature {signature}', 'Content-Type': 'application/json', 'Date': date, 'Digest': digest, } response = requests.put(url, headers=headers, data=body) return self.parse_response(response) def delete(self, path, token='main'): url = f'{base_url}/{path}' headers = { 'Authorization': f'Bearer {self.tokens[token]}', } response = requests.delete(url, headers=headers) return self.parse_response(response) def parse_response(self, response): if response.status_code < 200 or response.status_code > 299: print(f'ERROR: got HTTP status code {response.status_code}') print(f'Request: {response.request}') print(f'Response:') print(response.text) sys.exit(1) return json.loads(response.text, object_hook=lambda obj: types.SimpleNamespace(**obj)) def make_signature(self, method, url, date, digest): path = urllib.parse.urlparse(url).path headers = '(request-target) Date Digest' content = f'(request-target): {method} {path}\nDate: {date}\nDigest: {digest}' signature = self.api_key.sign(content) return f'keyid="{self.api_key.uid}",algorithm="{StarlingKey.algorithm}",headers="{headers}",signature="{signature}"' ### Low-level API wrappers ### def account_holder(self): return self.get('account-holder') def account_holder_individual(self): return self.get('account-holder/individual') def accounts(self): return self.get('accounts') def balance(self, account_uid): return self.get(f'accounts/{account_uid}/balance') def payees(self, data=None): if data is None: return self.get('payees') return self.put('payees', data) def payees_account(self, payee_uid, data): return self.put(f'payees/{payee_uid}/account', data) def payment(self, account_uid, category_uid, data): return self.signed_put(f'payments/local/account/{account_uid}/category/{category_uid}', data, token='payments') def spaces(self, account_uid): return self.get(f'account/{account_uid}/spaces') ### Mid-level methods to munge the data from the low-level calls ### def get_primary_account(self): for account in self.accounts().accounts: if account.accountType == 'PRIMARY': return account sys.exit('ERROR: No PRIMARY account found') def format_amount(self, amount): if amount.currency == 'GBP': symbol = '£' elif amount.currency == 'EUR': symbol = '€' else: sys.exit(f'ERROR: Unsupported currency {amount.currency}') major_units = int(amount.minorUnits / 100) minor_units = amount.minorUnits % 100 return f'{symbol}{major_units}.{minor_units:02d}' ### High-level methods which correspond to actions ### def default_action(self): holder = self.account_holder() holder_type = holder.accountHolderType if holder_type == 'INDIVIDUAL': details = self.account_holder_individual() else: sys.exit(f'ERROR: Unsupported account holder type {holder_type}') print('Account holder:') print(f' Name: {details.firstName} {details.lastName}') print(f' Email: <{details.email}>') print(f' Phone: {details.phone}') print(f' UID: {holder.accountHolderUid}') accounts = self.accounts().accounts count = len(accounts) if count == 0: # I don't think this should happen! print('This holder has no accounts.') return if count == 1: print('This holder has one account:') else: print(f'This holder has {count} accounts:') for account in accounts: balance = self.balance(account.accountUid).effectiveBalance print(f' {account.name}:') print(f' Balance: {self.format_amount(balance)}') print(f' Account type: {account.accountType}') print(f' Account UID: {account.accountUid}') print(f' Default category: {account.defaultCategory}') def list_payees(self): payees = self.payees().payees count = len(payees) if count == 0: print('There are no payees for this account holder.') return if count == 1: print('There is one payee for this account holder:') else: print(f'There are {count} payees for this account holder:') for payee in payees: sys.stdout.write(f' {payee.payeeUid}: {payee.payeeName} ({payee.payeeType}) - ') count = len(payee.accounts) if count == 0: print('no accounts.') continue if count == 1: print('one account:') else: print(f'{count} accounts:') for account in payee.accounts: sort_code = f'{account.bankIdentifier[0:2]}-{account.bankIdentifier[2:4]}-{account.bankIdentifier[4:6]}' account_number = account.accountIdentifier details = f'{account.payeeAccountUid}: {sort_code} {account_number} {account.description}' if account.defaultAccount: details += ' (default)' print(f' {details}') def payee_add(self): details = self.new_payee_form.complete() data = { 'payeeName': details['payee_name'], 'payeeType': details['payee_type'], 'accounts': [ { 'description': details['account_description'], 'defaultAccount': True, 'countryCode': 'GB', 'accountIdentifier': details['account_number'], 'bankIdentifier': details['sort_code'], 'bankIdentifierType': 'SORT_CODE', }, ], } response = self.payees(data) if response.success: print(f'Successfully created payee with UID {response.payeeUid}') else: print(f'Failed to create payee: {response}') sys.exit(1) def payee_del(self, payee_uid): response = self.delete(f'payees/{payee_uid}') print(response) def account_add(self): details = self.new_account_form.complete() data = { 'description': details['account_description'], 'defaultAccount': False, 'countryCode': 'GB', 'accountIdentifier': details['account_number'], 'bankIdentifier': details['sort_code'], 'bankIdentifierType': 'SORT_CODE', } response = self.payees_account(details['payee_uid'], data) if response.success: print(f'Successfully added account with UID {response.payeeAccountUid}') else: print(f'Failed to create account: {response}') sys.exit(1) def pay(self, payee_uid, amount, ref): match = self.amount_re.match(amount) if match is None: sys.exit(f'ERROR: bad amount "{amount}" - must be number with two decimal places') minor_units = (int(match.group(1)) * 100) + int(match.group(2)) account = self.get_primary_account() data = { 'externalIdentifier': str(datetime.datetime.utcnow().timestamp()), 'destinationPayeeAccountUid': payee_uid, 'reference': ref, 'amount': { 'currency': 'GBP', 'minorUnits': minor_units, } } response = self.payment(account.accountUid, account.defaultCategory, data) print(f'Successfully created payment order with UID {response.paymentOrderUid}') def list_spaces(self): account = self.get_primary_account() result = self.spaces(account.accountUid) self.print_savings_goals(result.savingsGoals) self.print_spending_spaces(result.spendingSpaces) def print_savings_goals(self, goals): count = len(goals) print(f'Account has {count} savings goal(s):') for index, goal, in enumerate(goals, 1): print(f'{index}. Savings Goal UID = {goal.savingsGoalUid}') print(f' Name = {goal.name}') if hasattr(goal, 'target'): print(f' Target = {self.format_amount(goal.target)}') print(f' Total Saved = {self.format_amount(goal.totalSaved)}') if hasattr(goal, 'savedPercent'): print(f' Saved Percent = {goal.savedPercent}') print(f' State = {goal.state}') def print_spending_spaces(self, spaces): count = len(spaces) print(f'Account has {count} spending space(s):') for index, space, in enumerate(spaces, 1): print(f'{index}. Spending Space UID = {space.spaceUid}') print(f' Name = {space.name}') print(f' Balance = {self.format_amount(space.balance)}') print(f' Card Association UID = {space.cardAssociationUid}') print(f' Type = {space.spendingSpaceType}') print(f' State = {space.state}') #-----------------------------------------------------------------------------# parser = argparse.ArgumentParser( usage='%(prog)s [options] [action [arg ...]]', description='Interacts with Starling Bank\'s API to manage bank accounts.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog='''recognised actions and args: - show account holder and details payees - list the customer's payees payee add - adds a new payee payee del - deletes an existing payee account add - adds a new account to a payee pay - pay an existing payee ''') parser.add_argument('action', help='which action to perform', nargs='*') args = parser.parse_args() client = StarlingClient() action_args = args.action if len(action_args) == 0: client.default_action() sys.exit(0) action = action_args.pop(0) count = len(action_args) if action == 'payees': if count > 0: parser.error(f'Too many arguments for "{action}" action') client.list_payees() elif action == 'payee': if count < 1: parser.error(f'Too few arguments for "{action}" action') subaction = action_args.pop(0) if subaction == 'add': if count > 1: parser.error(f'Too many arguments for "{action} {subaction}" action') client.payee_add() elif subaction == 'del': if count < 2: parser.error(f'Too few arguments for "{action} {subaction}" action') if count > 2: parser.error(f'Too many arguments for "{action} {subaction}" action') client.payee_del(*action_args) else: parser.error(f'Unknown "{action} {subaction}" action') elif action == 'account': if count < 1: parser.error(f'Too few arguments for "{action}" action') subaction = action_args.pop(0) if subaction == 'add': if count > 1: parser.error(f'Too many arguments for "{action} {subaction}" action') client.account_add() elif subaction == 'del': if count < 2: parser.error(f'Too few arguments for "{action} {subaction}" action') if count > 2: parser.error(f'Too many arguments for "{action} {subaction}" action') client.account_del(*actuon_args) else: parser.error(f'Unknown "{action} {subaction}" action') elif action == 'pay': if count < 3: parser.error(f'Too few arguments for "{action}" action') if count > 3: parser.error(f'Too many arguments for "{action}" action') client.pay(*action_args) elif action == 'spaces': if count > 1: parser.error(f'Too many arguments for "{action}" action') client.list_spaces() else: parser.error(f'Unknown action "{action}"') #-----------------------------------------------------------------------------#