Skip to content

Commit

Permalink
Merge pull request #148 from accelerated/sequencer-schema
Browse files Browse the repository at this point in the history
Added json schema for the sequencer configuration
  • Loading branch information
Alex Damian authored Mar 25, 2021
2 parents b71dcbd + 125a2f6 commit 09990a1
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 45 deletions.
31 changes: 20 additions & 11 deletions quantum/impl/quantum_configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Bloomberg {
namespace quantum {

inline
const std::string& Configuration::getJsonSchema()
const std::string& ConfigurationSchemaProvider::getJsonSchema()
{
static std::string schema = R"JSON(
{
Expand Down Expand Up @@ -85,64 +85,73 @@ const std::string& Configuration::getJsonSchema()
}

inline
const std::string& Configuration::getJsonSchemaUri()
const std::string& ConfigurationSchemaProvider::getJsonSchemaUri()
{
static std::string uri = "bloomberg:quantum.json";
return uri;
}

inline
void Configuration::setNumCoroutineThreads(int num)
Configuration& Configuration::setNumCoroutineThreads(int num)
{
_numCoroutineThreads = num;
return *this;
}

inline
void Configuration::setNumIoThreads(int num)
Configuration& Configuration::setNumIoThreads(int num)
{
_numIoThreads = num;
return *this;
}

inline
void Configuration::setPinCoroutineThreadsToCores(bool value)
Configuration& Configuration::setPinCoroutineThreadsToCores(bool value)
{
_pinCoroutineThreadsToCores = value;
return *this;
}

inline
void Configuration::setLoadBalanceSharedIoQueues(bool value)
Configuration& Configuration::setLoadBalanceSharedIoQueues(bool value)
{
_loadBalanceSharedIoQueues = value;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval)
Configuration& Configuration::setLoadBalancePollIntervalMs(std::chrono::milliseconds interval)
{
_loadBalancePollIntervalMs = interval;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy)
Configuration& Configuration::setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy)
{
_loadBalancePollIntervalBackoffPolicy = policy;
return *this;
}

inline
void Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs)
Configuration& Configuration::setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs)
{
_loadBalancePollIntervalNumBackoffs = numBackoffs;
return *this;
}

inline
void Configuration::setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny)
Configuration& Configuration::setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny)
{
_coroQueueIdRangeForAny = coroQueueIdRangeForAny;
return *this;
}

inline
void Configuration::setCoroutineSharingForAny(bool sharing)
Configuration& Configuration::setCoroutineSharingForAny(bool sharing)
{
_coroutineSharingForAny = sharing;
return *this;
}

inline
Expand Down
54 changes: 36 additions & 18 deletions quantum/quantum_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,78 +24,96 @@
namespace Bloomberg {
namespace quantum {

//==============================================================================================
// class ConfigurationSchemaProvider
//==============================================================================================
/// @class ConfigurationSchemaProvider
/// @brief Provides static accessors to a json schema representing a Configuration object
struct ConfigurationSchemaProvider
{
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();
};

//==============================================================================================
// class Configuration
//==============================================================================================
/// @class class Configuration.
/// @brief Configuration parameters for the Quantum library.
class Configuration
class Configuration : public ConfigurationSchemaProvider
{
public:
enum class BackoffPolicy : int {
Linear = QUANTUM_BACKOFF_LINEAR, ///< Linear backoff
Exponential = QUANTUM_BACKOFF_EXPONENTIAL ///< Exponential backoff (doubles every time)
};
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();

/// @brief Set the number of threads running coroutines.
/// @param[in] num The number of threads. Set to -1 to have one coroutine thread per core.
/// Default is -1.
void setNumCoroutineThreads(int num);
/// @return A reference to itself
Configuration& setNumCoroutineThreads(int num);

/// @brief Set the number of threads running IO tasks.
/// @param[in] num The number of threads. Default is 5.
void setNumIoThreads(int num);
/// @return A reference to itself
Configuration& setNumIoThreads(int num);

/// @brief Indicate if coroutine threads should be pinned to a core.
/// @param[in] value True or False. Default is False.
/// @note For best performance, the number of coroutine threads should
/// be <= the number of cores in the system.
void setPinCoroutineThreadsToCores(bool value);
/// @return A reference to itself
Configuration& setPinCoroutineThreadsToCores(bool value);

/// @brief Load balancee the shared IO queues.
/// @brief Load balance the shared IO queues.
/// @param[in] value If set to true, posting to the 'any' IO queue will result in
/// the load being spread among N queues. This mode can provide higher
/// throughput if dealing with high task loads. Default is false.
/// @note To achieve higher performance, the threads run in polling mode which
/// increases CPU usage even when idle.
void setLoadBalanceSharedIoQueues(bool value);
/// @return A reference to itself
Configuration& setLoadBalanceSharedIoQueues(bool value);

/// @brief Set the interval between IO thread polls.
/// @param[in] interval Interval in milliseconds. Default is 100ms.
/// @note Setting this to a higher value means it may take longer to react to the first
/// IO task posted, and vice-versa if the interval is lower.
void setLoadBalancePollIntervalMs(std::chrono::milliseconds interval);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalMs(std::chrono::milliseconds interval);

/// @brief Set a backoff policy for the shared queue polling interval.
/// @param[in] policy The backoff policy to use. Default is 'Linear'.
void setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalBackoffPolicy(BackoffPolicy policy);

/// @brief Set the number of backoffs.
/// @param[in] numBackoffs The number of backoff increments. Default is 0.
/// When the number of backoffs is reached, the poll interval remains unchanged thereafter.
void setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs);
/// @return A reference to itself
Configuration& setLoadBalancePollIntervalNumBackoffs(size_t numBackoffs);

/// @brief Sets the range of coroutine queueIds covered by IQueue::QueueId::Any when using Dispatcher::post
/// @param[in] coroQueueIdRangeForAny The range [minQueueId, maxQueueId] of queueIds that IQueue::QueueId::Any
/// will cover.
/// @remark if the provided range is empty or invalid, then the default range of
/// std::pair<int, int>(0, getNumCoroutineThreads()-1) will be used
void setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny);
/// @return A reference to itself
Configuration& setCoroQueueIdRangeForAny(const std::pair<int, int>& coroQueueIdRangeForAny);

/// @brief Enables or disables the shared-coro-queue-for-any settings
/// @param[in] isSharedCoroQueueForAny sets the shared-coro-queue-for any setting
/// @warning When the coroutine sharing feature is enabled, then after each yield
/// (explicit or implicit) a coroutine sent to the Any queue may be executed by
/// a different thread. As a result, coroutines using thread-local-storage (e.g., via thread_local),
/// will _not_ work as expected.
void setCoroutineSharingForAny(bool sharing);
/// @return A reference to itself
Configuration& setCoroutineSharingForAny(bool sharing);

/// @brief Get the number of coroutine threads.
/// @return The number of threads.
Expand Down
39 changes: 37 additions & 2 deletions quantum/util/impl/quantum_sequencer_configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,45 @@ struct SequenceKeyData
StatsPtr _stats;
};

