Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: dataclient kubernetes zone awareness #3238

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions dataclients/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestClientGetEndpointAddresses(t *testing.T) {

// client.LoadAll() is not called

addrs := client.GetEndpointAddresses("namespace1", "service1")
addrs := client.GetEndpointAddresses("", "namespace1", "service1")
assert.Nil(t, addrs)
})

Expand All @@ -55,13 +55,13 @@ func TestClientGetEndpointAddresses(t *testing.T) {
_, err := client.LoadAll()
require.NoError(t, err)

addrs := client.GetEndpointAddresses("namespace1", "service1")
addrs := client.GetEndpointAddresses("", "namespace1", "service1")
assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs)

// test subsequent call returns the expected values even when previous result was modified
addrs[0] = "modified"

addrs = client.GetEndpointAddresses("namespace1", "service1")
addrs = client.GetEndpointAddresses("", "namespace1", "service1")
assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs)
})

Expand All @@ -76,13 +76,13 @@ func TestClientGetEndpointAddresses(t *testing.T) {
_, err := client.LoadAll()
require.NoError(t, err)

addrs := client.GetEndpointAddresses("namespace1", "service1")
addrs := client.GetEndpointAddresses("", "namespace1", "service1")
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)

// test subsequent call returns the expected values even when previous result was modified
addrs[0] = "modified"

addrs = client.GetEndpointAddresses("namespace1", "service1")
addrs = client.GetEndpointAddresses("", "namespace1", "service1")
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
}
Expand All @@ -94,7 +94,7 @@ func TestClientLoadEndpointAddresses(t *testing.T) {
"testdata/ingressV1/ingress-data/lb-target-multi.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
addrs, err := client.LoadEndpointAddresses("", "namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs)
})
Expand All @@ -107,7 +107,7 @@ func TestClientLoadEndpointAddresses(t *testing.T) {
"testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
addrs, err := client.LoadEndpointAddresses("", "namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
Expand Down
8 changes: 6 additions & 2 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func collectReadyEndpoints(endpointSlices *endpointSliceList) map[definitions.Re
}

// loadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API.
func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) {
func (c *clusterClient) loadEndpointAddresses(zone, namespace, name string) ([]string, error) {
var result []string
if c.enableEndpointSlices {
url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) +
Expand All @@ -579,7 +579,11 @@ func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string,
}

for _, eps := range ready {
result = eps.addresses()
if zone != "" {
result = eps.addressesByZone(zone)
} else {
result = eps.addresses()
}
break
}
} else {
Expand Down
16 changes: 10 additions & 6 deletions dataclients/kubernetes/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error
}

// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string {
func (state *clusterState) GetEndpointsByService(zone, namespace, name, protocol string, servicePort *servicePort) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
Expand All @@ -66,7 +66,7 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
var targets []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targetsByServicePort("TCP", protocol, servicePort)
targets = eps.targetsByServicePort(zone, "TCP", protocol, servicePort)
} else {
return nil
}
Expand All @@ -84,7 +84,7 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
}

// getEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices.
func (state *clusterState) getEndpointAddresses(namespace, name string) []string {
func (state *clusterState) getEndpointAddresses(zone, namespace, name string) []string {
rID := newResourceID(namespace, name)

state.mu.Lock()
Expand All @@ -93,7 +93,11 @@ func (state *clusterState) getEndpointAddresses(namespace, name string) []string
var addresses []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[rID]; ok {
addresses = eps.addresses()
if zone != "" {
addresses = eps.addressesByZone(zone)
} else {
addresses = eps.addresses()
}
} else {
return nil
}
Expand All @@ -110,7 +114,7 @@ func (state *clusterState) getEndpointAddresses(namespace, name string) []string
}

// GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, scheme string, target *definitions.BackendPort) []string {
func (state *clusterState) GetEndpointsByTarget(zone, namespace, name, protocol, scheme string, target *definitions.BackendPort) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
Expand All @@ -126,7 +130,7 @@ func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, schem
var targets []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targetsByServiceTarget(protocol, scheme, target)
targets = eps.targetsByServiceTarget(zone, protocol, scheme, target)
} else {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion dataclients/kubernetes/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func benchmarkCachedEndpoints(b *testing.B, n int) {
b.ResetTimer()
dummy := []string{}
for i := 0; i < b.N; i++ {
dummy = cs.GetEndpointsByTarget("default", "foo-0", "TCP", "http", &definitions.BackendPort{})
dummy = cs.GetEndpointsByTarget("", "default", "foo-0", "TCP", "http", &definitions.BackendPort{})
}
dummy2 = dummy
}
Expand Down
28 changes: 26 additions & 2 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (eps *skipperEndpointSlice) getPort(protocol, pName string, pValue int) int

return port
}
func (eps *skipperEndpointSlice) targetsByServicePort(protocol, scheme string, servicePort *servicePort) []string {
func (eps *skipperEndpointSlice) targetsByServicePort(zone, protocol, scheme string, servicePort *servicePort) []string {
var port int
if servicePort.Name != "" {
port = eps.getPort(protocol, servicePort.Name, servicePort.Port)
Expand All @@ -58,21 +58,45 @@ func (eps *skipperEndpointSlice) targetsByServicePort(protocol, scheme string, s
}

result := make([]string, 0, len(eps.Endpoints))
resultByZone := make([]string, 0, len(eps.Endpoints))
for _, ep := range eps.Endpoints {
if ep.Zone == zone {
resultByZone = append(resultByZone, formatEndpointString(ep.Address, scheme, port))
}
result = append(result, formatEndpointString(ep.Address, scheme, port))
}
if len(resultByZone) >= minEndpointsByZone {
return resultByZone
}
return result
}

func (eps *skipperEndpointSlice) targetsByServiceTarget(protocol, scheme string, serviceTarget *definitions.BackendPort) []string {
func (eps *skipperEndpointSlice) targetsByServiceTarget(zone, protocol, scheme string, serviceTarget *definitions.BackendPort) []string {
pName, _ := serviceTarget.Value.(string)
pValue, _ := serviceTarget.Value.(int)
port := eps.getPort(protocol, pName, pValue)

result := make([]string, 0, len(eps.Endpoints))
resultByZone := make([]string, 0, len(eps.Endpoints))
for _, ep := range eps.Endpoints {
if ep.Zone == zone {
resultByZone = append(resultByZone, formatEndpointString(ep.Address, scheme, port))
}
result = append(result, formatEndpointString(ep.Address, scheme, port))
}
if len(resultByZone) >= minEndpointsByZone {
return resultByZone
}
return result
}

func (eps *skipperEndpointSlice) addressesByZone(zone string) []string {
result := make([]string, 0, len(eps.Endpoints))
for _, ep := range eps.Endpoints {
if ep.Zone == zone {
result = append(result, ep.Address)
}
}
return result
}

Expand Down
1 change: 1 addition & 0 deletions dataclients/kubernetes/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ingressContext struct {
defaultFilters defaultFilters
certificateRegistry *certregistry.CertRegistry
calculateTraffic func([]*weightedIngressBackend) map[string]backendTraffic
zone string
}

type ingress struct {
Expand Down
3 changes: 2 additions & 1 deletion dataclients/kubernetes/ingressv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func convertPathRuleV1(
protocol = p
}

eps = state.GetEndpointsByService(ns, svcName, protocol, servicePort)
eps = state.GetEndpointsByService(ic.zone, ns, svcName, protocol, servicePort)
}
if len(eps) == 0 {
ic.logger.Tracef("Target endpoints not found, shuntroute for %s:%s", svcName, svcPort)
Expand Down Expand Up @@ -370,6 +370,7 @@ func (ing *ingress) convertDefaultBackendV1(
}

eps = state.GetEndpointsByService(
ic.zone,
ns,
svcName,
protocol,
Expand Down
13 changes: 9 additions & 4 deletions dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
servicePortEnvVar = "KUBERNETES_SERVICE_PORT"
httpRedirectRouteID = "kube__redirect"
defaultEastWestDomain = "skipper.cluster.local"
minEndpointsByZone = 3
)

// PathMode values are used to control the ingress path interpretation. The path mode can
Expand Down Expand Up @@ -240,6 +241,10 @@ type Options struct {
// DefaultLoadBalancerAlgorithm sets the default algorithm to be used for load balancing between backend endpoints,
// available options: roundRobin, consistentHash, random, powerOfRandomNChoices
DefaultLoadBalancerAlgorithm string

// TopologyZone
// TODO: explain its use
TopologyZone string
}

// Client is a Skipper DataClient implementation used to create routes based on Kubernetes Ingress settings.
Expand Down Expand Up @@ -576,18 +581,18 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {

// GetEndpointAddresses returns the list of all addresses for the given service
// loaded by previous call to LoadAll or LoadUpdate.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
func (c *Client) GetEndpointAddresses(zone, ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == nil {
return nil
}
return c.state.getEndpointAddresses(ns, name)
return c.state.getEndpointAddresses(zone, ns, name)
}

// LoadEndpointAddresses returns the list of all addresses for the given service.
func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) {
return c.ClusterClient.loadEndpointAddresses(namespace, name)
func (c *Client) LoadEndpointAddresses(zone, namespace, name string) ([]string, error) {
return c.ClusterClient.loadEndpointAddresses(zone, namespace, name)
}

func compareStringList(a, b []string) []string {
Expand Down
3 changes: 3 additions & 0 deletions dataclients/kubernetes/routegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type routeGroupContext struct {
calculateTraffic func([]*definitions.BackendReference) map[string]backendTraffic
defaultLoadBalancerAlgorithm string
certificateRegistry *certregistry.CertRegistry
zone string
}

type routeContext struct {
Expand Down Expand Up @@ -187,6 +188,7 @@ func applyServiceBackend(ctx *routeGroupContext, backend *definitions.SkipperBac
}

eps := ctx.state.GetEndpointsByTarget(
ctx.zone,
namespaceString(ctx.routeGroup.Metadata.Namespace),
s.Meta.Name,
"TCP",
Expand Down Expand Up @@ -569,6 +571,7 @@ func (r *routeGroups) convert(s *clusterState, df defaultFilters, loggingEnabled
calculateTraffic: getBackendTrafficCalculator[*definitions.BackendReference](r.options.BackendTrafficAlgorithm),
defaultLoadBalancerAlgorithm: r.options.DefaultLoadBalancerAlgorithm,
certificateRegistry: cr,
zone: r.options.TopologyZone,
}

ri, err := transformRouteGroup(ctx)
Expand Down
2 changes: 1 addition & 1 deletion routesrv/redishandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func getRedisAddresses(opts *skipper.Options, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) {
return func() ([]byte, error) {
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
a := kdc.GetEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a)
m.UpdateGauge("redis_endpoints", float64(len(a)))

Expand Down
7 changes: 5 additions & 2 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ type Options struct {
// KubernetesRedisServicePort to be used to lookup ring shards dynamically
KubernetesRedisServicePort int

// KubernetesTopologyZone TODO
KubernetesTopologyZone string

// KubernetesForceService overrides the default Skipper functionality to route traffic using Kubernetes Endpoints,
// instead using Kubernetes Services.
KubernetesForceService bool
Expand Down Expand Up @@ -1423,14 +1426,14 @@ func getKubernetesRedisAddrUpdater(opts *Options, kdc *kubernetes.Client, loaded
// has polled the data once or kdc.GetEndpointAdresses should be blocking
// call to kubernetes API
return func() ([]string, error) {
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
a := kdc.GetEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("GetEndpointAddresses found %d redis endpoints", len(a))

return joinPort(a, opts.KubernetesRedisServicePort), nil
}
} else {
return func() ([]string, error) {
a, err := kdc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
a, err := kdc.LoadEndpointAddresses("", opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err)

return joinPort(a, opts.KubernetesRedisServicePort), err
Expand Down
Loading