Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle operator #515

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Rx/v2/examples/doxygen/throttle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "rxcpp/rx.hpp"

#include "rxcpp/rx-test.hpp"
#include "catch.hpp"

SCENARIO("throttle sample"){
printf("//! [throttle sample]\n");
using namespace std::chrono;
auto scheduler = rxcpp::identity_current_thread();
auto start = scheduler.now();
auto period = milliseconds(10);
auto values = rxcpp::observable<>::interval(start, period, scheduler).
take(4).
throttle(period);
values.
subscribe(
[](long v) { printf("OnNext: %ld\n", v); },
[]() { printf("OnCompleted\n"); });
printf("//! [throttle sample]\n");
}
271 changes: 271 additions & 0 deletions Rx/v2/src/rxcpp/operators/rx-throttle.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-throttle.hpp

\brief Return an observable that emits a value from the source and then ignores any following items until a particular timespan has passed before emitting another value.

\tparam Duration the type of the time interval
\tparam Coordination the type of the scheduler

\param period the period of time to suppress any emitted items after the first emission
\param coordination the scheduler to manage timeout for each event

\return Observable that emits a value from the source and then ignores any following items until a particular timespan has passed before emitting another value.

\sample
\snippet throttle.cpp throttle sample
\snippet output.txt throttle sample
*/

#if !defined(RXCPP_OPERATORS_RX_THROTTLE_HPP)
#define RXCPP_OPERATORS_RX_THROTTLE_HPP

#include "../rx-includes.hpp"

#include <iostream>

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct throttle_invalid_arguments {};

template<class... AN>
struct throttle_invalid : public rxo::operator_base<throttle_invalid_arguments<AN...>> {
using type = observable<throttle_invalid_arguments<AN...>, throttle_invalid<AN...>>;
};
template<class... AN>
using throttle_invalid_t = typename throttle_invalid<AN...>::type;

template<class T, class Duration, class Coordination>
struct throttle
{
typedef rxu::decay_t<T> source_value_type;
typedef rxu::decay_t<Coordination> coordination_type;
typedef typename coordination_type::coordinator_type coordinator_type;
typedef rxu::decay_t<Duration> duration_type;

struct throttle_values
{
throttle_values(duration_type p, coordination_type c)
: period(p)
, coordination(c)
{
}

duration_type period;
coordination_type coordination;
};
throttle_values initial;

throttle(duration_type period, coordination_type coordination)
: initial(period, coordination)
{
}

template<class Subscriber>
struct throttle_observer
{
typedef throttle_observer<Subscriber> this_type;
typedef rxu::decay_t<T> value_type;
typedef rxu::decay_t<Subscriber> dest_type;
typedef observer<T, this_type> observer_type;

struct throttle_subscriber_values : public throttle_values
{
throttle_subscriber_values(composite_subscription cs, dest_type d, throttle_values v, coordinator_type c)
: throttle_values(v)
, cs(std::move(cs))
, dest(std::move(d))
, coordinator(std::move(c))
, worker(coordinator.get_worker())
, throttled(false)
{
}

composite_subscription cs;
dest_type dest;
coordinator_type coordinator;
rxsc::worker worker;
mutable bool throttled;
};
typedef std::shared_ptr<throttle_subscriber_values> state_type;
state_type state;

throttle_observer(composite_subscription cs, dest_type d, throttle_values v, coordinator_type c)
: state(std::make_shared<throttle_subscriber_values>(throttle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
{
auto localState = state;

auto disposer = [=](const rxsc::schedulable&){
localState->cs.unsubscribe();
localState->dest.unsubscribe();
localState->worker.unsubscribe();
};
auto selectedDisposer = on_exception(
[&](){ return localState->coordinator.act(disposer); },
localState->dest);
if (selectedDisposer.empty()) {
return;
}

localState->dest.add([=](){
localState->worker.schedule(selectedDisposer.get());
});
localState->cs.add([=](){
localState->worker.schedule(selectedDisposer.get());
});
}

static std::function<void(const rxsc::schedulable&)> reset_throttle(state_type state) {
auto reset = [state](const rxsc::schedulable&) {
state->throttled = false;
};

auto selectedReset = on_exception(
[&](){ return state->coordinator.act(reset); },
state->dest);
if (selectedReset.empty()) {
return std::function<void(const rxsc::schedulable&)>();
}

return std::function<void(const rxsc::schedulable&)>(selectedReset.get());
}

void on_next(T v) const {
auto localState = state;

const auto tp = localState->worker.now().time_since_epoch();
std::cout << "on_next(" << v << ") at " << tp.count() / 1000000 << " throttled: " << (localState->throttled ? "true" : "false") << std::endl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove std::cout usage


if (!localState->throttled) {
localState->throttled = true;

state->dest.on_next(v);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call to on_next is not necessarily on the same worker. the following needs to be scheduled on the worker.

            if (!localState->throttled) {
                localState->throttled = true;

                state->dest.on_next(v);
            }

It would be more efficient to check throttled first and then only schedule state->dest.on_next(v); when throttled == false. To do that throttled would need to be std::atomic<bool>.

auto work = [v, localState](const rxsc::schedulable&) {
auto produce_time = localState->worker.now() + localState->period;

std::cout << "scheduling unthrottle for " << (produce_time.time_since_epoch().count() / 1000000) << std::endl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove std::cout usage


localState->worker.schedule(produce_time, reset_throttle(localState));
};
auto selectedWork = on_exception(
[&](){return localState->coordinator.act(work);},
localState->dest);
if (selectedWork.empty()) {
return;
}
localState->worker.schedule(selectedWork.get());
}
}

void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&) {
localState->dest.on_error(e);
};
auto selectedWork = on_exception(
[&](){ return localState->coordinator.act(work); },
localState->dest);
if (selectedWork.empty()) {
return;
}
localState->worker.schedule(selectedWork.get());
}

void on_completed() const {
auto localState = state;
auto work = [localState](const rxsc::schedulable&) {
localState->dest.on_completed();
};
auto selectedWork = on_exception(
[&](){ return localState->coordinator.act(work); },
localState->dest);
if (selectedWork.empty()) {
return;
}
localState->worker.schedule(selectedWork.get());
}

static subscriber<T, observer_type> make(dest_type d, throttle_values v) {
auto cs = composite_subscription();
auto coordinator = v.coordination.create_coordinator();

return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
}
};

template<class Subscriber>
auto operator()(Subscriber dest) const
-> decltype(throttle_observer<Subscriber>::make(std::move(dest), initial)) {
return throttle_observer<Subscriber>::make(std::move(dest), initial);
}
};

}

