diff --git a/Orleans.sln b/Orleans.sln index d5e8786f4a..fdb05321bf 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -242,6 +242,12 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Orleans.Serialization.FShar EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.MessagePack", "src\Orleans.Serialization.MessagePack\Orleans.Serialization.MessagePack.csproj", "{F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ActivationRebalancing", "ActivationRebalancing", "{B0DC8B8D-29CD-4CA3-A874-471F75595829}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ActivationRebalancing.Cluster", "playground\ActivationRebalancing\ActivationRebalancing.Cluster\ActivationRebalancing.Cluster.csproj", "{2D109E60-E9BF-4F57-BBCD-DF5FA7768B00}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ActivationRebalancing.Frontend", "playground\ActivationRebalancing\ActivationRebalancing.Frontend\ActivationRebalancing.Frontend.csproj", "{DFAF9FFC-EBD9-45F0-A121-010D29A296C1}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ChaoticCluster", "ChaoticCluster", "{2579A7F6-EBE8-485A-BB20-A5D19DB5612B}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaoticCluster.AppHost", "playground\ChaoticCluster\ChaoticCluster.AppHost\ChaoticCluster.AppHost.csproj", "{4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02}" @@ -642,6 +648,14 @@ Global {F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Debug|Any CPU.Build.0 = Debug|Any CPU {F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.ActiveCfg = Release|Any CPU {F50F81B6-E9B5-4143-B66B-A1AD913F6E9C}.Release|Any CPU.Build.0 = Release|Any CPU + {2D109E60-E9BF-4F57-BBCD-DF5FA7768B00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2D109E60-E9BF-4F57-BBCD-DF5FA7768B00}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2D109E60-E9BF-4F57-BBCD-DF5FA7768B00}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2D109E60-E9BF-4F57-BBCD-DF5FA7768B00}.Release|Any CPU.Build.0 = Release|Any CPU + {DFAF9FFC-EBD9-45F0-A121-010D29A296C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DFAF9FFC-EBD9-45F0-A121-010D29A296C1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DFAF9FFC-EBD9-45F0-A121-010D29A296C1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DFAF9FFC-EBD9-45F0-A121-010D29A296C1}.Release|Any CPU.Build.0 = Release|Any CPU {4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02}.Debug|Any CPU.Build.0 = Debug|Any CPU {4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -774,6 +788,9 @@ Global {84B44F1D-B7FE-40E3-82F0-730A55AC8613} = {316CDCC7-323F-4264-9FC9-667662BB1F80} {B2D53D3C-E44A-4C9B-AAEE-28FB8C1BDF62} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} {F50F81B6-E9B5-4143-B66B-A1AD913F6E9C} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23} + {B0DC8B8D-29CD-4CA3-A874-471F75595829} = {A41DE3D1-F8AA-4234-BE6F-3C9646A1507A} + {2D109E60-E9BF-4F57-BBCD-DF5FA7768B00} = {B0DC8B8D-29CD-4CA3-A874-471F75595829} + {DFAF9FFC-EBD9-45F0-A121-010D29A296C1} = {B0DC8B8D-29CD-4CA3-A874-471F75595829} {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} = {A41DE3D1-F8AA-4234-BE6F-3C9646A1507A} {4E79EC4B-2DC4-41E3-9AE6-17C1FFF17B02} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} {76A549FA-69F1-4967-82B6-161A8B52C86B} = {2579A7F6-EBE8-485A-BB20-A5D19DB5612B} diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Cluster/ActivationRebalancing.Cluster.csproj b/playground/ActivationRebalancing/ActivationRebalancing.Cluster/ActivationRebalancing.Cluster.csproj new file mode 100644 index 0000000000..2a145fa58d --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Cluster/ActivationRebalancing.Cluster.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + true + + + + + + + \ No newline at end of file diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Cluster/Program.cs b/playground/ActivationRebalancing/ActivationRebalancing.Cluster/Program.cs new file mode 100644 index 0000000000..3d36dc2e62 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Cluster/Program.cs @@ -0,0 +1,170 @@ +using System.Diagnostics; +using System.Net; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Runtime.Placement; + +#nullable enable + +// Ledjon: The silos will run in the same process so they will have the same memory usage. +// I previously had 4 console apps to run the example, but didn't want to add so many proj into the solution. +// I am sure with something like Aspire that would be easier, but for now I'll leave them like this. +// You (the reader) feel free to run this in different processes for a more realistic example. + +var host0 = await StartSiloHost(0); +var host1 = await StartSiloHost(1); +var host2 = await StartSiloHost(2); +var host3 = await StartSiloHost(3); +IHost? host5 = null; + +Console.WriteLine("All silos have started."); + +var grainFactory = host0.Services.GetRequiredService(); +var mgmtGrain = grainFactory.GetGrain(0); + +var silos = await mgmtGrain.GetHosts(onlyActive: true); +Debug.Assert(silos.Count == 4); +var addresses = silos.Select(x => x.Key).ToArray(); + +var tasks = new List(); +RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[0]); +for (var i = 0; i < 300; i++) +{ + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); +} + +RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[1]); +for (var i = 0; i < 30; i++) +{ + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); +} + +RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[2]); +for (var i = 0; i < 410; i++) +{ + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); +} + +RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[3]); +for (var i = 0; i < 120; i++) +{ + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); +} + +var sessionCount = 0; +while (true) +{ + if (sessionCount == 25) + { + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[0]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[1]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + } + + if (sessionCount == 35) + { + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[1]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[2]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + } + + if (sessionCount == 40) + { + host5 = await StartSiloHost(4); + } + + if (sessionCount == 45) + { + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[2]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + + RequestContext.Set(IPlacementDirector.PlacementHintKey, addresses[3]); + for (var i = 0; i < 50; i++) + { + tasks.Add(grainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + } + + await Task.Delay(5000); // session duration + sessionCount++; + + if (sessionCount > 55) + { + break; + } +} + +Console.WriteLine("Simulation has finished. Press Enter to terminate..."); +Console.ReadLine(); + +await host0.StopAsync(); +await host1.StopAsync(); +await host2.StopAsync(); +await host3.StopAsync(); + +if (host5 != null) +{ + await host5.StopAsync(); +} + +static async Task StartSiloHost(int num) +{ + #pragma warning disable ORLEANSEXP002 + var host = Host.CreateDefaultBuilder() + .ConfigureLogging(builder => builder + .AddFilter("", LogLevel.Error) + .AddFilter("Orleans.Runtime.Placement.Rebalancing", LogLevel.Trace) + .AddConsole()) + .UseOrleans(builder => builder + .Configure(o => + { + o.RebalancerDueTime = TimeSpan.FromSeconds(5); + o.SessionCyclePeriod = TimeSpan.FromSeconds(5); + // uncomment these below, if you want higher migration rate + //o.CycleNumberWeight = 1; + //o.SiloNumberWeight = 0; + }) + .UseLocalhostClustering( + siloPort: EndpointOptions.DEFAULT_SILO_PORT + num, + gatewayPort: EndpointOptions.DEFAULT_GATEWAY_PORT + num, + primarySiloEndpoint: new IPEndPoint(IPAddress.Loopback, EndpointOptions.DEFAULT_SILO_PORT)) + .AddActivationRebalancer()) + .Build(); + #pragma warning restore ORLEANSEXP002 + + await host.StartAsync(); + Console.WriteLine($"Silo{num} started."); + + return host; +} + +public interface IRebalancingTestGrain : IGrainWithGuidKey +{ + Task Ping(); +} + +public class RebalancingTestGrain : Grain, IRebalancingTestGrain +{ + public Task Ping() => Task.CompletedTask; +} \ No newline at end of file diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/ActivationRebalancing.Frontend.csproj b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/ActivationRebalancing.Frontend.csproj new file mode 100644 index 0000000000..63b5c91516 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/ActivationRebalancing.Frontend.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + + diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Controllers/StatsController.cs b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Controllers/StatsController.cs new file mode 100644 index 0000000000..7d7261ae0b --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Controllers/StatsController.cs @@ -0,0 +1,36 @@ +using Microsoft.AspNetCore.Mvc; +using Orleans.Runtime; +using Orleans; + +namespace ActivationRebalancing.Frontend.Controllers; + +[ApiController] +[Route("api/[controller]")] +public class StatsController(IClusterClient clusterClient) : ControllerBase +{ + [HttpGet("silos")] + public async Task GetStats() + { + var grainStats = await clusterClient + .GetGrain(0) + .GetDetailedGrainStatistics(); + + var siloData = grainStats.GroupBy(stat => stat.SiloAddress) + .Select(g => new SiloData(g.Key.ToString(), g.Count())) + .ToList(); + + if (siloData.Count == 4) + { + siloData = [.. siloData, new SiloData("x", 0)]; + } + + if (siloData.Count > 5) + { + throw new NotSupportedException("The frontend cant support more than 6 silos"); + } + + return Ok(siloData); + } +} + +public record SiloData(string Host, int Activations); \ No newline at end of file diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Program.cs b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Program.cs new file mode 100644 index 0000000000..2dbec59e39 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Program.cs @@ -0,0 +1,17 @@ +using Orleans.Hosting; + +var builder = WebApplication.CreateBuilder(args); + +builder.UseOrleansClient(clientBuilder => clientBuilder.UseLocalhostClustering()); +builder.Services.AddControllers(); + +var app = builder.Build(); + +var options = new DefaultFilesOptions(); +options.DefaultFileNames.Clear(); +options.DefaultFileNames.Add("index.html"); + +app.UseDefaultFiles(options); +app.UseStaticFiles(); +app.MapControllers(); +app.Run(); diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Properties/launchSettings.json b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Properties/launchSettings.json new file mode 100644 index 0000000000..730bdb6499 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/Properties/launchSettings.json @@ -0,0 +1,14 @@ +{ + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "index.html", + "applicationUrl": "http://localhost:5000", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.Development.json b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.Development.json new file mode 100644 index 0000000000..0c208ae918 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.json b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.json new file mode 100644 index 0000000000..10f68b8c8b --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/index.html b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/index.html new file mode 100644 index 0000000000..6ca0d92a03 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/index.html @@ -0,0 +1,213 @@ + + + + + + Orleans Activation Rebalancing + + + + +

Orleans Activation Rebalancing

+
+
+
+
+
+ + + + diff --git a/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/worker.js b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/worker.js new file mode 100644 index 0000000000..90696044c0 --- /dev/null +++ b/playground/ActivationRebalancing/ActivationRebalancing.Frontend/wwwroot/worker.js @@ -0,0 +1,29 @@ +self.onmessage = function (e) { + try { + const data = e.data; + const totalActivations = data.reduce((sum, d) => sum + d.activations, 0); + const densityMatrix = Array.from({ length: 20 }, () => Array(20).fill('white')); + + data.forEach((d, i) => { + const numCells = Math.round(d.activations / totalActivations * 400); + const color = ['#ff0000', '#00ff00', '#0000ff', '#ffff00', '#ff00ff'][i % 5]; + + let cellsFilled = 0; + const maxFillWhiteCellAttempts = 500; + + for (let attempt = 0; attempt < maxFillWhiteCellAttempts && cellsFilled < numCells; attempt++) { + const x = Math.floor(Math.random() * 20); + const y = Math.floor(Math.random() * 20); + if (densityMatrix[y][x] === 'white') { + densityMatrix[y][x] = color; + cellsFilled++; + } + } + + }); + + postMessage({ densityMatrix }); + } catch (error) { + postMessage({ error: error.message }); + } +}; diff --git a/playground/DashboardToy/DashboardToy.Frontend/wwwroot/index.html b/playground/DashboardToy/DashboardToy.Frontend/wwwroot/index.html index 121803cbe7..1dc88976af 100644 --- a/playground/DashboardToy/DashboardToy.Frontend/wwwroot/index.html +++ b/playground/DashboardToy/DashboardToy.Frontend/wwwroot/index.html @@ -3,7 +3,7 @@ - Orleans Activation Repartitioning + Orleans Activation Rebalancing diff --git a/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancer.cs b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancer.cs new file mode 100644 index 0000000000..49b9257676 --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancer.cs @@ -0,0 +1,37 @@ +using System; +using System.Threading.Tasks; + +namespace Orleans.Placement.Rebalancing; + +/// +/// A gateway to interface with the activation rebalancer. +/// +/// This is available only on the silo. +public interface IActivationRebalancer +{ + /// + /// Returns the rebalancing report. + /// The report can lag behind if you choose a session cycle period less than . + /// + /// If set to returns the most current report. + /// Using incurs an asynchronous operation. + ValueTask GetRebalancingReport(bool force = false); + + /// + Task ResumeRebalancing(); + + /// + Task SuspendRebalancing(TimeSpan? duration = null); + + /// + /// Subscribe to activation rebalancer reports. + /// + /// The component that will be notified. + void SubscribeToReports(IActivationRebalancerReportListener listener); + + /// + /// Unsubscribe from activation rebalancer reports. + /// + /// The already subscribed component. + void UnsubscribeFromReports(IActivationRebalancerReportListener listener); +} diff --git a/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerMonitor.cs b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerMonitor.cs new file mode 100644 index 0000000000..a795c15814 --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerMonitor.cs @@ -0,0 +1,20 @@ +using System; +using System.Threading.Tasks; + +#nullable enable + +namespace Orleans.Placement.Rebalancing; + +[Alias("IActivationRebalancerMonitor")] +internal interface IActivationRebalancerMonitor : ISystemTarget, IActivationRebalancer +{ + /// + /// The period on which the must report back to the monitor. + /// + public static readonly TimeSpan WorkerReportPeriod = TimeSpan.FromSeconds(30); + + /// + /// Invoked periodically by the . + /// + [Alias("Report")] Task Report(RebalancingReport report); +} diff --git a/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerReportListener.cs b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerReportListener.cs new file mode 100644 index 0000000000..13bcbdd419 --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerReportListener.cs @@ -0,0 +1,13 @@ +namespace Orleans.Placement.Rebalancing; + +/// +/// Interface for types which listen to rebalancer status changes. +/// +public interface IActivationRebalancerReportListener +{ + /// + /// Triggered when rebalancer has provided a new . + /// + /// Latest report from the rebalancer. + void OnReport(RebalancingReport report); +} \ No newline at end of file diff --git a/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerWorker.cs b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerWorker.cs new file mode 100644 index 0000000000..68f54a132d --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/IActivationRebalancerWorker.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using Orleans.Concurrency; + +namespace Orleans.Placement.Rebalancing; + +[Alias("IActivationRebalancerWorker")] +internal interface IActivationRebalancerWorker : IGrainWithIntegerKey +{ + /// + /// Returns the most recent rebalancing report. + /// + /// Acts also as a way to wake up the rebalancer, if its deactivated. + [AlwaysInterleave, Alias("GetReport")] ValueTask GetReport(); + + /// + /// Resumes rebalancing if its suspended, otherwise its a no-op. + /// + [Alias("ResumeRebalancing")] Task ResumeRebalancing(); + + /// + /// Suspends rebalancing if its running, otherwise its a no-op. + /// + /// + /// The amount of time to suspend the rebalancer. + /// means suspend indefinitely. + /// + [Alias("SuspendRebalancing")] Task SuspendRebalancing(TimeSpan? duration); +} \ No newline at end of file diff --git a/src/Orleans.Core/Placement/Rebalancing/IFailedSessionBackoffProvider.cs b/src/Orleans.Core/Placement/Rebalancing/IFailedSessionBackoffProvider.cs new file mode 100644 index 0000000000..0dc9e94188 --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/IFailedSessionBackoffProvider.cs @@ -0,0 +1,12 @@ +using Orleans.Internal; + +namespace Orleans.Placement.Rebalancing; + +/// +/// Determines how long to wait between successive rebalancing sessions, if an aprior session has failed. +/// +/// +/// A session is considered "failed" if n-consecutive number of cycles yielded no significant improvement +/// to the cluster's entropy. +/// +public interface IFailedSessionBackoffProvider : IBackoffProvider { } \ No newline at end of file diff --git a/src/Orleans.Core/Placement/Rebalancing/RebalancingReport.cs b/src/Orleans.Core/Placement/Rebalancing/RebalancingReport.cs new file mode 100644 index 0000000000..ff422d34b8 --- /dev/null +++ b/src/Orleans.Core/Placement/Rebalancing/RebalancingReport.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Immutable; +using Orleans.Runtime; + +namespace Orleans.Placement.Rebalancing; + +/// +/// The status of the . +/// +[GenerateSerializer] +public enum RebalancerStatus : byte +{ + /// + /// It is executing. + /// + Executing = 0, + /// + /// It is suspended. + /// + Suspended = 1 +} + +/// +/// A report of the current state of the activation rebalancer. +/// +[GenerateSerializer, Immutable, Alias("RebalancingReport")] +public readonly struct RebalancingReport +{ + /// + /// The silo where the rebalancer is currently located. + /// + [Id(0)] public required SiloAddress Host { get; init; } + + /// + /// The current status of the rebalancer. + /// + [Id(1)] public required RebalancerStatus Status { get; init; } + + /// + /// The amount of time the rebalancer is suspended (if at all). + /// + /// This will always be if is . + [Id(2)] public TimeSpan? SuspensionDuration { get; init; } + + /// + /// The current view of the cluster's imbalance. + /// + /// Range: [0-1] + [Id(3)] public required double ClusterImbalance { get; init; } + + /// + /// Latest rebalancing statistics. + /// + [Id(4)] public required ImmutableArray Statistics { get; init; } +} + +/// +/// Rebalancing statistics for the given . +/// +/// +/// Used for diagnostics / metrics purposes. Note that statistics are an approximation. +[GenerateSerializer, Immutable, Alias("RebalancingStatistics")] +public readonly struct RebalancingStatistics +{ + /// + /// The time these statistics were assembled. + /// + [Id(0)] public required DateTime TimeStamp { get; init; } + + /// + /// The silo to which these statistics belong to. + /// + [Id(1)] public required SiloAddress SiloAddress { get; init; } + + /// + /// The approximate number of activations that have been dispersed from this silo thus far. + /// + [Id(2)] public required ulong DispersedActivations { get; init; } + + /// + /// The approximate number of activations that have been acquired by this silo thus far. + /// + [Id(3)] public required ulong AcquiredActivations { get; init; } +} \ No newline at end of file diff --git a/src/Orleans.Core/Placement/Repartitioning/IActivationRepartitionerSystemTarget.cs b/src/Orleans.Core/Placement/Repartitioning/IActivationRepartitionerSystemTarget.cs index a5964ae495..23cffbbe5a 100644 --- a/src/Orleans.Core/Placement/Repartitioning/IActivationRepartitionerSystemTarget.cs +++ b/src/Orleans.Core/Placement/Repartitioning/IActivationRepartitionerSystemTarget.cs @@ -40,7 +40,7 @@ static IActivationRepartitionerSystemTarget GetReference(IGrainFactory grainFact ValueTask> GetGrainCallFrequencies(); /// - /// For sue in testing only! Flushes buffered messages. + /// For use in testing only! Flushes buffered messages. /// ValueTask FlushBuffers(); } diff --git a/src/Orleans.Core/Runtime/Constants.cs b/src/Orleans.Core/Runtime/Constants.cs index 198908e73a..1d6f2f9153 100644 --- a/src/Orleans.Core/Runtime/Constants.cs +++ b/src/Orleans.Core/Runtime/Constants.cs @@ -26,6 +26,7 @@ internal static class Constants public static readonly GrainType ManifestProviderType = SystemTargetGrainId.CreateGrainType("manifest"); public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator"); public static readonly GrainType ActivationRepartitionerType = SystemTargetGrainId.CreateGrainType("repartitioner"); + public static readonly GrainType ActivationRebalancerMonitorType = SystemTargetGrainId.CreateGrainType("rebalancer-monitor"); public static readonly GrainType GrainDirectoryPartition = SystemTargetGrainId.CreateGrainType("dir.grain.part"); public static readonly GrainType GrainDirectory = SystemTargetGrainId.CreateGrainType("dir.grain"); @@ -55,6 +56,7 @@ internal static class Constants {ManifestProviderType, "ManifestProvider"}, {ActivationMigratorType, "ActivationMigrator"}, {ActivationRepartitionerType, "ActivationRepartitioner"}, + {ActivationRebalancerMonitorType, "ActivationRebalancerMonitor"}, {GrainDirectory, "GrainDirectory"}, }.ToFrozenDictionary(); @@ -62,4 +64,4 @@ internal static class Constants public static bool IsSingletonSystemTarget(GrainType id) => SingletonSystemTargetNames.ContainsKey(id); } } - + diff --git a/src/Orleans.Core/SystemTargetInterfaces/ISiloControl.cs b/src/Orleans.Core/SystemTargetInterfaces/ISiloControl.cs index 007f147b29..d7de973c4b 100644 --- a/src/Orleans.Core/SystemTargetInterfaces/ISiloControl.cs +++ b/src/Orleans.Core/SystemTargetInterfaces/ISiloControl.cs @@ -21,6 +21,7 @@ internal interface ISiloControl : ISystemTarget, IVersionManager Task GetDetailedGrainReport(GrainId grainId); Task GetActivationCount(); + Task MigrateRandomActivations(SiloAddress target, int count); Task SendControlCommandToProvider(string providerName, int command, object arg) where T : IControllable; Task> GetActiveGrains(GrainType grainType); diff --git a/src/Orleans.Runtime/Configuration/Options/ActivationRebalancerOptions.cs b/src/Orleans.Runtime/Configuration/Options/ActivationRebalancerOptions.cs new file mode 100644 index 0000000000..570901e5d0 --- /dev/null +++ b/src/Orleans.Runtime/Configuration/Options/ActivationRebalancerOptions.cs @@ -0,0 +1,195 @@ +using System; +using Microsoft.Extensions.Options; +using Orleans.Runtime; + +namespace Orleans.Configuration; + +/// +/// Options for configuring activation rebalancing. +/// +public sealed class ActivationRebalancerOptions +{ + /// + /// The due time for the rebalancer to start the very first session. + /// + public TimeSpan RebalancerDueTime { get; set; } = DEFAULT_REBALANCER_DUE_TIME; + + /// + /// The default value of . + /// + public static readonly TimeSpan DEFAULT_REBALANCER_DUE_TIME = TimeSpan.FromSeconds(60); + + /// + /// The time between two consecutive rebalancing cycles within a session. + /// + /// It must be greater than 2 x . + public TimeSpan SessionCyclePeriod { get; set; } = DEFAULT_SESSION_CYCLE_PERIOD; + + /// + /// The default value of . + /// + public static readonly TimeSpan DEFAULT_SESSION_CYCLE_PERIOD = TimeSpan.FromSeconds(15); + + /// + /// The maximum, consecutive number of cycles, yielding no significant improvement to the cluster's entropy. + /// + /// This value is inclusive, i.e. if this value is 'n', then the 'n+1' cycle will stop the current rebalancing session. + public int MaxStagnantCycles { get; set; } = DEFAULT_MAX_STAGNANT_CYCLES; + + /// + /// The default value of . + /// + public const int DEFAULT_MAX_STAGNANT_CYCLES = 3; + + /// + /// The minimum change in the entropy of the cluster that is considered an improvement. + /// When a total of n-consecutive stagnant cycles pass, during which the change in entropy is less than + /// the quantum, then the current rebalancing session will stop. The change is a normalized value + /// being relative to the maximum possible entropy. + /// + /// Allowed range: (0-0.1] + public double EntropyQuantum { get; set; } = DEFAULT_ENTROPY_QUANTUM; + + /// + /// The default value of . + /// + public const double DEFAULT_ENTROPY_QUANTUM = 0.0001d; + + /// + /// Represents the allowed entropy deviation between the cluster's current entropy, against the theoretical maximum. + /// Values lower than this are practically considered as "maximum", and the current rebalancing session will stop. + /// This acts as a base rate if is set to . + /// + /// Allowed range is: (0-0.1] + public double AllowedEntropyDeviation { get; set; } = DEFAULT_ALLOWED_ENTROPY_DEVIATION; + + /// + /// The default value of . + /// + public const double DEFAULT_ALLOWED_ENTROPY_DEVIATION = 0.0001d; + + /// + /// Determines whether should be scaled dynamically + /// based on the total number of activations. When set to , the allowed entropy + /// deviation will increase logarithmically after reaching , + /// and will cap at . + /// + /// This is in place because a deviation of say 10 activations has far lesser + /// impact on a total of 100,000 activations than it does for say 1,000 activations. + public bool ScaleAllowedEntropyDeviation { get; set; } = DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION; + + /// + /// The default value of . + /// + public const bool DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION = true; + + /// + /// The maximum value allowed when is . + /// + public const double MAX_SCALED_ENTROPY_DEVIATION = 0.1d; + + /// + /// Determines the number of activations that must be active during any rebalancing cycle, in order for + /// (if, and only if, its ) to begin scaling the . + /// + /// + /// Allowed range: [1000-∞) + /// Values lower than the default are discouraged. + /// + public int ScaledEntropyDeviationActivationThreshold { get; set; } = DEFAULT_SCALED_ENTROPY_DEVIATION_ACTIVATION_THRESHOLD; + + /// + /// The default value of . + /// + public const int DEFAULT_SCALED_ENTROPY_DEVIATION_ACTIVATION_THRESHOLD = 10_000; + + /// + /// Represents the weight that is given to the number of rebalancing cycles that have passed during a rebalancing session. + /// Changing this value has a far greater impact on the migration rate than , and is suitable for controlling the session duration. + /// Pick higher values if you want a faster migration rate. + /// + /// Allowed range: (0-1] + public double CycleNumberWeight { get; set; } = DEFAULT_CYCLE_NUMBER_WEIGHT; + + /// + /// The default value of . + /// + public const double DEFAULT_CYCLE_NUMBER_WEIGHT = 0.1d; + + /// + /// Represents the weight that is given to the number of silos in the cluster during a rebalancing session. + /// Changing this value has a far lesser impact on the migration rate than , and is suitable for fine-tuning. + /// Pick lower values if you want a faster migration rate. + /// + /// Allowed range: [0-1] + public double SiloNumberWeight { get; set; } = DEFAULT_SILO_NUMBER_WEIGHT; + + /// + /// The default value of . + /// + public const double DEFAULT_SILO_NUMBER_WEIGHT = 0.1d; + + /// + /// The maximum allowed number of activations that can be migrated at any given cycle. + /// + public int ActivationMigrationCountLimit { get; set; } = DEFAULT_ACTIVATION_MIGRATION_COUNT_LIMIT; + + /// + /// The default value for . + /// The default is practically no limit. + /// + public const int DEFAULT_ACTIVATION_MIGRATION_COUNT_LIMIT = int.MaxValue; +} + +internal sealed class ActivationRebalancerOptionsValidator( + IOptions options, + IOptions publisherOptions) : IConfigurationValidator +{ + private readonly ActivationRebalancerOptions _options = options.Value; + private readonly DeploymentLoadPublisherOptions _publisherOptions = publisherOptions.Value; + + public void ValidateConfiguration() + { + if (_options.SessionCyclePeriod < 2 * _publisherOptions.DeploymentLoadPublisherRefreshTime) + { + throw new OrleansConfigurationException( + $"{nameof(ActivationRebalancerOptions.SessionCyclePeriod)} must be at least " + + $"{$"2 x {nameof(DeploymentLoadPublisherOptions.DeploymentLoadPublisherRefreshTime)}"}"); + } + + if (_options.MaxStagnantCycles <= 0) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.MaxStagnantCycles)} must be greater than 0"); + } + + if (_options.EntropyQuantum <= 0d || _options.EntropyQuantum > 0.1d) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.EntropyQuantum)} must be in greater than 0, and less or equal 0.1"); + } + + if (_options.AllowedEntropyDeviation <= 0d || _options.AllowedEntropyDeviation > 0.1d) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.AllowedEntropyDeviation)} must be in greater than 0, and less or equal 0.1"); + } + + if (_options.CycleNumberWeight <= 0d || _options.CycleNumberWeight > 1d) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.CycleNumberWeight)} must be in greater than 0, and less or equal to 1"); + } + + if (_options.SiloNumberWeight < 0d || _options.SiloNumberWeight > 1d) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.SiloNumberWeight)} must be in greater than or equal to 0, and less or equal to 1"); + } + + if (_options.ActivationMigrationCountLimit < 1) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.ActivationMigrationCountLimit)} must be greater than 0"); + } + + if (_options.ScaledEntropyDeviationActivationThreshold < 1_000) + { + throw new OrleansConfigurationException($"{nameof(ActivationRebalancerOptions.ScaledEntropyDeviationActivationThreshold)} must be greater than or equal to 1000"); + } + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Core/ManagementGrain.cs b/src/Orleans.Runtime/Core/ManagementGrain.cs index 2a6552ea21..41de5ebb61 100644 --- a/src/Orleans.Runtime/Core/ManagementGrain.cs +++ b/src/Orleans.Runtime/Core/ManagementGrain.cs @@ -384,8 +384,8 @@ public async Task> GetGrainCallFrequencies(SiloAddress[ var results = new List(); foreach (var host in hostsIds) { - var siloBalancer = IActivationRepartitionerSystemTarget.GetReference(internalGrainFactory, host); - var frequencies = await siloBalancer.GetGrainCallFrequencies(); + var siloPartitioner = IActivationRepartitionerSystemTarget.GetReference(internalGrainFactory, host); + var frequencies = await siloPartitioner.GetGrainCallFrequencies(); foreach (var frequency in frequencies) { results.Add(new GrainCallFrequency diff --git a/src/Orleans.Runtime/Core/SystemTarget.cs b/src/Orleans.Runtime/Core/SystemTarget.cs index c06a9b8db0..c84bd40bcd 100644 --- a/src/Orleans.Runtime/Core/SystemTarget.cs +++ b/src/Orleans.Runtime/Core/SystemTarget.cs @@ -157,8 +157,7 @@ internal void HandleResponse(Message response) } /// - /// Registers a timer to send regular callbacks to this grain. - /// This timer will keep the current grain from being deactivated. + /// Registers a timer to send regular callbacks to this system target. /// /// The timer callback, which will fire whenever the timer becomes due. /// The state object passed to the callback. @@ -176,13 +175,64 @@ internal void HandleResponse(Message response) /// public IGrainTimer RegisterTimer(Func callback, object state, TimeSpan dueTime, TimeSpan period) { - var ctxt = RuntimeContext.Current; ArgumentNullException.ThrowIfNull(callback); var timer = this.ActivationServices.GetRequiredService() .RegisterGrainTimer(this, static (state, _) => state.Callback(state.State), (Callback: callback, State: state), new() { DueTime = dueTime, Period = period, Interleave = true }); return timer; } + /// + /// Registers a timer to send regular callbacks to this system target. + /// + /// The timer callback, which will fire whenever the timer becomes due. + /// + /// The amount of time to delay before the is invoked. + /// Specify to prevent the timer from starting. + /// Specify to invoke the callback promptly. + /// + /// + /// The time interval between invocations of . + /// Specify to disable periodic signaling. + /// + /// + /// An object which will cancel the timer upon disposal. + /// + public IGrainTimer RegisterGrainTimer(Func callback, TimeSpan dueTime, TimeSpan period) + { + CheckRuntimeContext(); + ArgumentNullException.ThrowIfNull(callback); + var timer = this.ActivationServices.GetRequiredService() + .RegisterGrainTimer(this, (state, ct) => state(ct), callback, new() { DueTime = dueTime, Period = period, Interleave = true }); + return timer; + } + + /// + /// Registers a timer to send regular callbacks to this grain. + /// This timer will keep the current grain from being deactivated. + /// + /// The timer callback, which will fire whenever the timer becomes due. + /// The state object passed to the callback. + /// + /// The amount of time to delay before the is invoked. + /// Specify to prevent the timer from starting. + /// Specify to invoke the callback promptly. + /// + /// + /// The time interval between invocations of . + /// Specify to disable periodic signaling. + /// + /// + /// An object which will cancel the timer upon disposal. + /// + public IGrainTimer RegisterGrainTimer(Func callback, TState state, TimeSpan dueTime, TimeSpan period) + { + CheckRuntimeContext(); + ArgumentNullException.ThrowIfNull(callback); + var timer = this.ActivationServices.GetRequiredService() + .RegisterGrainTimer(this, callback, state, new() { DueTime = dueTime, Period = period, Interleave = true }); + return timer; + } + /// public sealed override string ToString() => $"{this}"; @@ -332,5 +382,21 @@ private void StopAllTimers() timer.Dispose(); } } + + internal void CheckRuntimeContext() + { + var context = RuntimeContext.Current; + if (context is null) + { + ThrowMissingContext(); + void ThrowMissingContext() => throw new InvalidOperationException($"Access violation: attempted to access context '{this}' from null context."); + } + + if (!ReferenceEquals(context, this)) + { + ThrowAccessViolation(context); + void ThrowAccessViolation(IGrainContext currentContext) => throw new InvalidOperationException($"Access violation: attempt to access context '{this}' from different context, '{currentContext}'."); + } + } } } diff --git a/src/Orleans.Runtime/Hosting/ActivationRebalancerExtensions.cs b/src/Orleans.Runtime/Hosting/ActivationRebalancerExtensions.cs new file mode 100644 index 0000000000..8f4af596ac --- /dev/null +++ b/src/Orleans.Runtime/Hosting/ActivationRebalancerExtensions.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Runtime; +using System.Diagnostics.CodeAnalysis; +using Orleans.Configuration.Internal; +using Orleans.Placement.Rebalancing; +using Orleans.Runtime.Placement.Rebalancing; + +namespace Orleans.Hosting; + +#nullable enable + +/// +/// Extensions for configuring activation rebalancing. +/// +public static class ActivationRebalancerExtensions +{ + /// + /// Enables activation rebalancing for the entire cluster. + /// + /// + /// Activation rebalancing attempts to distribute activations around the cluster in such a + /// way that it optimizes both activation count and memory usages across the silos of the cluster. + /// You can read more on activation rebalancing here + /// + [Experimental("ORLEANSEXP002")] + public static ISiloBuilder AddActivationRebalancer(this ISiloBuilder builder) => + builder.AddActivationRebalancer(); + + /// . + /// Custom backoff provider for determining next session after a failed attempt. + [Experimental("ORLEANSEXP002")] + public static ISiloBuilder AddActivationRebalancer(this ISiloBuilder builder) + where TProvider : class, IFailedSessionBackoffProvider => + builder.ConfigureServices(service => service.AddActivationRebalancer()); + + private static IServiceCollection AddActivationRebalancer(this IServiceCollection services) + where TProvider : class, IFailedSessionBackoffProvider + { + services.AddSingleton(); + services.AddFromExisting(); + services.AddFromExisting, ActivationRebalancerMonitor>(); + services.AddTransient(); + + services.AddSingleton(); + services.AddFromExisting(); + if (typeof(TProvider).IsAssignableTo(typeof(ILifecycleParticipant))) + { + services.AddFromExisting(typeof(ILifecycleParticipant), typeof(TProvider)); + } + + return services; + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index b8b3d7e6de..67f98907cd 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -411,6 +411,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.AddSingleton(); services.AddFromExisting(); services.AddFromExisting, ActivationMigrationManager>(); + services.AddSingleton(); ApplyConfiguration(builder); } diff --git a/src/Orleans.Runtime/Placement/GrainMigratabilityChecker.cs b/src/Orleans.Runtime/Placement/GrainMigratabilityChecker.cs new file mode 100644 index 0000000000..9878a8def2 --- /dev/null +++ b/src/Orleans.Runtime/Placement/GrainMigratabilityChecker.cs @@ -0,0 +1,73 @@ +using Orleans.Metadata; +using System; +using System.Collections.Concurrent; +using System.Collections.Frozen; +using System.Runtime.CompilerServices; + +#nullable enable + +namespace Orleans.Runtime.Placement; + +internal sealed class GrainMigratabilityChecker( + PlacementStrategyResolver strategyResolver, + IClusterManifestProvider clusterManifestProvider, + TimeProvider timeProvider) +{ + private readonly GrainManifest _localManifest = clusterManifestProvider.LocalGrainManifest; + private readonly PlacementStrategyResolver _strategyResolver = strategyResolver; + private readonly TimeProvider _timeProvider = timeProvider; + private readonly ConcurrentDictionary _migratableStatuses = new(); + private FrozenDictionary? _migratableStatusesCache; + private long _lastRegeneratedCacheTimestamp = timeProvider.GetTimestamp(); + + public bool IsMigratable(GrainType grainType) + { + var hash = grainType.GetUniformHashCode(); + if (_migratableStatusesCache is { } cache && cache.TryGetValue(hash, out var isMigratable)) + { + return isMigratable; + } + + return IsMigratableRare(grainType, hash); + + bool IsMigratableRare(GrainType grainType, uint hash) + { + // _migratableStatuses holds statuses for each grain type if its migratable type or not, so we can make fast lookups. + // since we don't anticipate a huge number of grain *types*, i think its just fine to have this in place as fast-check. + if (!_migratableStatuses.TryGetValue(hash, out var isMigratable)) + { + isMigratable = !(grainType.IsClient() || grainType.IsSystemTarget() || grainType.IsGrainService() || IsStatelessWorker(grainType) || IsImmovable(grainType)); + _migratableStatuses.TryAdd(hash, isMigratable); + } + + // Regenerate the cache periodically. + var currentTimestamp = _timeProvider.GetTimestamp(); + if (_timeProvider.GetElapsedTime(_lastRegeneratedCacheTimestamp, currentTimestamp) > TimeSpan.FromSeconds(5)) + { + _migratableStatusesCache = _migratableStatuses.ToFrozenDictionary(); + _lastRegeneratedCacheTimestamp = currentTimestamp; + } + + return isMigratable; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + bool IsStatelessWorker(GrainType grainType) => + _strategyResolver.GetPlacementStrategy(grainType).GetType() == typeof(StatelessWorkerPlacement); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + bool IsImmovable(GrainType grainType) + { + if (_localManifest.Grains.TryGetValue(grainType, out var props)) + { + // If there is no 'Immovable' property, it is not immovable. + // If the value fails to parse, assume it's immovable. + // If the value is true, it's immovable. + return props.Properties.TryGetValue(WellKnownGrainTypeProperties.Immovable, out var value) && (!bool.TryParse(value, out var result) || result); + } + + // Assume unknown grains are immovable. + return true; + } + } +} diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs new file mode 100644 index 0000000000..a1688c03e8 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs @@ -0,0 +1,165 @@ +using System; +using Orleans.Runtime.Placement.Repartitioning; +using System.Threading.Tasks; +using Orleans.Placement.Rebalancing; +using System.Collections.Immutable; +using Microsoft.Extensions.Logging; +using System.Collections.Generic; +using System.Threading; +using Orleans.Runtime.Scheduler; + +#nullable enable + +namespace Orleans.Runtime.Placement.Rebalancing; + +internal sealed partial class ActivationRebalancerMonitor : SystemTarget, IActivationRebalancerMonitor, ILifecycleParticipant +{ + private IGrainTimer? _monitorTimer; + private RebalancingReport _latestReport; + private long _lastHeartbeatTimestamp; + + private readonly TimeProvider _timeProvider; + private readonly ActivationDirectory _activationDirectory; + private readonly ISiloStatusOracle _siloStatusOracle; + private readonly IActivationRebalancerWorker _rebalancerGrain; + private readonly ILogger _logger; + private readonly List _statusListeners = []; + + // Check on the worker with double the period the worker reports to me. + private readonly static TimeSpan TimerPeriod = 2 * IActivationRebalancerMonitor.WorkerReportPeriod; + + public ActivationRebalancerMonitor( + Catalog catalog, + TimeProvider timeProvider, + ActivationDirectory activationDirectory, + ILoggerFactory loggerFactory, + IGrainFactory grainFactory, + ILocalSiloDetails localSiloDetails, + ISiloStatusOracle siloStatusOracle) + : base(Constants.ActivationRebalancerMonitorType, localSiloDetails.SiloAddress, loggerFactory) + { + _timeProvider = timeProvider; + _activationDirectory = activationDirectory; + _siloStatusOracle = siloStatusOracle; + _logger = loggerFactory.CreateLogger(); + _rebalancerGrain = grainFactory.GetGrain(0); + _lastHeartbeatTimestamp = _timeProvider.GetTimestamp(); + + catalog.RegisterSystemTarget(this); + + _latestReport = new() + { + ClusterImbalance = 1, + Host = SiloAddress.Zero, + Status = RebalancerStatus.Suspended, + SuspensionDuration = Timeout.InfiniteTimeSpan, + Statistics = [] + }; + } + + public void Participate(ISiloLifecycle observer) + { + observer.Subscribe( + nameof(ActivationRepartitioner), + ServiceLifecycleStage.Active, + OnStart, + _ => Task.CompletedTask); + + observer.Subscribe( + nameof(ActivationRepartitioner), + ServiceLifecycleStage.ApplicationServices, + _ => Task.CompletedTask, + OnStop); + } + + private async Task OnStart(CancellationToken cancellationToken) + { + await this.RunOrQueueTask(async () => + { + _monitorTimer = RegisterGrainTimer(async ct => + { + var elapsedSinceHeartbeat = _timeProvider.GetElapsedTime(_lastHeartbeatTimestamp); + if (elapsedSinceHeartbeat >= IActivationRebalancerMonitor.WorkerReportPeriod) + { + LogStartingRebalancer(elapsedSinceHeartbeat, IActivationRebalancerMonitor.WorkerReportPeriod); + _latestReport = await _rebalancerGrain.GetReport().AsTask().WaitAsync(ct); + } + + }, TimerPeriod, TimerPeriod); + + _latestReport = await _rebalancerGrain.GetReport().AsTask().WaitAsync(cancellationToken); + }); + } + + private async Task OnStop(CancellationToken cancellationToken) + { + await this.RunOrQueueTask(() => + { + if (_latestReport is { } report && Silo.IsSameLogicalSilo(report.Host)) + { + if (_activationDirectory.FindTarget(_rebalancerGrain.GetGrainId()) is { } activation) + { + LogMigratingRebalancer(Silo); + activation.Migrate(null, cancellationToken); // migrate it anywhere else + } + } + + _monitorTimer?.Dispose(); + return Task.CompletedTask; + }); + } + + public Task ResumeRebalancing() => _rebalancerGrain.ResumeRebalancing(); + public Task SuspendRebalancing(TimeSpan? duration) => _rebalancerGrain.SuspendRebalancing(duration); + + public async ValueTask GetRebalancingReport(bool force = false) + { + if (force) + { + _latestReport = await _rebalancerGrain.GetReport(); + } + + return _latestReport; + } + + public Task Report(RebalancingReport report) + { + _latestReport = report; + _lastHeartbeatTimestamp = _timeProvider.GetTimestamp(); + + foreach (var listener in _statusListeners) + { + try + { + listener.OnReport(report); + } + catch (Exception ex) + { + _logger.LogError(ex, "An unexpected error occurred while notifying rebalancer listener."); + } + } + + return Task.CompletedTask; + } + + public void SubscribeToReports(IActivationRebalancerReportListener listener) + { + if (!_statusListeners.Contains(listener)) + { + _statusListeners.Add(listener); + } + } + + public void UnsubscribeFromReports(IActivationRebalancerReportListener listener) => + _statusListeners.Remove(listener); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "I have not received a report from the activation rebalancer for the last {Duration} which is more than the " + + "allowed interval {Period}. I will now try to wake it up with the assumption that it has has been stopped ungracefully.")] + private partial void LogStartingRebalancer(TimeSpan duration, TimeSpan period); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "My silo '{Silo}' is stopping now, and I am the host of the activation rebalancer. " + + "I will attempt to migrate the rebalancer to another silo.")] + private partial void LogMigratingRebalancer(SiloAddress silo); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.Log.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.Log.cs new file mode 100644 index 0000000000..4797668357 --- /dev/null +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.Log.cs @@ -0,0 +1,70 @@ +using System; +using Microsoft.Extensions.Logging; +using Orleans.Statistics; + +namespace Orleans.Runtime.Placement.Rebalancing; + +internal partial class ActivationRebalancerWorker +{ + [LoggerMessage(Level = LogLevel.Trace, Message = "Activation rebalancer has been scheduled to start after {DueTime}.")] + private partial void LogScheduledToStart(TimeSpan dueTime); + + [LoggerMessage(Level = LogLevel.Trace, Message = "I have started a new rebalancing session.")] + private partial void LogSessionStarted(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "I have stopped my current rebalancing session.")] + private partial void LogSessionStopped(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "I have been told to suspend rebalancing indefinitely.")] + private partial void LogSuspended(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "I have been told to suspend rebalancing for {Duration}.")] + private partial void LogSuspendedFor(TimeSpan duration); + + [LoggerMessage(Level = LogLevel.Trace, Message = "Can not continue with rebalancing because there are less than 2 silos.")] + private partial void LogNotEnoughSilos(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "Can not continue with rebalancing because I have statistics information for less than 2 silos.")] + private partial void LogNotEnoughStatistics(); + + [LoggerMessage(Level = LogLevel.Warning, Message = + "Can not continue with rebalancing because at least one of the silos is reporting 0 memory usage. " + + $"This can indicate that there is no implementation of {nameof(IEnvironmentStatisticsProvider)}.")] + private partial void LogInvalidSiloMemory(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "The current rebalancing session has stopped due to {StagnantCycles} stagnant cycles having passed, which is the maximum allowed.")] + private partial void LogMaxStagnantCyclesReached(int stagnantCycles); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "The current rebalancing session has stopped due to a {EntropyDeviation} " + + "entropy deviation between the current {CurrentEntropy} and maximum possible {MaximumEntropy}. " + + "The difference is less than the required {AllowedEntropyDeviation} deviation.")] + private partial void LogMaxEntropyDeviationReached(double entropyDeviation, double currentEntropy, double maximumEntropy, double allowedEntropyDeviation); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "The relative change in entropy {EntropyChange} is less than the quantum {EntropyQuantum}. " + + "This is practically not considered an improvement, therefore this cycle will be marked as stagnant.")] + private partial void LogInsufficientEntropyQuantum(double entropyChange, double entropyQuantum); + + [LoggerMessage(Level = LogLevel.Trace, Message = "Stagnant cycle count has been reset as we are improving now.")] + private partial void LogStagnantCyclesReset(); + + [LoggerMessage(Level = LogLevel.Trace, Message = "Failed session count has been reset as we are improving now.")] + private partial void LogFailedSessionsReset(); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "I have decided to migrate {Delta} activations.\n" + + "Adjusted activations for {LowSilo} will be [{LowSiloPreActivations} -> {LowSiloPostActivations}].\n" + + "Adjusted activations for {HighSilo} will be [{HighSiloPreActivations} -> {HighSiloPostActivations}].")] + private partial void LogSiloMigrations(int delta, + SiloAddress lowSilo, int lowSiloPreActivations, int lowSiloPostActivations, + SiloAddress highSilo, int highSiloPreActivations, int highSiloPostActivations); + + [LoggerMessage(Level = LogLevel.Trace, Message = + "Rebalancing cycle {RebalancingCycle} has finished. " + + "[ Stagnant Cycles: {StagnantCycles} | Previous Entropy: {PreviousEntropy} | " + + "Current Entropy: {CurrentEntropy} | Maximum Entropy: {MaximumEntropy} | Entropy Deviation: {EntropyDeviation} ]")] + private partial void LogCycleOutcome( + int rebalancingCycle, int stagnantCycles, double previousEntropy, + double currentEntropy, double maximumEntropy, double entropyDeviation); +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.cs new file mode 100644 index 0000000000..32a34e7cfb --- /dev/null +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerWorker.cs @@ -0,0 +1,552 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Diagnostics; +using System.Linq; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Placement; +using Orleans.Placement.Rebalancing; + +#nullable enable + +namespace Orleans.Runtime.Placement.Rebalancing; + +// See: https://www.ledjonbehluli.com/posts/orleans_adaptive_rebalancing/ + +[KeepAlive, Immovable] +internal sealed partial class ActivationRebalancerWorker( + DeploymentLoadPublisher loadPublisher, + ILoggerFactory loggerFactory, + ISiloStatusOracle siloStatusOracle, + IInternalGrainFactory grainFactory, + ILocalSiloDetails localSiloDetails, + IOptions options, + IFailedSessionBackoffProvider backoffProvider) + : Grain, IActivationRebalancerWorker, ISiloStatisticsChangeListener, IGrainMigrationParticipant +{ + private readonly record struct ResourceStatistics(long MemoryUsage, int ActivationCount); + + [GenerateSerializer, Immutable, Alias("RebalancerState")] + internal readonly record struct RebalancerState( + int StagnantCycles, int FailedSessions, + int RebalancingCycle, double LatestEntropy, double EntropyDeviation, + TimeSpan? SuspensionDuration, ImmutableArray Statistics); + + private enum StopReason + { + /// + /// A new session is about to start. + /// + SessionStarting, + /// + /// Current session has stagnated. + /// + SessionStagnated, + /// + /// Current session has completed successfully till end + /// + SessionCompleted, + /// + /// Rebalancer was asked to suspend activity. + /// + RebalancerSuspended + } + + private const string StateKey = "REBALANCER_STATE"; + + private int _stagnantCycles; + private int _failedSessions; + private int _rebalancingCycle; + private double _previousEntropy; + private double _entropyDeviation; + private long _suspendedUntilTs; + private IGrainTimer? _sessionTimer; + private IGrainTimer? _triggerTimer; + private IGrainTimer? _monitorTimer; + + private readonly ActivationRebalancerOptions _options = options.Value; + private readonly Dictionary _siloStatistics = []; + private readonly Dictionary _rebalancingStatistics = []; + private readonly ILogger _logger = loggerFactory.CreateLogger(); + + private TimeSpan? RemainingSuspensionDuration => Runtime.TimeProvider.GetElapsedTime(Runtime.TimeProvider.GetTimestamp(), _suspendedUntilTs) switch + { + { } result when result > TimeSpan.Zero => result, + _ => null + }; + + public override Task OnActivateAsync(CancellationToken cancellationToken) + { + _monitorTimer = this.RegisterGrainTimer(ReportAllMonitors, new() + { + DueTime = TimeSpan.Zero, + Period = IActivationRebalancerMonitor.WorkerReportPeriod, + }); + + _triggerTimer = this.RegisterGrainTimer(TriggerRebalancing, new() + { + Interleave = true, + Period = 0.5 * _options.SessionCyclePeriod, // Make trigger-period half that of the session cycle-period. + DueTime = _options.RebalancerDueTime + }); + + LogScheduledToStart(_options.RebalancerDueTime); + + loadPublisher.SubscribeToStatisticsChangeEvents(this); + + return Task.CompletedTask; + } + + public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) + { + loadPublisher.UnsubscribeStatisticsChangeEvents(this); + return Task.CompletedTask; + } + + public void OnDehydrate(IDehydrationContext context) + { + context.TryAddValue(StateKey, + new(_stagnantCycles, _failedSessions, _rebalancingCycle, + _previousEntropy, _entropyDeviation, RemainingSuspensionDuration, [.. _rebalancingStatistics.Values])); + } + + public void OnRehydrate(IRehydrationContext context) + { + if (context.TryGetValue(StateKey, out var rebalancerState) && + rebalancerState is { } state) + { + _rebalancingCycle = state.RebalancingCycle; + _stagnantCycles = state.StagnantCycles; + _failedSessions = state.FailedSessions; + _previousEntropy = state.LatestEntropy; + _entropyDeviation = state.EntropyDeviation; + + foreach (var statistics in state.Statistics) + { + if (siloStatusOracle.IsDeadSilo(statistics.SiloAddress)) + { + continue; + } + + _rebalancingStatistics.TryAdd(statistics.SiloAddress, statistics); + } + + if (state.SuspensionDuration is { } value) + { + SuspendFor(value); + } + } + } + + void ISiloStatisticsChangeListener.RemoveSilo(SiloAddress silo) + { + GrainContext.Scheduler.QueueAction(() => + { + _siloStatistics.Remove(silo); + _rebalancingStatistics.Remove(silo); // Remove that silo's rebalancing stats, as it has been removed. + }); + } + + void ISiloStatisticsChangeListener.SiloStatisticsChangeNotification(SiloAddress address, SiloRuntimeStatistics statistics) + { + GrainContext.Scheduler.QueueAction(() + => _siloStatistics[address] = new(statistics.EnvironmentStatistics.MemoryUsageBytes, statistics.ActivationCount)); + } + + public ValueTask GetReport() => new(BuildReport()); + + public async Task ResumeRebalancing() + { + StartSession(); + await ReportAllMonitors(CancellationToken.None); + } + + public async Task SuspendRebalancing(TimeSpan? duration) + { + StopSession(StopReason.RebalancerSuspended, duration); + + if (duration.HasValue) + { + LogSuspendedFor(duration.Value); + } + else + { + LogSuspended(); + } + + await ReportAllMonitors(CancellationToken.None); + } + + private async Task ReportAllMonitors(CancellationToken cancellationToken) + { + var tasks = new List(); + var report = BuildReport(); + + foreach (var silo in siloStatusOracle.GetActiveSilos()) + { + tasks.Add(grainFactory.GetSystemTarget + (Constants.ActivationRebalancerMonitorType, silo).Report(report)); + } + + await Task.WhenAll(tasks).WaitAsync(cancellationToken); + } + + private RebalancingReport BuildReport() + { + var suspensionRemaining = RemainingSuspensionDuration; + + return new RebalancingReport() + { + Host = localSiloDetails.SiloAddress, + Status = suspensionRemaining is { } ? RebalancerStatus.Suspended : RebalancerStatus.Executing, + SuspensionDuration = suspensionRemaining, + ClusterImbalance = _entropyDeviation, + Statistics = [.. _rebalancingStatistics.Values] + }; + } + + private Task TriggerRebalancing() + { + if (_sessionTimer != null) + { + return Task.CompletedTask; + } + + if (RemainingSuspensionDuration.HasValue) + { + return Task.CompletedTask; + } + + StartSession(); + return Task.CompletedTask; + } + + private async Task RunRebalancingCycle(CancellationToken cancellationToken) + { + var siloCount = siloStatusOracle.GetActiveSilos().Length; + if (siloCount < 2) + { + LogNotEnoughSilos(); + return; + } + + var snapshot = _siloStatistics.ToDictionary(); + if (snapshot.Count < 2) + { + LogNotEnoughStatistics(); + return; + } + + if (snapshot.Any(x => x.Value.MemoryUsage == 0)) + { + LogInvalidSiloMemory(); + return; + } + + _rebalancingCycle++; + + if (_stagnantCycles >= _options.MaxStagnantCycles) + { + LogMaxStagnantCyclesReached(_stagnantCycles); + StopSession(StopReason.SessionStagnated); + + return; + } + + var totalActivations = snapshot.Sum(x => x.Value.ActivationCount); + var meanMemoryUsage = ComputeHarmonicMean(snapshot.Values); + var maximumEntropy = Math.Log(siloCount); + var currentEntropy = ComputeEntropy(snapshot.Values, totalActivations, meanMemoryUsage); + var allowedDeviation = ComputeAllowedEntropyDeviation(totalActivations); + var entropyDeviation = (maximumEntropy - currentEntropy) / maximumEntropy; + + _entropyDeviation = entropyDeviation; + + if (entropyDeviation < allowedDeviation) + { + // The deviation from maximum is practically considered "0" i.e: we've reached maximum. + LogMaxEntropyDeviationReached(entropyDeviation, currentEntropy, maximumEntropy, allowedDeviation); + StopSession(StopReason.SessionCompleted); + + return; + } + + // We use the normalized, absolute entropy change, because it is more useful for understanding how significant + // the change is, relative to the maximum possible. Values closer to 1 reflect higher significance than those closer to 0. + // Since max entropy is a function of the natural log of the cluster's size, this value is very robust against changes + // in silo number within the cluster. + + var entropyChange = Math.Abs((currentEntropy - _previousEntropy) / maximumEntropy); + Debug.Assert(entropyChange is >= 0 and <= 1); + + if (entropyChange < _options.EntropyQuantum) + { + // Entropy change is too low to be considered an improvement, chances are we are reaching the maximum, or the system + // is dynamically changing too fast i.e. new activations are being created at a high rate with an imbalanced distribution, + // we need to start "cooling-down". As a matter of fact, entropy could also become negative if the current entropy is less + // than the previous, due to many activation changes happening during this and the previous cycle. + + LogInsufficientEntropyQuantum(entropyChange, _options.EntropyQuantum); + + _stagnantCycles++; + _previousEntropy = currentEntropy; + + return; + } + + if (_stagnantCycles > 0) + { + _stagnantCycles = 0; + LogStagnantCyclesReset(); + } + + if (_failedSessions > 0) + { + _failedSessions = 0; + LogFailedSessionsReset(); + } + + var idealDistributions = snapshot.Select(x => new ValueTuple + // n_i = (N / S) * (M_m / m_i) + (x.Key, ((double)totalActivations / siloCount) * (meanMemoryUsage / x.Value.MemoryUsage))) + .ToDictionary(); + + var alpha = currentEntropy / maximumEntropy; + var scalingFactor = ComputeAdaptiveScaling(siloCount, _rebalancingCycle); + var addressPairs = FormSiloPairs(snapshot); + var migrationTasks = new List(); + + Debug.Assert(addressPairs.Count % 2 == 0); + + for (var i = 0; i < addressPairs.Count; i++) + { + (var lowSilo, var highSilo) = addressPairs[i]; + + var difference = Math.Abs( + (snapshot[lowSilo].ActivationCount - idealDistributions[lowSilo]) - + (snapshot[highSilo].ActivationCount - idealDistributions[highSilo])); + + var delta = (int)(alpha * scalingFactor * (difference / 2)); + if (delta == 0) + { + continue; + } + + var lowCount = snapshot[lowSilo].ActivationCount; + var highCount = snapshot[highSilo].ActivationCount; + + if (delta > highCount) + { + delta = highCount; + } + + if (delta > _options.ActivationMigrationCountLimit) + { + delta = _options.ActivationMigrationCountLimit; + } + + migrationTasks.Add(grainFactory + .GetSystemTarget(Constants.SiloControlType, highSilo) + .MigrateRandomActivations(lowSilo, delta)); + + UpdateStatistics(lowSilo, highSilo, delta); + LogSiloMigrations(delta, lowSilo, lowCount, lowCount + delta, highSilo, highCount, highCount - delta); + } + + if (migrationTasks.Count > 0) + { + await Task.WhenAll(migrationTasks).WaitAsync(cancellationToken); + } + + LogCycleOutcome(_rebalancingCycle, _stagnantCycles, _previousEntropy, currentEntropy, maximumEntropy, entropyDeviation); + _previousEntropy = currentEntropy; + } + + private void UpdateStatistics(SiloAddress lowSilo, SiloAddress highSilo, int delta) + { + Debug.Assert(delta > 0); + var now = Runtime.TimeProvider.GetUtcNow().DateTime; + + ref var lowStats = ref CollectionsMarshal.GetValueRefOrAddDefault(_rebalancingStatistics, lowSilo, out _); + lowStats = new() + { + TimeStamp = now, + SiloAddress = lowSilo, + DispersedActivations = lowStats.DispersedActivations, + AcquiredActivations = lowStats.AcquiredActivations + (ulong)delta + }; + + ref var highStats = ref CollectionsMarshal.GetValueRefOrAddDefault(_rebalancingStatistics, highSilo, out _); + highStats = new() + { + TimeStamp = now, + SiloAddress = highSilo, + DispersedActivations = highStats.DispersedActivations + (ulong)delta, + AcquiredActivations = highStats.AcquiredActivations + }; + } + + private static double ComputeEntropy( + Dictionary.ValueCollection values, + int totalActivations, double meanMemoryUsage) + { + Debug.Assert(totalActivations > 0); + Debug.Assert(meanMemoryUsage > 0); + + var ratios = values.Select(x => + // p_i = (n_i / N) * (m_i / M_m) + ((double)x.ActivationCount / totalActivations) * (x.MemoryUsage / meanMemoryUsage)); + + var ratiosSum = ratios.Sum(); + var normalizedRatios = ratios.Select(r => r / ratiosSum); + + const double epsilon = 1e-10d; + + var entropy = -normalizedRatios.Sum(p => + { + var value = Math.Max(p, epsilon); // Avoid log(0) + return value * Math.Log(value); // - sum(p_i * log(p_i)) + }); + + Debug.Assert(entropy > 0); + + return entropy; + } + + private static double ComputeHarmonicMean(Dictionary.ValueCollection values) + { + var result = 0d; + + foreach (var value in values) + { + var count = value.ActivationCount; + Debug.Assert(count > 0); + result += 1.0 / count; + } + + return values.Count / result; + } + + private double ComputeAllowedEntropyDeviation(int totalActivations) + { + if (!_options.ScaleAllowedEntropyDeviation || totalActivations < _options.ScaledEntropyDeviationActivationThreshold) + { + return _options.AllowedEntropyDeviation; + } + + Debug.Assert(totalActivations > 0); + + var logFactor = (int)Math.Log10(totalActivations / _options.ScaledEntropyDeviationActivationThreshold); + var adjustedDeviation = _options.AllowedEntropyDeviation * Math.Pow(10, logFactor); + + return Math.Min(adjustedDeviation, ActivationRebalancerOptions.MAX_SCALED_ENTROPY_DEVIATION); + } + + private double ComputeAdaptiveScaling(int siloCount, int rebalancingCycle) + { + Debug.Assert(rebalancingCycle > 0); + + var cycleFactor = 1 - Math.Exp(-_options.CycleNumberWeight * rebalancingCycle); + var siloFactor = 1 / (1 + _options.SiloNumberWeight * (siloCount - 1)); + + return (double)(cycleFactor * siloFactor); + } + + private static List<(SiloAddress, SiloAddress)> FormSiloPairs( + Dictionary statistics) + { + var pairs = new List<(SiloAddress, SiloAddress)>(); + var sorted = statistics.OrderBy(x => x.Value.ActivationCount).ToList(); + + var left = 0; + var right = sorted.Count - 1; + + while (left < right) + { + pairs.Add((sorted[left].Key, sorted[right].Key)); + + left++; + right--; + } + + return pairs; + } + + private void StartSession() + { + StopSession(StopReason.SessionStarting); + + _sessionTimer = this.RegisterGrainTimer(RunRebalancingCycle, new() + { + DueTime = TimeSpan.Zero, + Period = _options.SessionCyclePeriod + }); + + LogSessionStarted(); + } + + private void StopSession(StopReason reason, TimeSpan? duration = null) + { + _previousEntropy = 0; + _rebalancingCycle = 0; + _stagnantCycles = 0; + _sessionTimer?.Dispose(); + _sessionTimer = null; + + switch (reason) + { + case StopReason.SessionStarting: + { + _failedSessions = 0; + _suspendedUntilTs = 0; + } + break; + case StopReason.SessionStagnated: + { + _failedSessions++; + SuspendFor(backoffProvider.Next(_failedSessions)); + } + break; + case StopReason.SessionCompleted: + { + _failedSessions = 0; + SuspendFor(_options.SessionCyclePeriod); + } + break; + case StopReason.RebalancerSuspended: + { + _failedSessions = 0; + if (duration.HasValue) + { + SuspendFor(duration.Value); + } + else + { + _suspendedUntilTs = long.MaxValue; + } + } + break; + } + + LogSessionStopped(); + } + + private void SuspendFor(TimeSpan duration) + { + ArgumentOutOfRangeException.ThrowIfLessThan(duration, TimeSpan.Zero); + var now = Runtime.TimeProvider.GetTimestamp(); + var suspendUntil = now + (long)(Runtime.TimeProvider.TimestampFrequency * duration.TotalSeconds); + if (suspendUntil < now) + { + // Clamp overflow at max value. + suspendUntil = long.MaxValue; + } + + _suspendedUntilTs = Math.Max(_suspendedUntilTs, suspendUntil); + } +} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Rebalancing/FailedSessionBackoffProvider.cs b/src/Orleans.Runtime/Placement/Rebalancing/FailedSessionBackoffProvider.cs new file mode 100644 index 0000000000..601579879e --- /dev/null +++ b/src/Orleans.Runtime/Placement/Rebalancing/FailedSessionBackoffProvider.cs @@ -0,0 +1,9 @@ +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Internal; +using Orleans.Placement.Rebalancing; + +namespace Orleans.Runtime.Placement.Rebalancing; + +internal sealed class FailedSessionBackoffProvider(IOptions options) + : FixedBackoff(options.Value.SessionCyclePeriod), IFailedSessionBackoffProvider; \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs index c7c73af7b9..53c84dc1ec 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs @@ -1,9 +1,4 @@ #nullable enable -using Orleans.Metadata; -using System; -using System.Collections.Concurrent; -using System.Collections.Frozen; -using System.Runtime.CompilerServices; namespace Orleans.Runtime.Placement.Repartitioning; @@ -12,18 +7,8 @@ internal interface IRepartitionerMessageFilter bool IsAcceptable(Message message, out bool isSenderMigratable, out bool isTargetMigratable); } -internal sealed class RepartitionerMessageFilter( - PlacementStrategyResolver strategyResolver, - IClusterManifestProvider clusterManifestProvider, - TimeProvider timeProvider) : IRepartitionerMessageFilter +internal sealed class RepartitionerMessageFilter(GrainMigratabilityChecker checker) : IRepartitionerMessageFilter { - private readonly GrainManifest _localManifest = clusterManifestProvider.LocalGrainManifest; - private readonly PlacementStrategyResolver _strategyResolver = strategyResolver; - private readonly TimeProvider _timeProvider = timeProvider; - private readonly ConcurrentDictionary _migratableStatuses = new(); - private FrozenDictionary? _migratableStatusesCache; - private long _lastRegeneratedCacheTimestamp = timeProvider.GetTimestamp(); - public bool IsAcceptable(Message message, out bool isSenderMigratable, out bool isTargetMigratable) { isSenderMigratable = false; @@ -37,61 +22,10 @@ public bool IsAcceptable(Message message, out bool isSenderMigratable, out bool return false; } - isSenderMigratable = IsMigratable(message.SendingGrain.Type); - isTargetMigratable = IsMigratable(message.TargetGrain.Type); + isSenderMigratable = checker.IsMigratable(message.SendingGrain.Type); + isTargetMigratable = checker.IsMigratable(message.TargetGrain.Type); // If both are not migratable types we ignore this. But if one of them is not, then we allow passing, as we wish to move grains closer to them, as with any type of grain. return isSenderMigratable || isTargetMigratable; - - bool IsMigratable(GrainType grainType) - { - var hash = grainType.GetUniformHashCode(); - if (_migratableStatusesCache is { } cache && cache.TryGetValue(hash, out var isMigratable)) - { - return isMigratable; - } - - return IsMigratableRare(grainType, hash); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - bool IsStatelessWorker(GrainType grainType) => - _strategyResolver.GetPlacementStrategy(grainType).GetType() == typeof(StatelessWorkerPlacement); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - bool IsImmovable(GrainType grainType) - { - if (_localManifest.Grains.TryGetValue(grainType, out var props)) - { - // If there is no 'Immovable' property, it is not immovable. - // If the value fails to parse, assume it's immovable. - // If the value is true, it's immovable. - return props.Properties.TryGetValue(WellKnownGrainTypeProperties.Immovable, out var value) && (!bool.TryParse(value, out var result) || result); - } - - // Assume unknown grains are immovable. - return true; - } - - bool IsMigratableRare(GrainType grainType, uint hash) - { - // _migratableStatuses holds statuses for each grain type if its migratable type or not, so we can make fast lookups. - // since we don't anticipate a huge number of grain *types*, i think its just fine to have this in place as fast-check. - if (!_migratableStatuses.TryGetValue(hash, out var isMigratable)) - { - isMigratable = !(grainType.IsClient() || grainType.IsSystemTarget() || grainType.IsGrainService() || IsStatelessWorker(grainType) || IsImmovable(grainType)); - _migratableStatuses.TryAdd(hash, isMigratable); - } - - // Regenerate the cache periodically. - var currentTimestamp = _timeProvider.GetTimestamp(); - if (_timeProvider.GetElapsedTime(_lastRegeneratedCacheTimestamp, currentTimestamp) > TimeSpan.FromSeconds(5)) - { - _migratableStatusesCache = _migratableStatuses.ToFrozenDictionary(); - _lastRegeneratedCacheTimestamp = currentTimestamp; - } - - return isMigratable; - } - } } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Silo/SiloControl.cs b/src/Orleans.Runtime/Silo/SiloControl.cs index c8415c8e43..700c57ad03 100644 --- a/src/Orleans.Runtime/Silo/SiloControl.cs +++ b/src/Orleans.Runtime/Silo/SiloControl.cs @@ -12,6 +12,7 @@ using Orleans.Metadata; using Orleans.Providers; using Orleans.Runtime.GrainDirectory; +using Orleans.Runtime.Placement; using Orleans.Runtime.Versions; using Orleans.Runtime.Versions.Compatibility; using Orleans.Runtime.Versions.Selector; @@ -20,7 +21,6 @@ using Orleans.Versions.Compatibility; using Orleans.Versions.Selector; - namespace Orleans.Runtime { internal class SiloControl : SystemTarget, ISiloControl @@ -44,6 +44,7 @@ internal class SiloControl : SystemTarget, ISiloControl private readonly IOptions loadSheddingOptions; private readonly GrainCountStatistics _grainCountStatistics; private readonly GrainPropertiesResolver grainPropertiesResolver; + private readonly GrainMigratabilityChecker _migratabilityChecker; public SiloControl( ILocalSiloDetails localSiloDetails, @@ -61,7 +62,8 @@ public SiloControl( IEnvironmentStatisticsProvider environmentStatisticsProvider, IOptions loadSheddingOptions, GrainCountStatistics grainCountStatistics, - GrainPropertiesResolver grainPropertiesResolver) + GrainPropertiesResolver grainPropertiesResolver, + GrainMigratabilityChecker migratabilityChecker) : base(Constants.SiloControlType, localSiloDetails.SiloAddress, loggerFactory) { this.localSiloDetails = localSiloDetails; @@ -80,6 +82,7 @@ public SiloControl( this.loadSheddingOptions = loadSheddingOptions; _grainCountStatistics = grainCountStatistics; this.grainPropertiesResolver = grainPropertiesResolver; + _migratabilityChecker = migratabilityChecker; } public Task Ping(string message) @@ -156,27 +159,7 @@ public Task>> GetGrainStatistics() public Task> GetDetailedGrainStatistics(string[]? types = null) { if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("GetDetailedGrainStatistics"); - var stats = new List(); - lock (activationDirectory) - { - foreach (var activation in activationDirectory) - { - var data = activation.Value; - if (data == null || data.GrainInstance == null) continue; - - var grainType = RuntimeTypeNameFormatter.Format(data.GrainInstance.GetType()); - if (types == null || types.Contains(grainType)) - { - stats.Add(new DetailedGrainStatistic() - { - GrainType = grainType, - GrainId = data.GrainId, - SiloAddress = Silo - }); - } - } - } - + var stats = GetDetailedGrainStatisticsCore(); return Task.FromResult(stats); } @@ -308,5 +291,61 @@ public Task> GetActiveGrains(GrainType grainType) } return Task.FromResult(results); } + + public Task MigrateRandomActivations(SiloAddress target, int count) + { + ArgumentNullException.ThrowIfNull(target); + ArgumentOutOfRangeException.ThrowIfNegative(count); + var migrationContext = new Dictionary() + { + [IPlacementDirector.PlacementHintKey] = target + }; + + // Loop until we've migrated the desired count of activations or run out of activations to try. + // Note that we have a weak pseudorandom enumeration here, and lossy counting: this is not a precise + // or deterministic operation. + var remainingCount = count; + foreach (var (grainId, grainContext) in activationDirectory) + { + if (!_migratabilityChecker.IsMigratable(grainId.Type)) + { + continue; + } + + if (--remainingCount <= 0) + { + break; + } + + grainContext.Migrate(migrationContext); + } + + return Task.CompletedTask; + } + + private List GetDetailedGrainStatisticsCore(string[]? types = null) + { + var stats = new List(); + lock (activationDirectory) + { + foreach (var activation in activationDirectory) + { + var data = activation.Value; + if (data == null || data.GrainInstance == null) continue; + + var grainType = RuntimeTypeNameFormatter.Format(data.GrainInstance.GetType()); + if (types == null || types.Contains(grainType)) + { + stats.Add(new DetailedGrainStatistic() + { + GrainType = grainType, + GrainId = data.GrainId, + SiloAddress = Silo + }); + } + } + } + return stats; + } } } diff --git a/src/Orleans.TestingHost/TestCluster.cs b/src/Orleans.TestingHost/TestCluster.cs index 7d0a59084b..0ce8a6bb1a 100644 --- a/src/Orleans.TestingHost/TestCluster.cs +++ b/src/Orleans.TestingHost/TestCluster.cs @@ -17,6 +17,7 @@ using Orleans.TestingHost.InMemoryTransport; using Orleans.TestingHost.UnixSocketTransport; using System.Net; +using Orleans.Statistics; namespace Orleans.TestingHost { @@ -142,6 +143,26 @@ public TestCluster( this.CreateSiloAsync = DefaultCreateSiloAsync; } + /// + /// Returns the associated with the given . + /// + /// The silo process to the the service provider for. + /// If is one of the existing silos will be picked randomly. + public IServiceProvider GetSiloServiceProvider(SiloAddress silo = null) + { + if (silo != null) + { + var handle = Silos.FirstOrDefault(x => x.SiloAddress.Equals(silo)); + return handle != null ? ((InProcessSiloHandle)handle).SiloHost.Services : + throw new ArgumentException($"The provided silo address '{silo}' is unknown."); + } + else + { + var index = Random.Shared.Next(Silos.Count); + return ((InProcessSiloHandle)Silos[index]).SiloHost.Services; + } + } + /// /// Deploys the cluster using the specified configuration and starts the client in-process. /// It will start the number of silos defined in . @@ -676,10 +697,20 @@ public async Task DefaultCreateSiloAsync(string siloName, IConfigura default: throw new ArgumentException($"Unsupported {nameof(ConnectionTransportType)}: {transport}"); } + + if (options.UseRealEnvironmentStatistics) + { + var descriptor = siloBuilder.Services.FirstOrDefault(descriptor => descriptor.ServiceType == typeof(IEnvironmentStatisticsProvider)); + if (descriptor != null) + { + siloBuilder.Services.Remove(descriptor); + siloBuilder.Services.AddSingleton(); + } + } }); }); } - + /// /// Start a new silo in the target cluster /// diff --git a/src/Orleans.TestingHost/TestClusterOptions.cs b/src/Orleans.TestingHost/TestClusterOptions.cs index 056339951e..77d21fd980 100644 --- a/src/Orleans.TestingHost/TestClusterOptions.cs +++ b/src/Orleans.TestingHost/TestClusterOptions.cs @@ -42,6 +42,11 @@ public class TestClusterOptions /// if test cluster membership should be used; otherwise, . public bool UseTestClusterMembership { get; set; } + /// + /// Gets or sets a value indicating whether to use the real environment statistics. + /// + public bool UseRealEnvironmentStatistics { get; set; } + /// /// Gets or sets a value indicating whether to initialize the client immediately on deployment. /// @@ -111,6 +116,7 @@ public Dictionary ToDictionary() [nameof(BaseSiloPort)] = this.BaseSiloPort.ToString(), [nameof(BaseGatewayPort)] = this.BaseGatewayPort.ToString(), [nameof(UseTestClusterMembership)] = this.UseTestClusterMembership.ToString(), + [nameof(UseRealEnvironmentStatistics)] = this.UseRealEnvironmentStatistics.ToString(), [nameof(InitializeClientOnDeploy)] = this.InitializeClientOnDeploy.ToString(), [nameof(InitialSilosCount)] = this.InitialSilosCount.ToString(), [nameof(ApplicationBaseDirectory)] = this.ApplicationBaseDirectory, @@ -124,6 +130,8 @@ public Dictionary ToDictionary() { result["Orleans:Clustering:ProviderType"] = "Development"; } + + result["UseRealEnvironmentStatistics"] = UseRealEnvironmentStatistics ? "True" : "False"; if (this.SiloBuilderConfiguratorTypes != null) { diff --git a/test/TesterInternal/ActivationRebalancingTests/ControlRebalancerTests.cs b/test/TesterInternal/ActivationRebalancingTests/ControlRebalancerTests.cs new file mode 100644 index 0000000000..fd175766ef --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/ControlRebalancerTests.cs @@ -0,0 +1,89 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Placement.Rebalancing; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.ActivationRebalancingTests; + +[TestCategory("Functional"), TestCategory("ActivationRebalancing")] +public class ControlRebalancerTests(RebalancerFixture fixture, ITestOutputHelper output) + : RebalancingTestBase(fixture, output), IClassFixture +{ + [Fact] + public async Task Rebalancer_Should_Be_Controllable_And_Report_To_Listeners() + { + var serviceProvider = Cluster.GetSiloServiceProvider(); + var rebalancer = serviceProvider.GetRequiredService(); + var report = await rebalancer.GetRebalancingReport(); + var host = report.Host; + + Assert.Equal(RebalancerStatus.Executing, report.Status); + Assert.Null(report.SuspensionDuration); + Assert.NotEqual(SiloAddress.Zero, host); + + // Publish-Subscribe + var listener = new Listener(); + rebalancer.SubscribeToReports(listener); + Assert.False(listener.Report.HasValue); + + await rebalancer.ResumeRebalancing(); + Assert.True(listener.Report.HasValue); + Assert.Equal(RebalancerStatus.Executing, listener.Report.Value.Status); + Assert.Equal(host, listener.Report.Value.Host); + + await rebalancer.SuspendRebalancing(); + Assert.Equal(RebalancerStatus.Suspended, listener.Report.Value.Status); + Assert.True(listener.Report.Value.SuspensionDuration.HasValue); + Assert.Equal(host, listener.Report.Value.Host); + + rebalancer.UnsubscribeFromReports(listener); + await rebalancer.ResumeRebalancing(); + while (report.Status == RebalancerStatus.Suspended) + { + report = await rebalancer.GetRebalancingReport(true); + await Task.Delay(100); + } + // Its actually resumed, but here its still suspended since we unsubscribed + Assert.Equal(RebalancerStatus.Suspended, listener.Report.Value.Status); + + // Request-Reply + var duration = TimeSpan.FromSeconds(5); + await rebalancer.SuspendRebalancing(duration); // Suspend for some time + while (report.Status == RebalancerStatus.Executing) + { + report = await rebalancer.GetRebalancingReport(true); + await Task.Delay(100); + } + + Assert.True(report.SuspensionDuration.HasValue); + // Must be less than the time it was told to be suspended + Assert.True(report.SuspensionDuration.Value < duration); + Assert.Equal(host, report.Host); + + while (report.Status == RebalancerStatus.Suspended) + { + report = await rebalancer.GetRebalancingReport(true); + await Task.Delay(100); + } + + report = await rebalancer.GetRebalancingReport(true); + Assert.False(report.SuspensionDuration.HasValue); + Assert.Equal(host, report.Host); + + await rebalancer.SuspendRebalancing(); // Suspend indefinitely + while (report.Status == RebalancerStatus.Executing) + { + report = await rebalancer.GetRebalancingReport(true); + await Task.Delay(100); + } + report = await rebalancer.GetRebalancingReport(true); + Assert.True(report.SuspensionDuration.HasValue); + Assert.Equal(host, report.Host); + } + + private class Listener : IActivationRebalancerReportListener + { + public RebalancingReport? Report { get; private set; } + public void OnReport(RebalancingReport report) => Report = report; + } +} \ No newline at end of file diff --git a/test/TesterInternal/ActivationRebalancingTests/DynamicRebalancingTests.cs b/test/TesterInternal/ActivationRebalancingTests/DynamicRebalancingTests.cs new file mode 100644 index 0000000000..9f0da5def6 --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/DynamicRebalancingTests.cs @@ -0,0 +1,117 @@ +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.ActivationRebalancingTests; + +[TestCategory("Functional"), TestCategory("ActivationRebalancing")] +public class DynamicRebalancingTests(RebalancerFixture fixture, ITestOutputHelper output) + : RebalancingTestBase(fixture, output), IClassFixture +{ + [Fact] + public async Task Should_Move_Activations_From_Silo1_And_Silo3_To_Silo2_And_Silo4_While_New_Activations_Are_Created() + { + var tasks = new List(); + + AddTestActivations(tasks, Silo1, 300); + AddTestActivations(tasks, Silo2, 30); + AddTestActivations(tasks, Silo3, 180); + AddTestActivations(tasks, Silo4, 100); + + await Task.WhenAll(tasks); + + var stats = await MgmtGrain.GetDetailedGrainStatistics(); + + var initialSilo1Activations = GetActivationCount(stats, Silo1); + var initialSilo2Activations = GetActivationCount(stats, Silo2); + var initialSilo3Activations = GetActivationCount(stats, Silo3); + var initialSilo4Activations = GetActivationCount(stats, Silo4); + + OutputHelper.WriteLine( + $"Pre-rebalancing activations:\n" + + $"Silo1: {initialSilo1Activations}\n" + + $"Silo2: {initialSilo2Activations}\n" + + $"Silo3: {initialSilo3Activations}\n" + + $"Silo4: {initialSilo4Activations}\n"); + + var silo1Activations = initialSilo1Activations; + var silo2Activations = initialSilo2Activations; + var silo3Activations = initialSilo3Activations; + var silo4Activations = initialSilo4Activations; + + const int extraActivationsSilo1 = 30; + const int extraActivationsSilo2 = 3; + const int extraActivationsSilo3 = 18; + const int extraActivationsSilo4 = 10; + + var index = 0; + var extraRounds = 0; + + while (index < 5) + { + await Task.Delay(RebalancerFixture.SessionCyclePeriod); + + if (index % 2 == 0) + { + tasks.Clear(); + + // add an extra 1/10 of the initial activation count for each silo + AddTestActivations(tasks, Silo1, extraActivationsSilo1); + AddTestActivations(tasks, Silo2, extraActivationsSilo2); + AddTestActivations(tasks, Silo3, extraActivationsSilo3); + AddTestActivations(tasks, Silo4, extraActivationsSilo4); + + await Task.WhenAll(tasks); + + OutputHelper.WriteLine( + $"Added extra activations on cycle {index + 1}:\n" + + $"Silo1: {extraActivationsSilo1}\n" + + $"Silo2: {extraActivationsSilo2}\n" + + $"Silo3: {extraActivationsSilo3}\n" + + $"Silo4: {extraActivationsSilo4}\n"); + + extraRounds++; + } + + stats = await MgmtGrain.GetDetailedGrainStatistics(); + + silo1Activations = GetActivationCount(stats, Silo1); + silo2Activations = GetActivationCount(stats, Silo2); + silo3Activations = GetActivationCount(stats, Silo3); + silo4Activations = GetActivationCount(stats, Silo4); + + index++; + } + + var finalSilo1Activations = initialSilo1Activations + extraRounds * extraActivationsSilo1; + var finalSilo2Activations = initialSilo2Activations + extraRounds * extraActivationsSilo2; + var finalSilo3Activations = initialSilo3Activations + extraRounds * extraActivationsSilo3; + var finalSilo4Activations = initialSilo4Activations + extraRounds * extraActivationsSilo4; + + Assert.True(silo1Activations < finalSilo1Activations, + $"Did not expect Silo1 to have more activations than what it started + added afterwards: " + + $"[{finalSilo1Activations} -> {silo1Activations}]"); + + Assert.True(silo2Activations > finalSilo2Activations, + $"Did not expect Silo2 to have less activations than what it started + added afterwards: " + + $"[{finalSilo2Activations} -> {silo2Activations}]"); + + Assert.True(silo3Activations < finalSilo3Activations, + $"Did not expect Silo3 to have more activations than what it started + added afterwards: " + + $"[{finalSilo3Activations} -> {silo3Activations}]"); + + Assert.True(silo4Activations > finalSilo4Activations, + "Did not expect Silo4 to have less activations than what it started + added afterwards: " + + $"[{finalSilo4Activations} -> {silo4Activations}]"); + + var preVariance = CalculateVariance([finalSilo1Activations, finalSilo2Activations, finalSilo3Activations, finalSilo4Activations]); + var postVariance = CalculateVariance([silo1Activations, silo2Activations, silo3Activations, silo4Activations]); + + OutputHelper.WriteLine( + $"Post-rebalancing activations ({index} cycles):\n" + + $"Silo1: {silo1Activations} | Expected without rebalancing: {finalSilo1Activations}\n" + + $"Silo2: {silo2Activations} | Expected without rebalancing: {finalSilo2Activations}\n" + + $"Silo3: {silo3Activations} | Expected without rebalancing: {finalSilo3Activations}\n" + + $"Silo4: {silo4Activations} | Expected without rebalancing: {finalSilo4Activations}\n" + + $"Variance: {postVariance} | Expected without rebalancing: {preVariance}"); + } +} \ No newline at end of file diff --git a/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs b/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs new file mode 100644 index 0000000000..94e70861d4 --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs @@ -0,0 +1,33 @@ +using TestExtensions; +using Orleans.Configuration; +using Orleans.TestingHost; +using Microsoft.Extensions.DependencyInjection; + +namespace UnitTests.ActivationRebalancingTests; + +public class RebalancerFixture : BaseInProcessTestClusterFixture +{ + public static readonly TimeSpan RebalancerDueTime = TimeSpan.FromSeconds(5); + public static readonly TimeSpan SessionCyclePeriod = TimeSpan.FromSeconds(3); + + protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 4; + builder.Options.UseRealEnvironmentStatistics = true; + builder.ConfigureSilo((options, siloBuilder) +#pragma warning disable ORLEANSEXP002 + => siloBuilder + .Configure(o => + { + o.AssumeHomogenousSilosForTesting = true; + o.ClientGatewayShutdownNotificationTimeout = default; + }) + .Configure(o => + { + o.RebalancerDueTime = RebalancerDueTime; + o.SessionCyclePeriod = SessionCyclePeriod; + }) + .AddActivationRebalancer()); +#pragma warning restore ORLEANSEXP002 + } +} diff --git a/test/TesterInternal/ActivationRebalancingTests/RebalancingOptionsTests.cs b/test/TesterInternal/ActivationRebalancingTests/RebalancingOptionsTests.cs new file mode 100644 index 0000000000..a880398c91 --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/RebalancingOptionsTests.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Xunit; + +namespace UnitTests.ActivationRebalancingTests; + +[TestCategory("Functional"), TestCategory("ActivationRebalancing")] +public class RebalancingOptionsTests +{ + [Fact] + public void ConstantsShouldNotChange() + { + Assert.Equal(TimeSpan.FromSeconds(60), ActivationRebalancerOptions.DEFAULT_REBALANCER_DUE_TIME); + Assert.Equal(TimeSpan.FromSeconds(15), ActivationRebalancerOptions.DEFAULT_SESSION_CYCLE_PERIOD); + Assert.Equal(3, ActivationRebalancerOptions.DEFAULT_MAX_STAGNANT_CYCLES); + Assert.Equal(0.0001d, ActivationRebalancerOptions.DEFAULT_ENTROPY_QUANTUM); + Assert.Equal(0.0001d, ActivationRebalancerOptions.DEFAULT_ALLOWED_ENTROPY_DEVIATION); + Assert.Equal(0.1d, ActivationRebalancerOptions.DEFAULT_CYCLE_NUMBER_WEIGHT); + Assert.Equal(0.1d, ActivationRebalancerOptions.DEFAULT_SILO_NUMBER_WEIGHT); + Assert.Equal(0.1d, ActivationRebalancerOptions.MAX_SCALED_ENTROPY_DEVIATION); + Assert.Equal(10_000, ActivationRebalancerOptions.DEFAULT_SCALED_ENTROPY_DEVIATION_ACTIVATION_THRESHOLD); + Assert.Equal(int.MaxValue, ActivationRebalancerOptions.DEFAULT_ACTIVATION_MIGRATION_COUNT_LIMIT); + Assert.True(ActivationRebalancerOptions.DEFAULT_SCALE_ALLOWED_ENTROPY_DEVIATION); + } + + [Theory] + [InlineData(1000, 1, 0, 0.2, 0.2, 0, -0.1, 0, 999)] + [InlineData(2000, 2, -1, 0.05, 0.05, 0.5, 1.1, 10, 500)] + [InlineData(1000, 1, 2, 0, 0.05, 0.5, 0.5, 10, 999)] + [InlineData(1000, 1, 2, 0.05, 0, 0.5, 0.5, 10, 999)] + [InlineData(1000, 1, 2, 0.05, 0.05, -0.1, 0.5, 10, 999)] + [InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 1.1, 10, 999)] + [InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 0.5, 0, 999)] + [InlineData(1000, 1, 2, 0.05, 0.05, 0.5, 0.5, 10, 999)] + + public void InvalidOptionsShouldThrow( + int sessionCyclePeriodMilliseconds, + int publisherRefreshTimeSeconds, + int maxStagnantCycles, + double entropyQuantum, + double allowedEntropyDeviation, + double cycleNumberWeight, + double siloNumberWeight, + int activationMigrationCountLimit, + int scaledEntropyDeviationActivationThreshold) + { + var publisherOptions = new DeploymentLoadPublisherOptions + { + DeploymentLoadPublisherRefreshTime = TimeSpan.FromSeconds(publisherRefreshTimeSeconds) + }; + + var options = new ActivationRebalancerOptions + { + SessionCyclePeriod = TimeSpan.FromMilliseconds(sessionCyclePeriodMilliseconds), + MaxStagnantCycles = maxStagnantCycles, + EntropyQuantum = entropyQuantum, + AllowedEntropyDeviation = allowedEntropyDeviation, + CycleNumberWeight = cycleNumberWeight, + SiloNumberWeight = siloNumberWeight, + ActivationMigrationCountLimit = activationMigrationCountLimit, + ScaledEntropyDeviationActivationThreshold = scaledEntropyDeviationActivationThreshold + }; + + var validator = new ActivationRebalancerOptionsValidator(Options.Create(options), Options.Create(publisherOptions)); + Assert.Throws(validator.ValidateConfiguration); + } +} diff --git a/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs b/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs new file mode 100644 index 0000000000..71513e954e --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs @@ -0,0 +1,74 @@ +using TestExtensions; +using Xunit.Abstractions; +using Orleans.TestingHost; +using Orleans.Runtime.Placement; + +namespace UnitTests.ActivationRebalancingTests; + +public abstract class RebalancingTestBase + where TFixture : BaseInProcessTestClusterFixture +{ + protected InProcessTestCluster Cluster { get; } + + protected SiloAddress Silo1 { get; } + protected SiloAddress Silo2 { get; } + protected SiloAddress Silo3 { get; } + protected SiloAddress Silo4 { get; } + + internal ITestOutputHelper OutputHelper { get; } + internal IInternalGrainFactory GrainFactory { get; } + internal IManagementGrain MgmtGrain { get; } + + protected RebalancingTestBase(TFixture fixture, ITestOutputHelper output) + { + var silos = fixture.HostedCluster.GetActiveSilos().Select(h => h.SiloAddress).OrderBy(s => s).ToArray(); + + Silo1 = silos[0]; + Silo2 = silos[1]; + Silo3 = silos[2]; + Silo4 = silos[3]; + + Cluster = fixture.HostedCluster; + OutputHelper = output; + GrainFactory = (IInternalGrainFactory)fixture.HostedCluster.Client; + MgmtGrain = GrainFactory.GetGrain(0); + } + + protected static int GetActivationCount(DetailedGrainStatistic[] stats, SiloAddress silo) => + stats.Count(x => x.SiloAddress.Equals(silo)); + + protected void AddTestActivations(List tasks, SiloAddress silo, int count) + { + RequestContext.Set(IPlacementDirector.PlacementHintKey, silo); + for (var i = 0; i < count; i++) + { + tasks.Add(GrainFactory.GetGrain(Guid.NewGuid()).Ping()); + } + } + + protected static int CalculateVariance(int[] values) + { + var mean = values.Average(); + var sumSqrtDiff = values.Select(x => (x - mean) * (x - mean)).Sum(); + var variance = sumSqrtDiff / (values.Length - 1); + + return (int)variance; + } + + public async Task InitializeAsync() + { + await GrainFactory + .GetGrain(0) + .ForceActivationCollection(TimeSpan.Zero); + } +} + +public interface IRebalancingTestGrain : IGrainWithGuidKey +{ + Task Ping(); +} + +public class RebalancingTestGrain : Grain, IRebalancingTestGrain +{ + public Task Ping() => Task.CompletedTask; +} \ No newline at end of file diff --git a/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs b/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs new file mode 100644 index 0000000000..3ec178a79e --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs @@ -0,0 +1,207 @@ +using Microsoft.Extensions.Configuration; +using Orleans.Configuration; +using Orleans.Core.Internal; +using Orleans.Placement.Rebalancing; +using Orleans.Runtime.Placement; +using Orleans.TestingHost; +using TestExtensions; +using Xunit; +using Xunit.Abstractions; +using SPFixture = UnitTests.ActivationRebalancingTests.StatePreservationRebalancingTests.StatePreservationFixture; + +#nullable enable + +namespace UnitTests.ActivationRebalancingTests; + +[TestCategory("Functional"), TestCategory("ActivationRebalancing")] +public class StatePreservationRebalancingTests(SPFixture fixture, ITestOutputHelper output) + : RebalancingTestBase(fixture, output), IClassFixture +{ + private const string ErrorMessage = + "The rebalancer was not found in any of the 4 silos. " + + "Either you have added more silos and not updated this code, " + + "or there is a bug in the rebalancer or monitor"; + + [Fact] + public async Task Should_Migrate_And_Preserve_State_When_Hosting_Silo_Dies() + { + var tasks = new List(); + + // Move the rebalancer to the first secondary silo, since we will stop it later and we cannot stop + // the primary in this test setup. + RequestContext.Set(IPlacementDirector.PlacementHintKey, Cluster.Silos[1].SiloAddress); + await Cluster.Client.GetGrain(0).Cast().MigrateOnIdle(); + RequestContext.Set(IPlacementDirector.PlacementHintKey, null); + + AddTestActivations(tasks, Silo1, 300); + AddTestActivations(tasks, Silo2, 30); + AddTestActivations(tasks, Silo3, 180); + AddTestActivations(tasks, Silo4, 100); + + await Task.WhenAll(tasks); + + var stats = await MgmtGrain.GetDetailedGrainStatistics(); + + var initialSilo1Activations = GetActivationCount(stats, Silo1); + var initialSilo2Activations = GetActivationCount(stats, Silo2); + var initialSilo3Activations = GetActivationCount(stats, Silo3); + var initialSilo4Activations = GetActivationCount(stats, Silo4); + + OutputHelper.WriteLine( + $"Pre-rebalancing activations:\n" + + $"Silo1: {initialSilo1Activations}\n" + + $"Silo2: {initialSilo2Activations}\n" + + $"Silo3: {initialSilo3Activations}\n" + + $"Silo4: {initialSilo4Activations}\n"); + + var silo1Activations = initialSilo1Activations; + var silo2Activations = initialSilo2Activations; + var silo3Activations = initialSilo3Activations; + var silo4Activations = initialSilo4Activations; + + var rebalancerHostNum = 0; + var index = 0; + + while (index < 6) + { + if (index == 3) + { + (var rebalancerHost, rebalancerHostNum) = await FindRebalancerHost(Silo1); + + OutputHelper.WriteLine($"Cycle {index}: Now stopping Silo{rebalancerHostNum}, which is the host of the rebalancer\n"); + + Assert.NotEqual(rebalancerHost, Cluster.Silos[0].SiloAddress); + await Cluster.StopSiloAsync(Cluster.Silos.First(x => x.SiloAddress.Equals(rebalancerHost))); + } + + await Task.Delay(SPFixture.SessionCyclePeriod); + stats = await MgmtGrain.GetDetailedGrainStatistics(); + + silo1Activations = GetActivationCount(stats, Silo1); + silo2Activations = GetActivationCount(stats, Silo2); + silo3Activations = GetActivationCount(stats, Silo3); + silo4Activations = GetActivationCount(stats, Silo4); + + index++; + } + + if (rebalancerHostNum == 1) + { + Assert.True(silo2Activations > initialSilo2Activations, + $"Did not expect Silo2 to have less activations than what it started with: " + + $"[{initialSilo2Activations} -> {silo2Activations}]"); + + Assert.True(silo3Activations < initialSilo3Activations, + $"Did not expect Silo3 to have more activations than what it started with: " + + $"[{initialSilo3Activations} -> {silo3Activations}]"); + } + else if (rebalancerHostNum == 2) + { + Assert.True(silo3Activations < initialSilo3Activations, + $"Did not expect Silo3 to have more activations than what it started with: " + + $"[{initialSilo3Activations} -> {silo3Activations}]"); + + Assert.True(silo4Activations > initialSilo4Activations, + $"Did not expect Silo4 to have less activations than what it started with: " + + $"[{initialSilo4Activations} -> {silo4Activations}]"); + } + else if (rebalancerHostNum == 3) + { + Assert.True(silo1Activations < initialSilo1Activations, + $"Did not expect Silo1 to have more activations than what it started with: " + + $"[{initialSilo1Activations} -> {silo1Activations}]"); + + Assert.True(silo2Activations > initialSilo2Activations, + $"Did not expect Silo2 to have less activations than what it started with: " + + $"[{initialSilo2Activations} -> {silo2Activations}]"); + } + else if (rebalancerHostNum == 4) + { + Assert.True(silo1Activations < initialSilo1Activations, + $"Did not expect Silo1 to have more activations than what it started with: " + + $"[{initialSilo1Activations} -> {silo1Activations}]"); + + Assert.True(silo2Activations > initialSilo2Activations, + $"Did not expect Silo2 to have less activations than what it started with: " + + $"[{initialSilo2Activations} -> {silo2Activations}]"); + } + + OutputHelper.WriteLine( + $"Post-rebalancing activations ({index} cycles):\n" + + $"Silo1: {(rebalancerHostNum == 1 ? "DEAD" : silo1Activations)}\n" + + $"Silo2: {(rebalancerHostNum == 2 ? "DEAD" : silo2Activations)}\n" + + $"Silo3: {(rebalancerHostNum == 3 ? "DEAD" : silo3Activations)}\n" + + $"Silo4: {(rebalancerHostNum == 4 ? "DEAD" : silo4Activations)}\n"); + + (_, rebalancerHostNum) = await FindRebalancerHost(rebalancerHostNum switch + { + 1 => Silo2, + 2 => Silo3, + 3 => Silo4, + 4 => Silo1, + _ => throw new InvalidOperationException(ErrorMessage) + }); + + OutputHelper.WriteLine($"The rebalancer is hosted by Silo{rebalancerHostNum} now"); + } + + private async Task<(SiloAddress, int)> FindRebalancerHost(SiloAddress target) + { + var host = (await GrainFactory + .GetSystemTarget( + Constants.ActivationRebalancerMonitorType, target) + .GetRebalancingReport(true)) + .Host; + + if (host.Equals(Silo1)) + { + return new(host, 1); + } + + if (host.Equals(Silo2)) + { + return new(host, 2); + } + + if (host.Equals(Silo3)) + { + return new(host, 3); + } + + if (host.Equals(Silo4)) + { + return new(host, 4); + } + + Assert.Fail(ErrorMessage); + return new(SiloAddress.Zero, 0); + } + + public class StatePreservationFixture : BaseInProcessTestClusterFixture + { + public static readonly TimeSpan RebalancerDueTime = TimeSpan.FromSeconds(5); + public static readonly TimeSpan SessionCyclePeriod = TimeSpan.FromSeconds(3); + + protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 4; + builder.Options.UseRealEnvironmentStatistics = true; + builder.ConfigureSilo((options, siloBuilder) +#pragma warning disable ORLEANSEXP002 + => siloBuilder + .Configure(o => + { + o.ResponseTimeoutWithDebugger = TimeSpan.FromMinutes(1); + o.AssumeHomogenousSilosForTesting = true; + o.ClientGatewayShutdownNotificationTimeout = default; + }) + .Configure(o => + { + o.RebalancerDueTime = RebalancerDueTime; + o.SessionCyclePeriod = SessionCyclePeriod; + }) + .AddActivationRebalancer()); +#pragma warning restore ORLEANSEXP002 + } + } +} diff --git a/test/TesterInternal/ActivationRebalancingTests/StaticRebalancingTests.cs b/test/TesterInternal/ActivationRebalancingTests/StaticRebalancingTests.cs new file mode 100644 index 0000000000..31095f1961 --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/StaticRebalancingTests.cs @@ -0,0 +1,82 @@ +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.ActivationRebalancingTests; + +[TestCategory("Functional"), TestCategory("ActivationRebalancing")] +public class StaticRebalancingTests(RebalancerFixture fixture, ITestOutputHelper output) + : RebalancingTestBase(fixture, output), IClassFixture +{ + [Fact] + public async Task Should_Move_Activations_From_Silo1_And_Silo3_To_Silo2_And_Silo4() + { + var tasks = new List(); + + AddTestActivations(tasks, Silo1, 300); + AddTestActivations(tasks, Silo2, 30); + AddTestActivations(tasks, Silo3, 180); + AddTestActivations(tasks, Silo4, 100); + + await Task.WhenAll(tasks); + + var stats = await MgmtGrain.GetDetailedGrainStatistics(); + + var initialSilo1Activations = GetActivationCount(stats, Silo1); + var initialSilo2Activations = GetActivationCount(stats, Silo2); + var initialSilo3Activations = GetActivationCount(stats, Silo3); + var initialSilo4Activations = GetActivationCount(stats, Silo4); + + OutputHelper.WriteLine( + $"Pre-rebalancing activations:\n" + + $"Silo1: {initialSilo1Activations}\n" + + $"Silo2: {initialSilo2Activations}\n" + + $"Silo3: {initialSilo3Activations}\n" + + $"Silo4: {initialSilo4Activations}\n"); + + var silo1Activations = initialSilo1Activations; + var silo2Activations = initialSilo2Activations; + var silo3Activations = initialSilo3Activations; + var silo4Activations = initialSilo4Activations; + + var index = 0; + while (index < 3) + { + await Task.Delay(RebalancerFixture.SessionCyclePeriod); + stats = await MgmtGrain.GetDetailedGrainStatistics(); + + silo1Activations = GetActivationCount(stats, Silo1); + silo2Activations = GetActivationCount(stats, Silo2); + silo3Activations = GetActivationCount(stats, Silo3); + silo4Activations = GetActivationCount(stats, Silo4); + + index++; + } + + Assert.True(silo1Activations < initialSilo1Activations, + $"Did not expect Silo1 to have more activations than what it started with: " + + $"[{initialSilo1Activations} -> {silo1Activations}]"); + + Assert.True(silo2Activations > initialSilo2Activations, + $"Did not expect Silo2 to have less activations than what it started with: " + + $"[{initialSilo2Activations} -> {silo2Activations}]"); + + Assert.True(silo3Activations < initialSilo3Activations, + $"Did not expect Silo3 to have more activations than what it started with: " + + $"[{initialSilo3Activations} -> {silo3Activations}]"); + + Assert.True(silo4Activations > initialSilo4Activations, + "Did not expect Silo4 to have less activations than what it started with: " + + $"[{initialSilo4Activations} -> {silo4Activations}]"); + + var preVariance = CalculateVariance([initialSilo1Activations, initialSilo2Activations, initialSilo3Activations, initialSilo4Activations]); + var postVariance = CalculateVariance([silo1Activations, silo2Activations, silo3Activations, silo4Activations]); + + OutputHelper.WriteLine( + $"Post-rebalancing activations ({index} cycles):\n" + + $"Silo1: {silo1Activations}\n" + + $"Silo2: {silo2Activations}\n" + + $"Silo3: {silo3Activations}\n" + + $"Silo4: {silo4Activations}\n" + + $"Variance: {postVariance} | Expected without rebalancing: {preVariance}"); + } +} \ No newline at end of file diff --git a/test/TesterInternal/ActivationRepartitioningTests/TestMessageFilter.cs b/test/TesterInternal/ActivationRepartitioningTests/TestMessageFilter.cs index 19e9205776..fa3206d2fb 100644 --- a/test/TesterInternal/ActivationRepartitioningTests/TestMessageFilter.cs +++ b/test/TesterInternal/ActivationRepartitioningTests/TestMessageFilter.cs @@ -6,12 +6,9 @@ namespace UnitTests.ActivationRepartitioningTests; /// /// Ignores client messages to make testing easier /// -internal sealed class TestMessageFilter( - PlacementStrategyResolver strategyResolver, - IClusterManifestProvider clusterManifestProvider, - TimeProvider timeProvider) : IRepartitionerMessageFilter +internal sealed class TestMessageFilter(GrainMigratabilityChecker checker) : IRepartitionerMessageFilter { - private readonly RepartitionerMessageFilter _messageFilter = new(strategyResolver, clusterManifestProvider, timeProvider); + private readonly RepartitionerMessageFilter _messageFilter = new(checker); public bool IsAcceptable(Message message, out bool isSenderMigratable, out bool isTargetMigratable) => _messageFilter.IsAcceptable(message, out isSenderMigratable, out isTargetMigratable) &&