diff --git a/contrib/message-capture/message-capture-docs.md b/contrib/message-capture/message-capture-docs.md
new file mode 100644
--- /dev/null
+++ b/contrib/message-capture/message-capture-docs.md
@@ -0,0 +1,25 @@
+# Per-Peer Message Capture
+
+## Purpose
+
+This feature allows for message capture on a per-peer basis.  It answers the simple question: "Can I see what messages my node is sending and receiving?"
+
+## Usage and Functionality
+
+* Run `bitcoind` with the `-capturemessages` option.
+* Look in the `message_capture` folder in your datadir.
+  * Typically this will be `~/.bitcoin/message_capture`.
+  * See that there are many folders inside, one for each peer names with its IP address and port.
+  * Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
+* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
+  * See the `-h` option for help.
+  * To see all messages, both sent and received, for all peers use:
+    ```
+    ./contrib/message-capture/message-capture-parser.py -o out.json \
+    ~/.bitcoin/message_capture/**/*.dat
+    ```
+  * Note:  The messages in the given `.dat` files will be interleaved in chronological order.  So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
+  * If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
+* View the resulting output.
+  * The output file is `JSON` formatted.
+  * Suggestion: use `jq` to view the output, with `jq . out.json`
diff --git a/contrib/message-capture/message-capture-parser.py b/contrib/message-capture/message-capture-parser.py
new file mode 100755
--- /dev/null
+++ b/contrib/message-capture/message-capture-parser.py
@@ -0,0 +1,233 @@
+#!/usr/bin/env python3
+# Copyright (c) 2020 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Parse message capture binary files.
+To be used in conjunction with -capturemessages.
+"""
+
+import argparse
+import os
+import shutil
+import sys
+from io import BytesIO
+import json
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+sys.path.append(
+    os.path.join(
+        os.path.dirname(__file__),
+        '../../test/functional'))
+
+from test_framework.messages import ser_uint256     # noqa: E402
+from test_framework.p2p import MESSAGEMAP           # noqa: E402
+
+TIME_SIZE = 8
+LENGTH_SIZE = 4
+MSGTYPE_SIZE = 12
+
+# The test framework classes stores hashes as large ints in many cases.
+# These are variables of type uint256 in core.
+# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
+# As such, they are itemized here.
+# Any variables with these names that are of type int are actually uint256 variables.
+# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
+HASH_INTS = [
+    "blockhash",
+    "block_hash",
+    "hash",
+    "hashMerkleRoot",
+    "hashPrevBlock",
+    "hashstop",
+    "limited_proofid",
+    "prev_header",
+    "sha256",
+    "stop_hash",
+]
+
+HASH_INT_VECTORS = [
+    "hashes",
+    "headers",
+    "vHave",
+    "vHash",
+]
+
+
+class ProgressBar:
+    def __init__(self, total: float):
+        self.total = total
+        self.running = 0
+
+    def set_progress(self, progress: float):
+        cols = shutil.get_terminal_size()[0]
+        if cols <= 12:
+            return
+        max_blocks = cols - 9
+        num_blocks = int(max_blocks * progress)
+        print('\r[ {}{} ] {:3.0f}%'
+              .format('#' * num_blocks,
+                      ' ' * (max_blocks - num_blocks),
+                      progress * 100),
+              end='')
+
+    def update(self, more: float):
+        self.running += more
+        self.set_progress(self.running / self.total)
+
+
+def to_jsonable(obj: Any) -> Any:
+    if hasattr(obj, "__dict__"):
+        return obj.__dict__
+    elif hasattr(obj, "__slots__"):
+        ret: Dict[str, Any] = {}
+        for slot in obj.__slots__:
+            val = getattr(obj, slot, None)
+            if slot in HASH_INTS and isinstance(val, int):
+                ret[slot] = ser_uint256(val).hex()
+            elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
+                ret[slot] = [ser_uint256(a).hex() for a in val]
+            else:
+                ret[slot] = to_jsonable(val)
+        return ret
+    elif isinstance(obj, list):
+        return [to_jsonable(a) for a in obj]
+    elif isinstance(obj, bytes):
+        return obj.hex()
+    else:
+        return obj
+
+
+def process_file(path: str, messages: List[Any], recv: bool,
+                 progress_bar: Optional[ProgressBar]) -> None:
+    with open(path, 'rb') as f_in:
+        if progress_bar:
+            bytes_read = 0
+
+        while True:
+            if progress_bar:
+                # Update progress bar
+                diff = f_in.tell() - bytes_read - 1
+                progress_bar.update(diff)
+                bytes_read = f_in.tell() - 1
+
+            # Read the Header
+            tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+            if not tmp_header_raw:
+                break
+            tmp_header = BytesIO(tmp_header_raw)
+            time = int.from_bytes(tmp_header.read(TIME_SIZE), "little")
+            msgtype: bytes = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0]
+            length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")
+
+            # Start converting the message to a dictionary
+            msg_dict = {}
+            msg_dict["direction"] = "recv" if recv else "sent"
+            msg_dict["time"] = time
+            # "size" is less readable here, but more readable in the output
+            msg_dict["size"] = length
+
+            msg_ser = BytesIO(f_in.read(length))
+
+            # Determine message type
+            if msgtype not in MESSAGEMAP:
+                # Unrecognized message type
+                try:
+                    msgtype_tmp = msgtype.decode()
+                    if not msgtype_tmp.isprintable():
+                        raise UnicodeDecodeError
+                    msg_dict["msgtype"] = msgtype_tmp
+                except UnicodeDecodeError:
+                    msg_dict["msgtype"] = "UNREADABLE"
+                msg_dict["body"] = msg_ser.read().hex()
+                msg_dict["error"] = "Unrecognized message type."
+                messages.append(msg_dict)
+                print(
+                    f"WARNING - Unrecognized message type {msgtype} in {path}",
+                    file=sys.stderr)
+                continue
+
+            # Deserialize the message
+            msg = MESSAGEMAP[msgtype]()
+            msg_dict["msgtype"] = msgtype.decode()
+
+            try:
+                msg.deserialize(msg_ser)
+            except KeyboardInterrupt:
+                raise
+            except Exception:
+                # Unable to deserialize message body
+                msg_ser.seek(0, os.SEEK_SET)
+                msg_dict["body"] = msg_ser.read().hex()
+                msg_dict["error"] = "Unable to deserialize message."
+                messages.append(msg_dict)
+                print(
+                    f"WARNING - Unable to deserialize message in {path}",
+                    file=sys.stderr)
+                continue
+
+            # Convert body of message into a jsonable object
+            if length:
+                msg_dict["body"] = to_jsonable(msg)
+            messages.append(msg_dict)
+
+        if progress_bar:
+            # Update the progress bar to the end of the current file
+            # in case we exited the loop early
+            # Go to end of file
+            f_in.seek(0, os.SEEK_END)
+            diff = f_in.tell() - bytes_read - 1
+            progress_bar.update(diff)
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(
+            sys.argv[0]),
+        formatter_class=argparse.RawTextHelpFormatter)
+    parser.add_argument(
+        "capturepaths",
+        nargs='+',
+        help="binary message capture files to parse.")
+    parser.add_argument(
+        "-o", "--output",
+        help="output file.  If unset print to stdout")
+    parser.add_argument(
+        "-n", "--no-progress-bar",
+        action='store_true',
+        help="disable the progress bar.  Automatically set if the output is not a terminal")
+    args = parser.parse_args()
+    capturepaths = [Path.cwd() / Path(capturepath)
+                    for capturepath in args.capturepaths]
+    output = Path.cwd() / Path(args.output) if args.output else False
+    use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
+
+    messages: List[Any] = []
+    if use_progress_bar:
+        total_size = sum(capture.stat().st_size for capture in capturepaths)
+        progress_bar = ProgressBar(total_size)
+    else:
+        progress_bar = None
+
+    for capture in capturepaths:
+        process_file(
+            str(capture),
+            messages,
+            "recv" in capture.stem,
+            progress_bar)
+
+    messages.sort(key=lambda msg: msg['time'])
+
+    if use_progress_bar:
+        progress_bar.set_progress(1)
+    jsonrep = json.dumps(messages)
+    if output:
+        with open(str(output), 'w+', encoding="utf8") as f_out:
+            f_out.write(jsonrep)
+    else:
+        print(jsonrep)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/src/init.cpp b/src/init.cpp
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -1041,7 +1041,9 @@
     argsman.AddArg("-addrmantest", "Allows to test address relay on localhost",
                    ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY,
                    OptionsCategory::DEBUG_TEST);
-
+    argsman.AddArg("-capturemessages", "Capture all P2P messages to disk",
+                   ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY,
+                   OptionsCategory::DEBUG_TEST);
     argsman.AddArg("-debug=<category>",
                    strprintf("Output debugging information (default: %u, "
                              "supplying <category> is optional)",
@@ -1866,19 +1868,19 @@
     // Trim requested connection counts, to fit into system limitations
     // <int> in std::min<int>(...) to work around FreeBSD compilation issue
     // described in #2695
-    nFD = RaiseFileDescriptorLimit(nMaxConnections + nBind +
-                                   MIN_CORE_FILEDESCRIPTORS +
-                                   MAX_ADDNODE_CONNECTIONS);
+    nFD = RaiseFileDescriptorLimit(
+        nMaxConnections + nBind + MIN_CORE_FILEDESCRIPTORS +
+        MAX_ADDNODE_CONNECTIONS + NUM_FDS_MESSAGE_CAPTURE);
 #ifdef USE_POLL
     int fd_max = nFD;
 #else
     int fd_max = FD_SETSIZE;
 #endif
-    nMaxConnections =
-        std::max(std::min<int>(nMaxConnections, fd_max - nBind -
-                                                    MIN_CORE_FILEDESCRIPTORS -
-                                                    MAX_ADDNODE_CONNECTIONS),
-                 0);
+    nMaxConnections = std::max(
+        std::min<int>(nMaxConnections,
+                      fd_max - nBind - MIN_CORE_FILEDESCRIPTORS -
+                          MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE),
+        0);
     if (nFD < MIN_CORE_FILEDESCRIPTORS) {
         return InitError(_("Not enough file descriptors available."));
     }
diff --git a/src/net.h b/src/net.h
--- a/src/net.h
+++ b/src/net.h
@@ -25,6 +25,7 @@
 #include <pubkey.h>
 #include <radix.h>
 #include <random.h>
+#include <span.h>
 #include <streams.h>
 #include <sync.h>
 #include <threadinterrupt.h>
@@ -94,6 +95,8 @@
 static const bool DEFAULT_BLOCKSONLY = false;
 /** -peertimeout default */
 static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
+/** Number of file descriptors required for message capture **/
+static const int NUM_FDS_MESSAGE_CAPTURE = 1;
 
 static const bool DEFAULT_FORCEDNSSEED = false;
 static const bool DEFAULT_DNSSEED = true;
@@ -1438,6 +1441,10 @@
 std::string getSubVersionEB(uint64_t MaxBlockSize);
 std::string userAgent(const Config &config);
 
+/** Dump binary message to file, with timestamp */
+void CaptureMessage(const CAddress &addr, const std::string &msg_type,
+                    const Span<const uint8_t> &data, bool is_incoming);
+
 struct NodeEvictionCandidate {
     NodeId id;
     std::chrono::seconds m_connected;
diff --git a/src/net.cpp b/src/net.cpp
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -3503,6 +3503,9 @@
     size_t nMessageSize = msg.data.size();
     LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n",
              SanitizeString(msg.m_type), nMessageSize, pnode->GetId());
+    if (gArgs.GetBoolArg("-capturemessages", false)) {
+        CaptureMessage(pnode->addr, msg.m_type, msg.data, /*incoming=*/false);
+    }
 
     // make sure we use the appropriate network transport format
     std::vector<uint8_t> serializedHeader;
@@ -3629,3 +3632,32 @@
     // Size compliance is checked at startup, it is safe to not check it again
     return FormatUserAgent(client_name, client_version, uacomments);
 }
+
+void CaptureMessage(const CAddress &addr, const std::string &msg_type,
+                    const Span<const uint8_t> &data, bool is_incoming) {
+    // Note: This function captures the message at the time of processing,
+    // not at socket receive/send time.
+    // This ensures that the messages are always in order from an application
+    // layer (processing) perspective.
+    auto now = GetTime<std::chrono::microseconds>();
+
+    // Windows folder names can not include a colon
+    std::string clean_addr = addr.ToString();
+    std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
+
+    fs::path base_path = gArgs.GetDataDirNet() / "message_capture" / clean_addr;
+    fs::create_directories(base_path);
+
+    fs::path path =
+        base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
+    CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
+
+    ser_writedata64(f, now.count());
+    f.write(msg_type.data(), msg_type.length());
+    for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
+        f << '\0';
+    }
+    uint32_t size = data.size();
+    ser_writedata32(f, size);
+    f.write((const char *)data.data(), data.size());
+}
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -5866,6 +5866,11 @@
     }
     CNetMessage &msg(msgs.front());
 
+    if (gArgs.GetBoolArg("-capturemessages", false)) {
+        CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv),
+                       /*incoming=*/true);
+    }
+
     msg.SetVersion(pfrom->GetCommonVersion());
 
     // Check network magic
diff --git a/test/functional/p2p_message_capture.py b/test/functional/p2p_message_capture.py
new file mode 100755
--- /dev/null
+++ b/test/functional/p2p_message_capture.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+# Copyright (c) 2020 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Test per-peer message capture capability.
+
+Additionally, the output of contrib/message-capture/message-capture-parser.py
+should be verified manually.
+"""
+
+import glob
+import os
+from io import BytesIO
+
+from test_framework.p2p import MESSAGEMAP, P2PDataStore
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import assert_equal
+
+TIME_SIZE = 8
+LENGTH_SIZE = 4
+MSGTYPE_SIZE = 12
+
+
+def mini_parser(dat_file):
+    """Parse a data file created by CaptureMessage.
+
+    From the data file we'll only check the structure.
+
+    We won't care about things like:
+    - Deserializing the payload of the message
+        - This is managed by the deserialize methods in test_framework.messages
+    - The order of the messages
+        - There's no reason why we can't, say, change the order of the messages
+          in the handshake
+    - Message Type
+        - We can add new message types
+
+    We're ignoring these because they're simply too brittle to test here.
+    """
+    with open(dat_file, 'rb') as f_in:
+        # This should have at least one message in it
+        assert(os.fstat(f_in.fileno()).st_size >=
+               TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+        while True:
+            tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
+            if not tmp_header_raw:
+                break
+            tmp_header = BytesIO(tmp_header_raw)
+            int.from_bytes(tmp_header.read(TIME_SIZE), "little")
+            raw_msgtype = tmp_header.read(MSGTYPE_SIZE)
+            msgtype = raw_msgtype.split(b'\x00', 1)[0]
+            remainder = raw_msgtype.split(b'\x00', 1)[1]
+            assert(len(msgtype) > 0)
+            assert(msgtype in MESSAGEMAP)
+            assert(len(remainder) == 0 or not remainder.decode().isprintable())
+            length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")
+            data = f_in.read(length)
+            assert_equal(len(data), length)
+
+
+class MessageCaptureTest(BitcoinTestFramework):
+    def set_test_params(self):
+        self.num_nodes = 1
+        self.extra_args = [["-capturemessages"]]
+        self.setup_clean_chain = True
+
+    def run_test(self):
+        capturedir = os.path.join(
+            self.nodes[0].datadir,
+            "regtest/message_capture")
+        # Connect a node so that the handshake occurs
+        self.nodes[0].add_p2p_connection(P2PDataStore())
+        self.nodes[0].disconnect_p2ps()
+        recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0]
+        mini_parser(recv_file)
+        sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0]
+        mini_parser(sent_file)
+
+
+if __name__ == '__main__':
+    MessageCaptureTest().main()
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -831,6 +831,8 @@
 
 
 class AvalancheStake:
+    __slots__ = ("utxo", "amount", "height", "pubkey", "is_coinbase")
+
     def __init__(self, utxo=None, amount=0, height=0,
                  pubkey=b"", is_coinbase=False):
         self.utxo: COutPoint = utxo or COutPoint()
@@ -868,6 +870,8 @@
 
 
 class AvalancheSignedStake:
+    __slots__ = ("stake", "sig")
+
     def __init__(self, stake=None, sig=b""):
         self.stake: AvalancheStake = stake or AvalancheStake()
         self.sig: bytes = sig
@@ -1460,7 +1464,7 @@
 # for cases where a user needs tighter control over what is sent over the wire
 # note that the user must supply the name of the msgtype, and the data
 class msg_generic:
-    __slots__ = ("msgtype", "data")
+    __slots__ = ("data")
 
     def __init__(self, msgtype, data=None):
         self.msgtype = msgtype