Skip to content

Commit

Permalink
Merge pull request #64 from klarna-incubator/retry-when-rate-limited
Browse files Browse the repository at this point in the history
Retry when rate-limited
  • Loading branch information
jesperes authored Mar 15, 2023
2 parents 1dec470 + a86ce2b commit e7ca631
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/bec.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@
{env, [ {bitbucket_username, ""}
, {bitbucket_password, ""}
, {bitbucket_url, "http://localhost:8000" }
, {base_sleep_time, 100}
, {cap_sleep_time, 60000}
, {max_retries, 15}
]}
]}.
55 changes: 50 additions & 5 deletions src/bitbucket_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
-type request() :: {url(), [header()], string(), body()}
| {url(), [header()]}.

-type retry_state() :: #{n := non_neg_integer(),
base_sleep_time := pos_integer(),
cap_sleep_time := pos_integer(),
max_retries := pos_integer() | infinity }.

%%==============================================================================
%% DELETE
%%==============================================================================
Expand Down Expand Up @@ -72,7 +77,7 @@ do_request(Method, Url) ->
Headers = headers(),
Request = {Url, Headers},
ok = ?LOG_DEBUG("HTTP Request: (~p) ~p~n", [Method, Url]),
do_http_request(Method, Request).
do_http_request(Method, Request, default_retry_state()).

-spec do_request(method(), url(), body()) -> {ok, map()} | {error, any()}.
do_request(Method, Url, Body) ->
Expand All @@ -81,11 +86,11 @@ do_request(Method, Url, Body) ->
Request = {Url, Headers, Type, Body},
ok = ?LOG_DEBUG("HTTP Request: (~p) ~p~n~p~n",
[Method, Url, Headers]),
do_http_request(Method, Request).
do_http_request(Method, Request, default_retry_state()).

-spec do_http_request(method(), request()) ->
-spec do_http_request(method(), request(), retry_state()) ->
{ok, map()} | {ok, [map()]} | {error, any()}.
do_http_request(Method, Request) ->
do_http_request(Method, Request, RetryState) ->
HTTPOptions = [{autoredirect, true}],
Options = [],
%% Disable pipelining to avoid the socket getting closed during long runs
Expand All @@ -95,7 +100,19 @@ do_http_request(Method, Request) ->
]),
Result = httpc:request(Method, Request, HTTPOptions, Options),
ok = ?LOG_DEBUG("HTTP Result: ~p~n", [Result]),
handle_result(Result).
case handle_result(Result) of
{error, {retry, E}} ->
case should_retry(RetryState) of
{error, max_retries_exceeded} ->
?LOG_ERROR("Request was rate-limited and max retries was exceeded."),
{error, E};
{ok, RetryState0} ->
?LOG_WARNING("Request was rate-limited, retrying...", []),
do_http_request(Method, Request, RetryState0)
end;
Other ->
Other
end.

-spec headers() -> [{string(), string()}].
headers() ->
Expand All @@ -121,6 +138,8 @@ handle_result({ok, {{_Ver, Status, _Phrase}, _H, Body}}) when Status =:= 200;
Status =:= 202;
Status =:= 204 ->
{ok, decode_body(Body)};
handle_result({ok, {{_Ver, Status, _Phrase}, _H, Body}}) when Status =:= 429 ->
{error, {retry, decode_error(Body)}};
handle_result({ok, {{_Version, _Status, _Phrase}, _Headers, Resp}}) ->
{error, decode_error(Resp)};
handle_result({error, Reason}) ->
Expand All @@ -142,3 +161,29 @@ decode_body(Body0) ->
decode_error(Body) ->
Errors = maps:get(<<"errors">>, decode_body(Body)),
[M || #{<<"message">> := M} <- Errors].

-spec default_retry_state() -> retry_state().
default_retry_state() ->
{ok, BaseSleepTime} = application:get_env(bec, base_sleep_time),
{ok, CapSleepTime} = application:get_env(bec, cap_sleep_time),
{ok, MaxRetries} = application:get_env(bec, max_retries),
#{n => 0,
base_sleep_time => BaseSleepTime,
cap_sleep_time => CapSleepTime,
max_retries => MaxRetries}.

