Rippled Audit

The Overlay Network

The Overlay module is responsible for managing the peer-to-peer (P2P) network between rippled instances called 'nodes'. On startup the Overlay logic reads relevant params from the rippled config file and uses these to establish the base list of IP addresses which to connect to, referred to as the bootcache. From there connected peers inform the local node of the locations additional nodes on the network, which are stored locally in the livecache and connected to under certain circumstances. If fixed ips are specified in the config file (specifically via the fixed_ips param), these will be used as the preliminary set of peers to connect to. If the public keys of these nodes are specified via the cluster_nodes config param, rippled will form a Cluster with those endpoints, in which messages will skip redunant signature validation.

The Overlay system is primarily implemented via the following submodules:

  • overlay - high level definitions and binding logic providing the main interface to P2P network operations
  • peerfinder - implemention detains pertaining to tracking and connecting to peer nodes
  • beast - core network definitions and logic

The Overlay interface, and OverlayImpl implmentation derives from Stoppable and instantiated as a child of ServerHandler facilitating inclusion of Overlay operations in the application's lifecycle. Peerfinder::Manager (implementation) also derives from Stoppable and in instantiated as a child of Overlay.

On startup, the OverlayImpl::onPrepare method is executed which grabs the list specified in the [ips] section in the config, resolves domain names in to IP addresses and passes them to PeerFinder::Manager for storage.

Overlay::onPrepare - src/ripple/overlay/impl/OverlayImpl.cpp

458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
    void
    OverlayImpl::onPrepare()
    {
        PeerFinder::Config config;

        if (app_.config().PEERS_MAX != 0)
            config.maxPeers = app_.config().PEERS_MAX;

        config.outPeers = config.calcOutPeers();

        auto const port = serverHandler_.setup().overlay.port;

        config.peerPrivate = app_.config().PEER_PRIVATE;
        config.wantIncoming =
            (! config.peerPrivate) && (port != 0);
        // if it's a private peer or we are running as standalone
        // automatic connections would defeat the purpose.
        config.autoConnect =
            !app_.config().standalone() &&
            !app_.config().PEER_PRIVATE;
        config.listeningPort = port;
        config.features = "";
        config.ipLimit = setup_.ipLimit;

        // Enforce business rules
        config.applyTuning();

        m_peerFinder->setConfig (config);

In this first section we see config options being read and set to default values.

Next the assignment of the default peer r.ripple.com:51235 occurs if no other peers are specified. Looking into the example config shipped with rippled, we see this is the default peer there as well.

487
488
489
490
491
492
493
494
        // Populate our boot cache: if there are no entries in [ips] then we use
        // the entries in [ips_fixed]. If both are empty, we resort to a round-robin
        // pool.
        auto bootstrapIps = app_.config().IPS.empty()
            ? app_.config().IPS_FIXED
            : app_.config().IPS;
        if (bootstrapIps.empty ())
            bootstrapIps.push_back ("r.ripple.com 51235");

Finally the actual resolution of specified IPs is performed and the invocations of PeerFinder::Manager::addFallbackStrings and ::addFixedPeer to register the specified addresses/ports

496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
        m_resolver.resolve (bootstrapIps,
            [this](std::string const& name,
                std::vector <beast::IP::Endpoint> const& addresses)
            {
                std::vector <std::string> ips;
                ips.reserve(addresses.size());
                for (auto const& addr : addresses)
                {
                    if (addr.port () == 0)
                    {
                        Throw<std::runtime_error> ("Port not specified for "
                            "address:" + addr.to_string ());
                    }

                    ips.push_back (to_string (addr));
                }

                std::string const base ("config: ");
                if (!ips.empty ())
                    m_peerFinder->addFallbackStrings (base + name, ips);
            });

        // Add the ips_fixed from the rippled.cfg file
        if (! app_.config().standalone() && !app_.config().IPS_FIXED.empty ())
        {
            m_resolver.resolve (app_.config().IPS_FIXED,
                [this](
                    std::string const& name,
                    std::vector <beast::IP::Endpoint> const& addresses)
                {
                    if (!addresses.empty ())
                        m_peerFinder->addFixedPeer (name, addresses);
                });
        }
    }

