diff --git a/electrum/electrumabc/psbt.py b/electrum/electrumabc/psbt.py new file mode 100644 --- /dev/null +++ b/electrum/electrumabc/psbt.py @@ -0,0 +1,194 @@ +# Electrum ABC - lightweight eCash client +# Copyright (C) The Electrum ABC developers +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Parser and writer for the Partially Signed Bitcoin Transaction format. + +The Partially Signed Bitcoin Transaction (PSBT) format consists of key-value maps. +Each map consists of a sequence of key-value records, terminated by a 0x00 byte. + + <psbt> := <magic> <global-map> <input-map>* <output-map>* + +Where <global-map>, <input-map> and <output-map> are sequences of key-value records. + + <global-map> := <keypair>* 0x00 + <input-map> := <keypair>* 0x00 + <output-map> := <keypair>* 0x00 + +Where + <keypair> := <key> <value> + +Where + <key> := <keylen> <keytype> <keydata> + <value> := <valuelen> <valuedata> + +Where: + - <keylen> is the compact size unsigned integer containing the combined length of + <keytype> and <keydata> + - <keytype> is a compact size unsigned integer representing the type. + See the Enums below for a list of defined key types. + There can be multiple entries with the same <keytype> within a specific <map>, + but the <key> must be unique. + - <valuelen> is the compact size unsigned integer containing the length of + <valuedata>. + - <keydata> and <valuedata> are blob of bytes whose meaning depends on <keytype>. + See BIP 0174. + +Additional documentation: + - psbt.h in the Bitcoin ABC node. + - https://github.com/bitcoin/bips/blob/master/bip-0174.mediawiki + +This currently supports PSBT version 0. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import IntEnum +from io import BytesIO +from typing import List, Optional, Tuple + +from .serialize import ( + DeserializationError, + UnexpectedEndOfStream, + compact_size, + read_compact_size, +) + +PSBT_MAGIC_BYTES = b"psbt\xff" + +# The PSBT separator happens to also be a 0-length key. This is used by the parser +# to find the end of a section. +PSBT_SEPARATOR = b"\x00" + + +class PSBTGlobalType(IntEnum): + UNSIGNED_TX = 0x0 + XPUB = 0x1 + VERSION = 0xFB + + +class PSBTInputType(IntEnum): + UTXO = 0x0 + PARTIAL_SIG = 0x2 + SIGHASH_TYPE = 0x3 + REDEEM_SCRIPT = 0x4 + BIP32_DERIVATION = 0x6 + FINAL_SCRIPTSIG = 0x7 + + +class PSBTOutputType(IntEnum): + REDEEM_SCRIPT = 0x0 + BIP32_DERIVATION = 0x2 + + +@dataclass +class PSBTKeypair: + keytype: int + keydata: bytes + valuedata: bytes + + @classmethod + def deserialize(cls, stream: BytesIO) -> Optional[PSBTKeypair]: + """Deserialize a keypair from a buffer. Return None if the end of a section or + the end of the stream is reached. + """ + key_size = read_compact_size(stream) + if key_size is None or key_size == 0: + # We reached a PSBT separator or the end of the stream. + return None + + full_key = stream.read(key_size) + key_type, key = cls.get_keytype_and_key_from_fullkey(full_key) + + val_size = read_compact_size(stream) + if val_size is None: + raise UnexpectedEndOfStream() + val = stream.read(val_size) + + return cls(key_type, key, val) + + @classmethod + def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]: + with BytesIO(full_key) as key_stream: + key_type = read_compact_size(key_stream) + if key_type is None: + raise UnexpectedEndOfStream() + key = key_stream.read() + return key_type, key + + @classmethod + def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: + key_type_bytes = compact_size(key_type) + return key_type_bytes + key + + def serialize(self) -> bytes: + full_key = self.get_fullkey_from_keytype_and_key(self.keytype, self.keydata) + return ( + compact_size(len(full_key)) + + full_key + + compact_size(len(self.valuedata)) + + self.valuedata + ) + + +@dataclass +class PSBTSection: + keypairs: List[PSBTKeypair] + + @classmethod + def deserialize(cls, stream: BytesIO) -> Optional[PSBTSection]: + """Return None in case the section is empty (EOF reached or other error)""" + keypairs = [] + while (kp := PSBTKeypair.deserialize(stream)) is not None: + keypairs.append(kp) + + if not keypairs: + return None + + return cls(keypairs) + + def serialize(self) -> bytes: + return b"".join(kp.serialize() for kp in self.keypairs) + PSBT_SEPARATOR + + +@dataclass +class PSBT: + sections: List[PSBTSection] + + @classmethod + def deserialize(cls, stream: BytesIO) -> PSBT: + magic = stream.read(5) + if magic != PSBT_MAGIC_BYTES: + raise DeserializationError( + "The data does not start with the PSBT magic bytes" + ) + sections = [] + while (kp := PSBTSection.deserialize(stream)) is not None: + sections.append(kp) + + return cls(sections) + + def serialize(self) -> bytes: + return PSBT_MAGIC_BYTES + b"".join( + section.serialize() for section in self.sections + ) diff --git a/electrum/electrumabc/tests/test_psbt.py b/electrum/electrumabc/tests/test_psbt.py new file mode 100644 --- /dev/null +++ b/electrum/electrumabc/tests/test_psbt.py @@ -0,0 +1,165 @@ +import unittest +from io import BytesIO + +from .. import psbt + +# test data from psbt_wallet_tests.cpp +psbt_data = bytes.fromhex( + "70736274ff0100a0020000000258e87a21b56daf0c23be8e7070456c336f7cbaa5c875" + "7924f545887bb2abdd750000000000ffffffff6b04ec37326fbac8e468a73bf952c887" + "7f84f96c3f9deadeab246455e34fe0cd0100000000ffffffff0270aaf0080000000019" + "76a914d85c2b71d0060b09c9886aeb815e50991dda124d88ac00e1f505000000001976" + "a91400aea9a2e5f0f876a588df5546e8742d1d87008f88ac000000000001002080f0fa" + "020000000017a9140fb9463421696b82c833af241c78c17ddbde493487010447522102" + "9583bf39ae0a609747ad199addd634fa6108559d6c5cd39b4c2183f1ab96e07f2102da" + "b61ff49a14db6a7d02b0cd1fbb78fc4b18312b5b4e54dae4dba2fbfef536d752ae2206" + "029583bf39ae0a609747ad199addd634fa6108559d6c5cd39b4c2183f1ab96e07f10d9" + "0c6a4f000000800000008000000080220602dab61ff49a14db6a7d02b0cd1fbb78fc4b" + "18312b5b4e54dae4dba2fbfef536d710d90c6a4f000000800000008001000080000100" + "2000c2eb0b0000000017a914f6539307e3a48d1e0136d061f5d1fe19e1a24089870104" + "47522103089dc10c7ac6db54f91329af617333db388cead0c231f723379d1b99030b02" + "dc21023add904f3d6dcf59ddb906b0dee23529b7ffb9ed50e5e86151926860221f0e73" + "52ae2206023add904f3d6dcf59ddb906b0dee23529b7ffb9ed50e5e86151926860221f" + "0e7310d90c6a4f000000800000008003000080220603089dc10c7ac6db54f91329af61" + "7333db388cead0c231f723379d1b99030b02dc10d90c6a4f0000008000000080020000" + "8000220203a9a4c37f5996d3aa25dbac6b570af0650394492942460b354753ed9eeca5" + "877110d90c6a4f000000800000008004000080002202027f6399757d2eff55a136ad02" + "c684b1838b6556e5f1b6b34282a94b6b5005109610d90c6a4f00000080000000800500" + "008000" +) + + +class TestPSBT(unittest.TestCase): + + def test_deserialize(self): + psbt0 = psbt.PSBT.deserialize(BytesIO(psbt_data)) + + self.assertEqual(len(psbt0.sections), 5) + + # Global map + section0 = psbt0.sections[0] + self.assertEqual(len(section0.keypairs), 1) + self.assertEqual(section0.keypairs[0].keytype, psbt.PSBTGlobalType.UNSIGNED_TX) + self.assertEqual(section0.keypairs[0].keydata, b"") + # a raw transaction + self.assertEqual( + section0.keypairs[0].valuedata.hex(), + "020000000258e87a21b56daf0c23be8e7070456c336f7cbaa5c8757924f545887bb2abdd750000000000ffffffff6b04ec37326fbac8e468a73bf952c8877f84f96c3f9deadeab246455e34fe0cd0100000000ffffffff0270aaf008000000001976a914d85c2b71d0060b09c9886aeb815e50991dda124d88ac00e1f505000000001976a91400aea9a2e5f0f876a588df5546e8742d1d87008f88ac00000000", + ) + + # Input 1 + section1 = psbt0.sections[1] + self.assertEqual(len(section1.keypairs), 4) + self.assertEqual(section1.keypairs[0].keytype, psbt.PSBTInputType.UTXO) + self.assertEqual(section1.keypairs[0].keydata, b"") + # a transaction output (CTxOut) + self.assertEqual( + section1.keypairs[0].valuedata.hex(), + "80f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487", + ) + + self.assertEqual(section1.keypairs[1].keytype, psbt.PSBTInputType.REDEEM_SCRIPT) + self.assertEqual(section1.keypairs[1].keydata, b"") + self.assertEqual( + section1.keypairs[1].valuedata.hex(), + "5221029583bf39ae0a609747ad199addd634fa6108559d6c5cd39b4c2183f1ab96e07f2102dab61ff49a14db6a7d02b0cd1fbb78fc4b18312b5b4e54dae4dba2fbfef536d752ae", + ) + + self.assertEqual( + section1.keypairs[2].keytype, psbt.PSBTInputType.BIP32_DERIVATION + ) + # a pubkey + self.assertEqual( + section1.keypairs[2].keydata.hex(), + "029583bf39ae0a609747ad199addd634fa6108559d6c5cd39b4c2183f1ab96e07f", + ) + # master key fingerprint (32 bytes) + derivation path (3 x 32 bytes) + self.assertEqual( + section1.keypairs[2].valuedata.hex(), "d90c6a4f000000800000008000000080" + ) + + self.assertEqual( + section1.keypairs[3].keytype, psbt.PSBTInputType.BIP32_DERIVATION + ) + self.assertEqual( + section1.keypairs[3].keydata.hex(), + "02dab61ff49a14db6a7d02b0cd1fbb78fc4b18312b5b4e54dae4dba2fbfef536d7", + ) + self.assertEqual( + section1.keypairs[3].valuedata.hex(), "d90c6a4f000000800000008001000080" + ) + + # Input 2 + section2 = psbt0.sections[2] + self.assertEqual(len(section2.keypairs), 4) + self.assertEqual(section2.keypairs[0].keytype, psbt.PSBTInputType.UTXO) + self.assertEqual(section2.keypairs[0].keydata, b"") + self.assertEqual( + section2.keypairs[0].valuedata.hex(), + "00c2eb0b0000000017a914f6539307e3a48d1e0136d061f5d1fe19e1a2408987", + ) + + self.assertEqual(section2.keypairs[1].keytype, psbt.PSBTInputType.REDEEM_SCRIPT) + self.assertEqual(section2.keypairs[1].keydata, b"") + self.assertEqual( + section2.keypairs[1].valuedata.hex(), + "522103089dc10c7ac6db54f91329af617333db388cead0c231f723379d1b99030b02dc21023add904f3d6dcf59ddb906b0dee23529b7ffb9ed50e5e86151926860221f0e7352ae", + ) + + self.assertEqual( + section2.keypairs[2].keytype, psbt.PSBTInputType.BIP32_DERIVATION + ) + self.assertEqual( + section2.keypairs[2].keydata.hex(), + "023add904f3d6dcf59ddb906b0dee23529b7ffb9ed50e5e86151926860221f0e73", + ) + self.assertEqual( + section2.keypairs[2].valuedata.hex(), "d90c6a4f000000800000008003000080" + ) + + self.assertEqual( + section2.keypairs[3].keytype, psbt.PSBTInputType.BIP32_DERIVATION + ) + self.assertEqual( + section2.keypairs[3].keydata.hex(), + "03089dc10c7ac6db54f91329af617333db388cead0c231f723379d1b99030b02dc", + ) + self.assertEqual( + section2.keypairs[3].valuedata.hex(), "d90c6a4f000000800000008002000080" + ) + + # Output 1 + section3 = psbt0.sections[3] + self.assertEqual(len(section3.keypairs), 1) + self.assertEqual( + section3.keypairs[0].keytype, psbt.PSBTOutputType.BIP32_DERIVATION + ) + self.assertEqual( + section3.keypairs[0].keydata.hex(), + "03a9a4c37f5996d3aa25dbac6b570af0650394492942460b354753ed9eeca58771", + ) + self.assertEqual( + section3.keypairs[0].valuedata.hex(), "d90c6a4f000000800000008004000080" + ) + + # Output 2 + section4 = psbt0.sections[4] + self.assertEqual(len(section4.keypairs), 1) + self.assertEqual( + section4.keypairs[0].keytype, psbt.PSBTOutputType.BIP32_DERIVATION + ) + self.assertEqual( + section4.keypairs[0].keydata.hex(), + "027f6399757d2eff55a136ad02c684b1838b6556e5f1b6b34282a94b6b50051096", + ) + self.assertEqual( + section4.keypairs[0].valuedata.hex(), "d90c6a4f000000800000008005000080" + ) + + def test_round_trip(self): + psbt0 = psbt.PSBT.deserialize(BytesIO(psbt_data)) + self.assertEqual(psbt0.serialize(), psbt_data) + + +if __name__ == "__main__": + unittest.main()