04-Kube-Controller-Manager源码分析(EndpointSlice控制器)

本文基于1.29.0版本

背景

kubernetes 1.16版本中,EndpointSlice 作为 alpha 特性被引入,主要用于解决 Endpoints 资源可伸缩性改进、双堆栈服务和拓扑感知路由等问题,在Kubernetes 1.19版本之后默认启用,并在1.21版本后EndpointSlice API 版本改为 discovery.k8s.io/v1

可伸缩性改进

Endpoints 的限制

  1. 使用 Endpoints 时,一个Service只有一个 Endpoints 资源,当每一次更新 Pod 后,必须将整个 Endpoints 资源发送到集群中的每个节点。对于大型集群来说,这是一个非常大的负担,因为它涉及在集群中发送大量的数据。

  2. Endpoints 另一个限制是,它限制了可以为服务跟踪的网络端点的数量。存储在 etcd 中的对象的默认大小限制为 1.5MB。在某些情况下,可以将 Endpoints 资源限制为 5000 个 Pod IP。对于大多数用户来说,这不是问题,但是对于大规模集群的用户来说,这就存在大问题。

比如在一个1000节点的大型集群中,如有有一个服务的Pod数量有5000个,它每一次Pod发生变之后,需要将整个完整的Endpoints资源发送到集群中的每一个 kube-porxy 节点,那么它将涉及在集群中发送的数据量(1.5MB * 1000 = 1.5GB)

EndpointSlices 的改进

EndpointSlice API 旨在通过类似于分片的方法解决此问题。我们不是使用单个 Endpoints 资源跟踪服务的所有 Pod IP,而是将它们拆分为多个较小的 EndpointSlice,使用 EndpointSlices 时,用于端点更新的数据将大大减少,并且 kube-proxy 更新 iptablesipvs 规则的速度应该更快

endpoint

双堆栈服务

同时将 IPv4 和 IPv6 地址用于服务,并依靠 EndpointSlices 上的 addressType 字段按 IP 系列跟踪这些地址

拓扑感知路由

拓扑感知路由将更新 kube-proxy,以优先路由同一 zone 或者 region 内的请求。这利用了为 EndpointSlice 中的每个终结点存储的拓扑字段。

newEndpointSliceControllerDescriptor

cmd/kube-controller-manager/app/discovery.go

1
2
3
4
5
6
7
func newEndpointSliceControllerDescriptor() *ControllerDescriptor {
 return &ControllerDescriptor{
  name:     names.EndpointSliceController,
  aliases:  []string{"endpointslice"},
  initFunc: startEndpointSliceController,
 }
}

startEndpointSliceController

startEndpointSliceController 通过协程实例化 endpointslicecontroller 控制器对象,并且启动控制器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
 go endpointslicecontroller.NewController(
  ctx,
  controllerContext.InformerFactory.Core().V1().Pods(),
  controllerContext.InformerFactory.Core().V1().Services(),
  controllerContext.InformerFactory.Core().V1().Nodes(),
  controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
  controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
  controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"),
  controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
 ).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs))
 return nil, true, nil
}

NewController

NewController 初始化Controller, 并监听 SercvicePodEndpointSliceNode 资源事件,

参数:

  1. c.maxEndpointsPerSlice

    分片最大支持Endpoints数量

  2. features.TopologyAwareHints

    启用拓扑路由感知

