Skip to content

Commit

Permalink
Added json schema for the sequencer configuration.
Browse files Browse the repository at this point in the history
Moved schema getters into base classes.
Return reference to self for easier chaining when using config setters.

Signed-off-by: Alexander Damian <[email protected]>
  • Loading branch information
Alexander Damian committed Mar 25, 2021
1 parent 342ba0e commit 125a2f6
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 45 deletions.
31 changes: 20 additions & 11 deletions quantum/impl/quantum_configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Bloomberg {
namespace quantum {

inline
const std::string& Configuration::getJsonSchema()
const std::string& ConfigurationSchemaProvider::getJsonSchema()
{
static std::string schema = R"JSON(
{
Expand Down Expand Up @@ -85,64 +85,73 @@ const std::string& Configuration::getJsonSchema()
}

inline
const std::string& Configuration::getJsonSchemaUri()
const std::string& ConfigurationSchemaProvider::getJsonSchemaUri()
{
static std::string uri = "bloomberg:quantum.json";
return uri;
}

inline
void Configuration::setNumCoroutineThreads(int num)
Configuration& Configuration::setNumCoroutineThreads(int num)
{
_numCoroutineThreads = num;
return *this;
}

inline
void Configuration::setNumIoThreads(int num)
Configuration& Configuration::setNumIoThreads(int num)
{
_numIoThreads = num;
return *this;
}

inline
void Configuration::setPinCoroutineThreadsToCores(bool value)
Configuration& Configuration::setPinCoroutineThreadsToCores(bool value)
{
_pinCoroutineThreadsToCores = value;
return *this;
}

inline
void Configuration::setLoadBalanceSharedIoQueues(bool value)
Configuration& Configuration::setLoadBalanceSharedIoQueues(bool value)
{
_loadBalanceSharedIoQueues = value;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval)
Configuration& Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval)
{
_loadBalancePollIntervalMs = interval;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy)
Configuration& Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy)
{
_loadBalancePollIntervalBackoffPolicy = policy;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs)
Configuration& Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs)
{
_loadBalancePollIntervalNumBackoffs = numBackoffs;
return *this;
}

inline
void Configuration::setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny)
Configuration& Configuration::setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny)
{
_coroQueueIdRangeForAny = coroQueueIdRangeForAny;
return *this;
}

inline
void Configuration::setCoroutineSharingForAny(bool sharing)
Configuration& Configuration::setCoroutineSharingForAny(bool sharing)
{
_coroutineSharingForAny = sharing;
return *this;
}

inline
Expand Down
54 changes: 36 additions & 18 deletions quantum/quantum_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,78 +24,96 @@
namespace Bloomberg {
namespace quantum {

//==============================================================================================
// class ConfigurationSchemaProvider
//==============================================================================================
/// @class ConfigurationSchemaProvider
/// @brief Provides static accessors to a json schema representing a Configuration object
struct ConfigurationSchemaProvider
{
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();
};

//==============================================================================================
// class Configuration
//==============================================================================================
/// @class class Configuration.
/// @brief Configuration parameters for the Quantum library.
class Configuration
class Configuration : public ConfigurationSchemaProvider
{
public:
enum class BackoffPolicy : int {
Linear = QUANTUM_BACKOFF_LINEAR, ///< Linear backoff
Exponential = QUANTUM_BACKOFF_EXPONENTIAL ///< Exponential backoff (doubles every time)
};
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();

/// @brief Set the number of threads running coroutines.
/// @param[in] num The number of threads. Set to -1 to have one coroutine thread per core.
/// Default is -1.
void setNumCoroutineThreads(int num);
/// @return A reference to itself
Configuration& setNumCoroutineThreads(int num);

/// @brief Set the number of threads running IO tasks.
/// @param[in] num The number of threads. Default is 5.
void setNumIoThreads(int num);
/// @return A reference to itself
Configuration& setNumIoThreads(int num);

/// @brief Indicate if coroutine threads should be pinned to a core.
/// @param[in] value True or False. Default is False.
/// @note For best performance, the number of coroutine threads should
/// be <= the number of cores in the system.
void setPinCoroutineThreadsToCores(bool value);
/// @return A reference to itself
Configuration& setPinCoroutineThreadsToCores(bool value);

/// @brief Load balancee the shared IO queues.
/// @brief Load balance the shared IO queues.
/// @param[in] value If set to true, posting to the 'any' IO queue will result in
/// the load being spread among N queues. This mode can provide higher
/// throughput if dealing with high task loads. Default is false.
/// @note To achieve higher performance, the threads run in polling mode which
/// increases CPU usage even when idle.
void setLoadBalanceSharedIoQueues(bool value);
/// @return A reference to itself
Configuration& setLoadBalanceSharedIoQueues(bool value);

/// @brief Set the interval between IO thread polls.
/// @param[in] interval Interval in milliseconds. Default is 100ms.
/// @note Setting this to a higher value means it may take longer to react to the first
/// IO task posted, and vice-versa if the interval is lower.
void setLoadBalancePollIntervalMs(std::chrono::milliseconds interval);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalMs(std::chrono::milliseconds interval);

/// @brief Set a backoff policy for the shared queue polling interval.
/// @param[in] policy The backoff policy to use. Default is 'Linear'.
void setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy);

/// @brief Set the number of backoffs.
/// @param[in] numBackoffs The number of backoff increments. Default is 0.
/// When the number of backoffs is reached, the poll interval remains unchanged thereafter.
void setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs);

/// @brief Sets the range of coroutine queueIds covered by IQueue::QueueId::Any when using Dispatcher::post
/// @param[in] coroQueueIdRangeForAny The range [minQueueId, maxQueueId] of queueIds that IQueue::QueueId::Any
/// will cover.
/// @remark if the provided range is empty or invalid, then the default range of
/// std::pair<int, int>(0, getNumCoroutineThreads()-1) will be used
void setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny);
/// @return A reference to itself
Configuration& setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny);

