Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: az awareness via endpointregistry and routesrv - option 4 #3258

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type Config struct {
KubernetesRedisServiceNamespace string `yaml:"kubernetes-redis-service-namespace"`
KubernetesRedisServiceName string `yaml:"kubernetes-redis-service-name"`
KubernetesRedisServicePort int `yaml:"kubernetes-redis-service-port"`
KubernetesZoneAwareEnabled bool `yaml:"kubernetes-zone-aware"`
KubernetesEndpointsURL string `yaml:"kubernetes-endpoints-url"`
KubernetesPodZone string `yaml:"kubernetes-pod-zone"`
KubernetesBackendTrafficAlgorithmString string `yaml:"kubernetes-backend-traffic-algorithm"`
KubernetesBackendTrafficAlgorithm kubernetes.BackendTrafficAlgorithm `yaml:"-"`
KubernetesDefaultLoadBalancerAlgorithm string `yaml:"kubernetes-default-lb-algorithm"`
Expand Down Expand Up @@ -473,6 +476,11 @@ func NewConfig() *Config {
flag.StringVar(&cfg.KubernetesRedisServiceNamespace, "kubernetes-redis-service-namespace", "", "Sets namespace for redis to be used to lookup endpoints")
flag.StringVar(&cfg.KubernetesRedisServiceName, "kubernetes-redis-service-name", "", "Sets name for redis to be used to lookup endpoints")
flag.IntVar(&cfg.KubernetesRedisServicePort, "kubernetes-redis-service-port", 6379, "Sets the port for redis to be used to lookup endpoints")

flag.BoolVar(&cfg.KubernetesZoneAwareEnabled, "kubernetes-zone-aware", false, "Enables Kubernetes zone aware routes, requires -kubernetes-endpoints-url")
flag.StringVar(&cfg.KubernetesEndpointsURL, "kubernetes-endpoints-url", "", "Sets URL to lookup /endpoints from routesrv")
flag.StringVar(&cfg.KubernetesPodZone, "kubernetes-pod-zone", "", "Sets the Zone of the pod, you should set it via Kuibernetes donwards API to enable skipper to know in which zone it runs.")

flag.StringVar(&cfg.KubernetesBackendTrafficAlgorithmString, "kubernetes-backend-traffic-algorithm", kubernetes.TrafficPredicateAlgorithm.String(), "sets the algorithm to be used for traffic splitting between backends: traffic-predicate or traffic-segment-predicate")
flag.StringVar(&cfg.KubernetesDefaultLoadBalancerAlgorithm, "kubernetes-default-lb-algorithm", kubernetes.DefaultLoadBalancerAlgorithm, "sets the default algorithm to be used for load balancing between backend endpoints, available options: roundRobin, consistentHash, random, powerOfRandomNChoices")

Expand Down Expand Up @@ -830,6 +838,9 @@ func (c *Config) ToOptions() skipper.Options {
KubernetesRedisServiceNamespace: c.KubernetesRedisServiceNamespace,
KubernetesRedisServiceName: c.KubernetesRedisServiceName,
KubernetesRedisServicePort: c.KubernetesRedisServicePort,
KubernetesZoneAwareEnabled: c.KubernetesZoneAwareEnabled,
KubernetesEndpointsURL: c.KubernetesEndpointsURL,
KubernetesPodZone: c.KubernetesPodZone,
KubernetesBackendTrafficAlgorithm: c.KubernetesBackendTrafficAlgorithm,
KubernetesDefaultLoadBalancerAlgorithm: c.KubernetesDefaultLoadBalancerAlgorithm,

Expand Down
11 changes: 8 additions & 3 deletions dataclients/kubernetes/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type clusterState struct {
endpointSlices map[definitions.ResourceID]*skipperEndpointSlice
secrets map[definitions.ResourceID]*secret
cachedEndpoints map[endpointID][]string
ridToEpID map[definitions.ResourceID]endpointID
enableEndpointSlices bool
}

Expand Down Expand Up @@ -49,13 +50,17 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error
return s, nil
}

// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string {
epID := endpointID{
func getEpID(namespace, name, protocol string, servicePort *servicePort) endpointID {
return endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
TargetPort: servicePort.TargetPort.String(),
}
}

// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string {
epID := getEpID(namespace, name, protocol, servicePort)

state.mu.Lock()
defer state.mu.Unlock()
Expand Down
8 changes: 7 additions & 1 deletion dataclients/kubernetes/ingressv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func convertPathRuleV1(
return nil, err
}

protocol := "http"
servicePort, err := svc.getServicePortV1(svcPort)
if err != nil {
// service definition is wrong or no pods
Expand All @@ -113,7 +114,6 @@ func convertPathRuleV1(
} else if forceKubernetesService {
eps = []string{serviceNameBackend(svcName, ns, servicePort)}
} else {
protocol := "http"
if p, ok := metadata.Annotations[skipperBackendProtocolAnnotationKey]; ok {
protocol = p
}
Expand Down Expand Up @@ -155,6 +155,12 @@ func convertPathRuleV1(
LBAlgorithm: getLoadBalancerAlgorithm(metadata, defaultLoadBalancerAlgorithm),
HostRegexps: hostRegexp,
}
// zone aware
// TODO(sszuecs): lookup and store in clusterState ridToEpID
//epID := getEpID(ns, name, protocol, servicePort)
// r.Id
// ic.state.

setPathV1(pathMode, r, prule.PathType, prule.Path)
traffic.apply(r)
return r, nil
Expand Down
39 changes: 39 additions & 0 deletions dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,45 @@ func mapRoutes(routes []*eskip.Route) (map[string]*eskip.Route, []*eskip.Route)
return routesById, uniqueRoutes
}

type EndpointsMap map[string][]*Endpoint

type Endpoint struct {
Address string `json:"addr"`
Zone string `json:"zone"`
Port int `json:"port"`
}

func endpointMapName(ns, name string, port int) string {
return fmt.Sprintf("%s/%s/%d", ns, name, port)
}

func (c *Client) GetEndpointsMap() EndpointsMap {
if c.state == nil {
return nil
}
result := make(EndpointsMap)

c.mu.Lock()
for resID, epSlice := range c.state.endpointSlices {
for _, port := range epSlice.Ports {
id := endpointMapName(resID.Namespace, resID.Name, port.Port)
eps := make([]*Endpoint, 0, len(epSlice.Endpoints))

for _, ep := range epSlice.Endpoints {
eps = append(eps, &Endpoint{
Address: ep.Address,
Zone: ep.Zone,
Port: port.Port,
})
}
result[id] = eps
}
}
c.mu.Unlock()

return result
}

func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
c.mu.Lock()
state, err := c.ClusterClient.fetchClusterState()
Expand Down
1 change: 0 additions & 1 deletion proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

type healthyEndpoints struct {
rnd *rand.Rand
endpointRegistry *routing.EndpointRegistry
maxUnhealthyEndpointsRatio float64
}

Expand Down
19 changes: 18 additions & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck

// ZoneAwareEndpoints
ZoneAwareEndpoints bool

// Zone
Zone string
}

type (
Expand Down Expand Up @@ -438,6 +444,7 @@
registry *routing.EndpointRegistry
fadein *fadeIn
heathlyEndpoints *healthyEndpoints
zoneAwareEndpoints *zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 447 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / check-race

undefined: zoneAwareEndpoints
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -581,6 +588,7 @@
endpoints := rt.LBEndpoints
endpoints = p.fadein.filterFadeIn(endpoints, rt)
endpoints = p.heathlyEndpoints.filterHealthyEndpoints(ctx, endpoints, p.metrics)
endpoints = p.zoneAwareEndpoints.filterZoneEndpoints(ctx, endpoints)

lbctx := &routing.LBContext{
Request: ctx.request,
Expand Down Expand Up @@ -841,10 +849,18 @@
if p.EnablePassiveHealthCheck {
healthyEndpointsChooser = &healthyEndpoints{
rnd: rand.New(loadbalancer.NewLockedSource()),
endpointRegistry: p.EndpointRegistry,
maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio,
}
}

var zoneAwareEP *zoneAwareEndpoints

Check failure on line 856 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: zoneAwareEndpoints

Check failure on line 856 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 856 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 856 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 856 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / check-race

undefined: zoneAwareEndpoints
if p.ZoneAwareEndpoints {
zoneAwareEP = &zoneAwareEndpoints{

Check failure on line 858 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

undefined: zoneAwareEndpoints

Check failure on line 858 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 858 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 858 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / tests

undefined: zoneAwareEndpoints

Check failure on line 858 in proxy/proxy.go

View workflow job for this annotation

GitHub Actions / check-race

undefined: zoneAwareEndpoints
zone: p.Zone,
endpointRegistry: p.EndpointRegistry,
}
}

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
Expand All @@ -853,6 +869,7 @@
endpointRegistry: p.EndpointRegistry,
},
heathlyEndpoints: healthyEndpointsChooser,
zoneAwareEndpoints: zoneAwareEP,
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand Down
8 changes: 8 additions & 0 deletions routesrv/eskipbytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type eskipBytes struct {
tracer ot.Tracer
metrics metrics.Metrics
now func() time.Time

// zone aware
epMap []byte
}

// formatAndSet takes a slice of routes and stores them eskip-formatted
Expand Down Expand Up @@ -111,6 +114,11 @@ func (e *eskipBytes) compressLocked(data []byte) []byte {
return buf.Bytes()
}

func (e *eskipBytes) endpointsGetHandler(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(200)
rw.Write(e.epMap)
}

func (e *eskipBytes) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
span := tracing.CreateSpan("serve_routes", r.Context(), e.tracer)
defer span.Finish()
Expand Down
11 changes: 11 additions & 0 deletions routesrv/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type poller struct {
timeout time.Duration
quit chan struct{}

// zone awareness
updateEndpoints func() ([]byte, error)
//epMap []byte

// Preprocessors
defaultFilters *eskip.DefaultFilters
oauth2Config *auth.OAuthConfig
Expand Down Expand Up @@ -100,6 +104,13 @@ func (p *poller) poll(wg *sync.WaitGroup) {

span.Finish()

epMapBytes, err := p.updateEndpoints()
if err != nil {
log.Errorf("Failed to update endpoints: %v", err)
} else {
p.b.epMap = epMapBytes
}

select {
case <-p.quit:
log.Info(LogPollingStopped)
Expand Down
23 changes: 19 additions & 4 deletions routesrv/routesrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routesrv

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -77,6 +78,7 @@ func New(opts skipper.Options) (*RouteServer, error) {
mux := http.NewServeMux()
mux.Handle("/health", bs)
mux.Handle("/routes", b)

supportHandler := http.NewServeMux()
supportHandler.Handle("/metrics", metricsHandler)
supportHandler.Handle("/metrics/", metricsHandler)
Expand Down Expand Up @@ -113,6 +115,11 @@ func New(opts skipper.Options) (*RouteServer, error) {
mux.Handle("/swarm/redis/shards", rh)
}

if opts.KubernetesZoneAwareEnabled {
log.Infof("Expose /endpoints for zone aware traffic")
mux.HandleFunc("GET /endpoints", b.endpointsGetHandler)
}

rs.server = &http.Server{
Addr: opts.Address,
Handler: mux,
Expand All @@ -128,10 +135,18 @@ func New(opts skipper.Options) (*RouteServer, error) {
}

rs.poller = &poller{
client: dataclient,
timeout: opts.SourcePollTimeout,
b: b,
quit: make(chan struct{}),
client: dataclient,
timeout: opts.SourcePollTimeout,
b: b,
quit: make(chan struct{}),
updateEndpoints: func() ([]byte, error) {
epMap := dataclient.GetEndpointsMap()
epMapBytes, err := json.Marshal(epMap)
if err != nil {
return nil, fmt.Errorf("failed to marshal EndpointsMap: %w", err)
}
return epMapBytes, nil
},
defaultFilters: opts.DefaultFilters,
editRoute: opts.EditRoute,
cloneRoute: opts.CloneRoute,
Expand Down
Loading
Loading