diff --git a/.gitignore b/.gitignore index fbe491f..e0c022a 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,16 @@ dmypy.json #VSCode settings.json + +#IntelliJ +.idea/ + +#VSCode +settings.json + +#IntelliJ +.idea/ + +#VSCode +settings.json +.vscode/ \ No newline at end of file diff --git a/src/embit/bip47.py b/src/embit/bip47.py new file mode 100644 index 0000000..df69e63 --- /dev/null +++ b/src/embit/bip47.py @@ -0,0 +1,253 @@ +import hashlib +import hmac +import sys + +from binascii import hexlify, unhexlify +from io import BytesIO +from typing import Tuple + +from . import base58, ec, script +from .base import EmbitError +from .bip32 import HDKey +from .networks import NETWORKS +from .script import OPCODES +from .transaction import Transaction +if sys.implementation.name == "micropython": + import secp256k1 +else: + from .util import secp256k1 + + +""" + BIP-47: https://github.com/bitcoin/bips/blob/master/bip-0047.mediawiki +""" + +class BIP47Exception(Exception): + pass + + +def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: + """ + Generates the recipient's BIP-47 shareable payment code (version 1) + for the input root private key. + """ + bip47_child = root.derive("m/47'/{}'/{}'".format(coin, account)) + + buf = BytesIO() + buf.write(b'\x01') # bip47 version + buf.write(b'\x00') # Bitmessage; always zero + buf.write(bip47_child.get_public_key().serialize()) + buf.write(bip47_child.chain_code) + buf.write(b'\00' * 13) # bytes reserved for future expansion + + return base58.encode_check(b'\x47' + buf.getvalue()) + + +def get_derived_payment_code_node(payment_code: str, derivation_index: int) -> HDKey: + """Returns the nth derived child for the payment_code""" + raw_payment_code = base58.decode_check(payment_code) + + # 81-byte payment code format: + # 0x47 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + pubkey = ec.PublicKey.from_string(hexlify(raw_payment_code[3:36])) + chain_code = raw_payment_code[36:68] + root = HDKey(key=pubkey, chain_code=chain_code) + return root.derive([derivation_index]) + + +def get_notification_address(payment_code: str, script_type: str = "p2pkh", network: str = NETWORKS["main"]) -> str: + """Returns the BIP-47 notification address associated with the given payment_code""" + # Get the 0th public key derived from the payment_code + pubkey = get_derived_payment_code_node(payment_code, derivation_index=0).get_public_key() + + # TODO: Should we limit to just p2pkh? + if script_type == "p2pkh": + return script.p2pkh(pubkey).address(network) + elif script_type == "p2wpkh": + return script.p2wpkh(pubkey).address(network) + else: + raise EmbitError("Unsupported script_type: " + script_type) + + +def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"], script_type: str = "p2wpkh") -> str: + """Called by the payer, generates the nth payment address between the payer and recipient""" + # Alice selects the 0th private key derived from her payment code ("a") + payer_key = payer_root.derive("m/47'/{}'/{}'/0".format(coin, account)) + a = payer_key.secret + + # Alice selects the next unused public key derived from Bob's payment code, starting from zero ("B", where B = bG) + recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, derivation_index=index) + B = recipient_payment_code_node.get_public_key() + + # Alice calculates a secret point (S = aB) + S = B._xonly() + secp256k1.ec_pubkey_tweak_mul(S, a) + + # Alice calculates a scalar shared secret using the x value of S (s = SHA256(Sx)) + shared_secret = hashlib.sha256(secp256k1.ec_pubkey_serialize(S)[1:33]).digest() + + # If the value of s is not in the secp256k1 group, Alice MUST increment the index used to derive Bob's public key and try again. + if not secp256k1.ec_seckey_verify(shared_secret): + # TODO: Is this a sufficient test??? + raise BIP47Exception("Shared secret was not valid for index {}. Try again with the next index value.".format(index)) + + # Alice uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) + shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) + pub = secp256k1.ec_pubkey_combine(B._point, shared_pubkey) + shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=recipient_payment_code_node.chain_code) + + if script_type == "p2pkh": + return script.p2pkh(shared_node).address(network=network) + elif script_type == "p2wpkh": + return script.p2wpkh(shared_node).address(network=network) + elif script_type == "p2sh-p2wpkh": + return script.p2sh(script.p2wpkh(shared_node)).address(network=network) + else: + raise EmbitError("Unsupported script_type: " + script_type) + + +def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"], script_type: str = "p2wpkh") -> Tuple[str, ec.PrivateKey]: + """Called by the recipient, generates the nth receive address between the payer and recipient. + + Returns the payment address and its associated private key.""" + + # Using the 0th public key derived from Alice's payment code... + payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, derivation_index=0) + B = payer_payment_code_node.get_public_key() + + # ...Bob calculates the nth shared secret with Alice + recipient_key = recipient_root.derive("m/47'/{}'/{}'/{}".format(coin, account, index)) + a = recipient_key.secret + + # Bob calculates a secret point (S = aB) + S = B._xonly() + secp256k1.ec_pubkey_tweak_mul(S, a) + + # Bob calculates a scalar shared secret using the x value of S (s = SHA256(Sx)) + shared_secret = hashlib.sha256(secp256k1.ec_pubkey_serialize(S)[1:33]).digest() + + # If the value of s is not in the secp256k1 group, increment the index and try again. + if not secp256k1.ec_seckey_verify(shared_secret): + # TODO: Is this a sufficient test??? + raise BIP47Exception("Shared secret was not valid for index {}. Try again with the next index value.".format(index)) + + # Bob uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) + shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) + pub = secp256k1.ec_pubkey_combine(recipient_key.get_public_key()._point, shared_pubkey) + shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=payer_payment_code_node.chain_code) + + if script_type == "p2pkh": + receive_address = script.p2pkh(shared_node).address(network=network) + elif script_type == "p2wpkh": + receive_address = script.p2wpkh(shared_node).address(network=network) + elif script_type == "p2sh-p2wpkh": + receive_address = script.p2sh(script.p2wpkh(shared_node)).address(network=network) + else: + raise EmbitError("Unsupported script_type: " + script_type) + + # Bob calculates the private key for each ephemeral address as: b' = b + s + prv_key = secp256k1.ec_privkey_add(recipient_key.secret, shared_secret) + spending_key = ec.PrivateKey(secret=prv_key) + + return (receive_address, spending_key) + + +def blinding_function(private_key: bytes, secret_point: HDKey, utxo_outpoint: str, payload: bytes) -> bytes: + """Reversible blind/unblind function: blinds plaintext payloads and unblinds blinded payloads""" + S = secret_point._xonly() + secp256k1.ec_pubkey_tweak_mul(S, private_key) + + # Calculate a 64 byte blinding factor (s = HMAC-SHA512(x, o)) + # "x" is the x value of the secret point + # "o" is the outpoint being spent by the designated input + x = secp256k1.ec_pubkey_serialize(S)[1:33] + o = utxo_outpoint + s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) + + # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) + # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) + # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[3:35], s[:32])]) + c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[35:67], s[-32:])]) + return payload[0:3] + x_prime + c_prime + payload[-13:] + + +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str) -> str: + """Called by the payer, returns the blinded payload for the payer's notification tx + that is sent to the recipient while spending the input_utxo. The blinded payload + should be inserted as OP_RETURN data.""" + # TODO: method signature was made to easily match the BIP-47 test vector data, but + # isn't necessarily what might be ideal for real-world usage. + + # Alice selects the private key ("a") corresponding to the designated pubkey + a = input_utxo_private_key.secret + + # Alice selects the public key associated with Bob's notification address (B, where B = bG) + B = get_derived_payment_code_node(recipient_payment_code, derivation_index=0).get_public_key() + + # Alice serializes her payment code in binary form + payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte + + # Blind the payment code + raw_blinded_payload = blinding_function(a, B, utxo_outpoint=input_utxo_outpoint[:72], payload=payment_code) + return hexlify(raw_blinded_payload).decode() + + +def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"]) -> str: + """If the tx is a BIP-47 notification tx for the recipient, return the new payer's + embedded payment_code, else None.""" + # Notification txs have one output sent to the recipient's notification addr + # and another containing the payer's payment code in an OP_RETURN payload. + if len(tx.vout) < 2: + return False + + recipient_payment_code = get_payment_code(recipient_root, coin, account) + + matches_notification_addr = False + payload = None + for vout in tx.vout: + # Notification txs include a dust payment to the recipient's notification address + if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address(network=network) == get_notification_address(recipient_payment_code, network=network): + matches_notification_addr = True + continue + + # Payer's blinded payment code will be in an OP_RETURN w/exactly 80 bytes of data + # data = OP_RETURN OP_PUSHDATA1 (len of payload) + data = vout.script_pubkey.data + if data is not None and len(data) == 83 and data[0] == OPCODES.OP_RETURN and data[1] == OPCODES.OP_PUSHDATA1 and data[2] == 80: + payload = data[3:] + + if payload[0] != 1: + # Only version 1 currently supported + payload = None + continue + + if not matches_notification_addr or payload is None: + return None + + # Bob selects the designated pubkey ("A") + # (the first tx input that exposes a pubkey in scriptsig or witness) + for vin in tx.vin: + if not vin.is_segwit: + # data = (1byte len of sig) (1byte len of pubkey) + sig_len = vin.script_sig.data[0] + A = ec.PublicKey.from_string(hexlify(vin.script_sig.data[sig_len + 2:])) + break + + else: + # Witness should have [sig, pubkey] + A = ec.PublicKey.from_string(hexlify(vin.witness.items[1])) + break + + if not A or len(A.serialize()) != 33: + return None + + # Bob selects the private key associated with his notification address (0th child) + recipient_notification_node = recipient_root.derive("m/47'/{}'/{}'/0".format(coin, account)) + b = recipient_notification_node.secret + + utxo_outpoint = vin.to_string()[:72] # TODO: Is there a better way to get the outpoint? + + # Unblind the payload using the reversible `blinding_function`. + raw_unblinded_payload = blinding_function(b, A, utxo_outpoint=utxo_outpoint, payload=payload) + return base58.encode_check(b'\x47' + raw_unblinded_payload) diff --git a/src/embit/script.py b/src/embit/script.py index 5cea7f9..b6c327a 100644 --- a/src/embit/script.py +++ b/src/embit/script.py @@ -7,6 +7,10 @@ SIGHASH_ALL = 1 +class OPCODES: + OP_RETURN = 106 + OP_PUSHDATA1 = 76 + class Script(EmbitBase): def __init__(self, data=b""): diff --git a/tests/tests/__init__.py b/tests/tests/__init__.py index 2593ae5..a933885 100644 --- a/tests/tests/__init__.py +++ b/tests/tests/__init__.py @@ -5,6 +5,7 @@ from .test_bip32 import * from .test_psbt import * from .test_bip39 import * +from .test_bip47 import * from .test_slip39 import * from .test_descriptor import * from .test_psbtview import * diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py new file mode 100644 index 0000000..bdf34ab --- /dev/null +++ b/tests/tests/test_bip47.py @@ -0,0 +1,305 @@ +from unittest import TestCase + +from embit import bip32, bip39, bip47, ec, compact, finalizer +from embit.networks import NETWORKS +from embit.psbt import PSBT, OutputScope +from embit.script import OPCODES, Script, p2wpkh +from embit.transaction import Transaction +from binascii import unhexlify + + +""" + Test vectors from: https://gist.github.com/SamouraiDev/6aad669604c5930864bd +""" +ALICE_MNEMONIC = "response seminar brave tip suit recall often sound stick owner lottery motion" +ALICE_PAYMENT_CODE = "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA" +ALICE_PAYMENT_CODE_REGTEST = "PM8TJcUtZbTqYoGWcNAnaYDkAzA1cLq6gQV4aPJ3N5jydgmTHUr5UFK74CU58mdL6V8pVo3JJ8JsJFJzriZSGMj27ujJ3jxwFUQwi49ox3Cfai4SG5rk" +ALICE_NOTIFICATION_ADDR = "1JDdmqFLhpzcUwPeinhJbUPw4Co3aWLyzW" +ALICE_NOTIFICATION_ADDR_REGTEST = "mod1FsW4dsVRod4ZVRR3D3ovY97SxSjJwk" +ALICE_NOTIFICATION_INPUT_PRIVATE_KEY = "Kx983SRhAZpAhj7Aac1wUXMJ6XZeyJKqCxJJ49dxEbYCT4a1ozRD" +ALICE_NOTIFICATION_INPUT_OUTPOINT = "86f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c01000000" +ALICE_NOTIFICATION_BLINDED_PAYLOAD = "010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b00000000000000000000000000" + +""" + Mainnet p2pkh from BIP-47 test vectors, remaining addrs generated from: + https://bitcoiner.guide/seed +""" +ALICE_PAYS_BOB_ADDRS = { + "main": { + "p2wpkh": [ + "bc1qyyytpxv60e6hwh5jqkj2dcenckdsw6ekn2htfq", + "bc1qzn8a8drxv6ln7rztjsw660gzf3hnrfwupzmsfh", + "bc1q5v84r4dq2vkdku8h7ewfkj6c00eh20gmf0amr5", + "bc1q06ld55yrxrqdfym235h0jvdddvwc72ktsamh7c", + "bc1qe8uxekd8s59szxgnnfd2nxrn3ncnkmxlku83l9", + "bc1qemm4xmwr0fxwysry5mur0r5q5kakkw79fpezx0", + "bc1q3fl6rfkg4f600tlfrtkn6jv6kndg9tfu3hr009", + "bc1q89zc0ptgrcgsrzkfe4fjrlwcwfvny908976vxh", + "bc1qfteug4efvdlhyek9p9mrgwk0kqsq74y8jm5qw7", + "bc1q4ugsxkh69aknjvskm8k2susv9c6dq0pp3476y0" + ], + "p2pkh": [ + "141fi7TY3h936vRUKh1qfUZr8rSBuYbVBK", + "12u3Uued2fuko2nY4SoSFGCoGLCBUGPkk6", + "1FsBVhT5dQutGwaPePTYMe5qvYqqjxyftc", + "1CZAmrbKL6fJ7wUxb99aETwXhcGeG3CpeA", + "1KQvRShk6NqPfpr4Ehd53XUhpemBXtJPTL", + "1KsLV2F47JAe6f8RtwzfqhjVa8mZEnTM7t", + "1DdK9TknVwvBrJe7urqFmaxEtGF2TMWxzD", + "16DpovNuhQJH7JUSZQFLBQgQYS4QB9Wy8e", + "17qK2RPGZMDcci2BLQ6Ry2PDGJErrNojT5", + "1GxfdfP286uE24qLZ9YRP3EWk2urqXgC4s", + ], + "p2sh-p2wpkh": [ + "3QnEFKkpXFYSipn4uqcMNAKWhZq6PD4Gmz", + "38mr84Lrer3j1pTEZpTJ1pQTQJweMcc4YC", + "37Q2nDn2MGPLR2eCSRRnx3EZUv1bgNJpH3", + "38KnaMF7yiGnuUxDuM5AYoU7biYaGEfaRg", + "38A9WgnPYfNwDbovo12sSGF4E8Kq67qHvc", + "3A41gu3kgtqPpiWQwp5fY5VVS5WNgT11nN", + "33prMnukiGDj4vdwD7r3WV7fDuWxWAFEh5", + "38qRxEnED8hMVqQMywJydEmK595gBXi6yQ", + "3QH8LrqkkTnLNcaq5dsBzcj5LCoo5U8pEz", + "3ALkcRwUk1QhkZhcG7t9ooAAu7o12MGQr7", + ], + }, + "test": { + "p2wpkh": [ + "tb1qvlcks6jystdc984whpcqwm0ftuwvk888w3phmk", + "tb1qrzn3xca8ll4v6j65956ywslwzn7mu8d2t00xqa", + "tb1qwynwpawd5t3twd7yepk8v8wz4cewtel5z88tn5", + "tb1q3a6ltk6pycyy4ds5lt67whrglude5l85l0ru43", + "tb1qx630tnvjdx98r6cukv905ltn6ndtyr9zmvdp4l", + "tb1quqe2w3jz334gyadtns0gjzn535dsy8jlrkmdgt", + "tb1q4353mvaglaflcxk65u3t579pn4lgle5vldvpty", + "tb1qj5j0xrujhh0fns4q353q8uf970d6dp5xnacq6a", + "tb1qkqrwshzah97q5dmfz7pr7vfk4saulqsyvnyrth", + "tb1qwvsls5tydc7pw0f0r93ypexcyfundyve5z5t25", + ], + "p2pkh": [ + "mpzZ68EWiTQ3kb8dLoJ8YYd5yH8YaTHYvR", + "mhmJfHR5YJosP3CWm9fYrv1Sm5CJrTZ9Yd", + "mqqFE5EZGwSxAdfGR9NM4BskZbdyjN1h1g", + "mtbWEWuYrhaxZPposbVPBtgsZoEn1ZiczJ", + "mkVr15pHUVEnwi9L9cc61K8A5zcMbv2cox", + "n1xQLV627ei3exa7jXedAFEw2DcdgQSDoS", + "mwEaSdxyqPvTpvytzWkSqcPQBiX8Qmh9hB", + "mu7ZEvtjwqu8kwqkjFqZqi1KEVS3nvfVEu", + "mwZhVmYNhZLfsM7YnLCCABnatS27CtyYyD", + "mr1ihPdNQYqyAxWcV6MEZgwgzW1bmfUyXH", + ], + "p2sh-p2wpkh": [ + "2MxeHXEAqnX45Lc5pW5WJepo1PbaBiGVa4i", + "2N65jANRymkKniZXtdmud2ycLdbC9yPheU3", + "2NE4CDQbkbzJ6HAuYgStwHAhK2WhPQu4yqq", + "2N8f9kvkGoJ1oZdn63pUpewgzmwX9S1sY7K", + "2NB69mKr5v8b4AsgZZrTU5yMEu56yVCRgQQ", + "2NCmmAswkz6nL7Td7KUx9paMLNC1dHyDHhs", + "2Mwp4ufTEoqSgWuGvhp528EwQHT7StY5pDc", + "2NGM11Heusc71BpztycAgmG1yBgXy3D5WKe", + "2MwBYsrDuv4B64otzC5EampPJRmQyZaJQKo", + "2N12k1GuMrywSFGe1wBDk1v2j8eiw7SP77P", + ], + }, + "regtest": { + "p2wpkh": [ + "bcrt1qvlcks6jystdc984whpcqwm0ftuwvk888vcc6vl", + "bcrt1qrzn3xca8ll4v6j65956ywslwzn7mu8d2fxkth5", + "bcrt1qwynwpawd5t3twd7yepk8v8wz4cewtel5qw7xya", + "bcrt1q3a6ltk6pycyy4ds5lt67whrglude5l85ax63zc", + "bcrt1qx630tnvjdx98r6cukv905ltn6ndtyr9ze95vzk", + "bcrt1quqe2w3jz334gyadtns0gjzn535dsy8jlplzqlz", + "bcrt1q4353mvaglaflcxk65u3t579pn4lgle5vay4vud", + "bcrt1qj5j0xrujhh0fns4q353q8uf970d6dp5x35pdd5", + "bcrt1qkqrwshzah97q5dmfz7pr7vfk4saulqsyw6awu7", + "bcrt1qwvsls5tydc7pw0f0r93ypexcyfundyvektdxaa", + ], + }, +} + +BOB_MNEMONIC = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" +BOB_PAYMENT_CODE = "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97" +BOB_PAYMENT_CODE_REGTEST = "PM8TJMJnBXShCFdcGRaGiCrhcCXczikNSyXJeAES6ciFMBv9jNY3ZwEc8fSV8DLmNRqnP9RPP1NPDxUf6vBoUnohPt5bwFFpTvosRw7gV2W4Tr34MULo" +BOB_NOTIFICATION_ADDR = "1ChvUUvht2hUQufHBXF8NgLhW8SwE2ecGV" +BOB_NOTIFICATION_ADDR_REGTEST = "mrVYeCNDyrzYwUuZNMWTFL76wdQ3mfXYHL" + +ALICE_NOTIFICATION_TX_FOR_BOB = "010000000186f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c010000006b483045022100ac8c6dbc482c79e86c18928a8b364923c774bfdbd852059f6b3778f2319b59a7022029d7cc5724e2f41ab1fcfc0ba5a0d4f57ca76f72f19530ba97c860c70a6bf0a801210272d83d8a1fa323feab1c085157a0791b46eba34afb8bfbfaeb3a3fcc3f2c9ad8ffffffff0210270000000000001976a9148066a8e7ee82e5c5b9b7dc1765038340dc5420a988ac1027000000000000536a4c50010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b0000000000000000000000000000000000" + + +class Bip47Test(TestCase): + + def test_get_payment_code(self): + """Alice & Bob's payment codes should match the test vectors in BIP-47""" + # Generate Alice's payment code + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + payment_code = bip47.get_payment_code(root) + self.assertEqual(payment_code, ALICE_PAYMENT_CODE) + + # Generate Bob's payment code + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + payment_code = bip47.get_payment_code(root) + self.assertEqual(payment_code, BOB_PAYMENT_CODE) + + + def test_get_payment_code_regtest(self): + """Regtest payment codes are different from mainnet and should be properly generated""" + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + + # coin=1 for test/regtest, per BIP-44 + payment_code = bip47.get_payment_code(root, coin=1) + self.assertEqual(payment_code, ALICE_PAYMENT_CODE_REGTEST) + + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + payment_code = bip47.get_payment_code(root, coin=1) + self.assertEqual(payment_code, BOB_PAYMENT_CODE_REGTEST) + + + def test_get_notification_address(self): + """Alice & Bob's derived notification addresses should match the test vectors in BIP-47""" + self.assertEqual(bip47.get_notification_address(ALICE_PAYMENT_CODE), ALICE_NOTIFICATION_ADDR) + self.assertEqual(bip47.get_notification_address(BOB_PAYMENT_CODE), BOB_NOTIFICATION_ADDR) + + + def test_get_notification_address_regtest(self): + """Regtest notification addresses are different from mainnet and should be properly generated""" + self.assertEqual(bip47.get_notification_address(ALICE_PAYMENT_CODE_REGTEST, network=NETWORKS["regtest"]), ALICE_NOTIFICATION_ADDR_REGTEST) + self.assertEqual(bip47.get_notification_address(BOB_PAYMENT_CODE_REGTEST, network=NETWORKS["regtest"]), BOB_NOTIFICATION_ADDR_REGTEST) + + + def test_get_payment_address(self): + """ Alice's payment addresses to Bob's payment code should match the test vector + addresses in BIP-47 and additional ones generated by Seed Tool. """ + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + payer_root = bip32.HDKey.from_seed(seed_bytes) + + # Test against all the networks and script types for Alice pays Bob + for network, addrs_dict in ALICE_PAYS_BOB_ADDRS.items(): + if network == "main": + coin = 0 + recipient_payment_code = BOB_PAYMENT_CODE + else: + coin = 1 # for test/regtest, per BIP-44 + recipient_payment_code = BOB_PAYMENT_CODE_REGTEST + for script_type, addrs in addrs_dict.items(): + for i, addr in enumerate(addrs): + payment_addr = bip47.get_payment_address( + payer_root=payer_root, + recipient_payment_code=recipient_payment_code, + coin=coin, + index=i, + network=NETWORKS[network], + script_type=script_type, + ) + self.assertEqual(addr, payment_addr) + + + def test_get_receive_address(self): + """ Bob (the recipient) should be able to use Alice's payment code to generate the + same addresses that Alice (the payer) generated. """ + # Test against all the networks and script types for A pays B + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + recipient_root = bip32.HDKey.from_seed(seed_bytes) + + for network, addr_dict in ALICE_PAYS_BOB_ADDRS.items(): + if network == "main": + coin = 0 + payer_payment_code = ALICE_PAYMENT_CODE + else: + coin = 1 # for test/regtest, per BIP-44 + payer_payment_code = ALICE_PAYMENT_CODE_REGTEST + for script_type, addrs in addr_dict.items(): + for i, addr in enumerate(addrs): + payment_addr, spending_key = bip47.get_receive_address( + recipient_root=recipient_root, + payer_payment_code=payer_payment_code, + coin=coin, + index=i, + network=NETWORKS[network], + script_type=script_type, + ) + self.assertEqual(addr, payment_addr) + + + def test_get_blinded_payment_code(self): + """Alice should be able to blind her payment code for Bob to unblind""" + input_utxo_private_key = ec.PrivateKey.from_string(ALICE_NOTIFICATION_INPUT_PRIVATE_KEY) + blinded_payload = bip47.get_blinded_payment_code( + payer_payment_code=ALICE_PAYMENT_CODE, + input_utxo_private_key=input_utxo_private_key, + input_utxo_outpoint=ALICE_NOTIFICATION_INPUT_OUTPOINT, + recipient_payment_code=BOB_PAYMENT_CODE + ) + + self.assertEqual(blinded_payload, ALICE_NOTIFICATION_BLINDED_PAYLOAD) + + + def test_get_payment_code_from_notification_tx(self): + """Bob should be able to decode Alice's payment code from her notification tx""" + tx = Transaction.from_string(ALICE_NOTIFICATION_TX_FOR_BOB) + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + recipient_root = bip32.HDKey.from_seed(seed_bytes) + + payer_payment_code = bip47.get_payment_code_from_notification_tx(tx, recipient_root) + self.assertEqual(payer_payment_code, ALICE_PAYMENT_CODE) + + # Any other root should fail + seed_bytes = bip39.mnemonic_to_seed("abandon " * 11 + "about") + other_root = bip32.HDKey.from_seed(seed_bytes) + self.assertTrue(bip47.get_payment_code_from_notification_tx(tx, other_root) is None) + + + def test_end_to_end_notification_tx(self): + """Alice should be able to construct a psbt that uses the utxo being spent to + blind her payment code and include it as an OP_RETURN output that Bob can + successfully unblind. + + Note: the above test used the already-constructed tx from the BIP-47 test + vectors; this test demonstrates Alice's full psbt-to-tx construction + process. + """ + # Regtest initial psbt that spends one of Alice's utxos to Bob's notification + # address. + ALICE_NOTIFICATION_BASE_PSBT = "cHNidP8BAHQCAAAAAbSCxQCModbVqGCXS5f5q5BbyrIPipQGLzRkUWlvMk41AAAAAAD9////AiICAAAAAAAAGXapFHhlOS7V9B6XfLmRNMuc4ixew5VliKyQFKgEAAAAABYAFMCPedocVwqxQv7WE3dtTEHFcgdeFwQAAAABAGkCAAAAAXVRckaobGUWT22l1cJgRaArprnVqrpKuIxmjoTenYzgAAAAABcWABQAOKoxxLYTkMskJhqVMhXE2L3mW/3///8BQxeoBAAAAAAWABR/yGzYyF9cb44byWc2v+DhQgtggBYEAAABAR9DF6gEAAAAABYAFH/IbNjIX1xvjhvJZza/4OFCC2CAIgYDdwO70Vy+5ob7m/irfdzOCIzKa2NpA+qZc8M4vUb/X/8YREM4d1QAAIABAACAAAAAgAAAAAAAAAAAAAAiAgPOQHgFgYW4Q0pSBPYgwA2DwbTSJsnO+ylBnsDILKIRpxhEQzh3VAAAgAEAAIAAAACAAQAAAAAAAAAA" + psbt = PSBT.from_base64(ALICE_NOTIFICATION_BASE_PSBT) + + # Verify that this psbt is spending to Bob's notification addr + self.assertEqual(psbt.outputs[0].script_pubkey.address(NETWORKS["regtest"]), BOB_NOTIFICATION_ADDR_REGTEST) + + # Extract the utxo that Alice will be spending + outpoint = psbt.inputs[0].vin.to_string() + + # Derive Alice's private key for this utxo + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + alice_root = bip32.HDKey.from_seed(seed_bytes) + prvkey = alice_root.derive("m/84'/1'/0'/0/0") # This utxo is Alice's first receive addr on regtest + + # Sanity check the prvkey + self.assertEqual(p2wpkh(prvkey.get_public_key()).address(NETWORKS["regtest"]), psbt.inputs[0].script_pubkey.address(NETWORKS["regtest"])) + + # Use the utxo to blind Alice's payment code + blinded_payload = bip47.get_blinded_payment_code(ALICE_PAYMENT_CODE_REGTEST, prvkey, outpoint, BOB_PAYMENT_CODE_REGTEST) + + # Build the additional psbt output with the blinded payload in an OP_RETURN + # data = OP_RETURN OP_PUSHDATA1 (len of payload) + raw_payload_data = unhexlify(blinded_payload) + data = compact.to_bytes(OPCODES.OP_RETURN) + compact.to_bytes(OPCODES.OP_PUSHDATA1) + compact.to_bytes(len(raw_payload_data)) + raw_payload_data + script = Script(data) + output = OutputScope() + output.script_pubkey = script + output.value = 0 + psbt.outputs.append(output) + + # Alice signs and finalizes her psbt into a tx + psbt.sign_with(alice_root) + tx = finalizer.finalize_psbt(psbt) + + # Can Bob decode the payload in the tx? + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + bob_root = bip32.HDKey.from_seed(seed_bytes) + unblinded_payment_code = bip47.get_payment_code_from_notification_tx(tx, bob_root, coin=1, network=NETWORKS["regtest"]) + + self.assertEqual(unblinded_payment_code, ALICE_PAYMENT_CODE_REGTEST)