深入Ingress - Ingress Controller源码解析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# ss -ntlp
State          Recv-Q          Send-Q                   Local Address:Port                    Peer Address:Port         Process                                                            
LISTEN         0               4096                         127.0.0.1:10245                        0.0.0.0:*             users:(("nginx-ingress-c",pid=2671,fd=8))                         
LISTEN         0               511                          127.0.0.1:10246                        0.0.0.0:*             users:(("nginx",pid=2783,fd=17),("nginx",pid=2738,fd=17))         
LISTEN         0               511                          127.0.0.1:10247                        0.0.0.0:*             users:(("nginx",pid=2783,fd=18),("nginx",pid=2738,fd=18))         
LISTEN         0               4096                           0.0.0.0:80                           0.0.0.0:*             users:(("nginx",pid=2783,fd=11),("nginx",pid=2738,fd=11))         
LISTEN         0               4096                           0.0.0.0:8181                         0.0.0.0:*             users:(("nginx",pid=2783,fd=15),("nginx",pid=2738,fd=15))         
LISTEN         0               4096                           0.0.0.0:443                          0.0.0.0:*             users:(("nginx",pid=2783,fd=13),("nginx",pid=2738,fd=13))         
LISTEN         0               4096                                 *:10254                              *:*             users:(("nginx-ingress-c",pid=2671,fd=33))                        
LISTEN         0               4096                              [::]:80                              [::]:*             users:(("nginx",pid=2783,fd=12),("nginx",pid=2738,fd=12))         
LISTEN         0               4096                              [::]:8181                            [::]:*             users:(("nginx",pid=2783,fd=16),("nginx",pid=2738,fd=16))         
LISTEN         0               4096                                 *:8443                               *:*             users:(("nginx-ingress-c",pid=2671,fd=34))                        
LISTEN         0               4096                              [::]:443                             [::]:*             users:(("nginx",pid=2783,fd=14),("nginx",pid=2738,fd=14)) 

## nginx-ingress-controller进程
# ps aux |grep -w 2671
_rpc        2671  0.4  0.2 743856 42000 ?        Ssl  Sep15   5:06 /nginx-ingress-controller --publish-service=ingress-nginx/ingress-nginx-controller --election-id=ingress-controller-leader --controller-class=k8s.io/ingress-nginx --ingress-class=nginx --configmap=ingress-nginx/ingress-nginx-controller --validating-webhook=:8443 --validating-webhook-certificate=/usr/local/certificates/cert --validating-webhook-key=/usr/local/certificates/key
root      284760  0.0  0.0   3100   812 pts/1    S+   04:15   0:00 grep -w 2671
# 

## Nginx Master 进程
# ps aux |grep -w 2738
_rpc        2738  0.0  0.2 145128 35448 ?        S    Sep15   0:00 nginx: master process /usr/local/nginx/sbin/nginx -c /etc/nginx/nginx.conf
root      284662  0.0  0.0   3100   844 pts/1    S+   04:15   0:00 grep -w 2738
# 

## Nginx worker 进程
# ps aux |grep -w 2783
_rpc        2783  0.0  0.2 157240 41060 ?        Sl   Sep15   0:31 nginx: worker process
root      285147  0.0  0.0   3100   832 pts/1    S+   04:17   0:00 grep -w 2783
# 

从上面可以看出整个Ingress Nginx Controller Pod 包含两部分 nginx-ingress-controller 和 Nginx

https://raw.githubusercontent.com/kbsonlong/notes_statics/main/diagrams/ingress-nginx-controller.png

nginx-ingress-controller 如其名 控制管理(control)ingress资源的应用。ingress controller会监听集群中ingress资源事件,然后根据对应的事件处理逻辑更新Nginx的配置。

1 Controller 控制器

编译 Controller

build/build.sh

1
2
3
4
5
6
${GO_BUILD_CMD} \
  -trimpath -ldflags="-buildid= -w -s \
    -X ${PKG}/version.RELEASE=${TAG} \
    -X ${PKG}/version.COMMIT=${COMMIT_SHA} \
    -X ${PKG}/version.REPO=${REPO_INFO}" \
  -o "${TARGETS_DIR}/nginx-ingress-controller" "${PKG}/cmd/nginx"

从构建命令可以知道 nginx-ingress-controller 的入口在 cmd/nginx