/// @brief Enables or disables the shared-coro-queue-for-any settings
/// @param[in] isSharedCoroQueueForAny sets the shared-coro-queue-for any setting
/// @warning When the coroutine sharing feature is enabled, then after each yield
/// (explicit or implicit) a coroutine sent to the Any queue may be executed by
/// a different thread. As a result, coroutines using thread-local-storage (e.g., via thread_local),
/// will _not_ work as expected.
void setCoroutineSharingForAny(bool sharing);
/// @return A reference to itself
Configuration& setCoroutineSharingForAny(bool sharing);

/// @brief Get the number of coroutine threads.
/// @return The number of threads.
Expand Down
39 changes: 37 additions & 2 deletions quantum/util/impl/quantum_sequencer_configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,45 @@ struct SequenceKeyData
StatsPtr _stats;
};

inline const std::string&
SequencerConfigurationSchemaProvider::getJsonSchema()
{
static std::string schema = R"JSON(
{
"$schema" : "http://json-schema.org/draft-04/schema#",
"$id" : "bloomberg:sequencer.quantum.json",
"title": "Quantum sequencer settings",
"type": "object",
"properties": {
"controlQueueId": {
"type": "number",
"default": 0
},
"bucketCount": {
"type": "number",
"default": 100
}
},
"additionalProperties": false,
"required": []
}
)JSON";
return schema;
}

inline const std::string&
SequencerConfigurationSchemaProvider::getJsonSchemaUri()
{
static std::string uri = "bloomberg:sequencer.quantum.json";
return uri;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>&
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::setControlQueueId(int controlQueueId)
{
_controllerQueueId = controlQueueId;
return *this;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand All @@ -50,10 +84,11 @@ SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::getControlQueueI
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>&
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::setBucketCount(size_t bucketCount)
{
_bucketCount = bucketCount;
return *this;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down
44 changes: 30 additions & 14 deletions quantum/util/quantum_sequencer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ namespace quantum {

struct SequenceKeyData;

//==============================================================================================
// class SequencerConfigurationSchemaProvider
//==============================================================================================
/// @class SequencerConfigurationSchemaProvider
/// @brief Provides static accessors to a json schema representing a SequencerConfiguration object
struct SequencerConfigurationSchemaProvider
{
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();
};

//==============================================================================================
// class SequencerConfiguration
//==============================================================================================
Expand All @@ -38,29 +54,29 @@ template <class SequenceKey,
class Hash = std::hash<SequenceKey>,
class KeyEqual = std::equal_to<SequenceKey>,
class Allocator = std::allocator<std::pair<const SequenceKey, SequenceKeyData>>>
class SequencerConfiguration
class SequencerConfiguration : public SequencerConfigurationSchemaProvider
{
public:

public:
/// @brief Callback for unhandled exceptions in tasks posted to Sequencer
/// @param exception pointer to the thrown exception
/// @param opaque opaque data passed when posting a task
using ExceptionCallback = std::function<void(std::exception_ptr exception, void* opaque)>;

public:

/// @brief Sets the id of the control queue
/// @param controlQueueId the queue id
/// @remark Sequencer typically processes tasks with the lower latency when the control queue is
/// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it.
void setControlQueueId(int controlQueueId);
/// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it.
/// @return A reference to itself
SequencerConfiguration& setControlQueueId(int controlQueueId);

/// @brief Gets the id of the control queue
/// @return the queue id
int getControlQueueId() const;

/// @brief Sets the minimal number of buckets to be used for the context hash map
/// @param bucketCount the bucket number
void setBucketCount(size_t bucketCount);
/// @return A reference to itself
SequencerConfiguration& setBucketCount(size_t bucketCount);

/// @brief gets the minimal number of buckets to be used for the context hash map
/// @return the bucket number
Expand Down Expand Up @@ -99,12 +115,12 @@ class SequencerConfiguration
const ExceptionCallback& getExceptionCallback() const;

private:
int _controllerQueueId{0};
size_t _bucketCount{0};
Hash _hash;
KeyEqual _keyEqual;
Allocator _allocator;
ExceptionCallback _exceptionCallback;
int _controllerQueueId{0};
size_t _bucketCount{100};
Hash _hash;
KeyEqual _keyEqual;
Allocator _allocator;
ExceptionCallback _exceptionCallback;
};

}}
Expand Down

0 comments on commit 125a2f6

Please sign in to comment.