Skip to content

Commit

Permalink
Wait for migrating activations to become active or terminated (#8527)
Browse files Browse the repository at this point in the history
* Wait for migrating activations to become active or terminated

* Fix migration test code
  • Loading branch information
ReubenBond authored Jul 6, 2023
1 parent a9c8310 commit b6525b5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
33 changes: 30 additions & 3 deletions src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,43 @@ public ActivationMigrationManager(
}
}

public ValueTask AcceptMigratingGrains(List<GrainMigrationPackage> migratingGrains)
public async ValueTask AcceptMigratingGrains(List<GrainMigrationPackage> migratingGrains)
{
var activations = new List<ActivationData>();
foreach (var package in migratingGrains)
{
// If the activation does not exist, create it and provide it with the migration context while doing so.
// If the activation already exists or cannot be created, it is too late to perform migration, so ignore the request.
_catalog.GetOrCreateActivation(package.GrainId, requestContextData: null, package.MigrationContext);
var context = _catalog.GetOrCreateActivation(package.GrainId, requestContextData: null, package.MigrationContext);
if (context is ActivationData activation)
{
activations.Add(activation);
}
}

return default;
while (true)
{
var allActiveOrTerminal = true;
foreach (var activation in activations)
{
lock (activation)
{
if (activation.State is not ActivationState.Valid or ActivationState.Invalid or ActivationState.FailedToActivate)
{
allActiveOrTerminal = false;
break;
}
}
}

if (allActiveOrTerminal)
{
break;
}

// Wait a short amount of time and poll the activations again.
await Task.Delay(5);
}
}

public ValueTask MigrateAsync(SiloAddress targetSilo, GrainId grainId, MigrationContext migrationContext)
Expand Down
21 changes: 18 additions & 3 deletions test/DefaultCluster.Tests/Migration/MigrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ public async Task DirectedGrainMigrationTest()
RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost);
await grain.Cast<IGrainManagementExtension>().MigrateOnIdle();

var newAddress = await grain.GetGrainAddress();
GrainAddress newAddress;
do
{
newAddress = await grain.GetGrainAddress();
} while (newAddress.ActivationId == originalAddress.ActivationId);

var newHost = newAddress.SiloAddress;
Assert.Equal(targetHost, newHost);

Expand Down Expand Up @@ -91,7 +96,12 @@ public async Task DirectedGrainMigrationTest_GrainOfT()
RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost);
await grain.Cast<IGrainManagementExtension>().MigrateOnIdle();

var newAddress = await grain.GetGrainAddress();
GrainAddress newAddress;
do
{
newAddress = await grain.GetGrainAddress();
} while (newAddress.ActivationId == originalAddress.ActivationId);

var newHost = newAddress.SiloAddress;
Assert.Equal(targetHost, newHost);

Expand Down Expand Up @@ -121,7 +131,12 @@ public async Task DirectedGrainMigrationTest_IPersistentStateOfT()
RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost);
await grain.Cast<IGrainManagementExtension>().MigrateOnIdle();

var newAddress = await grain.GetGrainAddress();
GrainAddress newAddress;
do
{
newAddress = await grain.GetGrainAddress();
} while (newAddress.ActivationId == originalAddress.ActivationId);

var newHost = newAddress.SiloAddress;
Assert.Equal(targetHost, newHost);

Expand Down

0 comments on commit b6525b5

Please sign in to comment.