02-Kube-Controller-Manager源码分析(主流程)

本文基于1.29.0版本

本文主要分析 kubernetes/cmd/kube-controller-manager 部分,该部分主要涉及各种类型的controller的参数解析,及初始化,例如 deployment controller 和statefulset controller。

代码结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
tree  cmd/kube-controller-manager 
cmd/kube-controller-manager
├── OWNERS
├── app
│   ├── apps.go # 包含:startDeploymentController、startReplicaSetController、startStatefulSetController、startDaemonSetController
│   ├── autoscaling.go # startHPAController
│   ├── batch.go
│   ├── bootstrap.go
│   ├── certificates.go
│   ├── certificates_test.go
│   ├── cloudproviders.go
│   ├── config  # config: controller manager执行的上下文
│   │   └── config.go
│   ├── controllermanager.go
│   ├── controllermanager_test.go
│   ├── core.go
│   ├── core_test.go
│   ├── discovery.go
│   ├── flags_providerless.go
│   ├── flags_providers.go
│   ├── import_known_versions.go
│   ├── networking.go
│   ├── options   # 包含不同controller的option参数
│   │   ├── attachdetachcontroller.go
│   │   ├── cronjobcontroller.go
│   │   ├── csrsigningcontroller.go
│   │   ├── daemonsetcontroller.go
│   │   ├── deploymentcontroller.go
│   │   ├── deprecatedcontroller.go
│   │   ├── endpointcontroller.go
│   │   ├── endpointslicecontroller.go
│   │   ├── endpointslicemirroringcontroller.go
│   │   ├── ephemeralcontroller.go
│   │   ├── garbagecollectorcontroller.go
│   │   ├── hpacontroller.go
│   │   ├── jobcontroller.go
│   │   ├── legacyserviceaccounttokencleaner.go
│   │   ├── namespacecontroller.go
│   │   ├── nodeipamcontroller.go
│   │   ├── nodelifecyclecontroller.go
│   │   ├── options.go
│   │   ├── options_test.go
│   │   ├── persistentvolumebindercontroller.go
│   │   ├── podgccontroller.go
│   │   ├── replicasetcontroller.go
│   │   ├── replicationcontroller.go
│   │   ├── resourcequotacontroller.go
│   │   ├── serviceaccountcontroller.go
│   │   ├── statefulsetcontroller.go
│   │   ├── ttlafterfinishedcontroller.go
│   │   └── validatingadmissionpolicycontroller.go
│   ├── plugins.go
│   ├── plugins_providerless.go
│   ├── plugins_providers.go
│   ├── policy.go
│   ├── rbac.go
│   ├── testing
│   │   └── testserver.go
│   └── validatingadmissionpolicystatus.go
├── controller-manager.go   # main入口函数
└── names
    └── controller_names.go

5 directories, 55 files

Main函数

1
2
3
4
5
func main() {
 command := app.NewControllerManagerCommand()
 code := cli.Run(command)
 os.Exit(code)
}

NewControllerManagerCommand 函数

NewControllerManagerCommand 使用 Cobra 命令行框架。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
func NewControllerManagerCommand() *cobra.Command {
// 初始化controllerManager的参数,例如ServiceControllerOptions、DaemonSetControllerOptions等
    s, err := options.NewKubeControllerManagerOptions()
 if err != nil {
  klog.Background().Error(err, "Unable to initialize command options")
  klog.FlushAndExit(klog.ExitFlushTimeout, 1)
 }

 cmd := &cobra.Command{
  Use: "kube-controller-manager",
        ...
  RunE: func(cmd *cobra.Command, args []string) error {
   verflag.PrintAndExitIfRequested()

   // Activate logging as soon as possible, after that
   // show flags with the final logging configuration.
   if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
    return err
   }
   cliflag.PrintFlags(cmd.Flags())
   // 获取所有控制器配置,默认禁用的控制器
   c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
   if err != nil {
    return err
   }
   // add feature enablement metrics
   utilfeature.DefaultMutableFeatureGate.AddMetrics()
   // 启动控制器
   return Run(context.Background(), c.Complete())
  },
        ...
 }
    ...
}

NewKubeControllerManagerOptions

初始化controllerManager的参数,例如ServiceControllerOptions、DaemonSetControllerOptions等