方法:

  1. c.triggerTimeTracker

    用于计算 podService 最后一次更新时间

  2. c.reconciler

    核心控制逻辑

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
 serviceInformer coreinformers.ServiceInformer,
 nodeInformer coreinformers.NodeInformer,
 endpointSliceInformer discoveryinformers.EndpointSliceInformer,
 maxEndpointsPerSlice int32,
 client clientset.Interface,
 endpointUpdatesBatchPeriod time.Duration,
) *Controller {
 broadcaster := record.NewBroadcaster()
 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})

 endpointslicemetrics.RegisterMetrics()

 // 初始化Controller,并加入100个令牌,10qps的令牌桶限流队列
 c := &Controller{
  client: client,
  // This is similar to the DefaultControllerRateLimiter, just with a
  // significantly higher default backoff (1s vs 5ms). This controller
  // processes events that can require significant EndpointSlice changes,
  // such as an update to a Service or Deployment. A more significant
  // rate limit back off here helps ensure that the Controller does not
  // overwhelm the API Server.
  queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
   workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
   // 10 qps, 100 bucket size. This is only for retry speed and its
   // only the overall factor (not per item).
   &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  ), "endpoint_slice"),
  workerLoopPeriod: time.Second,
 }
 // 监听 `Service` 资源增删改事件
 serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc: c.onServiceUpdate,
  UpdateFunc: func(old, cur interface{}) {
   c.onServiceUpdate(cur)
  },
  DeleteFunc: c.onServiceDelete,
 })
 c.serviceLister = serviceInformer.Lister()
 c.servicesSynced = serviceInformer.Informer().HasSynced

 // 监听`Pod` 资源增删改事件
 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc:    c.addPod,
  UpdateFunc: c.updatePod,
  DeleteFunc: c.deletePod,
 })
 c.podLister = podInformer.Lister()
 c.podsSynced = podInformer.Informer().HasSynced

 c.nodeLister = nodeInformer.Lister()
 c.nodesSynced = nodeInformer.Informer().HasSynced

 logger := klog.FromContext(ctx)
 // 监听`EndpointSlice` 资源增删改事件
 endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  AddFunc: c.onEndpointSliceAdd,
  UpdateFunc: func(oldObj, newObj interface{}) {
   c.onEndpointSliceUpdate(logger, oldObj, newObj)
  },
  DeleteFunc: c.onEndpointSliceDelete,
 })

 c.endpointSliceLister = endpointSliceInformer.Lister()
 c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
 c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()

 // 每个`EndpointSlice`最大支持`Endpoint` 数量,默认100
 c.maxEndpointsPerSlice = maxEndpointsPerSlice

 // 计算 service 和 pods 最后一次更新时间,并存到缓存,然后更新2者资源状态。
 c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()

 c.eventBroadcaster = broadcaster
 c.eventRecorder = recorder

 c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod

 // 判断是否启用`TopologyAwareHints` 特性,如果启用监听`Node` 资源增删改事件
 if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
  nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc: func(obj interface{}) {
    c.addNode(logger, obj)
   },
   UpdateFunc: func(oldObj, newObj interface{}) {
    c.updateNode(logger, oldObj, newObj)
   },
   DeleteFunc: func(obj interface{}) {
    c.deleteNode(logger, obj)
   },
  })

  c.topologyCache = topologycache.NewTopologyCache()
 }

 // `EndpointSlice` 控制器核心逻辑
 c.reconciler = endpointslicerec.NewReconciler(
  c.client,
  c.nodeLister,
  c.maxEndpointsPerSlice,
  c.endpointSliceTracker,
  c.topologyCache,
  c.eventRecorder,
  controllerName,
 )

 return c
}

Run

参数:

  • ConcurrentServiceEndpointSyncs

    同步ServiceEndpointSlice 的并发数量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (c *Controller) Run(ctx context.Context, workers int) {
 defer utilruntime.HandleCrash()

 // Start events processing pipeline.
 c.eventBroadcaster.StartLogging(klog.Infof)
 c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
 defer c.eventBroadcaster.Shutdown()

 defer c.queue.ShutDown()

 logger := klog.FromContext(ctx)
 logger.Info("Starting endpoint slice controller")
 defer logger.Info("Shutting down endpoint slice controller")

 if !cache.WaitForNamedCacheSync("endpoint_slice", ctx.Done(), c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
  return
 }

 // 启动 消费者 `worker` 协程
 logger.V(2).Info("Starting worker threads", "total", workers)
 for i := 0; i < workers; i++ {
  go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
 }

 <-ctx.Done()
}

func (c *Controller) worker(logger klog.Logger) {
 for c.processNextWorkItem(logger) {
 }
}

func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
 cKey, quit := c.queue.Get()
 if quit {
  return false
 }
 defer c.queue.Done(cKey)

 err := c.syncService(logger, cKey.(string))
 c.handleErr(logger, err, cKey)

 return true
}