-spec should_retry(RetryState :: retry_state()) ->
{ok, retry_state()} | {error, max_retries_exceeded}.
should_retry(#{n := N, max_retries := Max}) when N >= Max ->
{error, max_retries_exceeded};
should_retry(#{ n := N
, base_sleep_time := BaseSleepTime
, cap_sleep_time := CapSleepTime} = RetryState0) ->
Sleep = calculate_sleep_time(N, BaseSleepTime, CapSleepTime),
?LOG_DEBUG("Sleep-time: ~w ms", [Sleep]),
timer:sleep(Sleep),
{ok, RetryState0#{ n => N + 1 }}.

calculate_sleep_time(N, BaseSleepTime, CapSleepTime) ->
Temp = min(CapSleepTime, BaseSleepTime bsl N),
Temp div 2 + rand:uniform(Temp div 2).
43 changes: 43 additions & 0 deletions test/bitbucket_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
, get_wz_branch_reviewers/1
, get_wz_branch_reviewers_when_none_configured/1
, get_wz_branch_reviewers_with_mandatory/1

, test_retry_on_rate_limiting/1
, test_max_retries_exceeded/1
]).

-include_lib("common_test/include/ct.hrl").
Expand All @@ -22,6 +25,7 @@
-define(REPO_SLUG , "a_repo").

init_per_suite(Config) ->
bec_test_utils:init_logging(),
{ok, Started} = application:ensure_all_started(bec),
ok = meck:new(httpc, [unlink]),
[{started, Started}|Config].
Expand Down Expand Up @@ -69,6 +73,31 @@ init_per_testcase(get_wz_branch_reviewers_with_mandatory, Config) ->
{ok, {{"1.1", 200, "OK"}, [], Body}}
end,
ok = meck:expect(httpc, request, Fun),
Config;
init_per_testcase(test_retry_on_rate_limiting, Config) ->
FunRateLimited = fun(_, _, _, _) ->
{ok, {{"1.1", 429, "Rate limited"}, [], rate_limited_body()}}
end,
FunOk = fun(_, _, _, _) ->
{ok, {{"1.1", 200, "OK"}, [], ""}}
end,

NumRateLimits = 4,

ok = meck:expect(httpc, request, 4,
meck:seq(lists:duplicate(NumRateLimits, FunRateLimited) ++ [FunOk])),
Config;
init_per_testcase(test_max_retries_exceeded, Config) ->
FunRateLimited = fun(_, _, _, _) ->
{ok, {{"1.1", 429, "Rate limited"}, [], rate_limited_body()}}
end,
FunOk = fun(_, _, _, _) ->
{ok, {{"1.1", 200, "OK"}, [], ""}}
end,

NumRateLimits = 4,
ok = meck:expect(httpc, request, 4,
meck:seq(lists:duplicate(NumRateLimits, FunRateLimited) ++ [FunOk])),
Config.

end_per_testcase(_TestCase, _Config) ->
Expand All @@ -82,6 +111,8 @@ all() ->
, get_wz_branch_reviewers
, get_wz_branch_reviewers_when_none_configured
, get_wz_branch_reviewers_with_mandatory
, test_retry_on_rate_limiting
, test_max_retries_exceeded
].

get_default_branch(_Config) ->
Expand Down Expand Up @@ -164,3 +195,15 @@ get_wz_branch_reviewers_with_mandatory(_Config) ->
body(Config, TestCase) ->
DataDir = ?config(data_dir, Config),
file:read_file(filename:join([DataDir, atom_to_list(TestCase) ++ ".json"])).

test_retry_on_rate_limiting(_Config) ->
Result = bitbucket:set_default_branch(?PROJECT_KEY, ?REPO_SLUG, "develop"),
?assertEqual(ok, Result).

test_max_retries_exceeded(_Config) ->
application:set_env(bec, max_retries, 2),
Result = bitbucket:set_default_branch(?PROJECT_KEY, ?REPO_SLUG, "develop"),
?assertMatch({error, [<<"Rate limited">>]}, Result).

rate_limited_body() ->
<<"{\"errors\": [{\"message\": \"Rate limited\"}]}">>.

0 comments on commit e7ca631

Please sign in to comment.