Rippled Audit

Consensus & Ledger Management

Now that we've discussed how the rippled server handles client connections and peer connections, we can start diving into how the server manages its internal representation of the ledger, and modifications to that. In this section we provide a high level overview of the Ledger representation and consensus process, through which transactions are negotiated and agreed upon by the network. In the next section we will explore the process which client and peer transactions are accepted on established connections.

The ledger and consensus subsystems are implemented in the following modules:

  • src/ripple/consensus - Generic consensus workflow logic and datastructures.
  • src/ripple/ledger - Utility methods to access and modify ledger data. Presents methods to present and manipuate Views on ledgers.
  • src/ripple/app/consensus - High level consensus implementation and operations, defines callbacks and glue logic to flush out the consensus cycle and be used from the general application
  • src/ripple/app/ledger - High level methods and structures to define and manipulate ledger state

Major components in the ledger and consensus subsystems can be seen below:

We're not going to go into all of these right now, but of particular importance is LedgerMaster which provides much of the high level access to the underlying stored ledgers and is a Stoppable child of Application, providing ledger access to various other subsystems

Ledger Master (abridged) - src/ripple/app/ledger/LedgerMaster.h

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
  class LedgerMaster
      : public Stoppable
      , public AbstractFetchPackContainer
  {
  public:
      explicit
      LedgerMaster(Application& app, Stopwatch& stopwatch,
          Stoppable& parent,
              beast::insight::Collector::ptr const& collector,
                  beast::Journal journal);

      virtual ~LedgerMaster () = default;

      LedgerIndex getCurrentLedgerIndex ();
      LedgerIndex getValidLedgerIndex ();

      bool isCompatible (
          ReadView const&,
          beast::Journal::Stream,
          char const* reason);

      std::recursive_mutex& peekMutex ();

      // The current ledger is the ledger we believe new transactions should go in
      std::shared_ptr<ReadView const>
      getCurrentLedger();

      // The finalized ledger is the last closed/accepted ledger
      std::shared_ptr<Ledger const>
      getClosedLedger()
      {
          return mClosedLedger.get();
      }

      // The validated ledger is the last fully validated ledger
      std::shared_ptr<Ledger const>
      getValidatedLedger ()
      {
          return mValidLedger.get();
      }

      // The Rules are in the last fully validated ledger if there is one.
      Rules getValidatedRules();

      // This is the last ledger we published to clients and can lag the validated
      // ledger
      std::shared_ptr<ReadView const>
      getPublishedLedger();

      std::chrono::seconds getPublishedLedgerAge ();
      std::chrono::seconds getValidatedLedgerAge ();
      bool isCaughtUp(std::string& reason);

      std::uint32_t getEarliestFetch ();

      bool storeLedger (std::shared_ptr<Ledger const> ledger);

      void setFullLedger (
          std::shared_ptr<Ledger const> const& ledger,
              bool isSynchronous, bool isCurrent);

      void switchLCL (std::shared_ptr<Ledger const> const& lastClosed);

      void failedSave(std::uint32_t seq, uint256 const& hash);

      std::string getCompleteLedgers ();

      /** Apply held transactions to the open ledger
          This is normally called as we close the ledger.
          The open ledger remains open to handle new transactions
          until a new open ledger is built.
      */
      void applyHeldTransactions ();

      /** Get all the transactions held for a particular account.
          This is normally called when a transaction for that
          account is successfully applied to the open
          ledger so those transactions can be resubmitted without
          waiting for ledger close.
      */
      std::vector<std::shared_ptr<STTx const>>
      pruneHeldTransactions(AccountID const& account,
          std::uint32_t const seq);

      /** Get a ledger's hash by sequence number using the cache
      */
      uint256 getHashBySeq (std::uint32_t index);

      /** Walk to a ledger's hash using the skip list */
      boost::optional<LedgerHash> walkHashBySeq (std::uint32_t index);

      /** Walk the chain of ledger hashes to determine the hash of the
          ledger with the specified index. The referenceLedger is used as
          the base of the chain and should be fully validated and must not
          precede the target index. This function may throw if nodes
          from the reference ledger or any prior ledger are not present
          in the node store.
      */
      boost::optional<LedgerHash> walkHashBySeq (
          std::uint32_t index,
          std::shared_ptr<ReadView const> const& referenceLedger);

      std::shared_ptr<Ledger const>
      getLedgerBySeq (std::uint32_t index);

      std::shared_ptr<Ledger const>
      getLedgerByHash (uint256 const& hash);

      void setLedgerRangePresent (
          std::uint32_t minV, std::uint32_t maxV);

      boost::optional<LedgerHash> getLedgerHash(
          std::uint32_t desiredSeq,
          std::shared_ptr<ReadView const> const& knownGoodLedger);

      boost::optional <NetClock::time_point> getCloseTimeBySeq (
          LedgerIndex ledgerIndex);

      boost::optional <NetClock::time_point> getCloseTimeByHash (
          LedgerHash const& ledgerHash, LedgerIndex ledgerIndex);

      void addHeldTransaction (std::shared_ptr<Transaction> const& trans);
      void fixMismatch (ReadView const& ledger);

      bool haveLedger (std::uint32_t seq);
      void clearLedger (std::uint32_t seq);
      bool getValidatedRange (
          std::uint32_t& minVal, std::uint32_t& maxVal);
      bool getFullValidatedRange (
          std::uint32_t& minVal, std::uint32_t& maxVal);

      void tune (int size, std::chrono::seconds age);
      void sweep ();
      float getCacheHitRate ();

      void checkAccept (std::shared_ptr<Ledger const> const& ledger);
      void checkAccept (uint256 const& hash, std::uint32_t seq);
      void
      consensusBuilt(
          std::shared_ptr<Ledger const> const& ledger,
          uint256 const& consensusHash,
          Json::Value consensus);

      LedgerIndex getBuildingLedger ();
      void setBuildingLedger (LedgerIndex index);

      void tryAdvance ();
      bool newPathRequest (); // Returns true if path request successfully placed.
      bool isNewPathRequest ();
      bool newOrderBookDB (); // Returns true if able to fulfill request.

      bool fixIndex (
          LedgerIndex ledgerIndex, LedgerHash const& ledgerHash);
      void doLedgerCleaner(Json::Value const& parameters);

      beast::PropertyStream::Source& getPropertySource ();

      void clearPriorLedgers (LedgerIndex seq);

      void clearLedgerCachePrior (LedgerIndex seq);

      // ledger replay
      void takeReplay (std::unique_ptr<LedgerReplay> replay);
      std::unique_ptr<LedgerReplay> releaseReplay ();

      // ...

See implementation file for more details of how these various ledger accessor and modification methods work, for example the logic to apply held transactions to the open ledger:

298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
    void
    LedgerMaster::applyHeldTransactions ()
    {
        ScopedLockType sl (m_mutex);

        app_.openLedger().modify(
            [&](OpenView& view, beast::Journal j)
            {
                bool any = false;
                for (auto const& it : mHeldTransactions)
                {
                    ApplyFlags flags = tapNONE;
                    auto const result = app_.getTxQ().apply(
                        app_, view, it.second, flags, j);
                    if (result.second)
                        any = true;
                }
                return any;
            });

        mHeldTransactions.reset (
            app_.openLedger().current()->info().parentHash);
    }

Additional core Ledger system classes are LedgerHistory, providing access to previously closed & validated ledgers, and TxQ, used to manage transactions received from the network. For example, here we can see logic governing the acceptance of a proposed transaction into the transaction queue.

The consensus system is defined and partially implemented in the src/ripple/consensus where the core Consensus and ConsensusPhase definitions reside. Here we can see the consensus process is split up into 3 phases:

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
    /** Phases of consensus for a single ledger round.
        @code
              "close"             "accept"
         open ------- > establish ---------> accepted
           ^               |                    |
           |---------------|                    |
           ^                     "startRound"   |
           |------------------------------------|
       @endcode
       The typical transition goes from open to establish to accepted and
       then a call to startRound begins the process anew. However, if a wrong prior
       ledger is detected and recovered during the establish or accept phase,
       consensus will internally go back to open (see Consensus::handleWrongLedger).
    */
    enum class ConsensusPhase {
        //! We haven't closed our ledger yet, but others might have
        open,

        //! Establishing consensus by exchanging proposals with our peers
        establish,

        //! We have accepted a new last closed ledger and are waiting on a call
        //! to startRound to begin the next consensus round.  No changes
        //! to consensus phase occur while in this phase.
        accepted,
    };

The first round of consensus is started directly from Application though subsequent checks are run via a "heartbeat timer" also invoked from the top level (actually invoked via processHeartbeatTimer). The entire process can be seen below:

We can see how disputes are handled via the createDisputes method:

1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
    template <class Adaptor>
    void
    Consensus<Adaptor>::createDisputes(TxSet_t const& o)
    {
        // Cannot create disputes without our stance
        assert(result_);

        // Only create disputes if this is a new set
        if (!result_->compares.emplace(o.id()).second)
            return;

        // Nothing to dispute if we agree
        if (result_->txns.id() == o.id())
            return;

        JLOG(j_.debug()) << "createDisputes " << result_->txns.id() << " to "
                         << o.id();

        auto differences = result_->txns.compare(o);

Differences between transaction sets are ascertained from a SHAMap comparison between transaction sets.

169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
    std::map<Tx::ID, bool>
    compare(RCLTxSet const& j) const
    {
        SHAMap::Delta delta;

        // Bound the work we do in case of a malicious
        // map_ from a trusted validator
        map_->compare(*(j.map_), delta, 65536);

        std::map<uint256, bool> ret;
        for (auto const& item : delta)
        {
            assert(
                (item.second.first && !item.second.second) ||
                (item.second.second && !item.second.first));

            ret[item.first] = static_cast<bool>(item.second.first);
        }
        return ret;
    }

Disputes are stored in the consensus result object until...

1522
1523
1524
1525
            result_->disputes.emplace(txID, std::move(dtx));
        }
        JLOG(j_.debug()) << dc << " differences found";
    }