syncService

  1. 获取 Services 对象
  2. 根据Service标签选择器获取 Pods 对象
  3. 根据Service标签选择器获取 EndpointSlices 对象
  4. 标记删除的 EndpointSlices 对象
  5. 计算 EndpointSlices 对象最后一次更新时间,并更新 EndpointSlice 的注解
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
func (c *Controller) syncService(logger klog.Logger, key string) error {
 startTime := time.Now()
 defer func() {
  logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
 }()

 namespace, name, err := cache.SplitMetaNamespaceKey(key)
 if err != nil {
  return err
 }

 // 1. 获取 `Services` 对象
 service, err := c.serviceLister.Services(namespace).Get(name)
 if err != nil {
  if !apierrors.IsNotFound(err) {
   return err
  }

  c.triggerTimeTracker.DeleteService(namespace, name)
  c.reconciler.DeleteService(namespace, name)
  c.endpointSliceTracker.DeleteService(namespace, name)
  // The service has been deleted, return nil so that it won't be retried.
  return nil
 }

 if service.Spec.Type == v1.ServiceTypeExternalName {
  // services with Type ExternalName receive no endpoints from this controller;
  // Ref: https://issues.k8s.io/105986
  return nil
 }

 if service.Spec.Selector == nil {
  // services without a selector receive no endpoint slices from this controller;
  // these services will receive endpoint slices that are created out-of-band via the REST API.
  return nil
 }

 logger.V(5).Info("About to update endpoint slices for service", "key", key)

 // 2. 根据Service标签选择器获取 `Pods` 对象
 podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
 pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
 if err != nil {
  // Since we're getting stuff from a local cache, it is basically
  // impossible to get this error.
  c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
   "Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
  return err
 }

 // 3. 根据Service标签选择器获取 `EndpointSlices` 对象
 esLabelSelector := labels.Set(map[string]string{
  discovery.LabelServiceName: service.Name,
  discovery.LabelManagedBy:   c.reconciler.GetControllerName(),
 }).AsSelectorPreValidated()
 endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)

 if err != nil {
  // Since we're getting stuff from a local cache, it is basically
  // impossible to get this error.
  c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
   "Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
  return err
 }

 // Drop EndpointSlices that have been marked for deletion to prevent the controller from getting stuck.
 // 4. 标记删除的 `EndpointSlices` 对象
 endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)

 if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
  return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
 }

 // We call ComputeEndpointLastChangeTriggerTime here to make sure that the
 // state of the trigger time tracker gets updated even if the sync turns out
 // to be no-op and we don't update the EndpointSlice objects.
 // 5. 计算 `EndpointSlices` 对象最后一次更新时间,并更新 `EndpointSlice` 的注解
 lastChangeTriggerTime := c.triggerTimeTracker.
  ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

 // 6. 核心控制逻辑
 err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
 if err != nil {
  c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
   "Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
  return err
 }

 return nil
}

c.reconciler.Reconcile

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
func (r *Reconciler) Reconcile(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
 slicesToDelete := []*discovery.EndpointSlice{}                                    // slices that are no longer  matching any address the service has
 errs := []error{}                                                                 // all errors generated in the process of reconciling
 slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type

 // addresses that this service supports [o(1) find]
 // 获取`Service` 支持的地址类型
 serviceSupportedAddressesTypes := getAddressTypesForService(logger, service)

 // loop through slices identifying their address type.
 // slices that no longer match address type supported by services
 // go to delete, other slices goes to the Reconciler machinery
 // for further adjustment
 // 根据服务支持的地址类型,对`EndpointSlice`进行分类
 for _, existingSlice := range existingSlices {
  // service no longer supports that address type, add it to deleted slices
  if !serviceSupportedAddressesTypes.Has(existingSlice.AddressType) {
   // 拓扑缓存不为空,按拓扑进行切分
   if r.topologyCache != nil {
    svcKey, err := ServiceControllerKey(existingSlice)
    if err != nil {
     logger.Info("Couldn't get key to remove EndpointSlice from topology cache", "existingSlice", existingSlice, "err", err)
    } else {
     r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
    }
   }

   slicesToDelete = append(slicesToDelete, existingSlice)
   continue
  }

  // add list if it is not on our map
  if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
   slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
  }

  slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
 }

 // reconcile for existing.
 // 根据地址类型,对`EndpointSlice`进行切分处理
 for addressType := range serviceSupportedAddressesTypes {
  existingSlices := slicesByAddressType[addressType]
  err := r.reconcileByAddressType(logger, service, pods, existingSlices, triggerTime, addressType)
  if err != nil {
   errs = append(errs, err)
  }
 }

 // delete those which are of addressType that is no longer supported
 // by the service
 // 删除不再支持的`EndpointSlice`
 for _, sliceToDelete := range slicesToDelete {
  err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
  if err != nil {
   errs = append(errs, fmt.Errorf("error deleting %s EndpointSlice for Service %s/%s: %w", sliceToDelete.Name, service.Namespace, service.Name, err))
  } else {
   r.endpointSliceTracker.ExpectDeletion(sliceToDelete)
   metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
  }
 }

 return utilerrors.NewAggregate(errs)
}

