diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index ce2d16b1c19..7513fdedc17 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -120,6 +120,7 @@ type clusterImplBalancer struct { // childState/drops/requestCounter keeps the state used by the most recently // generated picker. childState balancer.State + mu sync.Mutex dropCategories []DropConfig // The categories for drops. drops []*dropper requestCounterCluster string // The cluster name for the request counter. @@ -254,9 +255,8 @@ func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) b.config = newConfig + b.mu.Lock() b.inhibitPickerUpdates = true - defer func() { b.inhibitPickerUpdates = false }() - b.telemetryLabels = newConfig.TelemetryLabels dc := b.handleDropAndRequestCount(newConfig) if dc != nil && b.childState.Picker != nil { @@ -265,6 +265,8 @@ func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) Picker: b.newPicker(dc), }) } + b.inhibitPickerUpdates = false + b.mu.Unlock() // Addresses and sub-balancer config are sent to sub-balancer. return b.child.UpdateClientConnState(balancer.ClientConnState{ @@ -318,8 +320,9 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer func (b *clusterImplBalancer) Close() { b.serializer.TrySchedule(func(_ context.Context) { b.child.Close() + b.mu.Lock() b.childState = balancer.State{} - + b.mu.Unlock() if b.cancelLoadReport != nil { b.cancelLoadReport() b.cancelLoadReport = nil @@ -339,18 +342,20 @@ func (b *clusterImplBalancer) ExitIdle() { // Override methods to accept updates from the child LB. func (b *clusterImplBalancer) UpdateState(state balancer.State) { - b.serializer.TrySchedule(func(context.Context) { - b.childState = state - if !b.inhibitPickerUpdates { - b.ClientConn.UpdateState(balancer.State{ - ConnectivityState: b.childState.ConnectivityState, - Picker: b.newPicker(&dropConfigs{ - drops: b.drops, - requestCounter: b.requestCounter, - requestCountMax: b.requestCountMax, - }), - }) - } + b.mu.Lock() + b.childState = state + if b.inhibitPickerUpdates { + b.mu.Unlock() + return + } + b.mu.Unlock() + b.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: b.newPicker(&dropConfigs{ + drops: b.drops, + requestCounter: b.requestCounter, + requestCountMax: b.requestCountMax, + }), }) }