1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
| func (r *Reconciler) Reconcile(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
slicesToDelete := []*discovery.EndpointSlice{} // slices that are no longer matching any address the service has
errs := []error{} // all errors generated in the process of reconciling
slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type
// addresses that this service supports [o(1) find]
// 获取`Service` 支持的地址类型
serviceSupportedAddressesTypes := getAddressTypesForService(logger, service)
// loop through slices identifying their address type.
// slices that no longer match address type supported by services
// go to delete, other slices goes to the Reconciler machinery
// for further adjustment
// 根据服务支持的地址类型,对`EndpointSlice`进行分类
for _, existingSlice := range existingSlices {
// service no longer supports that address type, add it to deleted slices
if !serviceSupportedAddressesTypes.Has(existingSlice.AddressType) {
// 拓扑缓存不为空,按拓扑进行切分
if r.topologyCache != nil {
svcKey, err := ServiceControllerKey(existingSlice)
if err != nil {
logger.Info("Couldn't get key to remove EndpointSlice from topology cache", "existingSlice", existingSlice, "err", err)
} else {
r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
}
}
slicesToDelete = append(slicesToDelete, existingSlice)
continue
}
// add list if it is not on our map
if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
}
slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
}
// reconcile for existing.
// 根据地址类型,对`EndpointSlice`进行切分处理
for addressType := range serviceSupportedAddressesTypes {
existingSlices := slicesByAddressType[addressType]
err := r.reconcileByAddressType(logger, service, pods, existingSlices, triggerTime, addressType)
if err != nil {
errs = append(errs, err)
}
}
// delete those which are of addressType that is no longer supported
// by the service
// 删除不再支持的`EndpointSlice`
for _, sliceToDelete := range slicesToDelete {
err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting %s EndpointSlice for Service %s/%s: %w", sliceToDelete.Name, service.Namespace, service.Name, err))
} else {
r.endpointSliceTracker.ExpectDeletion(sliceToDelete)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}
return utilerrors.NewAggregate(errs)
}
// reconcileByAddressType 获取当前与服务选择器匹配的一组 Pod,
// 并将它们与给定服务的任何现有端点切片(按地址类型)中已存在的端点进行比较。
// 它创建、更新或删除端点切片,以确保所需的 pod 集由端点切片表示。
func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
errs := []error{}
slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
slicesToDelete := []*discovery.EndpointSlice{}
events := []*topologycache.EventBuilder{}
// Build data structures for existing state.
existingSlicesByPortMap := map[endpointsliceutil.PortMapKey][]*discovery.EndpointSlice{}
for _, existingSlice := range existingSlices {
if ownedBy(existingSlice, service) {
epHash := endpointsliceutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
} else {
slicesToDelete = append(slicesToDelete, existingSlice)
}
}
// Build data structures for desired state.
desiredMetaByPortMap := map[endpointsliceutil.PortMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[endpointsliceutil.PortMapKey]endpointsliceutil.EndpointSet{}
for _, pod := range pods {
if !endpointsliceutil.ShouldPodBeInEndpoints(pod, true) {
continue
}
endpointPorts := getEndpointPorts(logger, service, pod)
epHash := endpointsliceutil.NewPortMapKey(endpointPorts)
if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
}
if _, ok := desiredMetaByPortMap[epHash]; !ok {
desiredMetaByPortMap[epHash] = &endpointMeta{
addressType: addressType,
ports: endpointPorts,
}
}
node, err := r.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
// we are getting the information from the local informer,
// an error different than IsNotFound should not happen
if !errors.IsNotFound(err) {
return err
}
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
// retry later, but also update the EndpointSlice without the problematic Pod.
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
// stuck forever.
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
// Pod, since the user is explicitly indicating that the Pod address should be published.
if !service.Spec.PublishNotReadyAddresses {
logger.Info("skipping Pod for Service, Node not found", "pod", klog.KObj(pod), "service", klog.KObj(service), "node", klog.KRef("", pod.Spec.NodeName))
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
continue
}
}
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
}
}
spMetrics := metrics.NewServicePortCache()
totalAdded := 0
totalRemoved := 0
// Determine changes necessary for each group of slices by port map.
for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
numEndpoints := len(desiredEndpoints)
pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
logger, service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
totalAdded += added
totalRemoved += removed
spMetrics.Set(portMap, metrics.EfficiencyInfo{
Endpoints: numEndpoints,
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
})
slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
}
// If there are unique sets of ports that are no longer desired, mark
// the corresponding endpoint slices for deletion.
for portMap, existingSlices := range existingSlicesByPortMap {
if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
slicesToDelete = append(slicesToDelete, existingSlices...)
}
}
// When no endpoint slices would usually exist, we need to add a placeholder.
if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
// Check for existing placeholder slice outside of the core control flow
placeholderSlice := newEndpointSlice(logger, service, &endpointMeta{ports: []discovery.EndpointPort{}, addressType: addressType}, r.controllerName)
if len(slicesToDelete) == 1 && placeholderSliceCompare.DeepEqual(slicesToDelete[0], placeholderSlice) {
// We are about to unnecessarily delete/recreate the placeholder, remove it now.
slicesToDelete = slicesToDelete[:0]
} else {
slicesToCreate = append(slicesToCreate, placeholderSlice)
}
spMetrics.Set(endpointsliceutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}
metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
// Topology hints are assigned per address type. This means it is
// theoretically possible for endpoints of one address type to be assigned
// hints while another endpoints of another address type are not.
si := &topologycache.SliceInfo{
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
AddressType: addressType,
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}
// 根据`Service`的注解 `service.kubernetes.io/topology-mode` 判断是否开启拓扑路由切分
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si)
} else {
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &topologycache.EventBuilder{
EventType: corev1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType),
})
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
for _, event := range events {
r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message)
}
return utilerrors.NewAggregate(errs)
}
|