Inspecting this further, the PeerFinder::Manager::addFallbackStrings simply dispatches to the Logic::addStaticSource method. Logic is an internal helper class used to implement much of the functionality of the peerfinder module.

addStaticSource in return dispatches to Logic::fetch. fetch invokes Source#fetch on a new SourceStrings instance before calling addBootcacheAddresses to populate the bootcache, storing the result.

PeerFinder::addBootcacheAddresses and ::fetch methods - src/ripple/peerfinder/impl/Logic.h

997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
    int addBootcacheAddresses (IPAddresses const& list)
    {
        int count (0);
        std::lock_guard<std::recursive_mutex> _(lock_);
        for (auto addr : list)
        {
            if (bootcache_.insertStatic (addr))
                ++count;
        }
        return count;
    }

    // Fetch bootcache addresses from the specified source.
    void
    fetch (std::shared_ptr<Source> const& source)
    {
        Source::Results results;

        {
            {
                std::lock_guard<std::recursive_mutex> _(lock_);
                if (stopping_)
                    return;
                fetchSource_ = source;
            }

            // VFALCO NOTE The fetch is synchronous,
            //             not sure if that's a good thing.
            //
            source->fetch (results, m_journal);

            {
                std::lock_guard<std::recursive_mutex> _(lock_);
                if (stopping_)
                    return;
                fetchSource_ = nullptr;
            }
        }

        if (! results.error)
        {
            int const count (addBootcacheAddresses (results.addresses));
            JLOG(m_journal.info()) << beast::leftw (18) <<
                "Logic added " << count <<
                " new " << ((count == 1) ? "address" : "addresses") <<
                " from " << source->name();
        }
        else
        {
            JLOG(m_journal.error()) << beast::leftw (18) <<
                "Logic failed " << "'" << source->name() << "' fetch, " <<
                results.error.message();
        }
    }

Peer sources are stored as beast::IP::Endpoint instances, which in return contain beast::IP::Address instances as well as the logic needed to parse and verify IP addresses and ports from strings. See the beast net module for details on this helper logic.

The next stage of operation occurs when the Overlay::onStart callback is fired during the application lifecycle, which in return starts the interal Overlay::Timer instance.

532
533
534
535
536
537
538
539
540
    void
    OverlayImpl::onStart ()
    {
        auto const timer = std::make_shared<Timer>(*this);
        std::lock_guard <decltype(mutex_)> lock (mutex_);
        list_.emplace(timer.get(), timer);
        timer_ = timer;
        timer->run();
    }

The Overlay::Timer callback is responsible for invoking several high level operations required to maintain the Overlay network including:

  • sendEndpoints to notify peers of connected nodes & other peers on the network
  • autoConnect to automatically establish connections to known peers so as to maintain the needed number of connections needed to operate
  • check to check and manage existing peer connections
  • Scheduling the next invocation of the Timer callback
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    void
    OverlayImpl::Timer::on_timer (error_code ec)
    {
        if (ec || overlay_.isStopping())
        {
            if (ec && ec != boost::asio::error::operation_aborted)
            {
                JLOG(overlay_.journal_.error()) << "on_timer: " << ec.message();
            }
            return;
        }

        overlay_.m_peerFinder->once_per_second();
        overlay_.sendEndpoints();
        overlay_.autoConnect();

        if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0)
            overlay_.check();

        timer_.expires_from_now (std::chrono::seconds(1));
        timer_.async_wait(overlay_.strand_.wrap(std::bind(
            &Timer::on_timer, shared_from_this(),
                std::placeholders::_1)));
    }

We will start by looking at autoconnect since until we have established peer connections, endpoints cannot be broadcast.

We can see that OverlayImpl#autoConnect dispatches to PeerFinder::Manager::autoConnect to retrieve a list of Endpoints before invoking connect with each.