cmd/nginx/main.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func main() {

	klog.InitFlags(nil)

	rand.Seed(time.Now().UnixNano())

	fmt.Println(version.String())

    // 命令行传入配置信息以及一些默认配置项,并实例化controller.Configuration
	showVersion, conf, err := ingressflags.ParseFlags()
	if showVersion {
		os.Exit(0)
	}

	if err != nil {
		klog.Fatal(err)
	}

	err = file.CreateRequiredDirectories()
	if err != nil {
		klog.Fatal(err)
	}

	kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
	if err != nil {
		handleFatalInitError(err)
	}

	if len(conf.DefaultService) > 0 {
		err := checkService(conf.DefaultService, kubeClient)
		if err != nil {
			klog.Fatal(err)
		}

		klog.InfoS("Valid default backend", "service", conf.DefaultService)
	}

	if len(conf.PublishService) > 0 {
		err := checkService(conf.PublishService, kubeClient)
		if err != nil {
			klog.Fatal(err)
		}
	}

	if conf.Namespace != "" {
		_, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
		if err != nil {
			klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
		}
	}

	conf.FakeCertificate = ssl.GetFakeSSLCert()
	klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName)

	// 检查版本信息
	if !k8s.NetworkingIngressAvailable(kubeClient) {
		klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher")
	}

	_, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		if !errors.IsNotFound(err) {
			if errors.IsForbidden(err) {
				klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
				conf.IngressClassConfiguration.IgnoreIngressClass = true
			}
		}
	}
	conf.Client = kubeClient

	err = k8s.GetIngressPod(kubeClient)
	if err != nil {
		klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
	}
	// 注册Prometheus
	reg := prometheus.NewRegistry()

	reg.MustRegister(collectors.NewGoCollector())
	reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
		PidFn:        func() (int, error) { return os.Getpid(), nil },
		ReportErrors: true,
	}))

	// metric指标数据采集,mc是一个用于收集指标的collector实例
	mc := metric.NewDummyCollector()
	if conf.EnableMetrics {
		mc, err = metric.NewCollector(conf.MetricsPerHost, conf.ReportStatusClasses, reg, conf.IngressClassConfiguration.Controller, *conf.MetricsBuckets)
		if err != nil {
			klog.Fatalf("Error creating prometheus collector:  %v", err)
		}
	}
	// Pass the ValidationWebhook status to determine if we need to start the collector
	// for the admissionWebhook
	mc.Start(conf.ValidationWebhook)

	// 启用性能调试端口
	if conf.EnableProfiling {
		go metrics.RegisterProfiler("127.0.0.1", nginx.ProfilerPort)
	}

	// 实例化 ngx controller 控制器
	ngx := controller.NewNGINXController(conf, mc)

	mux := http.NewServeMux()
	metrics.RegisterHealthz(nginx.HealthPath, mux, ngx)
	metrics.RegisterMetrics(reg, mux)

	_, errExists := os.Stat("/chroot")
	if errExists == nil {
		conf.IsChroot = true
		go logger(conf.InternalLoggerAddress)

	}

	// 启动健康检查和 metrics API 接口
	go metrics.StartHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)

	// 启动 nginx master 进程
	go ngx.Start()

	process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
		os.Exit(code)
	})
}

controller 启动进程中传入配置信息以及一些默认配置项, 并根据配置项启用 Metrics、HealthCheck、Profile、Logging 等功能,本文主要介绍 Controller 控制器主流程 ngx := controller.NewNGINXController(conf, mc)

2 创建NGINXController 控制器

2.1 NGINXController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
type NGINXController struct {
	// 配置信息
	cfg *Configuration

	// 事件通知器
	recorder record.EventRecorder

	// 同步队列
	syncQueue *task.Queue

	// 同步状态 -- [kubernetes-ingress状态上报机制](https://blog.dianduidian.com/post/kubernetes-ingress%E7%8A%B6%E6%80%81%E4%B8%8A%E6%8A%A5%E6%9C%BA%E5%88%B6/)
	syncStatus status.Syncer

	// 同步限流器
	syncRateLimiter flowcontrol.RateLimiter

	// stopLock is used to enforce that only a single call to Stop send at
	// a given time. We allow stopping through an HTTP endpoint and
	// allowing concurrent stoppers leads to stack traces.
	stopLock *sync.Mutex

	stopCh chan struct{}

	// 更新环状channel
	updateCh *channels.RingChannel

	// 接受nginx 错误信息channel
	// ngxErrCh is used to detect errors with the NGINX processes
	ngxErrCh chan error

	// 当前配置文件,用来对比配置是否有更新
	// runningConfig contains the running configuration in the Backend
	runningConfig *ingress.Configuration

	// nginx 配置模板渲染器
	t ngx_template.Writer

	// nameserver 列表
	// 读取/etc/resolv.conf 中的ns地址,用来生成nginx.conf使用,eg:resolver 127.0.0.1 [::1]:5353;
	resolver []net.IP

	// 是否启用ipv6
	isIPV6Enabled bool

	// 是否关闭
	isShuttingDown bool

	// TCP代理,启用SSLPassthrough时使用
	Proxy *tcpproxy.TCPProxy

	// 本地缓存
	store store.Storer

	// metrics 收集器
	metricCollector    metric.Collector
	admissionCollector metric.Collector

	// webhook
	validationWebhookServer *http.Server

	// 操作Nginx的接口,用来启动Nginx进程和测试配置文件
	command NginxExecTester
}

