Rippled Audit

The Job Queue

The JobQueue class gets instantiated as part of the top-level Application class and is responsible for running tasks asynchronously. This is accomplished through the utilization of an internal instance of the Workers class (which in return manages an internal thread pool), the Job class providing a generic wrapper around asynchronous logic, and coroutines.

JobQueue Class - src/ripple/core/JobQueue.h

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    class JobQueue
        : public Stoppable
        , private Workers::Callback
    {
        // ... skipping internal coroutine definition for brevity (see below)

        template <typename JobHandler,
        typename = std::enable_if_t<std::is_same<decltype(
            std::declval<JobHandler&&>()(std::declval<Job&>())), void>::value>>

        bool addJob (JobType type, std::string const& name, JobHandler&& jobHandler)

        template <class F>
        std::shared_ptr<Coro> postCoro (JobType t, std::string const& name, F&& f);

        int getJobCount (JobType t) const;

        int getJobCountTotal (JobType t) const;

        int getJobCountGE (JobType t) const;

        void setThreadCount (int c, bool const standaloneMode);

        std::unique_ptr <LoadEvent>
        makeLoadEvent (JobType t, std::string const& name);

        void addLoadEvents (JobType t, int count, std::chrono::milliseconds elapsed);

        bool isOverloaded ();

        Json::Value getJson (int c = 0);

        void rendezvous();

    private:
        friend class Coro;

        using JobDataMap = std::map <JobType, JobTypeData>;

        beast::Journal m_journal;
        mutable std::mutex m_mutex;
        std::uint64_t m_lastJob;
        std::set <Job> m_jobSet;
        JobDataMap m_jobData;
        JobTypeData m_invalidJobData;

        int m_processCount;

        int nSuspend_ = 0;

        Workers m_workers;
        Job::CancelCallback m_cancelCallback;

        std::condition_variable cv_;

        void collect();

        JobTypeData& getJobTypeData (JobType type);

        void onStop() override;

        void checkStopped (std::lock_guard <std::mutex> const& lock);

        bool addRefCountedJob (
            JobType type, std::string const& name, JobFunction const& func);

        void queueJob (Job const& job, std::lock_guard <std::mutex> const& lock);

        void getNextJob (Job& job);

        void finishJob (JobType type);

        void processTask (int instance) override;

        int getJobLimit (JobType type);

        void onChildrenStopped () override;
    };

On line 97 of the code block above we see the set of Jobs being executed. On line 105 we see the inclusion of workers

Jobs are placed on the Queue by many components in the system. These include:

The following components execute work on the job queue via coroutines:

We can see the Workers class below, managing an encapsulated thread pool to run jobs added to it in parallel.

Workers Class - src/ripple/core/impl/Workers.h

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
    class Workers
    {
        // ... comments removed for brevity

    public:
        explicit Workers (Callback& callback,
                          perf::PerfLog& perfLog,
                          std::string const& threadNames = "Worker",
                          int numberOfThreads =
                             static_cast<int>(std::thread::hardware_concurrency()));

        ~Workers ();

        int getNumberOfThreads () const noexcept;

        void setNumberOfThreads (int numberOfThreads);

        void pauseAllThreadsAndWait ();

        void addTask ();

        int numberOfCurrentlyRunningTasks () const noexcept;

    private:
        struct PausedTag
        {
           explicit PausedTag() = default;
        };

        class Worker
            : public beast::LockFreeStack <Worker>::Node
            , public beast::LockFreeStack <Worker, PausedTag>::Node
        {
        public:
           Worker (Workers& workers,
               std::string const& threadName,
               int const instance);

            ~Worker ();

           void notify ();

        private:
           void run ();

        private:
            Workers& m_workers;
            std::string const threadName_;
            int const instance_;

            std::thread thread_;
            std::mutex mutex_;
            std::condition_variable wakeup_;
            int wakeCount_;               // how many times to un-pause
            bool shouldExit_;
        };

    private:
        static void deleteWorkers (beast::LockFreeStack <Worker>& stack);

    private:
        Callback& m_callback;
        perf::PerfLog& perfLog_;
        std::string m_threadNames;                   // The name to give each thread
        beast::WaitableEvent m_allPaused;            // signaled when all threads paused
        semaphore m_semaphore;                       // each pending task is 1 resource
        int m_numberOfThreads;                       // how many we want active now
        std::atomic <int> m_activeCount;             // to know when all are paused
        std::atomic <int> m_pauseCount;              // how many threads need to pause now
        std::atomic <int> m_runningTaskCount;        // how many calls to processTask() active
        beast::LockFreeStack <Worker> m_everyone;           // holds all created workers
        beast::LockFreeStack <Worker, PausedTag> m_paused;  // holds just paused workers
    };