/*! @copydoc rx-throttle.hpp
*/
template<class... AN>
auto throttle(AN&&... an)
-> operator_factory<throttle_tag, AN...> {
return operator_factory<throttle_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<throttle_tag>
{
template<class Observable, class Duration,
class Enabled = rxu::enable_if_all_true_type_t<
is_observable<Observable>,
rxu::is_duration<Duration>>,
class SourceValue = rxu::value_type_t<Observable>,
class Throttle = rxo::detail::throttle<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
static auto member(Observable&& o, Duration&& d)
-> decltype(o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), identity_current_thread()))) {
return o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), identity_current_thread()));
}

template<class Observable, class Coordination, class Duration,
class Enabled = rxu::enable_if_all_true_type_t<
is_observable<Observable>,
is_coordination<Coordination>,
rxu::is_duration<Duration>>,
class SourceValue = rxu::value_type_t<Observable>,
class Throttle = rxo::detail::throttle<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
static auto member(Observable&& o, Coordination&& cn, Duration&& d)
-> decltype(o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
return o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), std::forward<Coordination>(cn)));
}

template<class Observable, class Coordination, class Duration,
class Enabled = rxu::enable_if_all_true_type_t<
is_observable<Observable>,
is_coordination<Coordination>,
rxu::is_duration<Duration>>,
class SourceValue = rxu::value_type_t<Observable>,
class Throttle = rxo::detail::throttle<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
static auto member(Observable&& o, Duration&& d, Coordination&& cn)
-> decltype(o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
return o.template lift<SourceValue>(Throttle(std::forward<Duration>(d), std::forward<Coordination>(cn)));
}

template<class... AN>
static operators::detail::throttle_invalid_t<AN...> member(const AN&...) {
std::terminate();
return {};
static_assert(sizeof...(AN) == 10000, "throttle takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
}
};

}

#endif
1 change: 1 addition & 0 deletions Rx/v2/src/rxcpp/rx-includes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
#include "operators/rx-take_until.hpp"
#include "operators/rx-take_while.hpp"
#include "operators/rx-tap.hpp"
#include "operators/rx-throttle.hpp"
#include "operators/rx-time_interval.hpp"
#include "operators/rx-timeout.hpp"
#include "operators/rx-timestamp.hpp"
Expand Down
11 changes: 11 additions & 0 deletions Rx/v2/src/rxcpp/rx-observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,17 @@ class observable
return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
}

/*! @copydoc rx-throttle.hpp
*/
template<class... AN>
auto throttle(AN&&... an) const
/// \cond SHOW_SERVICE_MEMBERS
-> decltype(observable_member(throttle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
return observable_member(throttle_tag{}, *this, std::forward<AN>(an)...);
}

/*! @copydoc rx-repeat.hpp
*/
template<class... AN>
Expand Down
7 changes: 7 additions & 0 deletions Rx/v2/src/rxcpp/rx-operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,13 @@ struct tap_tag {
};
};

struct throttle_tag {
template<class Included>
struct include_header{
static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-throttle.hpp>");
};
};

struct timeout_tag {
template<class Included>
struct include_header{
Expand Down
1 change: 1 addition & 0 deletions Rx/v2/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/take_until.cpp
${TEST_DIR}/operators/take_while.cpp
${TEST_DIR}/operators/tap.cpp
${TEST_DIR}/operators/throttle.cpp
${TEST_DIR}/operators/time_interval.cpp
${TEST_DIR}/operators/timeout.cpp
${TEST_DIR}/operators/timestamp.cpp
Expand Down
Loading