2.2 Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

type Configuration struct {
	APIServerHost string
	RootCAFile    string

	KubeConfigFile string

	Client clientset.Interface

	ResyncPeriod time.Duration

	ConfigMapName  string
	DefaultService string

	Namespace string

	WatchNamespaceSelector labels.Selector

	// +optional
	TCPConfigMapName string
	// +optional
	UDPConfigMapName string

	DefaultSSLCertificate string

	// +optional
	PublishService       string
	PublishStatusAddress string

	UpdateStatus           bool
	UseNodeInternalIP      bool
	ElectionID             string
	UpdateStatusOnShutdown bool

	HealthCheckHost string
	ListenPorts     *ngx_config.ListenPorts

	DisableServiceExternalName bool

	EnableSSLPassthrough bool

	EnableProfiling bool

	EnableMetrics       bool
	MetricsPerHost      bool
	MetricsBuckets      *collectors.HistogramBuckets
	ReportStatusClasses bool

	FakeCertificate *ingress.SSLCert

	SyncRateLimit float32

	DisableCatchAll bool

	IngressClassConfiguration *ingressclass.IngressClassConfiguration

	ValidationWebhook         string
	ValidationWebhookCertPath string
	ValidationWebhookKeyPath  string
	DisableFullValidationTest bool

	GlobalExternalAuth  *ngx_config.GlobalExternalAuth
	MaxmindEditionFiles *[]string

	MonitorMaxBatchSize int

	PostShutdownGracePeriod int
	ShutdownGracePeriod     int

	InternalLoggerAddress string
	IsChroot              bool
	DeepInspector         bool

	DynamicConfigurationRetries int
}

这些配置需要控制器启动时通过命令行传入

