From 125a2f690a387a18add7bc72577d4e80aa860432 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Wed, 24 Mar 2021 18:38:48 -0400 Subject: [PATCH] Added json schema for the sequencer configuration. Moved schema getters into base classes. Return reference to self for easier chaining when using config setters. Signed-off-by: Alexander Damian --- quantum/impl/quantum_configuration_impl.h | 31 +++++++---- quantum/quantum_configuration.h | 54 ++++++++++++------- .../quantum_sequencer_configuration_impl.h | 39 +++++++++++++- .../util/quantum_sequencer_configuration.h | 44 ++++++++++----- 4 files changed, 123 insertions(+), 45 deletions(-) diff --git a/quantum/impl/quantum_configuration_impl.h b/quantum/impl/quantum_configuration_impl.h index a1eef43..e8394fd 100644 --- a/quantum/impl/quantum_configuration_impl.h +++ b/quantum/impl/quantum_configuration_impl.h @@ -23,7 +23,7 @@ namespace Bloomberg { namespace quantum { inline -const std::string& Configuration::getJsonSchema() +const std::string& ConfigurationSchemaProvider::getJsonSchema() { static std::string schema = R"JSON( { @@ -85,64 +85,73 @@ const std::string& Configuration::getJsonSchema() } inline -const std::string& Configuration::getJsonSchemaUri() +const std::string& ConfigurationSchemaProvider::getJsonSchemaUri() { static std::string uri = "bloomberg:quantum.json"; return uri; } inline -void Configuration::setNumCoroutineThreads(int num) +Configuration& Configuration::setNumCoroutineThreads(int num) { _numCoroutineThreads = num; + return *this; } inline -void Configuration::setNumIoThreads(int num) +Configuration& Configuration::setNumIoThreads(int num) { _numIoThreads = num; + return *this; } inline -void Configuration::setPinCoroutineThreadsToCores(bool value) +Configuration& Configuration::setPinCoroutineThreadsToCores(bool value) { _pinCoroutineThreadsToCores = value; + return *this; } inline -void Configuration::setLoadBalanceSharedIoQueues(bool value) +Configuration& Configuration::setLoadBalanceSharedIoQueues(bool value) { _loadBalanceSharedIoQueues = value; + return *this; } inline -void Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval) +Configuration& Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval) { _loadBalancePollIntervalMs = interval; + return *this; } inline -void Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy) +Configuration& Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy) { _loadBalancePollIntervalBackoffPolicy = policy; + return *this; } inline -void Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs) +Configuration& Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs) { _loadBalancePollIntervalNumBackoffs = numBackoffs; + return *this; } inline -void Configuration::setCoroQueueIdRangeForAny(const std::pair& coroQueueIdRangeForAny) +Configuration& Configuration::setCoroQueueIdRangeForAny(const std::pair& coroQueueIdRangeForAny) { _coroQueueIdRangeForAny = coroQueueIdRangeForAny; + return *this; } inline -void Configuration::setCoroutineSharingForAny(bool sharing) +Configuration& Configuration::setCoroutineSharingForAny(bool sharing) { _coroutineSharingForAny = sharing; + return *this; } inline diff --git a/quantum/quantum_configuration.h b/quantum/quantum_configuration.h index b626364..82f48f3 100644 --- a/quantum/quantum_configuration.h +++ b/quantum/quantum_configuration.h @@ -24,70 +24,87 @@ namespace Bloomberg { namespace quantum { +//============================================================================================== +// class ConfigurationSchemaProvider +//============================================================================================== +/// @class ConfigurationSchemaProvider +/// @brief Provides static accessors to a json schema representing a Configuration object +struct ConfigurationSchemaProvider +{ + /// @brief Get the JSON schema corresponding to this configuration object. + /// @return The draft-04 compatible schema. + static const std::string& getJsonSchema(); + + /// @brief Get the schema URI used to resolve remote JSON references '$ref'. + /// @return The URI. + static const std::string& getJsonSchemaUri(); +}; + //============================================================================================== // class Configuration //============================================================================================== /// @class class Configuration. /// @brief Configuration parameters for the Quantum library. -class Configuration +class Configuration : public ConfigurationSchemaProvider { public: enum class BackoffPolicy : int { Linear = QUANTUM_BACKOFF_LINEAR, ///< Linear backoff Exponential = QUANTUM_BACKOFF_EXPONENTIAL ///< Exponential backoff (doubles every time) }; - /// @brief Get the JSON schema corresponding to this configuration object. - /// @return The draft-04 compatible schema. - static const std::string& getJsonSchema(); - - /// @brief Get the schema URI used to resolve remote JSON references '$ref'. - /// @return The URI. - static const std::string& getJsonSchemaUri(); /// @brief Set the number of threads running coroutines. /// @param[in] num The number of threads. Set to -1 to have one coroutine thread per core. /// Default is -1. - void setNumCoroutineThreads(int num); + /// @return A reference to itself + Configuration& setNumCoroutineThreads(int num); /// @brief Set the number of threads running IO tasks. /// @param[in] num The number of threads. Default is 5. - void setNumIoThreads(int num); + /// @return A reference to itself + Configuration& setNumIoThreads(int num); /// @brief Indicate if coroutine threads should be pinned to a core. /// @param[in] value True or False. Default is False. /// @note For best performance, the number of coroutine threads should /// be <= the number of cores in the system. - void setPinCoroutineThreadsToCores(bool value); + /// @return A reference to itself + Configuration& setPinCoroutineThreadsToCores(bool value); - /// @brief Load balancee the shared IO queues. + /// @brief Load balance the shared IO queues. /// @param[in] value If set to true, posting to the 'any' IO queue will result in /// the load being spread among N queues. This mode can provide higher /// throughput if dealing with high task loads. Default is false. /// @note To achieve higher performance, the threads run in polling mode which /// increases CPU usage even when idle. - void setLoadBalanceSharedIoQueues(bool value); + /// @return A reference to itself + Configuration& setLoadBalanceSharedIoQueues(bool value); /// @brief Set the interval between IO thread polls. /// @param[in] interval Interval in milliseconds. Default is 100ms. /// @note Setting this to a higher value means it may take longer to react to the first /// IO task posted, and vice-versa if the interval is lower. - void setLoadBalancePollIntervalMs(std::chrono::milliseconds interval); + /// @return A reference to itself + Configuration& setLoadBalancePollIntervalMs(std::chrono::milliseconds interval); /// @brief Set a backoff policy for the shared queue polling interval. /// @param[in] policy The backoff policy to use. Default is 'Linear'. - void setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy); + /// @return A reference to itself + Configuration& setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy); /// @brief Set the number of backoffs. /// @param[in] numBackoffs The number of backoff increments. Default is 0. /// When the number of backoffs is reached, the poll interval remains unchanged thereafter. - void setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs); + /// @return A reference to itself + Configuration& setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs); /// @brief Sets the range of coroutine queueIds covered by IQueue::QueueId::Any when using Dispatcher::post /// @param[in] coroQueueIdRangeForAny The range [minQueueId, maxQueueId] of queueIds that IQueue::QueueId::Any /// will cover. /// @remark if the provided range is empty or invalid, then the default range of /// std::pair(0, getNumCoroutineThreads()-1) will be used - void setCoroQueueIdRangeForAny(const std::pair& coroQueueIdRangeForAny); + /// @return A reference to itself + Configuration& setCoroQueueIdRangeForAny(const std::pair& coroQueueIdRangeForAny); /// @brief Enables or disables the shared-coro-queue-for-any settings /// @param[in] isSharedCoroQueueForAny sets the shared-coro-queue-for any setting @@ -95,7 +112,8 @@ class Configuration /// (explicit or implicit) a coroutine sent to the Any queue may be executed by /// a different thread. As a result, coroutines using thread-local-storage (e.g., via thread_local), /// will _not_ work as expected. - void setCoroutineSharingForAny(bool sharing); + /// @return A reference to itself + Configuration& setCoroutineSharingForAny(bool sharing); /// @brief Get the number of coroutine threads. /// @return The number of threads. diff --git a/quantum/util/impl/quantum_sequencer_configuration_impl.h b/quantum/util/impl/quantum_sequencer_configuration_impl.h index b73d2f7..2995803 100644 --- a/quantum/util/impl/quantum_sequencer_configuration_impl.h +++ b/quantum/util/impl/quantum_sequencer_configuration_impl.h @@ -35,11 +35,45 @@ struct SequenceKeyData StatsPtr _stats; }; +inline const std::string& +SequencerConfigurationSchemaProvider::getJsonSchema() +{ + static std::string schema = R"JSON( + { + "$schema" : "http://json-schema.org/draft-04/schema#", + "$id" : "bloomberg:sequencer.quantum.json", + "title": "Quantum sequencer settings", + "type": "object", + "properties": { + "controlQueueId": { + "type": "number", + "default": 0 + }, + "bucketCount": { + "type": "number", + "default": 100 + } + }, + "additionalProperties": false, + "required": [] + } + )JSON"; + return schema; +} + +inline const std::string& +SequencerConfigurationSchemaProvider::getJsonSchemaUri() +{ + static std::string uri = "bloomberg:sequencer.quantum.json"; + return uri; +} + template -void +SequencerConfiguration& SequencerConfiguration::setControlQueueId(int controlQueueId) { _controllerQueueId = controlQueueId; + return *this; } template @@ -50,10 +84,11 @@ SequencerConfiguration::getControlQueueI } template -void +SequencerConfiguration& SequencerConfiguration::setBucketCount(size_t bucketCount) { _bucketCount = bucketCount; + return *this; } template diff --git a/quantum/util/quantum_sequencer_configuration.h b/quantum/util/quantum_sequencer_configuration.h index 187ff91..5a89141 100644 --- a/quantum/util/quantum_sequencer_configuration.h +++ b/quantum/util/quantum_sequencer_configuration.h @@ -25,6 +25,22 @@ namespace quantum { struct SequenceKeyData; +//============================================================================================== +// class SequencerConfigurationSchemaProvider +//============================================================================================== +/// @class SequencerConfigurationSchemaProvider +/// @brief Provides static accessors to a json schema representing a SequencerConfiguration object +struct SequencerConfigurationSchemaProvider +{ + /// @brief Get the JSON schema corresponding to this configuration object. + /// @return The draft-04 compatible schema. + static const std::string& getJsonSchema(); + + /// @brief Get the schema URI used to resolve remote JSON references '$ref'. + /// @return The URI. + static const std::string& getJsonSchemaUri(); +}; + //============================================================================================== // class SequencerConfiguration //============================================================================================== @@ -38,21 +54,20 @@ template , class KeyEqual = std::equal_to, class Allocator = std::allocator>> -class SequencerConfiguration +class SequencerConfiguration : public SequencerConfigurationSchemaProvider { -public: - +public: /// @brief Callback for unhandled exceptions in tasks posted to Sequencer /// @param exception pointer to the thrown exception /// @param opaque opaque data passed when posting a task using ExceptionCallback = std::function; - -public: + /// @brief Sets the id of the control queue /// @param controlQueueId the queue id /// @remark Sequencer typically processes tasks with the lower latency when the control queue is - /// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it. - void setControlQueueId(int controlQueueId); + /// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it. + /// @return A reference to itself + SequencerConfiguration& setControlQueueId(int controlQueueId); /// @brief Gets the id of the control queue /// @return the queue id @@ -60,7 +75,8 @@ class SequencerConfiguration /// @brief Sets the minimal number of buckets to be used for the context hash map /// @param bucketCount the bucket number - void setBucketCount(size_t bucketCount); + /// @return A reference to itself + SequencerConfiguration& setBucketCount(size_t bucketCount); /// @brief gets the minimal number of buckets to be used for the context hash map /// @return the bucket number @@ -99,12 +115,12 @@ class SequencerConfiguration const ExceptionCallback& getExceptionCallback() const; private: - int _controllerQueueId{0}; - size_t _bucketCount{0}; - Hash _hash; - KeyEqual _keyEqual; - Allocator _allocator; - ExceptionCallback _exceptionCallback; + int _controllerQueueId{0}; + size_t _bucketCount{100}; + Hash _hash; + KeyEqual _keyEqual; + Allocator _allocator; + ExceptionCallback _exceptionCallback; }; }}