996
997
998
999
1000
1001
1002
    void
    OverlayImpl::autoConnect()
    {
        auto const result = m_peerFinder->autoconnect();
        for (auto addr : result)
            connect (addr);
    }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
    void
    OverlayImpl::connect (beast::IP::Endpoint const& remote_endpoint)
    {
        assert(work_);

        auto usage = resourceManager().newOutboundEndpoint (remote_endpoint);
        if (usage.disconnect())
        {
            JLOG(journal_.info()) << "Over resource limit: " << remote_endpoint;
            return;
        }

        auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
        if (slot == nullptr)
        {
            JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
            return;
        }

        auto const p = std::make_shared<ConnectAttempt>(app_,
            io_service_, beast::IPAddressConversion::to_asio_endpoint(remote_endpoint),
                usage, setup_.context, next_id_++, slot,
                    app_.journal("Peer"), *this);

        std::lock_guard<decltype(mutex_)> lock(mutex_);
        list_.emplace(p.get(), p);
        p->run();
    }

In connect above an outbound Slot is instantiated along with a ConnectAttempt instance, before the run method is invoked on it. Slots are the internal mechanism which the PeerFinder module uses to store and track both inbound and outbound endpoints with their properties.

The ConnectAttempt::run method actually attempts to establish the network connection via a boost::asio::ssl::stream, invoking the onConnect callback to handle the result of the operation.

After onConnect a series of steps are invoked to establish the SSL connection to the remote node, after which the Hello message handshake is performed with the remote node, marking the end of the connection-establishment phase.

The list of endpoints which is passed to Overlay::connect is established via a called to PeerFinder::Manager::autoconnect which in return delegates to PeerFinder::Logic::autoconnect. Here resides the primary logic employed to select the candidate peers which to connect to:

461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
    std::vector <beast::IP::Endpoint>
    autoconnect()
    {
        std::vector <beast::IP::Endpoint> const none;

        std::lock_guard<std::recursive_mutex> _(lock_);

        // Count how many more outbound attempts to make
        //
        auto needed (counts_.attempts_needed ());
        if (needed == 0)
            return none;

        ConnectHandouts h (needed, m_squelches);

        // Make sure we don't connect to already-connected entries.
        for (auto const& s : slots_)
        {
            auto const result (m_squelches.insert (
                s.second->remote_endpoint().address()));
            if (! result.second)
                m_squelches.touch (result.first);
        }

Right off the bat the ConnectHandouts class is instantiated and used to track the list of endpoints which to connect to, incorporating squelches or endpoints temporarily being ignored due to recent activity.

After this we connect to Fixed IPs:

485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
        // 1. Use Fixed if:
        //    Fixed active count is below fixed count AND
        //      ( There are eligible fixed addresses to try OR
        //        Any outbound attempts are in progress)
        //
        if (counts_.fixed_active() < fixed_.size ())
        {
            get_fixed (needed, h.list(), m_squelches);

            if (! h.list().empty ())
            {
                JLOG(m_journal.debug()) << beast::leftw (18) <<
                    "Logic connect " << h.list().size() << " fixed";
                return h.list();
            }

            if (counts_.attempts() > 0)
            {
                JLOG(m_journal.debug()) << beast::leftw (18) <<
                    "Logic waiting on " <<
                        counts_.attempts() << " attempts";
                return none;
            }
        }

Next the logic connects to random IPs in the livecache, nodes which we've been informed of by other peers on the network:

510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
        // Only proceed if auto connect is enabled and we
        // have less than the desired number of outbound slots
        //
        if (! config_.autoConnect ||
            counts_.out_active () >= counts_.out_max ())
            return none;

        // 2. Use Livecache if:
        //    There are any entries in the cache OR
        //    Any outbound attempts are in progress
        //
        {
            livecache_.hops.shuffle();
            handout (&h, (&h)+1,
                livecache_.hops.rbegin(),
                    livecache_.hops.rend());
            if (! h.list().empty ())
            {
                JLOG(m_journal.debug()) << beast::leftw (18) <<
                    "Logic connect " << h.list().size () << " live " <<
                    ((h.list().size () > 1) ? "endpoints" : "endpoint");
                return h.list();
            }
            else if (counts_.attempts() > 0)
            {
                JLOG(m_journal.debug()) << beast::leftw (18) <<
                    "Logic waiting on " <<
                        counts_.attempts() << " attempts";
                return none;
            }
        }

Note the call to shuffle above, randomizing the livecache IPs which we select to connect to.

Finally the logic connects to bootcache entries, or nodes which we've been notified about at startup (see Overlay::onPrepare above). It should be noted that the bootcache will be expanded upon with livecache entries as the local node receives endpoint broadcasts, as we will see next.

542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
        /*  3. Bootcache refill
            If the Bootcache is empty, try to get addresses from the current
            set of Sources and add them into the Bootstrap cache.
            Pseudocode:
                If (    domainNames.count() > 0 AND (
                           unusedBootstrapIPs.count() == 0
                        OR activeNameResolutions.count() > 0) )
                    ForOneOrMore (DomainName that hasn't been resolved recently)
                        Contact DomainName and add entries to the unusedBootstrapIPs
                    return;
        */

        // 4. Use Bootcache if:
        //    There are any entries we haven't tried lately
        //
        for (auto iter (bootcache_.begin());
            ! h.full() && iter != bootcache_.end(); ++iter)
            h.try_insert (*iter);

        if (! h.list().empty ())
        {
            JLOG(m_journal.debug()) << beast::leftw (18) <<
                "Logic connect " << h.list().size () << " boot " <<
                ((h.list().size () > 1) ? "addresses" : "address");
            return h.list();
        }

        // If we get here we are stuck
        return none;
    }