2.3 实例化 NewNGINXController 控制器

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
	// kubectl describe 命令看到的事件日志就是这个库产生的
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
		Interface: config.Client.CoreV1().Events(config.Namespace),
	})

	// 读取pod里面的/etc/resolv.conf
	h, err := dns.GetSystemNameServers()
	if err != nil {
		klog.Warningf("Error reading system nameservers: %v", err)
	}

	// 实例化 NGINXController
	n := &NGINXController{
		isIPV6Enabled: ing_net.IsIPv6Enabled(),

		resolver:        h,
		cfg:             config,
		syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

		recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
			Component: "nginx-ingress-controller",
		}),

		stopCh:   make(chan struct{}),
		updateCh: channels.NewRingChannel(1024),

		ngxErrCh: make(chan error),

		stopLock: &sync.Mutex{},

		// 当前运行的配置文件,刚启动是为空
		runningConfig: new(ingress.Configuration),

		Proxy: &tcpproxy.TCPProxy{},

		metricCollector: mc,

		// 一个可以调用 nginx -c nginx.conf 命令的对象
		command: NewNginxCommand(),
	}

	// 启动 webhook 服务
	if n.cfg.ValidationWebhook != "" {
		n.validationWebhookServer = &http.Server{
			Addr: config.ValidationWebhook,
			//G112 (CWE-400): Potential Slowloris Attack
			ReadHeaderTimeout: 10 * time.Second,
			Handler:           adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
			TLSConfig:         ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
			// disable http/2
			// https://github.com/kubernetes/kubernetes/issues/80313
			// https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
			TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
		}
	}

	// 实例化 store (本地缓存)
	// store 对象,很重要,数据缓存与k8s交互的接口都在这个对象
	n.store = store.New(
		config.Namespace,
		config.WatchNamespaceSelector,
		config.ConfigMapName,
		config.TCPConfigMapName,
		config.UDPConfigMapName,
		config.DefaultSSLCertificate,
		config.ResyncPeriod,
		config.Client,
		n.updateCh,
		config.DisableCatchAll,
		config.DeepInspector,
		config.IngressClassConfiguration)

	// 创建工作队列,这里把 syncIngress 注册到这个工作队列
	n.syncQueue = task.NewTaskQueue(n.syncIngress)

	if config.UpdateStatus {
		n.syncStatus = status.NewStatusSyncer(status.Config{
			Client:                 config.Client,
			PublishService:         config.PublishService,
			PublishStatusAddress:   config.PublishStatusAddress,
			IngressLister:          n.store,
			UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
			UseNodeInternalIP:      config.UseNodeInternalIP,
		})
	} else {
		klog.Warning("Update of Ingress status is disabled (flag --update-status)")
	}

	// 监听模板文件更新
	onTemplateChange := func() {
		// 渲染模板
		template, err := ngx_template.NewTemplate(nginx.TemplatePath)
		if err != nil {
			// this error is different from the rest because it must be clear why nginx is not working
			klog.ErrorS(err, "Error loading new template")
			return
		}

		// 若模板渲染正确,则更新到 nginxcontroller 对象中,并往同步队列发送一个 template-change 事件
		n.t = template
		klog.InfoS("New NGINX configuration template loaded")
		n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
	}

	// 首次启动加载配置模板文件
	ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
	if err != nil {
		klog.Fatalf("Invalid NGINX configuration template: %v", err)
	}

	n.t = ngxTpl

	// 监听模板文件变化
	// 监听 /etc/nginx/template/nginx.tmpl 模板文件是否有变化,有变化则调用 onTemplateChange

	_, err = file.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
	if err != nil {
		klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
	}

	filesToWatch := []string{}
	err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}

		if info.IsDir() {
			return nil
		}

		filesToWatch = append(filesToWatch, path)
		return nil
	})

	if err != nil {
		klog.Fatalf("Error creating file watchers: %v", err)
	}

	// 配置文件有变化则往同步队列发送一个 file-change 事件
	for _, f := range filesToWatch {
		_, err = file.NewFileWatcher(f, func() {
			klog.InfoS("File changed detected. Reloading NGINX", "path", f)
			n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
		})
		if err != nil {
			klog.Fatalf("Error creating file watcher for %v: %v", f, err)
		}
	}

	return n
}

