Skip to content

Commit

Permalink
fix some tests
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Oct 5, 2024
1 parent 17f4450 commit 42ce0f5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
12 changes: 8 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ type HealthCheckImpl struct {
loadTabletsTrigger chan struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
// nowTimeFunc is used to determine the current time.
nowTimeFunc func() time.Time
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -369,6 +371,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}),
nowTimeFunc: func() time.Time { return time.Now() },
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -420,10 +423,11 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
TabletType: tablet.Type,
}
thc := &tabletHealthCheck{
ctx: ctx,
cancelFunc: cancelFunc,
Tablet: tablet,
Target: target,
ctx: ctx,
cancelFunc: cancelFunc,
nowTimeFunc: hc.nowTimeFunc,
Tablet: tablet,
Target: target,
}

// add to our datastore
Expand Down
23 changes: 19 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func TestHealthCheckStreamError(t *testing.T) {
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
Expand All @@ -311,6 +313,7 @@ func TestHealthCheckStreamError(t *testing.T) {
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -329,6 +332,7 @@ func TestHealthCheckStreamError(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down Expand Up @@ -367,6 +371,8 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
fc.errCh = make(chan error)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)

// Immediately after AddTablet() there will be the first notification.
Expand All @@ -375,6 +381,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
result := <-resultChan
mustMatch(t, want, result, "Wrong TabletHealth data")
Expand All @@ -393,6 +400,7 @@ func TestHealthCheckErrorOnPrimary(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 10,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down Expand Up @@ -438,6 +446,8 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet2, input2)
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet2)
<-resultChan

Expand Down Expand Up @@ -466,6 +476,7 @@ func TestHealthCheckErrorOnPrimaryAfterExternalReparent(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 0, CpuUsage: 0.2},
PrimaryTermStartTime: 10,
Timestamp: protoutil.TimeToProto(now),
}}
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_PRIMARY})
mustMatch(t, health, a, "unexpected result")
Expand Down Expand Up @@ -557,6 +568,7 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
resultChan := hc.Subscribe()

now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }

hc.AddTablet(tablet)

Expand All @@ -579,10 +591,10 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}
want = &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
Tablet: tablet,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
Expand Down Expand Up @@ -624,6 +636,8 @@ func TestHealthCheckTimeout(t *testing.T) {
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
now := time.Now()
hc.nowTimeFunc = func() time.Time { return now }
hc.AddTablet(tablet)
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Expand All @@ -649,6 +663,7 @@ func TestHealthCheckTimeout(t *testing.T) {
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
PrimaryTermStartTime: 0,
Timestamp: protoutil.TimeToProto(now),
}
input <- shr
result = <-resultChan
Expand Down
7 changes: 6 additions & 1 deletion go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type tabletHealthCheck struct {
// possibly delete both these
loggedServingState bool
lastResponseTimestamp time.Time // timestamp of the last healthcheck response
// nowTimeFunc provides the current time
nowTimeFunc func() time.Time
}

// String is defined because we want to print a []*tabletHealthCheck array nicely.
Expand Down Expand Up @@ -128,7 +130,10 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
thc.loggedServingState = true
}
thc.Serving = serving
thc.Timestamp = protoutil.TimeToProto(time.Now())

if thc.nowTimeFunc != nil {
thc.Timestamp = protoutil.TimeToProto(thc.nowTimeFunc())
}
}

// stream streams healthcheck responses to callback.
Expand Down

0 comments on commit 42ce0f5

Please sign in to comment.