Returning to the Overlay::Timer callback, sendEndpoints is invoked, notifying peers of all outstanding connections.

413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
    void
    OverlayImpl::sendEndpoints()
    {
        auto const result = m_peerFinder->buildEndpointsForPeers();
        for (auto const& e : result)
        {
            std::shared_ptr<PeerImp> peer;
            {
                std::lock_guard <decltype(mutex_)> lock (mutex_);
                auto const iter = m_peers.find (e.first);
                if (iter != m_peers.end())
                    peer = iter->second.lock();
            }
            if (peer)
                peer->sendEndpoints (e.second.begin(), e.second.end());
        }
    }

At this point it is trivial to see the delegated calls to PeerFinder::Manager::buildEndpointsForPeers, subsequently PeerFinder::Logic::buildEndpointsForPeers, establishing the list of nodes which we will notify; and Peer::sendEndpoints actually sending the Endpoints message.

On the remote side, the PeerFinder::Logic::on_endpoints method is invoked upon receiving the Endpoints message, at which point new endpoints are added to livecache and bootcache

Logic::on_endpoints (abridged) - src/ripple/peerfinder/impl/Logic.h

753
754
755
756
757
758
759
760
761
762
763
    void on_endpoints (SlotImp::ptr const& slot, Endpoints list)
    {
        // ...

        for (auto const& ep : list)
        {
            // ...
            livecache_.insert (ep);
            bootcache_.insert (ep.address);
        }
}

The last missing piece is the logic managing inbound connections, before a node can receive endpoints or any other messages from peers, it must listen for and accept their connections! We've already established how this is done via the Server subsystem, with one caveat. Upon the invocation of PlainHTTPPeer::do_request and SSLHTTPPeer::do_request we see a call to ServerHandler::onHandoff which in return invokes Overlay::onHandoff. Here resides the logic verifying the connection is coming from a peer node, and parsing / responding to the Hello message. If all goes well, the incoming connection is added to the local PeerFinder and both nodes know of each other and can subsequently communicate!!! (sharing ledgers, transactions, and all other network data).

And this wraps up the overlay network logic! In the next section we will go into the NodeStore and discuss how rippled stores ledger data locally.

<<Back Home >>Next