// reconcileByAddressType 获取当前与服务选择器匹配的一组 Pod,
// 并将它们与给定服务的任何现有端点切片(按地址类型)中已存在的端点进行比较。
// 它创建、更新或删除端点切片,以确保所需的 pod 集由端点切片表示。
func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
 errs := []error{}

 slicesToCreate := []*discovery.EndpointSlice{}
 slicesToUpdate := []*discovery.EndpointSlice{}
 slicesToDelete := []*discovery.EndpointSlice{}
 events := []*topologycache.EventBuilder{}

 // Build data structures for existing state.
 existingSlicesByPortMap := map[endpointsliceutil.PortMapKey][]*discovery.EndpointSlice{}
 for _, existingSlice := range existingSlices {
  if ownedBy(existingSlice, service) {
   epHash := endpointsliceutil.NewPortMapKey(existingSlice.Ports)
   existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
  } else {
   slicesToDelete = append(slicesToDelete, existingSlice)
  }
 }

 // Build data structures for desired state.
 desiredMetaByPortMap := map[endpointsliceutil.PortMapKey]*endpointMeta{}
 desiredEndpointsByPortMap := map[endpointsliceutil.PortMapKey]endpointsliceutil.EndpointSet{}

 for _, pod := range pods {
  if !endpointsliceutil.ShouldPodBeInEndpoints(pod, true) {
   continue
  }

  endpointPorts := getEndpointPorts(logger, service, pod)
  epHash := endpointsliceutil.NewPortMapKey(endpointPorts)
  if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
   desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
  }

  if _, ok := desiredMetaByPortMap[epHash]; !ok {
   desiredMetaByPortMap[epHash] = &endpointMeta{
    addressType: addressType,
    ports:       endpointPorts,
   }
  }

  node, err := r.nodeLister.Get(pod.Spec.NodeName)
  if err != nil {
   // we are getting the information from the local informer,
   // an error different than IsNotFound should not happen
   if !errors.IsNotFound(err) {
    return err
   }
   // If the Node specified by the Pod doesn't exist we want to requeue the Service so we
   // retry later, but also update the EndpointSlice without the problematic Pod.
   // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
   // situations where a reference from a Pod to a missing node can leave the EndpointSlice
   // stuck forever.
   // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
   // Pod, since the user is explicitly indicating that the Pod address should be published.
   if !service.Spec.PublishNotReadyAddresses {
    logger.Info("skipping Pod for Service, Node not found", "pod", klog.KObj(pod), "service", klog.KObj(service), "node", klog.KRef("", pod.Spec.NodeName))
    errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
    continue
   }
  }
  endpoint := podToEndpoint(pod, node, service, addressType)
  if len(endpoint.Addresses) > 0 {
   desiredEndpointsByPortMap[epHash].Insert(&endpoint)
  }
 }

 spMetrics := metrics.NewServicePortCache()
 totalAdded := 0
 totalRemoved := 0

 // Determine changes necessary for each group of slices by port map.
 for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
  numEndpoints := len(desiredEndpoints)
  pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
   logger, service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])

  totalAdded += added
  totalRemoved += removed

  spMetrics.Set(portMap, metrics.EfficiencyInfo{
   Endpoints: numEndpoints,
   Slices:    len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
  })

  slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
  slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
  slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
 }

 // If there are unique sets of ports that are no longer desired, mark
 // the corresponding endpoint slices for deletion.
 for portMap, existingSlices := range existingSlicesByPortMap {
  if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
   slicesToDelete = append(slicesToDelete, existingSlices...)
  }
 }

 // When no endpoint slices would usually exist, we need to add a placeholder.
 if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
  // Check for existing placeholder slice outside of the core control flow
  placeholderSlice := newEndpointSlice(logger, service, &endpointMeta{ports: []discovery.EndpointPort{}, addressType: addressType}, r.controllerName)
  if len(slicesToDelete) == 1 && placeholderSliceCompare.DeepEqual(slicesToDelete[0], placeholderSlice) {
   // We are about to unnecessarily delete/recreate the placeholder, remove it now.
   slicesToDelete = slicesToDelete[:0]
  } else {
   slicesToCreate = append(slicesToCreate, placeholderSlice)
  }
  spMetrics.Set(endpointsliceutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
   Endpoints: 0,
   Slices:    1,
  })
 }

 metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
 metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))

 serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
 r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)

 // Topology hints are assigned per address type. This means it is
 // theoretically possible for endpoints of one address type to be assigned
 // hints while another endpoints of another address type are not.
 si := &topologycache.SliceInfo{
  ServiceKey:  fmt.Sprintf("%s/%s", service.Namespace, service.Name),
  AddressType: addressType,
  ToCreate:    slicesToCreate,
  ToUpdate:    slicesToUpdate,
  Unchanged:   unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
 }

 // 根据`Service`的注解 `service.kubernetes.io/topology-mode` 判断是否开启拓扑路由切分
 if r.topologyCache != nil && hintsEnabled(service.Annotations) {
  slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si)
 } else {
  if r.topologyCache != nil {
   if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
    logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
    events = append(events, &topologycache.EventBuilder{
     EventType: corev1.EventTypeWarning,
     Reason:    "TopologyAwareHintsDisabled",
     Message:   topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType),
    })
   }
   r.topologyCache.RemoveHints(si.ServiceKey, addressType)
  }
  slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
 }
 err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
 if err != nil {
  errs = append(errs, err)
 }
 for _, event := range events {
  r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message)
 }
 return utilerrors.NewAggregate(errs)

}