On line 111 above we see the collection of Worker instances, each of which containing a thread (line 91)

The Workers::Callback reference on line 102 is populated with a handle to the JobQueue itself. Upon receiving work, JobQueue notifies the Worker via the addTask method, after which the callback's (the JobQueue's) processTask method is invoked to actually execute the task.

Individual Worker Run Loop - Executes JobQueue Task

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    void Workers::Worker::run ()
    {
        bool shouldExit = true;
        do
        {
            // Increment the count of active workers, and if
            // we are the first one then reset the "all paused" event
            //
            if (++m_workers.m_activeCount == 1)
                m_workers.m_allPaused.reset ();

            for (;;)
            {
                // Put the name back in case the callback changed it
                beast::setCurrentThreadName (threadName_);

                // Acquire a task or "internal task."
                //
                m_workers.m_semaphore.wait ();

                // See if there's a pause request. This
                // counts as an "internal task."
                //
                int pauseCount = m_workers.m_pauseCount.load ();

                if (pauseCount > 0)
                {
                    // Try to decrement
                    pauseCount = --m_workers.m_pauseCount;

                    if (pauseCount >= 0)
                    {
                        // We got paused
                        break;
                    }
                    else
                    {
                        // Undo our decrement
                        ++m_workers.m_pauseCount;
                    }
                }

                // We couldn't pause so we must have gotten
                // unblocked in order to process a task.
                //
                ++m_workers.m_runningTaskCount;
                m_workers.m_callback.processTask (instance_);
                --m_workers.m_runningTaskCount;
            }

            // Any worker that goes into the paused list must
            // guarantee that it will eventually block on its
            // event object.
            //
            m_workers.m_paused.push_front (this);

            // Decrement the count of active workers, and if we
            // are the last one then signal the "all paused" event.
            //
            if (--m_workers.m_activeCount == 0)
                m_workers.m_allPaused.signal ();

            // Set inactive thread name.
            beast::setCurrentThreadName ("(" + threadName_ + ")");

            // [1] We will be here when the paused list is popped
            //
            // We block on our condition_variable, wakeup_, a requirement of being
            // put into the paused list.
            //
            // wakeup_ will get signaled by either Worker::notify() or ~Worker.
            {
                std::unique_lock <std::mutex> lock {mutex_};
                wakeup_.wait (lock, [this] {return this->wakeCount_ > 0;});

                shouldExit = shouldExit_;
                --wakeCount_;
            }
        } while (! shouldExit);
    }

The JobQueue itself is a Stoppable child of NodeStoreScheduler, and thus its lifecycle is managed along with the other entities in the Stoppable heirarchy. See the last section for details on this process.

The last component of this subsystem is the Coro class wraps boost::coroutines::asymmetic_coroutine providing a mechanism which to schedule and suspend/resume asynchronous logic.

Coro Class - src/ripple/core/JobQueue.h

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    class Coro : public std::enable_shared_from_this<Coro>
    {
        // ... comments removed for brevity
        private:
            detail::LocalValues lvs_;
            JobQueue& jq_;
            JobType type_;
            std::string name_;
            bool running_;
            std::mutex mutex_;
            std::mutex mutex_run_;
            std::condition_variable cv_;
            boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
            boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
        #ifndef NDEBUG
            bool finished_ = false;
        #endif

        public:
            template <class F>
            Coro(Coro_create_t, JobQueue&, JobType,
                std::string const&, F&&);

            Coro(Coro const&) = delete;
            Coro& operator= (Coro const&) = delete;

            ~Coro();

            void yield() const;

            bool post();

            void resume();

            bool runnable() const;

            void expectEarlyExit();

            void join();
    };

Continue onto the next section to start diving into components which use the JobQueue to process work asynchronously, namely the HTTP Server Handler.