From 0d9ccf3d1e1df2f6b92506d5d77e19e5efaa6c79 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 23 Sep 2024 20:26:46 -0700 Subject: [PATCH] WIP: add & use in-process test cluster --- src/Orleans.Runtime/Silo/SiloControl.cs | 2 - .../InMemoryTransportConnection.cs | 2 + src/Orleans.TestingHost/InProcTestCluster.cs | 754 ++++++++++++++++++ .../InProcTestClusterBuilder.cs | 153 ++++ .../InProcTestClusterOptions.cs | 92 +++ .../InProcTestSiloSpecificOptions.cs | 55 ++ .../InProcess/InProcessGrainDirectory.cs | 82 ++ .../InProcess/InProcessMembershipTable.cs | 193 +++++ .../InProcessSiloHandle.cs | 9 +- .../BaseInProcessTestClusterFixture.cs | 81 ++ .../RebalancerFixture.cs | 33 + .../RebalancingTestBase.cs | 48 +- .../StatePreservationRebalancingTests.cs | 17 +- 13 files changed, 1464 insertions(+), 57 deletions(-) create mode 100644 src/Orleans.TestingHost/InProcTestCluster.cs create mode 100644 src/Orleans.TestingHost/InProcTestClusterBuilder.cs create mode 100644 src/Orleans.TestingHost/InProcTestClusterOptions.cs create mode 100644 src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs create mode 100644 src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs create mode 100644 src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs create mode 100644 test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs create mode 100644 test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs diff --git a/src/Orleans.Runtime/Silo/SiloControl.cs b/src/Orleans.Runtime/Silo/SiloControl.cs index 7370efb880..8ac3041442 100644 --- a/src/Orleans.Runtime/Silo/SiloControl.cs +++ b/src/Orleans.Runtime/Silo/SiloControl.cs @@ -11,7 +11,6 @@ using Orleans.Providers; using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Placement; -using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Versions; using Orleans.Runtime.Versions.Compatibility; using Orleans.Runtime.Versions.Selector; @@ -20,7 +19,6 @@ using Orleans.Versions.Compatibility; using Orleans.Versions.Selector; - namespace Orleans.Runtime { internal class SiloControl : SystemTarget, ISiloControl diff --git a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs index 01ec045fbf..bf45faeb0d 100644 --- a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs +++ b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs @@ -87,4 +87,6 @@ public override async ValueTask DisposeAsync() _connectionClosedTokenSource.Dispose(); } + + public override string ToString() => $"InMem({LocalEndPoint}<->{RemoteEndPoint})"; } diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs new file mode 100644 index 0000000000..5256724292 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestCluster.cs @@ -0,0 +1,754 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Runtime; +using Orleans.TestingHost.Utils; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Configuration.Memory; +using Orleans.Configuration; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.Hosting; +using Orleans.TestingHost.InMemoryTransport; +using System.Net; +using Orleans.Statistics; +using Orleans.TestingHost.InProcess; +using Orleans.Runtime.Hosting; +using Orleans.GrainDirectory; +using Orleans.Messaging; +using Orleans.Hosting; +using Orleans.Runtime.TestHooks; +using Orleans.Configuration.Internal; +using Orleans.TestingHost.Logging; + +namespace Orleans.TestingHost; + +/// +/// A host class for local testing with Orleans using in-process silos. +/// +public sealed class InProcessTestCluster : IDisposable, IAsyncDisposable +{ + private readonly List _silos = []; + private readonly StringBuilder _log = new(); + private readonly InMemoryTransportConnectionHub _transportHub = new(); + private readonly InProcessGrainDirectory _grainDirectory; + private readonly InProcessMembershipTable _membershipTable; + private bool _disposed; + private int _startedInstances; + + /// + /// Collection of all known silos. + /// + public ReadOnlyCollection Silos + { + get + { + lock (_silos) + { + return new List(_silos).AsReadOnly(); + } + } + } + + /// + /// Options used to configure the test cluster. + /// + /// This is the options you configured your test cluster with, or the default one. + /// If the cluster is being configured via ClusterConfiguration, then this object may not reflect the true settings. + /// + public InProcessTestClusterOptions Options { get; } + + /// + /// The internal client interface. + /// + internal IHost ClientHost { get; private set; } + + /// + /// The internal client interface. + /// + internal IInternalClusterClient InternalClient => ClientHost?.Services.GetRequiredService(); + + /// + /// The client. + /// + public IClusterClient Client => ClientHost?.Services.GetRequiredService(); + + /// + /// The port allocator. + /// + public ITestClusterPortAllocator PortAllocator { get; } + + /// + /// Configures the test cluster plus client in-process. + /// + public InProcessTestCluster( + InProcessTestClusterOptions options, + ITestClusterPortAllocator portAllocator) + { + Options = options; + PortAllocator = portAllocator; + _membershipTable = new(options.ClusterId); + _grainDirectory = new(_membershipTable.GetSiloStatus); + } + + /// + /// Returns the associated with the given . + /// + /// The silo process to the the service provider for. + /// If is one of the existing silos will be picked randomly. + public IServiceProvider GetSiloServiceProvider(SiloAddress silo = null) + { + if (silo != null) + { + var handle = Silos.FirstOrDefault(x => x.SiloAddress.Equals(silo)); + return handle != null ? handle.SiloHost.Services : + throw new ArgumentException($"The provided silo address '{silo}' is unknown."); + } + else + { + var index = Random.Shared.Next(Silos.Count); + return Silos[index].SiloHost.Services; + } + } + + /// + /// Deploys the cluster using the specified configuration and starts the client in-process. + /// + public async Task DeployAsync() + { + if (_silos.Count > 0) throw new InvalidOperationException("Cluster host already deployed."); + + AppDomain.CurrentDomain.UnhandledException += ReportUnobservedException; + + try + { + string startMsg = "----------------------------- STARTING NEW UNIT TEST SILO HOST: " + GetType().FullName + " -------------------------------------"; + WriteLog(startMsg); + await InitializeAsync(); + + if (Options.InitializeClientOnDeploy) + { + await WaitForInitialStabilization(); + } + } + catch (TimeoutException te) + { + FlushLogToConsole(); + throw new TimeoutException("Timeout during test initialization", te); + } + catch (Exception ex) + { + await StopAllSilosAsync(); + + Exception baseExc = ex.GetBaseException(); + FlushLogToConsole(); + + if (baseExc is TimeoutException) + { + throw new TimeoutException("Timeout during test initialization", ex); + } + + // IMPORTANT: + // Do NOT re-throw the original exception here, also not as an internal exception inside AggregateException + // Due to the way MS tests works, if the original exception is an Orleans exception, + // it's assembly might not be loaded yet in this phase of the test. + // As a result, we will get "MSTest: Unit Test Adapter threw exception: Type is not resolved for member XXX" + // and will loose the original exception. This makes debugging tests super hard! + // The root cause has to do with us initializing our tests from Test constructor and not from TestInitialize method. + // More details: http://dobrzanski.net/2010/09/20/mstest-unit-test-adapter-threw-exception-type-is-not-resolved-for-member/ + //throw new Exception( + // string.Format("Exception during test initialization: {0}", + // LogFormatter.PrintException(baseExc))); + throw; + } + } + + private async Task WaitForInitialStabilization() + { + // Poll each silo to check that it knows the expected number of active silos. + // If any silo does not have the expected number of active silos in its cluster membership oracle, try again. + // If the cluster membership has not stabilized after a certain period of time, give up and continue anyway. + var totalWait = Stopwatch.StartNew(); + while (true) + { + var silos = Silos; + var expectedCount = silos.Count; + var remainingSilos = expectedCount; + + foreach (var silo in silos) + { + var hooks = InternalClient.GetTestHooks(silo); + var statuses = await hooks.GetApproximateSiloStatuses(); + var activeCount = statuses.Count(s => s.Value == SiloStatus.Active); + if (activeCount != expectedCount) break; + remainingSilos--; + } + + if (remainingSilos == 0) + { + totalWait.Stop(); + break; + } + + WriteLog($"{remainingSilos} silos do not have a consistent cluster view, waiting until stabilization."); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + if (totalWait.Elapsed < TimeSpan.FromSeconds(60)) + { + WriteLog($"Warning! {remainingSilos} silos do not have a consistent cluster view after {totalWait.ElapsedMilliseconds}ms, continuing without stabilization."); + break; + } + } + } + + /// + /// Get the list of current active silos. + /// + /// List of current silos. + public IEnumerable GetActiveSilos() + { + var additional = new List(); + lock (_silos) + { + additional.AddRange(_silos); + } + + WriteLog("GetActiveSilos: {0} Silos={1}", + additional.Count, Runtime.Utils.EnumerableToString(additional)); + + if (additional.Count > 0) + foreach (var s in additional) + if (s?.IsActive == true) + yield return s; + } + + /// + /// Find the silo handle for the specified silo address. + /// + /// Silo address to be found. + /// SiloHandle of the appropriate silo, or null if not found. + public InProcessSiloHandle GetSiloForAddress(SiloAddress siloAddress) + { + var activeSilos = GetActiveSilos().ToList(); + var ret = activeSilos.Find(s => s.SiloAddress.Equals(siloAddress)); + return ret; + } + + /// + /// Wait for the silo liveness sub-system to detect and act on any recent cluster membership changes. + /// + /// Whether recent membership changes we done by graceful Stop. + public async Task WaitForLivenessToStabilizeAsync(bool didKill = false) + { + var clusterMembershipOptions = Client.ServiceProvider.GetRequiredService>().Value; + TimeSpan stabilizationTime = GetLivenessStabilizationTime(clusterMembershipOptions, didKill); + WriteLog(Environment.NewLine + Environment.NewLine + "WaitForLivenessToStabilize is about to sleep for {0}", stabilizationTime); + await Task.Delay(stabilizationTime); + WriteLog("WaitForLivenessToStabilize is done sleeping"); + } + + /// + /// Get the timeout value to use to wait for the silo liveness sub-system to detect and act on any recent cluster membership changes. + /// + /// + public static TimeSpan GetLivenessStabilizationTime(ClusterMembershipOptions clusterMembershipOptions, bool didKill = false) + { + TimeSpan stabilizationTime = TimeSpan.Zero; + if (didKill) + { + // in case of hard kill (kill and not Stop), we should give silos time to detect failures first. + stabilizationTime = TestingUtils.Multiply(clusterMembershipOptions.ProbeTimeout, clusterMembershipOptions.NumMissedProbesLimit); + } + if (clusterMembershipOptions.UseLivenessGossip) + { + stabilizationTime += TimeSpan.FromSeconds(5); + } + else + { + stabilizationTime += TestingUtils.Multiply(clusterMembershipOptions.TableRefreshTimeout, 2); + } + return stabilizationTime; + } + + /// + /// Start an additional silo, so that it joins the existing cluster. + /// + /// SiloHandle for the newly started silo. + public InProcessSiloHandle StartAdditionalSilo(bool startAdditionalSiloOnNewPort = false) + { + return StartAdditionalSiloAsync(startAdditionalSiloOnNewPort).GetAwaiter().GetResult(); + } + + /// + /// Start an additional silo, so that it joins the existing cluster. + /// + /// SiloHandle for the newly started silo. + public async Task StartAdditionalSiloAsync(bool startAdditionalSiloOnNewPort = false) + { + return (await StartSilosAsync(1, startAdditionalSiloOnNewPort)).Single(); + } + + /// + /// Start a number of additional silo, so that they join the existing cluster. + /// + /// Number of silos to start. + /// + /// List of SiloHandles for the newly started silos. + public async Task> StartSilosAsync(int silosToStart, bool startAdditionalSiloOnNewPort = false) + { + var instances = new List(); + if (silosToStart > 0) + { + var siloStartTasks = Enumerable.Range(_startedInstances, silosToStart) + .Select(instanceNumber => Task.Run(() => StartSiloAsync((short)instanceNumber, Options, startSiloOnNewPort: startAdditionalSiloOnNewPort))).ToArray(); + + try + { + await Task.WhenAll(siloStartTasks); + } + catch (Exception) + { + lock (_silos) + { + _silos.AddRange(siloStartTasks.Where(t => t.Exception == null).Select(t => t.Result)); + } + + throw; + } + + instances.AddRange(siloStartTasks.Select(t => t.Result)); + lock (_silos) + { + _silos.AddRange(instances); + } + } + + return instances; + } + + /// + /// Stop all silos. + /// + public async Task StopSilosAsync() + { + foreach (var instance in _silos.ToList()) + { + await StopSiloAsync(instance); + } + } + + /// + /// Stop cluster client as an asynchronous operation. + /// + /// A representing the asynchronous operation. + public async Task StopClusterClientAsync() + { + var client = ClientHost; + try + { + if (client is not null) + { + await client.StopAsync().ConfigureAwait(false); + } + } + catch (Exception exc) + { + WriteLog("Exception stopping client: {0}", exc); + } + finally + { + await DisposeAsync(client).ConfigureAwait(false); + ClientHost = null; + } + } + + /// + /// Stop all current silos. + /// + public void StopAllSilos() + { + StopAllSilosAsync().GetAwaiter().GetResult(); + } + + /// + /// Stop all current silos. + /// + public async Task StopAllSilosAsync() + { + await StopClusterClientAsync(); + await StopSilosAsync(); + AppDomain.CurrentDomain.UnhandledException -= ReportUnobservedException; + } + + /// + /// Do a semi-graceful Stop of the specified silo. + /// + /// Silo to be stopped. + public async Task StopSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + await StopSiloAsync(instance, true); + lock (_silos) + { + _silos.Remove(instance); + } + } + } + + /// + /// Do an immediate Kill of the specified silo. + /// + /// Silo to be killed. + public async Task KillSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + // do NOT stop, just kill directly, to simulate crash. + await StopSiloAsync(instance, false); + lock (_silos) + { + _silos.Remove(instance); + } + } + } + + /// + /// Performs a hard kill on client. Client will not cleanup resources. + /// + public async Task KillClientAsync() + { + var client = ClientHost; + if (client != null) + { + var cancelled = new CancellationTokenSource(); + cancelled.Cancel(); + try + { + await client.StopAsync(cancelled.Token).ConfigureAwait(false); + } + finally + { + await DisposeAsync(client); + ClientHost = null; + } + } + } + + /// + /// Do a Stop or Kill of the specified silo, followed by a restart. + /// + /// Silo to be restarted. + public async Task RestartSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + var instanceNumber = instance.InstanceNumber; + await StopSiloAsync(instance); + var newInstance = await StartSiloAsync(instanceNumber, Options); + lock (_silos) + { + _silos.Add(newInstance); + } + + return newInstance; + } + + return null; + } + + /// + /// Restart a previously stopped. + /// + /// Silo to be restarted. + public async Task RestartStoppedSecondarySiloAsync(string siloName) + { + if (siloName == null) throw new ArgumentNullException(nameof(siloName)); + var siloHandle = Silos.Single(s => s.Name.Equals(siloName, StringComparison.Ordinal)); + var newInstance = await StartSiloAsync(Silos.IndexOf(siloHandle), Options); + lock (_silos) + { + _silos.Add(newInstance); + } + return newInstance; + } + + /// + /// Initialize the grain client. This should be already done by + /// + public async Task InitializeClientAsync() + { + WriteLog("Initializing Cluster Client"); + + if (ClientHost is not null) + { + await StopClusterClientAsync(); + } + + var hostBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings + { + EnvironmentName = Environments.Development, + ApplicationName = "TestClusterClient", + DisableDefaults = true, + }); + + hostBuilder.UseOrleansClient(clientBuilder => + { + if (Options.UseTestClusterMembership) + { + clientBuilder.Services.AddSingleton(_membershipTable); + } + + clientBuilder.UseInMemoryConnectionTransport(_transportHub); + }); + + TryConfigureFileLogging(Options, hostBuilder.Services, "TestClusterClient"); + + foreach (var hostDelegate in Options.ClientHostConfigurationDelegates) + { + hostDelegate(hostBuilder); + } + + ClientHost = hostBuilder.Build(); + await ClientHost.StartAsync(); + } + + private async Task InitializeAsync() + { + var silosToStart = Options.InitialSilosCount; + + if (silosToStart > 0) + { + await StartSilosAsync(silosToStart); + } + + WriteLog("Done initializing cluster"); + + if (Options.InitializeClientOnDeploy) + { + await InitializeClientAsync(); + } + } + + public async Task CreateSiloAsync(InProcessTestSiloSpecificOptions siloOptions) + { + var host = await Task.Run(async () => + { + var siloName = siloOptions.SiloName; + + var appBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings + { + ApplicationName = siloName, + EnvironmentName = Environments.Development, + DisableDefaults = true + }); + + var services = appBuilder.Services; + TryConfigureFileLogging(Options, services, siloName); + + if (Debugger.IsAttached) + { + // Test is running inside debugger - Make timeout ~= infinite + services.Configure(op => op.ResponseTimeout = TimeSpan.FromMilliseconds(1000000)); + } + + appBuilder.UseOrleans(siloBuilder => + { + siloBuilder.Configure(o => + { + o.SiloName = siloOptions.SiloName; + }); + + siloBuilder.Configure(o => + { + o.AdvertisedIPAddress = IPAddress.Loopback; + o.SiloPort = siloOptions.SiloPort; + o.GatewayPort = siloOptions.GatewayPort; + }); + + siloBuilder.Services + .Configure(options => options.ShutdownTimeout = TimeSpan.FromSeconds(30)); + + if (Options.UseTestClusterMembership) + { + services.AddSingleton(_membershipTable); + siloBuilder.AddGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, (_, _) => _grainDirectory); + } + + siloBuilder.UseInMemoryConnectionTransport(_transportHub); + + services.AddSingleton(); + services.AddSingleton(); + if (!Options.UseRealEnvironmentStatistics) + { + services.AddFromExisting(); + } + }); + + foreach (var hostDelegate in Options.SiloHostConfigurationDelegates) + { + hostDelegate(siloOptions, appBuilder); + } + + var host = appBuilder.Build(); + InitializeTestHooksSystemTarget(host); + await host.StartAsync(); + return host; + }); + + return new InProcessSiloHandle + { + Name = siloOptions.SiloName, + SiloHost = host, + SiloAddress = host.Services.GetRequiredService().SiloAddress, + GatewayAddress = host.Services.GetRequiredService().GatewayAddress, + }; + } + + /// + /// Start a new silo in the target cluster + /// + /// The InProcessTestCluster in which the silo should be deployed + /// The instance number to deploy + /// The options to use. + /// Configuration overrides. + /// Whether we start this silo on a new port, instead of the default one + /// A handle to the silo deployed + public static async Task StartSiloAsync(InProcessTestCluster cluster, int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false) + { + if (cluster == null) throw new ArgumentNullException(nameof(cluster)); + return await cluster.StartSiloAsync(instanceNumber, clusterOptions, configurationOverrides, startSiloOnNewPort); + } + + /// + /// Starts a new silo. + /// + /// The instance number to deploy + /// The options to use. + /// Configuration overrides. + /// Whether we start this silo on a new port, instead of the default one + /// A handle to the deployed silo. + public async Task StartSiloAsync(int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false) + { + var siloOptions = InProcessTestSiloSpecificOptions.Create(this, clusterOptions, instanceNumber, startSiloOnNewPort); + var handle = await CreateSiloAsync(siloOptions); + handle.InstanceNumber = (short)instanceNumber; + Interlocked.Increment(ref _startedInstances); + return handle; + } + + private async Task StopSiloAsync(InProcessSiloHandle instance, bool stopGracefully) + { + try + { + await instance.StopSiloAsync(stopGracefully).ConfigureAwait(false); + } + finally + { + await DisposeAsync(instance).ConfigureAwait(false); + + Interlocked.Decrement(ref _startedInstances); + } + } + + /// + /// Gets the log. + /// + /// The log contents. + public string GetLog() + { + return _log.ToString(); + } + + private void ReportUnobservedException(object sender, UnhandledExceptionEventArgs eventArgs) + { + Exception exception = (Exception)eventArgs.ExceptionObject; + WriteLog("Unobserved exception: {0}", exception); + } + + private void WriteLog(string format, params object[] args) + { + _log.AppendFormat(format + Environment.NewLine, args); + } + + private void FlushLogToConsole() + { + Console.WriteLine(GetLog()); + } + + /// + public async ValueTask DisposeAsync() + { + if (_disposed) + { + return; + } + + await Task.Run(async () => + { + foreach (var handle in Silos) + { + await DisposeAsync(handle).ConfigureAwait(false); + } + + await DisposeAsync(ClientHost).ConfigureAwait(false); + ClientHost = null; + + PortAllocator?.Dispose(); + }); + + _disposed = true; + } + + /// + public void Dispose() + { + if (_disposed) + { + return; + } + + foreach (var handle in Silos) + { + handle.Dispose(); + } + + ClientHost?.Dispose(); + PortAllocator?.Dispose(); + + _disposed = true; + } + + private static async Task DisposeAsync(IDisposable value) + { + if (value is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + else if (value is IDisposable disposable) + { + disposable.Dispose(); + } + + } + private static void TryConfigureFileLogging(InProcessTestClusterOptions options, IServiceCollection services, string name) + { + if (options.ConfigureFileLogging) + { + var fileName = TestingUtils.CreateTraceFileName(name, options.ClusterId); + services.AddLogging(loggingBuilder => loggingBuilder.AddFile(fileName)); + } + } + + private static void InitializeTestHooksSystemTarget(IHost host) + { + var testHook = host.Services.GetRequiredService(); + var catalog = host.Services.GetRequiredService(); + catalog.RegisterSystemTarget(testHook); + } +} diff --git a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs new file mode 100644 index 0000000000..7156fd2d87 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs @@ -0,0 +1,153 @@ +using System; +using System.Globalization; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Orleans.Hosting; +using Orleans.Runtime; + +namespace Orleans.TestingHost; + +/// Configuration builder for starting a . +public sealed class InProcessTestClusterBuilder +{ + /// + /// Initializes a new instance of using the default options. + /// + public InProcessTestClusterBuilder() + : this(2) + { + } + + /// + /// Initializes a new instance of overriding the initial silos count. + /// + /// The number of initial silos to deploy. + public InProcessTestClusterBuilder(short initialSilosCount) + { + Options = new InProcessTestClusterOptions + { + InitialSilosCount = initialSilosCount, + ClusterId = CreateClusterId(), + ServiceId = Guid.NewGuid().ToString("N"), + UseTestClusterMembership = true, + InitializeClientOnDeploy = true, + ConfigureFileLogging = true, + AssumeHomogenousSilosForTesting = true + }; + } + + /// + /// Gets the options. + /// + /// The options. + public InProcessTestClusterOptions Options { get; } + + /// + /// The port allocator. + /// + public ITestClusterPortAllocator PortAllocator { get; } = new TestClusterPortAllocator(); + + /// + /// Adds a delegate for configuring silo and client hosts. + /// + public InProcessTestClusterBuilder ConfigureHost(Action configureDelegate) + { + Options.SiloHostConfigurationDelegates.Add((_, hostBuilder) => configureDelegate(hostBuilder)); + Options.ClientHostConfigurationDelegates.Add(configureDelegate); + return this; + } + + /// + /// Adds a delegate to configure silos. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureSilo(Action configureSiloDelegate) + { + Options.SiloHostConfigurationDelegates.Add((options, hostBuilder) => hostBuilder.UseOrleans(siloBuilder => configureSiloDelegate(options, siloBuilder))); + return this; + } + + /// + /// Adds a delegate to configure silo hosts. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureSiloHost(Action configureSiloHostDelegate) + { + Options.SiloHostConfigurationDelegates.Add(configureSiloHostDelegate); + return this; + } + + /// + /// Adds a delegate to configure clients. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureClient(Action configureClientDelegate) + { + Options.ClientHostConfigurationDelegates.Add(hostBuilder => hostBuilder.UseOrleansClient(clientBuilder => configureClientDelegate(clientBuilder))); + return this; + } + + /// + /// Adds a delegate to configure clients hosts. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureClientHost(Action configureHostDelegate) + { + Options.ClientHostConfigurationDelegates.Add(hostBuilder => configureHostDelegate(hostBuilder)); + return this; + } + + /// + /// Builds this instance. + /// + /// InProcessTestCluster. + public InProcessTestCluster Build() + { + var portAllocator = PortAllocator; + + ConfigureDefaultPorts(); + + var testCluster = new InProcessTestCluster(Options, portAllocator); + return testCluster; + } + + /// + /// Creates a cluster identifier. + /// + /// A new cluster identifier. + public static string CreateClusterId() + { + string prefix = "testcluster-"; + int randomSuffix = Random.Shared.Next(1000); + DateTime now = DateTime.UtcNow; + string DateTimeFormat = @"yyyy-MM-dd\tHH-mm-ss"; + return $"{prefix}{now.ToString(DateTimeFormat, CultureInfo.InvariantCulture)}-{randomSuffix}"; + } + + private void ConfigureDefaultPorts() + { + // Set base ports if none are currently set. + (int baseSiloPort, int baseGatewayPort) = PortAllocator.AllocateConsecutivePortPairs(Options.InitialSilosCount + 3); + if (Options.BaseSiloPort == 0) Options.BaseSiloPort = baseSiloPort; + if (Options.BaseGatewayPort == 0) Options.BaseGatewayPort = baseGatewayPort; + } + + internal class ConfigureStaticClusterDeploymentOptions : IHostConfigurator + { + public void Configure(IHostBuilder hostBuilder) + { + hostBuilder.ConfigureServices((context, services) => + { + var initialSilos = int.Parse(context.Configuration[nameof(InProcessTestClusterOptions.InitialSilosCount)]); + var siloNames = Enumerable.Range(0, initialSilos).Select(GetSiloName).ToList(); + services.Configure(options => options.SiloNames = siloNames); + }); + } + + private static string GetSiloName(int instanceNumber) + { + return instanceNumber == 0 ? Silo.PrimarySiloName : $"Secondary_{instanceNumber}"; + } + } +} \ No newline at end of file diff --git a/src/Orleans.TestingHost/InProcTestClusterOptions.cs b/src/Orleans.TestingHost/InProcTestClusterOptions.cs new file mode 100644 index 0000000000..5f0ae5e509 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestClusterOptions.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Hosting; +using Orleans.Configuration; +using Orleans.Hosting; + +namespace Orleans.TestingHost; + +/// +/// Configuration options for test clusters. +/// +public sealed class InProcessTestClusterOptions +{ + /// + /// Gets or sets the cluster identifier. + /// + /// + /// The cluster identifier. + public string ClusterId { get; set; } + + /// + /// Gets or sets the service identifier. + /// + /// + /// The service identifier. + public string ServiceId { get; set; } + + /// + /// Gets or sets the base silo port, which is the port for the first silo. Other silos will use subsequent ports. + /// + /// The base silo port. + internal int BaseSiloPort { get; set; } + + /// + /// Gets or sets the base gateway port, which is the gateway port for the first silo. Other silos will use subsequent ports. + /// + /// The base gateway port. + internal int BaseGatewayPort { get; set; } + + /// + /// Gets or sets a value indicating whether to use test cluster membership. + /// + /// if test cluster membership should be used; otherwise, . + internal bool UseTestClusterMembership { get; set; } + + /// + /// Gets or sets a value indicating whether to use the real environment statistics. + /// + public bool UseRealEnvironmentStatistics { get; set; } + + /// + /// Gets or sets a value indicating whether to initialize the client immediately on deployment. + /// + /// if the client should be initialized immediately on deployment; otherwise, . + public bool InitializeClientOnDeploy { get; set; } + + /// + /// Gets or sets the initial silos count. + /// + /// The initial silos count. + public short InitialSilosCount { get; set; } + + /// + /// Gets or sets a value indicating whether to configure file logging. + /// + /// if file logging should be configured; otherwise, . + public bool ConfigureFileLogging { get; set; } = true; + + /// + /// Gets or sets a value indicating whether to assume homogeneous silos for testing purposes. + /// + /// if the cluster should assume homogeneous silos; otherwise, . + public bool AssumeHomogenousSilosForTesting { get; set; } + + /// + /// Gets or sets a value indicating whether each silo should host a gateway. + /// + /// if each silo should host a gateway; otherwise, . + public bool GatewayPerSilo { get; set; } = true; + + /// + /// Gets the silo host configuration delegates. + /// + /// The silo host configuration delegates. + public List> SiloHostConfigurationDelegates { get; } = []; + + /// + /// Gets the client host configuration delegates. + /// + /// The client host configuration delegates. + public List> ClientHostConfigurationDelegates { get; } = []; +} diff --git a/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs b/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs new file mode 100644 index 0000000000..c7904fc0bb --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs @@ -0,0 +1,55 @@ +namespace Orleans.TestingHost; + +/// +/// Configuration overrides for individual silos. +/// +public sealed class InProcessTestSiloSpecificOptions +{ + /// + /// Gets or sets the silo port. + /// + /// The silo port. + public int SiloPort { get; set; } + + /// + /// Gets or sets the gateway port. + /// + /// The gateway port. + public int GatewayPort { get; set; } + + /// + /// Gets or sets the name of the silo. + /// + /// The name of the silo. + public string SiloName { get; set; } + + /// + /// Creates an instance of the class. + /// + /// The test cluster. + /// The test cluster options. + /// The instance number. + /// if set to , assign a new port for the silo. + /// The options. + public static InProcessTestSiloSpecificOptions Create(InProcessTestCluster testCluster, InProcessTestClusterOptions testClusterOptions, int instanceNumber, bool assignNewPort = false) + { + var result = new InProcessTestSiloSpecificOptions + { + SiloName = $"Silo_{instanceNumber}", + }; + + if (assignNewPort) + { + var (siloPort, gatewayPort) = testCluster.PortAllocator.AllocateConsecutivePortPairs(1); + result.SiloPort = siloPort; + result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? gatewayPort : 0; + } + else + { + result.SiloPort = testClusterOptions.BaseSiloPort + instanceNumber; + result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? testClusterOptions.BaseGatewayPort + instanceNumber : 0; + } + + return result; + } +} diff --git a/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs b/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs new file mode 100644 index 0000000000..26b7d48797 --- /dev/null +++ b/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs @@ -0,0 +1,82 @@ +#nullable enable +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.GrainDirectory; +using Orleans.Runtime; + +namespace Orleans.TestingHost.InProcess; +internal sealed class InProcessGrainDirectory(Func getSiloStatus) : IGrainDirectory +{ + private readonly ConcurrentDictionary _entries = []; + + public Task Lookup(GrainId grainId) + { + if (_entries.TryGetValue(grainId, out var result) && !IsSiloDead(result)) + { + return Task.FromResult(result); + } + + return Task.FromResult(null); + } + + public Task Register(GrainAddress address, GrainAddress? previousAddress) + { + ArgumentNullException.ThrowIfNull(address); + + var result = _entries.AddOrUpdate( + address.GrainId, + static (grainId, state) => state.Address, + static (grainId, existing, state) => + { + if (existing is null || state.PreviousAddress is { } prev && existing.Matches(prev) || state.Self.IsSiloDead(existing)) + { + return state.Address; + } + + return existing; + }, + (Self: this, Address: address, PreviousAddress: previousAddress)); + + if (result is null || IsSiloDead(result)) + { + return Task.FromResult(null); + } + + return Task.FromResult(result); + } + + public Task Register(GrainAddress address) => Register(address, null); + + public Task Unregister(GrainAddress address) + { + if (!((IDictionary)_entries).Remove(KeyValuePair.Create(address.GrainId, address))) + { + if (_entries.TryGetValue(address.GrainId, out var existing) && (existing.Matches(address) || IsSiloDead(existing))) + { + ((IDictionary)_entries).Remove(KeyValuePair.Create(existing.GrainId, existing)); + } + } + + return Task.CompletedTask; + } + + public Task UnregisterSilos(List siloAddresses) + { + foreach (var entry in _entries) + { + foreach (var silo in siloAddresses) + { + if (silo.Equals(entry.Value.SiloAddress)) + { + ((IDictionary)_entries).Remove(entry); + } + } + } + + return Task.CompletedTask; + } + + private bool IsSiloDead(GrainAddress existing) => existing.SiloAddress is not { } address || getSiloStatus(address) is SiloStatus.Dead or SiloStatus.None; +} diff --git a/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs b/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs new file mode 100644 index 0000000000..bfefa09531 --- /dev/null +++ b/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Orleans.Runtime; +using System.Globalization; +using System.Threading.Tasks; +using Orleans.Messaging; + +namespace Orleans.TestingHost.InProcess; + +/// +/// An in-memory implementation of for testing purposes. +/// +internal sealed class InProcessMembershipTable(string clusterId) : IMembershipTableSystemTarget, IGatewayListProvider +{ + private readonly Table _table = new(); + private readonly string _clusterId = clusterId; + + public TimeSpan MaxStaleness => TimeSpan.Zero; + public bool IsUpdatable => true; + + public Task InitializeMembershipTable(bool tryInitTableVersion) => Task.CompletedTask; + + public Task DeleteMembershipTableEntries(string clusterId) + { + if (string.Equals(_clusterId, clusterId, StringComparison.Ordinal)) + { + _table.Clear(); + } + + return Task.CompletedTask; + } + + public Task ReadRow(SiloAddress key) => Task.FromResult(_table.Read(key)); + + public Task ReadAll() => Task.FromResult(_table.ReadAll()); + + public Task InsertRow(MembershipEntry entry, TableVersion tableVersion) => Task.FromResult(_table.Insert(entry, tableVersion)); + + public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) => Task.FromResult(_table.Update(entry, etag, tableVersion)); + + public Task UpdateIAmAlive(MembershipEntry entry) + { + _table.UpdateIAmAlive(entry); + return Task.CompletedTask; + } + + public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate) + { + _table.CleanupDefunctSiloEntries(beforeDate); + return Task.CompletedTask; + } + + public Task InitializeGatewayListProvider() => Task.CompletedTask; + + public Task> GetGateways() + { + var table = _table.ReadAll(); + var result = table.Members + .Where(x => x.Item1.Status == SiloStatus.Active && x.Item1.ProxyPort != 0) + .Select(x => + { + var entry = x.Item1; + return SiloAddress.New(entry.SiloAddress.Endpoint.Address, entry.ProxyPort, entry.SiloAddress.Generation).ToGatewayUri(); + }).ToList(); + return Task.FromResult>(result); + } + + public SiloStatus GetSiloStatus(SiloAddress address) => _table.GetSiloStatus(address); + + private sealed class Table + { + private readonly object _lock = new(); + private readonly Dictionary _table = []; + private TableVersion _tableVersion; + private long _lastETagCounter; + + public Table() + { + _tableVersion = new TableVersion(0, NewETag()); + } + public SiloStatus GetSiloStatus(SiloAddress key) + { + lock (_lock) + { + return _table.TryGetValue(key, out var data) ? data.Entry.Status : SiloStatus.None; + } + } + + public MembershipTableData Read(SiloAddress key) + { + lock (_lock) + { + return _table.TryGetValue(key, out var data) ? + new MembershipTableData(Tuple.Create(data.Entry.Copy(), data.ETag), _tableVersion) + : new MembershipTableData(_tableVersion); + } + } + + public MembershipTableData ReadAll() + { + lock (_lock) + { + return new MembershipTableData(_table.Values.Select(data => Tuple.Create(data.Entry.Copy(), data.ETag)).ToList(), _tableVersion); + } + } + + public TableVersion ReadTableVersion() => _tableVersion; + + public bool Insert(MembershipEntry entry, TableVersion version) + { + lock (_lock) + { + if (_table.TryGetValue(entry.SiloAddress, out var data)) + { + return false; + } + + if (!_tableVersion.VersionEtag.Equals(version.VersionEtag)) + { + return false; + } + + _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture)); + _tableVersion = new TableVersion(version.Version, NewETag()); + return true; + } + } + + public bool Update(MembershipEntry entry, string etag, TableVersion version) + { + lock (_lock) + { + if (!_table.TryGetValue(entry.SiloAddress, out var data)) + { + return false; + } + + if (!data.ETag.Equals(etag) || !_tableVersion.VersionEtag.Equals(version.VersionEtag)) + { + return false; + } + + _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture)); + _tableVersion = new TableVersion(version.Version, NewETag()); + return true; + } + } + + public void UpdateIAmAlive(MembershipEntry entry) + { + lock (_lock) + { + if (!_table.TryGetValue(entry.SiloAddress, out var data)) + { + return; + } + + data.Entry.IAmAliveTime = entry.IAmAliveTime; + _table[entry.SiloAddress] = (data.Entry, NewETag()); + } + } + + public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate) + { + lock (_lock) + { + var entries = _table.Values.ToList(); + foreach (var (entry, _) in entries) + { + if (entry.Status == SiloStatus.Dead + && new DateTime(Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks), DateTimeKind.Utc) < beforeDate) + { + _table.Remove(entry.SiloAddress, out _); + continue; + } + } + } + } + + internal void Clear() + { + lock (_lock) + { + _table.Clear(); + } + } + + public override string ToString() => $"Table = {ReadAll()}, ETagCounter={_lastETagCounter}"; + + private string NewETag() => _lastETagCounter++.ToString(CultureInfo.InvariantCulture); + } +} diff --git a/src/Orleans.TestingHost/InProcessSiloHandle.cs b/src/Orleans.TestingHost/InProcessSiloHandle.cs index 7712db2cbd..0191f24034 100644 --- a/src/Orleans.TestingHost/InProcessSiloHandle.cs +++ b/src/Orleans.TestingHost/InProcessSiloHandle.cs @@ -16,7 +16,12 @@ public class InProcessSiloHandle : SiloHandle private bool isActive = true; /// Gets a reference to the silo host. - public IHost SiloHost { get; private set; } + public IHost SiloHost { get; init; } + + /// + /// Gets the silo's service provider. + /// + public IServiceProvider ServiceProvider => SiloHost.Services; /// public override bool IsActive => isActive; @@ -28,7 +33,7 @@ public class InProcessSiloHandle : SiloHandle /// The configuration. /// An optional delegate which is invoked just prior to building the host builder. /// The silo handle. - public static async Task CreateAsync( + public static async Task CreateAsync( string siloName, IConfiguration configuration, Action postConfigureHostBuilder = null) diff --git a/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs b/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs new file mode 100644 index 0000000000..78c9b623b4 --- /dev/null +++ b/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs @@ -0,0 +1,81 @@ +using System.Runtime.ExceptionServices; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.TestingHost; + +namespace TestExtensions; + +public abstract class BaseInProcessTestClusterFixture : Xunit.IAsyncLifetime +{ + private readonly ExceptionDispatchInfo preconditionsException; + + static BaseInProcessTestClusterFixture() + { + TestDefaultConfiguration.InitializeDefaults(); + } + + protected BaseInProcessTestClusterFixture() + { + try + { + CheckPreconditionsOrThrow(); + } + catch (Exception ex) + { + preconditionsException = ExceptionDispatchInfo.Capture(ex); + return; + } + } + + public void EnsurePreconditionsMet() + { + preconditionsException?.Throw(); + } + + protected virtual void CheckPreconditionsOrThrow() { } + + protected virtual void ConfigureTestCluster(InProcessTestClusterBuilder builder) + { + } + + public InProcessTestCluster HostedCluster { get; private set; } + + public IGrainFactory GrainFactory => Client; + + public IClusterClient Client => HostedCluster?.Client; + + public ILogger Logger { get; private set; } + + public string GetClientServiceId() => Client.ServiceProvider.GetRequiredService>().Value.ServiceId; + + public virtual async Task InitializeAsync() + { + EnsurePreconditionsMet(); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureHost(hostBuilder => TestDefaultConfiguration.ConfigureHostConfiguration(hostBuilder.Configuration)); + ConfigureTestCluster(builder); + + var testCluster = builder.Build(); + await testCluster.DeployAsync().ConfigureAwait(false); + + HostedCluster = testCluster; + Logger = Client.ServiceProvider.GetRequiredService().CreateLogger("Application"); + } + + public virtual async Task DisposeAsync() + { + var cluster = HostedCluster; + if (cluster is null) return; + + try + { + await cluster.StopAllSilosAsync().ConfigureAwait(false); + } + finally + { + await cluster.DisposeAsync().ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs b/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs new file mode 100644 index 0000000000..94e70861d4 --- /dev/null +++ b/test/TesterInternal/ActivationRebalancingTests/RebalancerFixture.cs @@ -0,0 +1,33 @@ +using TestExtensions; +using Orleans.Configuration; +using Orleans.TestingHost; +using Microsoft.Extensions.DependencyInjection; + +namespace UnitTests.ActivationRebalancingTests; + +public class RebalancerFixture : BaseInProcessTestClusterFixture +{ + public static readonly TimeSpan RebalancerDueTime = TimeSpan.FromSeconds(5); + public static readonly TimeSpan SessionCyclePeriod = TimeSpan.FromSeconds(3); + + protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 4; + builder.Options.UseRealEnvironmentStatistics = true; + builder.ConfigureSilo((options, siloBuilder) +#pragma warning disable ORLEANSEXP002 + => siloBuilder + .Configure(o => + { + o.AssumeHomogenousSilosForTesting = true; + o.ClientGatewayShutdownNotificationTimeout = default; + }) + .Configure(o => + { + o.RebalancerDueTime = RebalancerDueTime; + o.SessionCyclePeriod = SessionCyclePeriod; + }) + .AddActivationRebalancer()); +#pragma warning restore ORLEANSEXP002 + } +} diff --git a/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs b/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs index 14ec9a6438..71513e954e 100644 --- a/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs +++ b/test/TesterInternal/ActivationRebalancingTests/RebalancingTestBase.cs @@ -1,16 +1,14 @@ using TestExtensions; using Xunit.Abstractions; -using Orleans.Configuration; using Orleans.TestingHost; using Orleans.Runtime.Placement; -using Microsoft.Extensions.DependencyInjection; namespace UnitTests.ActivationRebalancingTests; -public abstract class RebalancingTestBase : BaseTestClusterFixture - where TFixture : BaseTestClusterFixture +public abstract class RebalancingTestBase + where TFixture : BaseInProcessTestClusterFixture { - protected TestCluster Cluster { get; } + protected InProcessTestCluster Cluster { get; } protected SiloAddress Silo1 { get; } protected SiloAddress Silo2 { get; } @@ -18,7 +16,7 @@ public abstract class RebalancingTestBase : BaseTestClusterFixture protected SiloAddress Silo4 { get; } internal ITestOutputHelper OutputHelper { get; } - internal new IInternalGrainFactory GrainFactory { get; } + internal IInternalGrainFactory GrainFactory { get; } internal IManagementGrain MgmtGrain { get; } protected RebalancingTestBase(TFixture fixture, ITestOutputHelper output) @@ -32,7 +30,7 @@ protected RebalancingTestBase(TFixture fixture, ITestOutputHelper output) Cluster = fixture.HostedCluster; OutputHelper = output; - GrainFactory = fixture.HostedCluster.InternalGrainFactory; + GrainFactory = (IInternalGrainFactory)fixture.HostedCluster.Client; MgmtGrain = GrainFactory.GetGrain(0); } @@ -57,45 +55,11 @@ protected static int CalculateVariance(int[] values) return (int)variance; } - public override async Task InitializeAsync() + public async Task InitializeAsync() { await GrainFactory .GetGrain(0) .ForceActivationCollection(TimeSpan.Zero); - - await base.InitializeAsync(); - } -} - -public class RebalancerFixture : BaseTestClusterFixture -{ - public static readonly TimeSpan RebalancerDueTime = TimeSpan.FromSeconds(5); - public static readonly TimeSpan SessionCyclePeriod = TimeSpan.FromSeconds(3); - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - builder.Options.InitialSilosCount = 4; - builder.Options.UseRealEnvironmentStatistics = true; - builder.AddSiloBuilderConfigurator(); - } - - private class SiloConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder siloBuilder) -#pragma warning disable ORLEANSEXP002 - => siloBuilder - .Configure(o => - { - o.AssumeHomogenousSilosForTesting = true; - o.ClientGatewayShutdownNotificationTimeout = default; - }) - .Configure(o => - { - o.RebalancerDueTime = RebalancerDueTime; - o.SessionCyclePeriod = SessionCyclePeriod; - }) - .AddActivationRebalancer(); -#pragma warning restore ORLEANSEXP002 } } diff --git a/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs b/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs index 0dc13831f0..3ec178a79e 100644 --- a/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs +++ b/test/TesterInternal/ActivationRebalancingTests/StatePreservationRebalancingTests.cs @@ -29,7 +29,7 @@ public async Task Should_Migrate_And_Preserve_State_When_Hosting_Silo_Dies() // Move the rebalancer to the first secondary silo, since we will stop it later and we cannot stop // the primary in this test setup. - RequestContext.Set(IPlacementDirector.PlacementHintKey, Cluster.SecondarySilos[0].SiloAddress); + RequestContext.Set(IPlacementDirector.PlacementHintKey, Cluster.Silos[1].SiloAddress); await Cluster.Client.GetGrain(0).Cast().MigrateOnIdle(); RequestContext.Set(IPlacementDirector.PlacementHintKey, null); @@ -70,7 +70,7 @@ public async Task Should_Migrate_And_Preserve_State_When_Hosting_Silo_Dies() OutputHelper.WriteLine($"Cycle {index}: Now stopping Silo{rebalancerHostNum}, which is the host of the rebalancer\n"); - Assert.NotEqual(rebalancerHost, Cluster.Primary.SiloAddress); + Assert.NotEqual(rebalancerHost, Cluster.Silos[0].SiloAddress); await Cluster.StopSiloAsync(Cluster.Silos.First(x => x.SiloAddress.Equals(rebalancerHost))); } @@ -177,21 +177,16 @@ public async Task Should_Migrate_And_Preserve_State_When_Hosting_Silo_Dies() return new(SiloAddress.Zero, 0); } - public class StatePreservationFixture : BaseTestClusterFixture + public class StatePreservationFixture : BaseInProcessTestClusterFixture { public static readonly TimeSpan RebalancerDueTime = TimeSpan.FromSeconds(5); public static readonly TimeSpan SessionCyclePeriod = TimeSpan.FromSeconds(3); - protected override void ConfigureTestCluster(TestClusterBuilder builder) + protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder) { builder.Options.InitialSilosCount = 4; builder.Options.UseRealEnvironmentStatistics = true; - builder.AddSiloBuilderConfigurator(); - } - - private class Configurator : ISiloConfigurator - { - public void Configure(ISiloBuilder siloBuilder) + builder.ConfigureSilo((options, siloBuilder) #pragma warning disable ORLEANSEXP002 => siloBuilder .Configure(o => @@ -205,7 +200,7 @@ public void Configure(ISiloBuilder siloBuilder) o.RebalancerDueTime = RebalancerDueTime; o.SessionCyclePeriod = SessionCyclePeriod; }) - .AddActivationRebalancer(); + .AddActivationRebalancer()); #pragma warning restore ORLEANSEXP002 } }