r.finalize

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (r *Reconciler) finalize(
 service *corev1.Service,
 slicesToCreate,
 slicesToUpdate,
 slicesToDelete []*discovery.EndpointSlice,
 triggerTime time.Time,
) error {
 // If there are slices to create and delete, change the creates to updates
 // of the slices that would otherwise be deleted.
 for i := 0; i < len(slicesToDelete); {
  if len(slicesToCreate) == 0 {
   break
  }
  sliceToDelete := slicesToDelete[i]
  slice := slicesToCreate[len(slicesToCreate)-1]
  // Only update EndpointSlices that are owned by this Service and have
  // the same AddressType. We need to avoid updating EndpointSlices that
  // are being garbage collected for an old Service with the same name.
  // The AddressType field is immutable. Since Services also consider
  // IPFamily immutable, the only case where this should matter will be
  // the migration from IP to IPv4 and IPv6 AddressTypes, where there's a
  // chance EndpointSlices with an IP AddressType would otherwise be
  // updated to IPv4 or IPv6 without this check.
  if sliceToDelete.AddressType == slice.AddressType && ownedBy(sliceToDelete, service) {
   slice.Name = sliceToDelete.Name
   slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
   slicesToUpdate = append(slicesToUpdate, slice)
   slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
  } else {
   i++
  }
 }

 // Don't create new EndpointSlices if the Service is pending deletion. This
 // is to avoid a potential race condition with the garbage collector where
 // it tries to delete EndpointSlices as this controller replaces them.
 if service.DeletionTimestamp == nil {
  for _, endpointSlice := range slicesToCreate {
   addTriggerTimeAnnotation(endpointSlice, triggerTime)
   createdSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
   if err != nil {
    // If the namespace is terminating, creates will continue to fail. Simply drop the item.
    if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
     return nil
    }
    return fmt.Errorf("failed to create EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err)
   }
   r.endpointSliceTracker.Update(createdSlice)
   metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
  }
 }

 for _, endpointSlice := range slicesToUpdate {
  addTriggerTimeAnnotation(endpointSlice, triggerTime)
  updatedSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
  if err != nil {
   return fmt.Errorf("failed to update %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
  }
  r.endpointSliceTracker.Update(updatedSlice)
  metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
 }

 for _, endpointSlice := range slicesToDelete {
  err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
  if err != nil {
   return fmt.Errorf("failed to delete %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
  }
  r.endpointSliceTracker.ExpectDeletion(endpointSlice)
  metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
 }

 topologyLabel := "Disabled"
 if r.topologyCache != nil && hintsEnabled(service.Annotations) {
  topologyLabel = "Auto"
 }

 numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
 metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))

 return nil
}

处理流程

202401021749445

参考资料

  1. https://kubernetes.io/blog/2020/09/02/scaling-kubernetes-networking-with-endpointslices/
0%