2.4 创建store(本地缓存)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
func New(
	namespace string,
	namespaceSelector labels.Selector,
	configmap, tcp, udp, defaultSSLCertificate string,
	resyncPeriod time.Duration,
	client clientset.Interface,
	updateCh *channels.RingChannel,
	disableCatchAll bool,
	deepInspector bool,
	icConfig *ingressclass.IngressClassConfiguration) Storer {

	// store的具体实现是k8sStore
	store := &k8sStore{
		informers:             &Informer{},
		listers:               &Lister{},
		sslStore:              NewSSLCertTracker(),
		updateCh:              updateCh,
		backendConfig:         ngx_config.NewDefault(),
		syncSecretMu:          &sync.Mutex{},
		backendConfigMu:       &sync.RWMutex{},
		secretIngressMap:      NewObjectRefMap(),
		defaultSSLCertificate: defaultSSLCertificate,
	}

	//  kubectl describe 命令看到的事件日志就是这个库产生的
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
		Interface: client.CoreV1().Events(namespace),
	})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
		Component: "nginx-ingress-controller",
	})

	// 用于提取注释的对象
	// 集中在internal/ingress/annotations目录
	// k8sStore fulfills resolver.Resolver interface
	store.annotations = annotations.NewAnnotationExtractor(store)

	// 将数据再缓存一份用于本地查询,缓存的对象正如其名IngressWithAnnotation
	// 会缓存internal/ingress/types.go:Ingress
	store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

	// As we currently do not filter out kubernetes objects we list, we can
	// retrieve a huge amount of data from the API server.
	// In a cluster using HELM < v3 configmaps are used to store binary data.
	// If you happen to have a lot of HELM releases in the cluster it will make
	// the memory consumption of nginx-ingress-controller explode.
	// In order to avoid that we filter out labels OWNER=TILLER.
	labelsTweakListOptionsFunc := func(options *metav1.ListOptions) {
		if len(options.LabelSelector) > 0 {
			options.LabelSelector += ",OWNER!=TILLER"
		} else {
			options.LabelSelector = "OWNER!=TILLER"
		}
	}

	// As of HELM >= v3 helm releases are stored using Secrets instead of ConfigMaps.
	// In order to avoid listing those secrets we discard type "helm.sh/release.v1"
	secretsTweakListOptionsFunc := func(options *metav1.ListOptions) {
		helmAntiSelector := fields.OneTermNotEqualSelector("type", "helm.sh/release.v1")
		baseSelector, err := fields.ParseSelector(options.FieldSelector)

		if err != nil {
			options.FieldSelector = helmAntiSelector.String()
		} else {
			options.FieldSelector = fields.AndSelectors(baseSelector, helmAntiSelector).String()
		}
	}

	// 创建informer工厂函数
	// create informers factory, enable and assign required informers
	infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
	)

	// infFactoryConfigmaps, infFactorySecrets
	// create informers factory for configmaps
	infFactoryConfigmaps := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
		informers.WithTweakListOptions(labelsTweakListOptionsFunc),
	)

	// create informers factory for secrets
	infFactorySecrets := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
		informers.WithNamespace(namespace),
		informers.WithTweakListOptions(secretsTweakListOptionsFunc),
	)

	store.informers.Ingress = infFactory.Networking().V1().Ingresses().Informer()
	store.listers.Ingress.Store = store.informers.Ingress.GetStore()

	if !icConfig.IgnoreIngressClass {
		store.informers.IngressClass = infFactory.Networking().V1().IngressClasses().Informer()
		store.listers.IngressClass.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
	}

	store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
	store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

	store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
	store.listers.Secret.Store = store.informers.Secret.GetStore()

	store.informers.ConfigMap = infFactoryConfigmaps.Core().V1().ConfigMaps().Informer()
	store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

	store.informers.Service = infFactory.Core().V1().Services().Informer()
	store.listers.Service.Store = store.informers.Service.GetStore()
	// 上面都是为了创建对应的informer对象,以及informer的缓存对象listers,用来查询最新数据

	// 默认监听整个集群,返回true
	// avoid caching namespaces at cluster scope when watching single namespace
	if namespaceSelector != nil && !namespaceSelector.Empty() {
		// cache informers factory for namespaces
		infFactoryNamespaces := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
			informers.WithTweakListOptions(labelsTweakListOptionsFunc),
		)

		store.informers.Namespace = infFactoryNamespaces.Core().V1().Namespaces().Informer()
		store.listers.Namespace.Store = store.informers.Namespace.GetStore()
	}

	watchedNamespace := func(namespace string) bool {
		if namespaceSelector == nil || namespaceSelector.Empty() {
			return true
		}

		item, ok, err := store.listers.Namespace.GetByKey(namespace)
		if !ok {
			klog.Errorf("Namespace %s not existed: %v.", namespace, err)
			return false
		}
		ns, ok := item.(*corev1.Namespace)
		if !ok {
			return false
		}

		return namespaceSelector.Matches(labels.Set(ns.Labels))
	}

	ingDeleteHandler := func(obj interface{}) {
		ing, ok := toIngress(obj)
		if !ok {
			// If we reached here it means the ingress was deleted but its final state is unrecorded.
			tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
			if !ok {
				klog.ErrorS(nil, "Error obtaining object from tombstone", "key", obj)
				return
			}
			ing, ok = tombstone.Obj.(*networkingv1.Ingress)
			if !ok {
				klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
				return
			}
		}

		if !watchedNamespace(ing.Namespace) {
			return
		}

		_, err := store.GetIngressClass(ing, icConfig)
		if err != nil {
			klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
			return
		}

		if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
			klog.InfoS("Ignoring delete for catch-all because of --disable-catch-all", "ingress", klog.KObj(ing))
			return
		}

		store.listers.IngressWithAnnotation.Delete(ing)

		key := k8s.MetaNamespaceKey(ing)
		store.secretIngressMap.Delete(key)

		updateCh.In() <- Event{
			Type: DeleteEvent,
			Obj:  obj,
		}
	}

	ingEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ing, _ := toIngress(obj)

			if !watchedNamespace(ing.Namespace) {
				return
			}

			ic, err := store.GetIngressClass(ing, icConfig)
			if err != nil {
				klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
				return
			}

			klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)

			if deepInspector {
				if err := inspector.DeepInspect(ing); err != nil {
					klog.ErrorS(err, "received invalid ingress", "ingress", klog.KObj(ing))
					return
				}
			}
			if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
				klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
				return
			}

			recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")

			store.syncIngress(ing)
			store.updateSecretIngressMap(ing)
			store.syncSecrets(ing)

			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: ingDeleteHandler,
		UpdateFunc: func(old, cur interface{}) {
			oldIng, _ := toIngress(old)
			curIng, _ := toIngress(cur)

			if !watchedNamespace(oldIng.Namespace) {
				return
			}

			var errOld, errCur error
			var classCur string
			if !icConfig.IgnoreIngressClass {
				_, errOld = store.GetIngressClass(oldIng, icConfig)
				classCur, errCur = store.GetIngressClass(curIng, icConfig)
			}
			if errOld != nil && errCur == nil {
				if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
					return
				}

				klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else if errOld == nil && errCur != nil {
				klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
				ingDeleteHandler(old)
				return
			} else if errCur == nil && !reflect.DeepEqual(old, cur) {
				if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
					klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
					ingDeleteHandler(old)
					return
				}

				recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
			} else {
				klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
				return
			}

			if deepInspector {
				if err := inspector.DeepInspect(curIng); err != nil {
					klog.ErrorS(err, "received invalid ingress", "ingress", klog.KObj(curIng))
					return
				}
			}

			store.syncIngress(curIng)
			store.updateSecretIngressMap(curIng)
			store.syncSecrets(curIng)

			updateCh.In() <- Event{
				Type: UpdateEvent,
				Obj:  cur,
			}
		},
	}

	ingressClassEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			ingressclass := obj.(*networkingv1.IngressClass)
			foundClassByName := false
			if icConfig.IngressClassByName && ingressclass.Name == icConfig.AnnotationValue {
				klog.InfoS("adding ingressclass as ingress-class-by-name is configured", "ingressclass", klog.KObj(ingressclass))
				foundClassByName = true
			}
			if !foundClassByName && ingressclass.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
				return
			}
			err := store.listers.IngressClass.Add(ingressclass)
			if err != nil {
				klog.InfoS("error adding ingressclass to store", "ingressclass", klog.KObj(ingressclass), "error", err)
				return
			}

			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: func(obj interface{}) {
			ingressclass := obj.(*networkingv1.IngressClass)
			if ingressclass.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(ingressclass))
				return
			}
			err := store.listers.IngressClass.Delete(ingressclass)
			if err != nil {
				klog.InfoS("error removing ingressclass from store", "ingressclass", klog.KObj(ingressclass), "error", err)
				return
			}
			updateCh.In() <- Event{
				Type: DeleteEvent,
				Obj:  obj,
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oic := old.(*networkingv1.IngressClass)
			cic := cur.(*networkingv1.IngressClass)
			if cic.Spec.Controller != icConfig.Controller {
				klog.InfoS("ignoring ingressclass as the spec.controller is not the same of this ingress", "ingressclass", klog.KObj(cic))
				return
			}
			// TODO: In a future we might be interested in parse parameters and use as
			// current IngressClass for this case, crossing with configmap
			if !reflect.DeepEqual(cic.Spec.Parameters, oic.Spec.Parameters) {
				err := store.listers.IngressClass.Update(cic)
				if err != nil {
					klog.InfoS("error updating ingressclass in store", "ingressclass", klog.KObj(cic), "error", err)
					return
				}
				updateCh.In() <- Event{
					Type: UpdateEvent,
					Obj:  cur,
				}
			}
		},
	}

	secrEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			sec := obj.(*corev1.Secret)
			key := k8s.MetaNamespaceKey(sec)

			if store.defaultSSLCertificate == key {
				store.syncSecret(store.defaultSSLCertificate)
			}

			// find references in ingresses and update local ssl certs
			if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
				klog.InfoS("Secret was added and it is used in ingress annotations. Parsing", "secret", key)
				for _, ingKey := range ings {
					ing, err := store.getIngress(ingKey)
					if err != nil {
						klog.Errorf("could not find Ingress %v in local store", ingKey)
						continue
					}
					store.syncIngress(ing)
					store.syncSecrets(ing)
				}
				updateCh.In() <- Event{
					Type: CreateEvent,
					Obj:  obj,
				}
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			if !reflect.DeepEqual(old, cur) {
				sec := cur.(*corev1.Secret)
				key := k8s.MetaNamespaceKey(sec)

				if !watchedNamespace(sec.Namespace) {
					return
				}

				if store.defaultSSLCertificate == key {
					store.syncSecret(store.defaultSSLCertificate)
				}

				// find references in ingresses and update local ssl certs
				if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
					klog.InfoS("secret was updated and it is used in ingress annotations. Parsing", "secret", key)
					for _, ingKey := range ings {
						ing, err := store.getIngress(ingKey)
						if err != nil {
							klog.ErrorS(err, "could not find Ingress in local store", "ingress", ingKey)
							continue
						}
						store.syncSecrets(ing)
						store.syncIngress(ing)
					}
					updateCh.In() <- Event{
						Type: UpdateEvent,
						Obj:  cur,
					}
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			sec, ok := obj.(*corev1.Secret)
			if !ok {
				// If we reached here it means the secret was deleted but its final state is unrecorded.
				tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
				if !ok {
					return
				}

				sec, ok = tombstone.Obj.(*corev1.Secret)
				if !ok {
					return
				}
			}

			if !watchedNamespace(sec.Namespace) {
				return
			}

			store.sslStore.Delete(k8s.MetaNamespaceKey(sec))

			key := k8s.MetaNamespaceKey(sec)

			// find references in ingresses
			if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
				klog.InfoS("secret was deleted and it is used in ingress annotations. Parsing", "secret", key)
				for _, ingKey := range ings {
					ing, err := store.getIngress(ingKey)
					if err != nil {
						klog.Errorf("could not find Ingress %v in local store", ingKey)
						continue
					}
					store.syncIngress(ing)
				}

				updateCh.In() <- Event{
					Type: DeleteEvent,
					Obj:  obj,
				}
			}
		},
	}

	epEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		},
		DeleteFunc: func(obj interface{}) {
			updateCh.In() <- Event{
				Type: DeleteEvent,
				Obj:  obj,
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oep := old.(*corev1.Endpoints)
			cep := cur.(*corev1.Endpoints)
			if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
				updateCh.In() <- Event{
					Type: UpdateEvent,
					Obj:  cur,
				}
			}
		},
	}

	// TODO: add e2e test to verify that changes to one or more configmap trigger an update
	changeTriggerUpdate := func(name string) bool {
		return name == configmap || name == tcp || name == udp
	}

	handleCfgMapEvent := func(key string, cfgMap *corev1.ConfigMap, eventName string) {
		// updates to configuration configmaps can trigger an update
		triggerUpdate := false
		if changeTriggerUpdate(key) {
			triggerUpdate = true
			recorder.Eventf(cfgMap, corev1.EventTypeNormal, eventName, fmt.Sprintf("ConfigMap %v", key))
			if key == configmap {
				store.setConfig(cfgMap)
			}
		}

		ings := store.listers.IngressWithAnnotation.List()
		for _, ingKey := range ings {
			key := k8s.MetaNamespaceKey(ingKey)
			ing, err := store.getIngress(key)
			if err != nil {
				klog.Errorf("could not find Ingress %v in local store: %v", key, err)
				continue
			}

			if parser.AnnotationsReferencesConfigmap(ing) {
				store.syncIngress(ing)
				continue
			}

			if triggerUpdate {
				store.syncIngress(ing)
			}
		}

		if triggerUpdate {
			updateCh.In() <- Event{
				Type: ConfigurationEvent,
				Obj:  cfgMap,
			}
		}
	}

	cmEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			cfgMap := obj.(*corev1.ConfigMap)
			key := k8s.MetaNamespaceKey(cfgMap)
			handleCfgMapEvent(key, cfgMap, "CREATE")
		},
		UpdateFunc: func(old, cur interface{}) {
			if reflect.DeepEqual(old, cur) {
				return
			}

			cfgMap := cur.(*corev1.ConfigMap)
			key := k8s.MetaNamespaceKey(cfgMap)
			handleCfgMapEvent(key, cfgMap, "UPDATE")
		},
	}

	serviceHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			svc := obj.(*corev1.Service)
			if svc.Spec.Type == corev1.ServiceTypeExternalName {
				updateCh.In() <- Event{
					Type: CreateEvent,
					Obj:  obj,
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			svc := obj.(*corev1.Service)
			if svc.Spec.Type == corev1.ServiceTypeExternalName {
				updateCh.In() <- Event{
					Type: DeleteEvent,
					Obj:  obj,
				}
			}
		},
		UpdateFunc: func(old, cur interface{}) {
			oldSvc := old.(*corev1.Service)
			curSvc := cur.(*corev1.Service)

			if reflect.DeepEqual(oldSvc, curSvc) {
				return
			}

			updateCh.In() <- Event{
				Type: UpdateEvent,
				Obj:  cur,
			}
		},
	}
	// 以上都是各种事件监听函数, 分别设置对应事件的响应函数,一共有三个需要响应AddFunc,DeleteFunc,UpdateFunc.
	// 而成功之后的逻辑一般分为三步
	// 1. 业务逻辑
	// 2. 同步数据到本地(如syncIngress, syncSecrets等)
	// 3. 将数据传递给传递给updateCh, 即交由controller的主循环。

	store.informers.Ingress.AddEventHandler(ingEventHandler)
	if !icConfig.IgnoreIngressClass {
		store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
	}
	store.informers.Endpoint.AddEventHandler(epEventHandler)
	store.informers.Secret.AddEventHandler(secrEventHandler)
	store.informers.ConfigMap.AddEventHandler(cmEventHandler)
	store.informers.Service.AddEventHandler(serviceHandler)

	// 在提供的helm charts里面会创建一个默认的configmap, 在这里就马上读取
	// do not wait for informers to read the configmap configuration
	ns, name, _ := k8s.ParseNameNS(configmap)
	cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		klog.Warningf("Unexpected error reading configuration configmap: %v", err)
	}

	store.setConfig(cm)
	return store
}

