Page MenuHomePhabricator

D14440.diff
No OneTemporary

D14440.diff

diff --git a/electrum/electrumabc/interface.py b/electrum/electrumabc/interface.py
--- a/electrum/electrumabc/interface.py
+++ b/electrum/electrumabc/interface.py
@@ -82,21 +82,6 @@
def diagnostic_name(self):
return self.host
- def check_host_name(self, peercert, name) -> bool:
- """Wrapper for ssl.match_hostname that never throws. Returns True if the
- certificate matches, False otherwise. Supports whatever wildcard certs
- and other bells and whistles supported by ssl.match_hostname."""
- # Check that the peer has supplied a certificate.
- # None/{} is not acceptable.
- if not peercert:
- return False
- try:
- ssl.match_hostname(peercert, name)
- return True
- except ssl.CertificateError as e:
- self.print_error("SSL certificate hostname mismatch:", str(e))
- return False
-
def get_simple_socket(self):
try:
addr_info = socket.getaddrinfo(
@@ -128,18 +113,33 @@
self.print_error("failed to connect", str(e))
@staticmethod
- def get_ssl_context(cert_reqs, ca_certs):
+ def get_ssl_context(cert_reqs, ca_certs, check_hostname: bool = False):
context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs
)
- context.check_hostname = False
+ context.check_hostname = check_hostname
context.verify_mode = cert_reqs
context.minimum_version = ssl.TLSVersion.TLSv1_2
return context
- def _get_socket_and_verify_ca_cert(
+ def _get_socket_and_verify_ca_cert(self) -> Optional[ssl.SSLSocket]:
+ """Attempts to connect to the remote host, assuming it is using a CA
+ signed certificate. If the cert is valid then a SSLSocket is returned.
+ Otherwise None is returned on error."""
+ s = self.get_simple_socket()
+ if s is None:
+ return None
+
+ context = self.get_ssl_context(
+ cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, check_hostname=True
+ )
+ return context.wrap_socket(
+ s, do_handshake_on_connect=True, server_hostname=self.host
+ )
+
+ def _get_socket_and_verify_ca_cert_checked(
self, *, suppress_errors
) -> Tuple[Optional[ssl.SSLSocket], bool]:
"""Attempts to connect to the remote host, assuming it is using a CA
@@ -147,31 +147,24 @@
SSLSocket, False) is returned. Otherwise (None, bool) is returned on
error. If the second item in the tuple is True, then the entire
operation should be aborted due to low-level error."""
- s = self.get_simple_socket()
- if s is not None:
- try:
- context = self.get_ssl_context(
- cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path
- )
- s = context.wrap_socket(s, do_handshake_on_connect=True)
- # validate cert
- if s and self.check_host_name(s.getpeercert(), self.host):
- self.print_error("SSL certificate signed by CA")
- # it's good, return the wrapped socket
- return s, False
- # bad cert or other shenanigans, return None but inform caller
- # to try alternate "pinned self-signed cert" code path
- return None, False
- except ssl.SSLError as e:
- if not suppress_errors:
- # Only show error if no pinned self-signed cert exists
- self.print_error("SSL error:", e)
- return None, False # inform caller to continue trying alternate
- except Exception as e:
- self.print_error(
- "Unexpected exception in _get_socket_and_verify_ca_cert:", repr(e)
- )
- return None, True # inform caller to abort the operation
+ try:
+ s = self._get_socket_and_verify_ca_cert()
+ if s is not None:
+ self.print_error("SSL certificate signed by CA")
+ # it's good, return the wrapped socket
+ return s, False
+ except ssl.SSLError as e:
+ if not suppress_errors:
+ # Only show error if no pinned self-signed cert exists
+ self.print_error("SSL error:", e)
+ return None, False # inform caller to continue trying alternate
+ except Exception as e:
+ self.print_error(
+ "Unexpected exception in _get_socket_and_verify_ca_cert_checked:",
+ repr(e),
+ )
+ # inform caller to abort the operation
+ return None, True
def get_socket(self):
if self.use_ssl:
@@ -184,7 +177,7 @@
sanitize_filename(self.host, replacement_text="_"),
)
has_pinned_self_signed = os.path.exists(cert_path)
- s, give_up = self._get_socket_and_verify_ca_cert(
+ s, give_up = self._get_socket_and_verify_ca_cert_checked(
suppress_errors=has_pinned_self_signed
)
if s:
@@ -204,7 +197,7 @@
pass
return s
elif give_up:
- # low-level error in _get_socket_and_verify_ca_cert, give up
+ # low-level error in _get_socket_and_verify_ca_cert_checked, give up
return
# if we get here, certificate is not CA signed, so try the alternate
# "pinned self-signed" method.
@@ -563,14 +556,6 @@
print_msg(m)
-# Used by tests
-def _match_hostname(name, val):
- if val == name:
- return True
-
- return val.startswith("*.") and name.endswith(val[1:])
-
-
def test_certificates():
from .simple_config import SimpleConfig
diff --git a/electrum/electrumabc/tests/test_interface.py b/electrum/electrumabc/tests/test_interface.py
--- a/electrum/electrumabc/tests/test_interface.py
+++ b/electrum/electrumabc/tests/test_interface.py
@@ -1,56 +1,54 @@
+import ssl
import unittest
from .. import interface
class TestInterface(unittest.TestCase):
- def test_match_host_name(self):
- self.assertTrue(interface._match_hostname("asd.fgh.com", "asd.fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "asd.zxc.com"))
- self.assertTrue(interface._match_hostname("asd.fgh.com", "*.fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "*fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "*.zxc.com"))
-
- def test_check_host_name(self):
- i = interface.TcpConnection(server=":1:", queue=None, config_path=None)
-
- self.assertFalse(i.check_host_name(None, None))
- self.assertFalse(i.check_host_name(peercert={"subjectAltName": []}, name=""))
- self.assertTrue(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "foo.bar.com"),)},
- name="foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "*.bar.com"),)}, name="foo.bar.com"
- )
- )
- self.assertFalse(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "*.bar.com"),)},
- name="sub.foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subject": ((("commonName", "foo.bar.com"),),)},
- name="foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subject": ((("commonName", "*.bar.com"),),)},
- name="foo.bar.com",
- )
- )
- self.assertFalse(
- i.check_host_name(
- peercert={"subject": ((("commonName", "*.bar.com"),),)},
- name="sub.foo.bar.com",
- )
- )
+ """Test CA certificate verification using https://badssl.com"""
+
+ def has_ca_signed_valid_cert(self, server: str) -> bool:
+ retries = 0
+ while retries < 5:
+ try:
+ i = interface.TcpConnection(server=server, queue=None, config_path=None)
+ s = i._get_socket_and_verify_ca_cert()
+ if s is not None:
+ s.close()
+ else:
+ self.skipTest("This test requires an internet connection.")
+ return bool(s)
+ except TimeoutError:
+ retries += 1
+
+ def test_verify_good_ca_cert(self):
+ # These are also a wildcard certificate
+ self.assertTrue(self.has_ca_signed_valid_cert("badssl.com:443:s"))
+ self.assertTrue(self.has_ca_signed_valid_cert("sha256.badssl.com:443:s"))
+
+ def test_verify_bad_ca_cert(self):
+ # See https://github.com/openssl/openssl/blob/70c2912f635aac8ab28629a2b5ea0c09740d2bda/include/openssl/x509_vfy.h#L99
+ # for a list of verify error codes
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("expired.badssl.com:443:s")
+ # X509_V_ERR_CERT_HAS_EXPIRED
+ self.assertEqual(cm.exception.verify_code, 10)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("wrong.host.badssl.com:443:s")
+ # X509_V_ERR_HOSTNAME_MISMATCH
+ self.assertEqual(cm.exception.verify_code, 62)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("self-signed.badssl.com:443:s")
+ # X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
+ self.assertEqual(cm.exception.verify_code, 18)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("untrusted-root.badssl.com:443:s")
+ # X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
+ self.assertEqual(cm.exception.verify_code, 19)
if __name__ == "__main__":

File Metadata

Mime Type
text/plain
Expires
Tue, May 20, 22:08 (18 h, 11 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5866023
Default Alt Text
D14440.diff (9 KB)

Event Timeline