Rippled Audit

The Server Handler

Similar to JobQueue, ServerHandler is a Stoppable object, being instantiated as a child of the NetworkOPs class. It is responsible for providing access to the primary JSON-RPC HTTP interface which clients use to communicate with the rippled server.

(NetworkOps is responsible for high-level XRP-network operations: processing transactions, establishing consensus, writing to the network, etc. We will explore it further in a subsequent section)

At a high level, the ServerHandler class (simply mapped to the ServerHandlerImp) owns a reference to the Server interface (inherited by the concrete ServerImpl class). ServerImp instantiates a Door for each port the server is configured to listening on, which is responsible for accepting for new connections and creating a Peer instance for each new client connection (the actual Peer class that is instantiated depends on the mechanism through which the connection was established, HTTP or SSL, both of which may be upgraded to web sockets).

ServerHandler is registered with Server, and subsequently the Door and Peer classes so that upon receiving client messages from the network, the ServerHandler::onRequest callback may be called, which dispatches to the RPC::doCommand proxy method (ultimately to the handler class which satisfies the client request). We will explore the RPC handlers in the next section.

Handled commands are executed via on the JobQueue via coroutines so as to free the rippled thread pool to handle other work.

Door#do_accept - src/ripple/server/impl/Door.h

Logic accepting client connections via boost::asio::ip::tcp::acceptor and invoking Door#create afterwards

370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
    template<class Handler>
    void
    Door<Handler>::
    do_accept(boost::asio::yield_context do_yield)
    {
        for(;;)
        {
            error_code ec;
            endpoint_type remote_address;
            socket_type socket (acceptor_.get_io_service());
            acceptor_.async_accept (socket, remote_address, do_yield[ec]);
            if (ec && ec != boost::asio::error::operation_aborted)
            {
                JLOG(j_.error()) <<
                    "accept: " << ec.message();
            }
            if (ec == boost::asio::error::operation_aborted)
                break;
            if (ec)
                continue;

            if (ssl_ && plain_)
            {
                if(auto sp = ios().template emplace<Detector>(
                    port_, handler_, std::move(socket),
                        remote_address, j_))
                    sp->run();
            }
            else if (ssl_ || plain_)
            {
                create(ssl_, boost::asio::null_buffers{},
                    std::move(socket), remote_address);
            }
        }
    }

Door#create - src/ripple/server/impl/Door.h

Creating new SSLHTTPPeer / PlainHTTPPeer instances on new client connections

350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
    template<class Handler>
    template<class ConstBufferSequence>
    void
    Door<Handler>::
    create(bool ssl, ConstBufferSequence const& buffers,
        socket_type&& socket, endpoint_type remote_address)
    {
        if (ssl)
        {
            if(auto sp = ios().template emplace<SSLHTTPPeer<Handler>>(
                port_, handler_, j_, remote_address,
                    buffers, std::move(socket)))
                sp->run();
            return;
        }
        if(auto sp = ios().template emplace<PlainHTTPPeer<Handler>>(
            port_, handler_, j_, remote_address,
                buffers, std::move(socket)))
            sp->run();
    }

PlainHTTPPeer#websocketUpgrade - src/ripple/server/impl/PlainHTTPPeer.h

Plain HTTP connection being upgraded to a websocket, invoked from ServerHandlerImp#onHandoff which in return is invoked via PlainHTTPPeer#do_request which is invoked after reading client data(BaseHTTPPeer::do_read). Upgrading SSL connections work in a similar way

102
103
104
105
106
107
108
109
110
111
112
    template<class Handler>
    std::shared_ptr<WSSession>
    PlainHTTPPeer<Handler>::
    websocketUpgrade()
    {
        auto ws = this->ios().template emplace<PlainWSPeer<Handler>>(
            this->port_, this->handler_, this->remote_address_,
                std::move(this->message_), std::move(stream_),
                    this->journal_);
        return ws;
    }

ServerHandlerImp#processSession - src/ripple/rpc/impl/ServerHandlerImp.cpp

Invoked from ServerHandlerImp#onRequest which is invoked when client data is received

379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
    Json::Value
    ServerHandlerImp::processSession(
        std::shared_ptr<WSSession> const& session,
            std::shared_ptr<JobQueue::Coro> const& coro,
                Json::Value const& jv)
    {
        auto is = std::static_pointer_cast<WSInfoSub> (session->appDefined);
        if (is->getConsumer().disconnect())
        {
            session->close();
            // FIX: This rpcError is not delivered since the session
            // was just closed.
            return rpcError(rpcSLOW_DOWN);
        }

        // Requests without "command" are invalid.
        Json::Value jr(Json::objectValue);
        Resource::Charge loadType = Resource::feeReferenceRPC;
        try
        {
            if ((!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
                (jv.isMember(jss::command) && !jv[jss::command].isString()) ||
                (jv.isMember(jss::method) && !jv[jss::method].isString()) ||
                (jv.isMember(jss::command) && jv.isMember(jss::method) &&
                 jv[jss::command].asString() != jv[jss::method].asString()))
            {
                jr[jss::type] = jss::response;
                jr[jss::status] = jss::error;
                jr[jss::error] = jss::missingCommand;
                jr[jss::request] = jv;
                if (jv.isMember (jss::id))
                    jr[jss::id]  = jv[jss::id];
                if (jv.isMember(jss::jsonrpc))
                    jr[jss::jsonrpc] = jv[jss::jsonrpc];
                if (jv.isMember(jss::ripplerpc))
                    jr[jss::ripplerpc] = jv[jss::ripplerpc];

                is->getConsumer().charge(Resource::feeInvalidRPC);
                return jr;
            }

            auto required = RPC::roleRequired(jv.isMember(jss::command) ?
                                              jv[jss::command].asString() :
                                              jv[jss::method].asString());
            auto role = requestRole(
                required,
                session->port(),
                jv,
                beast::IP::from_asio(session->remote_endpoint().address()),
                is->user());
            if (Role::FORBID == role)
            {
                loadType = Resource::feeInvalidRPC;
                jr[jss::result] = rpcError (rpcFORBIDDEN);
            }
            else
            {
                RPC::Context context{
                    app_.journal("RPCHandler"),
                    jv,
                    app_,
                    loadType,
                    app_.getOPs(),
                    app_.getLedgerMaster(),
                    is->getConsumer(),
                    role,
                    coro,
                    is,
                    {is->user(), is->forwarded_for()}
                    };
                RPC::doCommand(context, jr[jss::result]);
            }
        }
        catch (std::exception const& ex)
        {
            jr[jss::result] = RPC::make_error(rpcINTERNAL);
            JLOG(m_journal.error())
               << "Exception while processing WS: " << ex.what() << "\n"
               << "Input JSON: " << Json::Compact{Json::Value{jv}};
        }

        // ...

On line 436 we see the invokation of the RPC subsystem via the RPC::doCommand call.

All in all the complete Server lifecycle / workflow can be seen below:

Continue onto the next section to learn the details of the RPC dispatch and handlers mechanism