1
2
3
4
5
 s, err := options.NewKubeControllerManagerOptions()
 if err != nil {
  klog.Background().Error(err, "Unable to initialize command options")
  klog.FlushAndExit(klog.ExitFlushTimeout, 1)
 }

Run 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func Run(ctx context.Context, c *config.CompletedConfig) error {
    ...
    run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
  // 创建controller的context
  controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())
  if err != nil {
   logger.Error(err, "Error building controller context")
   klog.FlushAndExit(klog.ExitFlushTimeout, 1)
  }
  controllerInitializers := initializersFunc(controllerContext.LoopMode)
  // 启动各个控制器
  if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
   logger.Error(err, "Error starting controllers")
   klog.FlushAndExit(klog.ExitFlushTimeout, 1)
  }

  controllerContext.InformerFactory.Start(stopCh)
  controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
  close(controllerContext.InformersStarted)

  <-ctx.Done()
 }
    ...
    // 不选举直接运行上面 `run` 函数
    if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
  run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
  return nil
 }
    ...
    // Leader选举获取锁
    go leaderElectAndRun(ctx, c, id, electionChecker,
  c.ComponentConfig.Generic.LeaderElection.ResourceLock,
  c.ComponentConfig.Generic.LeaderElection.ResourceName,
  leaderelection.LeaderCallbacks{
   OnStartedLeading: func(ctx context.Context) {
    // 实例化各控制器初始化函数
    initializersFunc := NewControllerInitializers
    if leaderMigrator != nil {
     // If leader migration is enabled, we should start only non-migrated controllers
     //  for the main lock.
     initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
     logger.Info("leader migration: starting main controllers.")
    }
    run(ctx, startSATokenController, initializersFunc)
   },
   OnStoppedLeading: func() {
    logger.Error(nil, "leaderelection lost")
    klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   },
  })
    ...
    ...
    <-stopCh
 return nil
}

核心

1
2
3
4
5
6
7
8
  // 创建controller的context
  controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())

  // 启动各个控制器
  if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
   logger.Error(err, "Error starting controllers")
   klog.FlushAndExit(klog.ExitFlushTimeout, 1)
  }

CreateControllerContext

构建了各个 controller 所需的资源的上下文,各个 controller 在启动时,入参为该 context

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
 versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
 // 创建SharedInformerFactory
 sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

 metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
 metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())

 // If apiserver is not running we should wait for some time and fail only then. This is particularly
 // important when we start apiserver and controller manager at the same time.
 if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
  return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
 }

 // Use a discovery client capable of being refreshed.
 discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
 cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
 restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
 go wait.Until(func() {
  restMapper.Reset()
 }, 30*time.Second, stop)

 availableResources, err := GetAvailableResources(rootClientBuilder)
 if err != nil {
  return ControllerContext{}, err
 }

    // createCloudProvider 整合云提供商需要的东西,明确列出云提供商需要的东西作为参数
 cloud, loopMode, err := createCloudProvider(logger, s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
  s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
 if err != nil {
  return ControllerContext{}, err
 }

 // 赋值给ControllerContext
 ctx := ControllerContext{
  ClientBuilder:                   clientBuilder,
  InformerFactory:                 sharedInformers,
  ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
  ComponentConfig:                 s.ComponentConfig,
  RESTMapper:                      restMapper,
  AvailableResources:              availableResources,
  Cloud:                           cloud,
  LoopMode:                        loopMode,
  InformersStarted:                make(chan struct{}),
  ResyncPeriod:                    ResyncPeriod(s),
  ControllerManagerMetrics:        controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
 }
 controllersmetrics.Register()
 return ctx, nil
}

NewControllerInitializers

