Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864748
D14440.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D14440.diff
View Options
diff --git a/electrum/electrumabc/interface.py b/electrum/electrumabc/interface.py
--- a/electrum/electrumabc/interface.py
+++ b/electrum/electrumabc/interface.py
@@ -82,21 +82,6 @@
def diagnostic_name(self):
return self.host
- def check_host_name(self, peercert, name) -> bool:
- """Wrapper for ssl.match_hostname that never throws. Returns True if the
- certificate matches, False otherwise. Supports whatever wildcard certs
- and other bells and whistles supported by ssl.match_hostname."""
- # Check that the peer has supplied a certificate.
- # None/{} is not acceptable.
- if not peercert:
- return False
- try:
- ssl.match_hostname(peercert, name)
- return True
- except ssl.CertificateError as e:
- self.print_error("SSL certificate hostname mismatch:", str(e))
- return False
-
def get_simple_socket(self):
try:
addr_info = socket.getaddrinfo(
@@ -128,18 +113,33 @@
self.print_error("failed to connect", str(e))
@staticmethod
- def get_ssl_context(cert_reqs, ca_certs):
+ def get_ssl_context(cert_reqs, ca_certs, check_hostname: bool = False):
context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_certs
)
- context.check_hostname = False
+ context.check_hostname = check_hostname
context.verify_mode = cert_reqs
context.minimum_version = ssl.TLSVersion.TLSv1_2
return context
- def _get_socket_and_verify_ca_cert(
+ def _get_socket_and_verify_ca_cert(self) -> Optional[ssl.SSLSocket]:
+ """Attempts to connect to the remote host, assuming it is using a CA
+ signed certificate. If the cert is valid then a SSLSocket is returned.
+ Otherwise None is returned on error."""
+ s = self.get_simple_socket()
+ if s is None:
+ return None
+
+ context = self.get_ssl_context(
+ cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, check_hostname=True
+ )
+ return context.wrap_socket(
+ s, do_handshake_on_connect=True, server_hostname=self.host
+ )
+
+ def _get_socket_and_verify_ca_cert_checked(
self, *, suppress_errors
) -> Tuple[Optional[ssl.SSLSocket], bool]:
"""Attempts to connect to the remote host, assuming it is using a CA
@@ -147,31 +147,24 @@
SSLSocket, False) is returned. Otherwise (None, bool) is returned on
error. If the second item in the tuple is True, then the entire
operation should be aborted due to low-level error."""
- s = self.get_simple_socket()
- if s is not None:
- try:
- context = self.get_ssl_context(
- cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path
- )
- s = context.wrap_socket(s, do_handshake_on_connect=True)
- # validate cert
- if s and self.check_host_name(s.getpeercert(), self.host):
- self.print_error("SSL certificate signed by CA")
- # it's good, return the wrapped socket
- return s, False
- # bad cert or other shenanigans, return None but inform caller
- # to try alternate "pinned self-signed cert" code path
- return None, False
- except ssl.SSLError as e:
- if not suppress_errors:
- # Only show error if no pinned self-signed cert exists
- self.print_error("SSL error:", e)
- return None, False # inform caller to continue trying alternate
- except Exception as e:
- self.print_error(
- "Unexpected exception in _get_socket_and_verify_ca_cert:", repr(e)
- )
- return None, True # inform caller to abort the operation
+ try:
+ s = self._get_socket_and_verify_ca_cert()
+ if s is not None:
+ self.print_error("SSL certificate signed by CA")
+ # it's good, return the wrapped socket
+ return s, False
+ except ssl.SSLError as e:
+ if not suppress_errors:
+ # Only show error if no pinned self-signed cert exists
+ self.print_error("SSL error:", e)
+ return None, False # inform caller to continue trying alternate
+ except Exception as e:
+ self.print_error(
+ "Unexpected exception in _get_socket_and_verify_ca_cert_checked:",
+ repr(e),
+ )
+ # inform caller to abort the operation
+ return None, True
def get_socket(self):
if self.use_ssl:
@@ -184,7 +177,7 @@
sanitize_filename(self.host, replacement_text="_"),
)
has_pinned_self_signed = os.path.exists(cert_path)
- s, give_up = self._get_socket_and_verify_ca_cert(
+ s, give_up = self._get_socket_and_verify_ca_cert_checked(
suppress_errors=has_pinned_self_signed
)
if s:
@@ -204,7 +197,7 @@
pass
return s
elif give_up:
- # low-level error in _get_socket_and_verify_ca_cert, give up
+ # low-level error in _get_socket_and_verify_ca_cert_checked, give up
return
# if we get here, certificate is not CA signed, so try the alternate
# "pinned self-signed" method.
@@ -563,14 +556,6 @@
print_msg(m)
-# Used by tests
-def _match_hostname(name, val):
- if val == name:
- return True
-
- return val.startswith("*.") and name.endswith(val[1:])
-
-
def test_certificates():
from .simple_config import SimpleConfig
diff --git a/electrum/electrumabc/tests/test_interface.py b/electrum/electrumabc/tests/test_interface.py
--- a/electrum/electrumabc/tests/test_interface.py
+++ b/electrum/electrumabc/tests/test_interface.py
@@ -1,56 +1,54 @@
+import ssl
import unittest
from .. import interface
class TestInterface(unittest.TestCase):
- def test_match_host_name(self):
- self.assertTrue(interface._match_hostname("asd.fgh.com", "asd.fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "asd.zxc.com"))
- self.assertTrue(interface._match_hostname("asd.fgh.com", "*.fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "*fgh.com"))
- self.assertFalse(interface._match_hostname("asd.fgh.com", "*.zxc.com"))
-
- def test_check_host_name(self):
- i = interface.TcpConnection(server=":1:", queue=None, config_path=None)
-
- self.assertFalse(i.check_host_name(None, None))
- self.assertFalse(i.check_host_name(peercert={"subjectAltName": []}, name=""))
- self.assertTrue(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "foo.bar.com"),)},
- name="foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "*.bar.com"),)}, name="foo.bar.com"
- )
- )
- self.assertFalse(
- i.check_host_name(
- peercert={"subjectAltName": (("DNS", "*.bar.com"),)},
- name="sub.foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subject": ((("commonName", "foo.bar.com"),),)},
- name="foo.bar.com",
- )
- )
- self.assertTrue(
- i.check_host_name(
- peercert={"subject": ((("commonName", "*.bar.com"),),)},
- name="foo.bar.com",
- )
- )
- self.assertFalse(
- i.check_host_name(
- peercert={"subject": ((("commonName", "*.bar.com"),),)},
- name="sub.foo.bar.com",
- )
- )
+ """Test CA certificate verification using https://badssl.com"""
+
+ def has_ca_signed_valid_cert(self, server: str) -> bool:
+ retries = 0
+ while retries < 5:
+ try:
+ i = interface.TcpConnection(server=server, queue=None, config_path=None)
+ s = i._get_socket_and_verify_ca_cert()
+ if s is not None:
+ s.close()
+ else:
+ self.skipTest("This test requires an internet connection.")
+ return bool(s)
+ except TimeoutError:
+ retries += 1
+
+ def test_verify_good_ca_cert(self):
+ # These are also a wildcard certificate
+ self.assertTrue(self.has_ca_signed_valid_cert("badssl.com:443:s"))
+ self.assertTrue(self.has_ca_signed_valid_cert("sha256.badssl.com:443:s"))
+
+ def test_verify_bad_ca_cert(self):
+ # See https://github.com/openssl/openssl/blob/70c2912f635aac8ab28629a2b5ea0c09740d2bda/include/openssl/x509_vfy.h#L99
+ # for a list of verify error codes
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("expired.badssl.com:443:s")
+ # X509_V_ERR_CERT_HAS_EXPIRED
+ self.assertEqual(cm.exception.verify_code, 10)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("wrong.host.badssl.com:443:s")
+ # X509_V_ERR_HOSTNAME_MISMATCH
+ self.assertEqual(cm.exception.verify_code, 62)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("self-signed.badssl.com:443:s")
+ # X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT
+ self.assertEqual(cm.exception.verify_code, 18)
+
+ with self.assertRaises(ssl.SSLCertVerificationError) as cm:
+ self.has_ca_signed_valid_cert("untrusted-root.badssl.com:443:s")
+ # X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
+ self.assertEqual(cm.exception.verify_code, 19)
if __name__ == "__main__":
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, May 20, 22:08 (20 h, 47 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5866023
Default Alt Text
D14440.diff (9 KB)
Attached To
D14440: [electrum] Interface: Use check_hostname instead of match_hostname
Event Timeline
Log In to Comment