inline const std::string&
SequencerConfigurationSchemaProvider::getJsonSchema()
{
static std::string schema = R"JSON(
{
"$schema" : "http://json-schema.org/draft-04/schema#",
"$id" : "bloomberg:sequencer.quantum.json",
"title": "Quantum sequencer settings",
"type": "object",
"properties": {
"controlQueueId": {
"type": "number",
"default": 0
},
"bucketCount": {
"type": "number",
"default": 100
}
},
"additionalProperties": false,
"required": []
}
)JSON";
return schema;
}

inline const std::string&
SequencerConfigurationSchemaProvider::getJsonSchemaUri()
{
static std::string uri = "bloomberg:sequencer.quantum.json";
return uri;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>&
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::setControlQueueId(int controlQueueId)
{
_controllerQueueId = controlQueueId;
return *this;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand All @@ -50,10 +84,11 @@ SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::getControlQueueI
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
void
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>&
SequencerConfiguration<SequenceKey, Hash, KeyEqual, Allocator>::setBucketCount(size_t bucketCount)
{
_bucketCount = bucketCount;
return *this;
}

template <class SequenceKey, class Hash, class KeyEqual, class Allocator>
Expand Down
44 changes: 30 additions & 14 deletions quantum/util/quantum_sequencer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ namespace quantum {

struct SequenceKeyData;

//==============================================================================================
// class SequencerConfigurationSchemaProvider
//==============================================================================================
/// @class SequencerConfigurationSchemaProvider
/// @brief Provides static accessors to a json schema representing a SequencerConfiguration object
struct SequencerConfigurationSchemaProvider
{
/// @brief Get the JSON schema corresponding to this configuration object.
/// @return The draft-04 compatible schema.
static const std::string& getJsonSchema();

/// @brief Get the schema URI used to resolve remote JSON references '$ref'.
/// @return The URI.
static const std::string& getJsonSchemaUri();
};

//==============================================================================================
// class SequencerConfiguration
//==============================================================================================
Expand All @@ -38,29 +54,29 @@ template <class SequenceKey,
class Hash = std::hash<SequenceKey>,
class KeyEqual = std::equal_to<SequenceKey>,
class Allocator = std::allocator<std::pair<const SequenceKey, SequenceKeyData>>>
class SequencerConfiguration
class SequencerConfiguration : public SequencerConfigurationSchemaProvider
{
public:

public:
/// @brief Callback for unhandled exceptions in tasks posted to Sequencer
/// @param exception pointer to the thrown exception
/// @param opaque opaque data passed when posting a task
using ExceptionCallback = std::function<void(std::exception_ptr exception, void* opaque)>;

public:

/// @brief Sets the id of the control queue
/// @param controlQueueId the queue id
/// @remark Sequencer typically processes tasks with the lower latency when the control queue is
/// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it.
void setControlQueueId(int controlQueueId);
/// dedicated for the sequencer control tasks only, and no other tasks are enqueued into it.
/// @return A reference to itself
SequencerConfiguration& setControlQueueId(int controlQueueId);

/// @brief Gets the id of the control queue
/// @return the queue id
int getControlQueueId() const;

/// @brief Sets the minimal number of buckets to be used for the context hash map
/// @param bucketCount the bucket number
void setBucketCount(size_t bucketCount);
/// @return A reference to itself
SequencerConfiguration& setBucketCount(size_t bucketCount);

/// @brief gets the minimal number of buckets to be used for the context hash map
/// @return the bucket number
Expand Down Expand Up @@ -99,12 +115,12 @@ class SequencerConfiguration
const ExceptionCallback& getExceptionCallback() const;

private:
int _controllerQueueId{0};
size_t _bucketCount{0};
Hash _hash;
KeyEqual _keyEqual;
Allocator _allocator;
ExceptionCallback _exceptionCallback;
int _controllerQueueId{0};
size_t _bucketCount{100};
Hash _hash;
KeyEqual _keyEqual;
Allocator _allocator;
ExceptionCallback _exceptionCallback;
};

}}
Expand Down

0 comments on commit 09990a1

Please sign in to comment.