StartController的入参,定义了各种controller的类型和其对于的启动函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
 controllers := map[string]InitFunc{}

 // All of the controllers must have unique names, or else we will explode.
 // 通过调用register函数来注册各个控制器的初始化函数。每个控制器都有一个唯一的名称,并且会检查是否有重复的名称。
 register := func(name string, fn InitFunc) {
  if _, found := controllers[name]; found {
   panic(fmt.Sprintf("controller name %q was registered twice", name))
  }
  controllers[name] = fn
 }

 // 注册各个控制器初始化函数,比如endpoint、deployment 、 replicaset、pv
 register("endpoint", startEndpointController)
 // 注册 EndpointSlice Controller 资源并初始化
 register("endpointslice", startEndpointSliceController)
 register("endpointslicemirroring", startEndpointSliceMirroringController)
 register("replicationcontroller", startReplicationController)
 register("podgc", startPodGCController)
 register("resourcequota", startResourceQuotaController)
 register("namespace", startNamespaceController)
 register("serviceaccount", startServiceAccountController)
 register("garbagecollector", startGarbageCollectorController)
 register("daemonset", startDaemonSetController)
 register("job", startJobController)
 // Deployment Controller 资源初始化
 register("deployment", startDeploymentController)
 register("replicaset", startReplicaSetController)
 register("horizontalpodautoscaling", startHPAController)
 register("disruption", startDisruptionController)
 register("statefulset", startStatefulSetController)
 register("cronjob", startCronJobController)
 register("csrsigning", startCSRSigningController)
 register("csrapproving", startCSRApprovingController)
 register("csrcleaner", startCSRCleanerController)
 register("ttl", startTTLController)
 register("bootstrapsigner", startBootstrapSignerController)
 register("tokencleaner", startTokenCleanerController)
 register("nodeipam", startNodeIpamController)
 register("nodelifecycle", startNodeLifecycleController)
 if loopMode == IncludeCloudLoops {
  register("service", startServiceController)
  register("route", startRouteController)
  register("cloud-node-lifecycle", startCloudNodeLifecycleController)
  // TODO: volume controller into the IncludeCloudLoops only set.
 }
 // in tree pv 控制器
 register("persistentvolume-binder", startPersistentVolumeBinderController)
 // attachdetach控制器负责处理容器的挂载和卸载操作。
 // 它使用CSI(Container Storage Interface)插件来管理持久卷的挂载和卸载。
 register("attachdetach", startAttachDetachController)
 // out of tree pv 控制器
 register("persistentvolume-expander", startVolumeExpandController)
 register("clusterrole-aggregation", startClusterRoleAggregrationController)
 // PVC保护控制器
 register("pvc-protection", startPVCProtectionController)
 // PV保护控制器
 register("pv-protection", startPVProtectionController)
 register("ttl-after-finished", startTTLAfterFinishedController)
 register("root-ca-cert-publisher", startRootCACertPublisher)
 register("ephemeral-volume", startEphemeralVolumeController)
 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
  utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
  register("storage-version-gc", startStorageVersionGCController)
 }
 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
  register("resource-claim-controller", startResourceClaimController)
 }

 return controllers
}

StartControllers

启动各个控制器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
 unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
 logger := klog.FromContext(ctx)

 // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
 // If this fails, just return here and fail since other controllers won't be able to get credentials.
 if startSATokenController != nil {
  if _, _, err := startSATokenController(ctx, controllerCtx); err != nil {
   return err
  }
 }

 // Initialize the cloud provider with a reference to the clientBuilder only after token controller
 // has started in case the cloud provider uses the client builder.
 if controllerCtx.Cloud != nil {
  controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
 }

 var controllerChecks []healthz.HealthChecker

 // Each controller is passed a context where the logger has the name of
 // the controller set through WithName. That name then becomes the prefix of
 // of all log messages emitted by that controller.
 //
 // In this loop, an explicit "controller" key is used instead, for two reasons:
 // - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
 //   so we cannot rely on it yet to add the name
 // - it allows distinguishing between log entries emitted by the controller
 //   and those emitted for it - this is a bit debatable and could be revised.
 // 依次启动注册的控制器
 for controllerName, initFn := range controllers {
  // 检查控制器是否启用
  if !controllerCtx.IsControllerEnabled(controllerName) {
   logger.Info("Warning: controller is disabled", "controller", controllerName)
   continue
  }

  time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

  logger.V(1).Info("Starting controller", "controller", controllerName)
  // 开始启动控制器
  ctrl, started, err := initFn(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx)
  if err != nil {
   logger.Error(err, "Error starting controller", "controller", controllerName)
   return err
  }
  ...
 }
    ...
 return nil
}

调用流程

202401021309653

0%