2.4.1 监听范围和资源

file: internal/ingress/controller/store/store.go

  • Informer type
1
2
3
4
5
6
7
8
9
type Informer struct {
	Ingress      cache.SharedIndexInformer
	IngressClass cache.SharedIndexInformer
	Endpoint     cache.SharedIndexInformer
	Service      cache.SharedIndexInformer
	Secret       cache.SharedIndexInformer
	ConfigMap    cache.SharedIndexInformer
	Namespace    cache.SharedIndexInformer
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    // 默认监听整个 `Kubernetes` 集群, 如果设置了 --watch-namespace 或者 --watch-namespace-selector 配置项,Controller只监听相关名称空间的相关资源事件
	// avoid caching namespaces at cluster scope when watching single namespace
	if namespaceSelector != nil && !namespaceSelector.Empty() {
		// cache informers factory for namespaces
		infFactoryNamespaces := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
			informers.WithTweakListOptions(labelsTweakListOptionsFunc),
		)

		store.informers.Namespace = infFactoryNamespaces.Core().V1().Namespaces().Informer()
		store.listers.Namespace.Store = store.informers.Namespace.GetStore()
	}

	watchedNamespace := func(namespace string) bool {
		if namespaceSelector == nil || namespaceSelector.Empty() {
			return true
		}

		item, ok, err := store.listers.Namespace.GetByKey(namespace)
		if !ok {
			klog.Errorf("Namespace %s not existed: %v.", namespace, err)
			return false
		}
		ns, ok := item.(*corev1.Namespace)
		if !ok {
			return false
		}

		return namespaceSelector.Matches(labels.Set(ns.Labels))
	}
  • Handler