Disputes are retained until we can unVote on them later during the established phase:

1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
  template <class Adaptor>
  void
  Consensus<Adaptor>::updateOurPositions()
  {
      // We must have a position if we are updating it
      assert(result_);
      ConsensusParms const & parms = adaptor_.parms();

      // Compute a cutoff time
      auto const peerCutoff = now_ - parms.proposeFRESHNESS;
      auto const ourCutoff = now_ - parms.proposeINTERVAL;

      // Verify freshness of peer positions and compute close times
      std::map<NetClock::time_point, int> closeTimeVotes;
      {
          auto it = currPeerPositions_.begin();
          while (it != currPeerPositions_.end())
          {
              Proposal_t const& peerProp = it->second.proposal();
              if (peerProp.isStale(peerCutoff))
              {
                  // peer's proposal is stale, so remove it
                  NodeID_t const& peerID = peerProp.nodeID();
                  JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
                  for (auto& dt : result_->disputes)
                      dt.second.unVote(peerID);
                  it = currPeerPositions_.erase(it);
              }
              else
              {
                  // proposal is still fresh
                  ++closeTimeVotes[asCloseTime(peerProp.closeTime())];
                  ++it;
              }
          }
      }

      // This will stay unseated unless there are any changes
      boost::optional<TxSet_t> ourNewSet;

      // Update votes on disputed transactions
      {
          boost::optional<typename TxSet_t::MutableTxSet> mutableSet;
          for (auto& it : result_->disputes)
          {
              // Because the threshold for inclusion increases,
              //  time can change our position on a dispute
              if (it.second.updateVote(
                      convergePercent_,
                      mode_.get()== ConsensusMode::proposing,
                      parms))
              {
                  if (!mutableSet)
                      mutableSet.emplace(result_->txns);

                  if (it.second.getOurVote())
                  {
                      // now a yes
                      mutableSet->insert(it.second.tx());
                  }
                  else
                  {
                      // now a no
                      mutableSet->erase(it.first);
                  }
              }
          }

          if (mutableSet)
              ourNewSet.emplace(std::move(*mutableSet));
       }
       // ...

In the next section we will explore how transactions are applied interally.