https://raw.githubusercontent.com/kbsonlong/notes_statics/main/images20220918155511.png

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ingDeleteHandler := func (obj interface{})  {
   ...
   ...
   updateCh.In() <- Event{
			Type: DeleteEvent,
			Obj:  obj,
		}
}

ingEventHandler := cache.ResourceEventHandlerFuncs {
	AddFunc: func(obj interface{}) {
		...
		...
		updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
	},
	DeleteFunc: ingDeleteHandler,
	UpdateFunc: func(old, cur interface{}) {
		...
		...
		updateCh.In() <- Event{
					Type: UpdateEvent,
					Obj:  cur,
				}
	}
}

从上面函数中我们可以获取几个重要信息:

  • 控制器监听了Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service、Namespace等资源。
  • 调用syncIngress函数处理监听到资源
  • 监听到资源变化后生成Event并通过updateCh通道发送出去。

在controller的创建过程中controller只是创建了自身并监控本地文件的更新,并没有涉及与k8s集群交互的部分,而是通过将交互接口全部抽象到store对象,ingress, configmap, service等资源的事件响应也全部放在store里面,store负责判断对象是否应该传递给NGINXController的主循环, 然后将数据缓存到本地, 以便后面让NGINXController对象查询,NGINXController对象主要负责怎么将资源的变更同步给nginx,比如怎么渲染模板,是否动态更新数据给nginx。

2.5 syncIngress

注意有两个 syncIngress 方法,分别是 NGINXControllerstore

2.5.1 NGINXController 入口

1
n.syncQueue = task.NewTaskQueue(n.syncIngress)

2.5.2 store 入口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
		AddFunc: func(obj interface{}) {
			...
			...
			store.syncIngress(ing) //
			...
			updateCh.In() <- Event{
				Type: CreateEvent,
				Obj:  obj,
			}
		}

3 Nginx 反向代理

3.1 nginx.tmpl

文件内容太大,详细请查看源文件

注解内容

参考资料

0%