源码解析:Kubernetes 创建 Pod 时,背后发生了什么(一)


本文试图回答以下问题:敲下 kubectl run nginx --image=nginx --replicas=3 命令后Kubernetes 中发生了哪些事情?

要弄清楚这个问题,我们需要:
  1. 了解 Kubernetes 几个核心组件的启动过程,它们分别做了哪些事情,以及
  2. 从客户端发起请求到 Pod ready 的整个过程。


Kubernetes 组件启动过程

首先看几个核心组件的启动过程分别做了哪些事情。

kube-apiserver 启动

调用栈

创建命令行(kube-apiserver)入口:
main                                         // cmd/kube-apiserver/apiserver.go
|-cmd := app.NewAPIServerCommand()          // cmd/kube-apiserver/app/server.go
|  |-RunE := func() {
|      Complete()
|        |-ApplyAuthorization(s.Authorization)
|        |-if TLS:
|            ServiceAccounts.KeyFiles = []string{CertKey.KeyFile}
|      Validate()
|      Run(completedOptions, handlers) // 核心逻辑
|    }
|-cmd.Execute()

kube-apiserver 启动后,会执行到其中的 Run() 方法:
Run()          // cmd/kube-apiserver/app/server.go
|-server = CreateServerChain()
|           |-CreateKubeAPIServerConfig()
|           |   |-buildGenericConfig
|           |   |   |-genericapiserver.NewConfig()     // staging/src/k8s.io/apiserver/pkg/server/config.go
|           |   |   |  |-return &Config{
|           |   |   |       Serializer:             codecs,
|           |   |   |       BuildHandlerChainFunc:  DefaultBuildHandlerChain, // 注册 handler
|           |   |   |    } 
|           |   |   |
|           |   |   |-OpenAPIConfig = DefaultOpenAPIConfig()  // OpenAPI schema
|           |   |   |-kubeapiserver.NewStorageFactoryConfig() // etcd 相关配置
|           |   |   |-APIResourceConfig = genericConfig.MergedResourceConfig
|           |   |   |-storageFactoryConfig.Complete(s.Etcd)
|           |   |   |-storageFactory = completedStorageFactoryConfig.New()
|           |   |   |-s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
|           |   |   |-BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
|           |   |   |-pluginInitializers, admissionPostStartHook = admissionConfig.New()
|           |   |
|           |   |-capabilities.Initialize
|           |   |-controlplane.ServiceIPRange()
|           |   |-config := &controlplane.Config{}
|           |   |-AddPostStartHook("start-kube-apiserver-admission-initializer", admissionPostStartHook)
|           |   |-ServiceAccountIssuerURL = s.Authentication.ServiceAccounts.Issuer
|           |   |-ServiceAccountJWKSURI = s.Authentication.ServiceAccounts.JWKSURI
|           |   |-ServiceAccountPublicKeys = pubKeys
|           |
|           |-createAPIExtensionsServer
|           |-CreateKubeAPIServer
|           |-createAggregatorServer    // cmd/kube-apiserver/app/aggregator.go
|           |   |-aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)   // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
|           |   |  |-apiGroupInfo := NewRESTStorage()
|           |   |  |-GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
|           |   |  |-InstallAPIGroups
|           |   |  |-openAPIModels := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
|           |   |  |-for apiGroupInfo := range apiGroupInfos {
|           |   |  |   s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
|           |   |  |   s.DiscoveryGroupManager.AddGroup(apiGroup)
|           |   |  |   s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
|           |   |  |
|           |   |  |-GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
|           |   |  |-GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
|           |   |  |-
|           |   |-
|-prepared = server.PrepareRun()     // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
|            |-GenericAPIServer.AddPostStartHookOrDie
|            |-GenericAPIServer.PrepareRun
|            |  |-routes.OpenAPI{}.Install()
|            |     |-registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
|            |         |-POST: XX
|            |         |-GET: XX
|            |
|            |-openapiaggregator.BuildAndRegisterAggregator()
|            |-openapiaggregator.NewAggregationController()
|            |-preparedAPIAggregator{}
|-prepared.Run() // staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
|-s.runnable.Run()

一些重要步骤
  1. 创建 server chain。Server aggregation(聚合)是一种支持多 apiserver 的方式,其中包括了一个 generic apiserver,作为默认实现。
  2. 生成 OpenAPI schema,保存到 apiserver 的 Config.OpenAPIConfig 字段
  3. 遍历 schema 中的所有 API group,为每个 API group 配置一个 storage provider,这是一个通用 backend 存储抽象层。
  4. 遍历每个 group 版本,为每个 HTTP route 配置 REST mappings。稍后处理请求时,就能将 requests 匹配到合适的 handler。


controller-manager 启动

调用栈
NewDeploymentController
NewReplicaSetController

kubelet 启动

调用栈
main                                                                            // cmd/kubelet/kubelet.go
|-NewKubeletCommand                                                            // cmd/kubelet/app/server.go
|-Run                                                                        // cmd/kubelet/app/server.go
  |-initForOS                                                               // cmd/kubelet/app/server.go
  |-run                                                                     // cmd/kubelet/app/server.go
    |-initConfigz                                                           // cmd/kubelet/app/server.go
    |-InitCloudProvider
    |-NewContainerManager
    |-ApplyOOMScoreAdj
    |-PreInitRuntimeService
    |-RunKubelet                                                            // cmd/kubelet/app/server.go
    | |-k = createAndInitKubelet                                            // cmd/kubelet/app/server.go
    | |  |-NewMainKubelet
    | |  |  |-watch k8s Service
    | |  |  |-watch k8s Node
    | |  |  |-klet := &Kubelet{}
    | |  |  |-init klet fields
    | |  |
    | |  |-k.BirthCry()
    | |  |-k.StartGarbageCollection()
    | |
    | |-startKubelet(k)                                                     // cmd/kubelet/app/server.go
    |    |-go k.Run()                                                       // -> pkg/kubelet/kubelet.go
    |    |  |-go cloudResourceSyncManager.Run()
    |    |  |-initializeModules
    |    |  |-go volumeManager.Run()
    |    |  |-go nodeLeaseController.Run()
    |    |  |-initNetworkUtil() // setup iptables
    |    |  |-go Until(PerformPodKillingWork, 1*time.Second, neverStop)
    |    |  |-statusManager.Start()
    |    |  |-runtimeClassManager.Start
    |    |  |-pleg.Start()
    |    |  |-syncLoop(updates, kl)                                         // pkg/kubelet/kubelet.go
    |    |
    |    |-k.ListenAndServe
    |
    |-go http.ListenAndServe(healthz)

小结

以上核心组件启动完成后,就可以从命令行发起请求创建 Pod 了。

kubectl(命令行客户端)

调用栈概览

NewKubectlCommand                                    // staging/src/k8s.io/kubectl/pkg/cmd/cmd.go
|-matchVersionConfig = NewMatchVersionFlags()
|-f = cmdutil.NewFactory(matchVersionConfig)
|      |-clientGetter = matchVersionConfig
|-NewCmdRun(f)                                      // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
|  |-Complete                                       // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
|  |-Run(f)                                         // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
|    |-validate parameters
|    |-generators = GeneratorFn("run")
|    |-runObj = createGeneratedObject(generators)   // staging/src/k8s.io/kubectl/pkg/cmd/run/run.go
|    |           |-obj = generator.Generate()       // -> staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go
|    |           |        |-get pod params
|    |           |        |-pod = v1.Pod{params}
|    |           |        |-return &pod
|    |           |-mapper = f.ToRESTMapper()        // -> staging/src/k8s.io/cli-runtime/pkg/genericclioptions/config_flags.go
|    |           |  |-f.clientGetter.ToRESTMapper() // -> staging/src/k8s.io/kubectl/pkg/cmd/util/factory_client_access.go
|    |           |     |-f.Delegate.ToRESTMapper()  // -> staging/src/k8s.io/kubectl/pkg/cmd/util/kubectl_match_version.go
|    |           |        |-ToRESTMapper            // -> staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
|    |           |        |-delegate()              //    staging/src/k8s.io/cli-runtime/pkg/resource/builder.go
|    |           |--actualObj = resource.NewHelper(mapping).XX.Create(obj)
|    |-PrintObj(runObj.Object)
|
|-NewCmdEdit(f)      // kubectl edit   命令
|-NewCmdScale(f)     // kubectl scale  命令
|-NewCmdCordon(f)    // kubectl cordon 命令
|-NewCmdUncordon(f)
|-NewCmdDrain(f)
|-NewCmdTaint(f)
|-NewCmdExecute(f)
|-...

参数验证(validation)和资源对象生成器(generator)

参数验证

敲下 kubectl 命令后,它首先会做一些客户端侧的验证。 如果命令行参数有问题,例如,镜像名为空或格式不对,这里会直接报错,从而避免了将明显错误的请求发给 kube-apiserver,减轻了后者的压力。

此外,kubectl 还会检查其他一些配置,例如:
  • 是否需要记录(record)这条命令(用于 rollout 或审计)
  • 是否只是测试执行(--dry-run


创建 HTTP 请求

所有查询或修改 Kubernetes 资源的操作都需要与 kube-apiserver 交互,后者会进一步和 etcd 通信。

因此,验证通过之后,kubectl 接下来会创建发送给 kube-apiserver 的 HTTP 请求

Generators

创建 HTTP 请求用到了所谓的 generator文档),它封装了资源的序列化(serialization)操作。 例如,创建 Pod 时用到的 generator 是 BasicPod
// staging/src/k8s.io/kubectl/pkg/generate/versioned/run.go

type BasicPod struct{}

func (BasicPod) ParamNames() []generate.GeneratorParam {
return []generate.GeneratorParam{
    {Name: "labels", Required: false},
    {Name: "name", Required: true},
    {Name: "image", Required: true},
    ...
}


每个 generator 都实现了一个 Generate() 方法,用于生成一个该资源的运行时对象(runtime object)。 对于 BasicPod,其实现为:
func (BasicPod) Generate(genericParams map[string]interface{}) (runtime.Object, error) {
pod := v1.Pod{
    ObjectMeta: metav1.ObjectMeta{  // metadata 字段
        Name:        name,
        Labels:      labels,
        ...
    },
    Spec: v1.PodSpec{               // spec 字段
        ServiceAccountName: params["serviceaccount"],
        Containers: []v1.Container{
            {
                Name:            name,
                Image:           params["image"]
            },
        },
    },
}

return &pod, nil


API group 和版本协商(version negotiation)

有了 runtime object 之后,kubectl 需要用合适的 API 将请求发送给 kube-apiserver。

API Group

Kubernetes 用 API group 来管理 resource API。 这是一种不同于 monolithic API(所有 API 扁平化)的 API 管理方式。

具体来说,同一资源的不同版本的 API,会放到一个 group 里面。 例如 Deployment 资源的 API group 名为 apps,最新的版本是 v1。这也是为什么 我们在创建 Deployment 时,需要在 yaml 中指定 apiVersion: apps/v1 的原因。

版本协商

生成 runtime object 之后,kubectl 就开始 搜索合适的 API group 和版本
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go

obj := generator.Generate(params) // 创建运行时对象
mapper := f.ToRESTMapper()        // 寻找适合这个资源(对象)的 API group

然后创建一个正确版本的客户端(versioned client)
// staging/src/k8s.io/kubectl/pkg/cmd/run/run.go

gvks, _ := scheme.Scheme.ObjectKinds(obj)
mapping := mapper.RESTMapping(gvks[0].GroupKind(), gvks[0].Version)

这个客户端能感知资源的 REST 语义。

以上过程称为版本协商。在实现上,kubectl 会 扫描 kube-apiserver 的 /apis 路径(OpenAPI 格式的 schema 文档),获取所有的 API groups。

出于性能考虑,kubectl 会 缓存这份 OpenAPI schema, 路径是 ~/.kube/cache/discovery想查看这个 API discovery 过程,可以删除这个文件, 然后随便执行一条 kubectl 命令,并指定足够大的日志级别(例如 kubectl get ds -v 10)。

发送 HTTP 请求

现在有了 runtime object,也找到了正确的 API,因此接下来就是 将请求真正发送出去
// staging/src/k8s.io/kubectl/pkg/cmd/cmd.go

    actualObj = resource.
        NewHelper(client, mapping).
        DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
        WithFieldManager(o.fieldManager).
        Create(o.Namespace, false, obj)

发送成功后,会以恰当的格式打印返回的消息。

客户端认证(client auth)

前面其实有意漏掉了一步:客户端认证。它发生在发送 HTTP 请求之前。

用户凭证(credentials)一般都放在 kubeconfig 文件中,但这个文件可以位于多个位置, 优先级从高到低:
  • 命令行 --kubeconfig <file>
  • 环境变量 $KUBECONFIG
  • 某些预定义的路径,例如 ~/.kube


这个文件中存储了集群、用户认证等信息,如下面所示:
apiVersion: v1
clusters:
- cluster:
certificate-authority: /etc/kubernetes/pki/ca.crt
server: https://192.168.2.100:443
name: k8s-cluster-1
contexts:
- context:
cluster: k8s-cluster-1
user: default-user
name: default-context
current-context: default-context
kind: Config
preferences: {}
users:
- name: default-user
user:
client-certificate: /etc/kubernetes/pki/admin.crt
client-key: /etc/kubernetes/pki/admin.key

有了这些信息之后,客户端就可以组装 HTTP 请求的认证头了。支持的认证方式有几种:
  • X509 证书:放到 TLS 中发送;
  • Bearer token:放到 HTTP "Authorization" 头中 发送
  • 用户名密码:放到 HTTP basic auth 发送
  • OpenID auth:需要先由用户手动处理,将其转成一个 token,然后和 bearer token 类似发送。


kube-apiserver

请求从客户端发出后,便来到服务端,也就是 kube-apiserver。

调用栈概览

buildGenericConfig
|-genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)  // cmd/kube-apiserver/app/server.go

NewConfig       // staging/src/k8s.io/apiserver/pkg/server/config.go
|-return &Config{
  Serializer:             codecs,
  BuildHandlerChainFunc:  DefaultBuildHandlerChain,
}                          /
                        /
                      /
                    /
DefaultBuildHandlerChain       // staging/src/k8s.io/apiserver/pkg/server/config.go
|-handler := filterlatency.TrackCompleted(apiHandler)
|-handler = genericapifilters.WithAuthorization(handler)
|-handler = genericapifilters.WithAudit(handler)
|-handler = genericapifilters.WithAuthentication(handler)
|-return handler


WithAuthentication
|-withAuthentication
|-resp, ok := AuthenticateRequest(req)
|  |-for h := range authHandler.Handlers {
|      resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
|      if ok {
|          return resp, ok, err
|      }
|    }
|    return nil, false, utilerrors.NewAggregate(errlist)
|
|-audiencesAreAcceptable(apiAuds, resp.Audiences)
|-req.Header.Del("Authorization")
|-req = req.WithContext(WithUser(req.Context(), resp.User))
|-return handler.ServeHTTP(w, req)

认证(Authentication)

kube-apiserver 首先会对请求进行认证(authentication),以确保用户身份是合法的(verify that the requester is who they say they are)。

具体过程:启动时,检查所有的 命令行参数,组织成一个 authenticator list,例如:
  • 如果指定了 --client-ca-file,就会将 x509 证书加到这个列表;
  • 如果指定了 --token-auth-file,就会将 token 加到这个列表;


不同 anthenticator 做的事情有所不同:
  • x509 handler 验证该 HTTP 请求是用 TLS key 加密的,并且有 CA root 证书的签名。
  • bearer token handler 验证请求中带的 token(HTTP Authorization 头中),在 apiserver 的 auth file 中是存在的(--token-auth-file)。
  • basicauth handler 对 basic auth 信息进行校验。


如果认证成功,就会将 Authorization 头从请求中删除,然后在上下文中加上用户信息。这使得后面的步骤(例如鉴权和 admission control)能用到这里已经识别出的用户身份信息。
// staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go

// WithAuthentication creates an http handler that tries to authenticate the given request as a user, and then
// stores any such user found onto the provided context for the request.
// On success, "Authorization" header is removed from the request and handler
// is invoked to serve the request.
func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences) http.Handler {
return withAuthentication(handler, auth, failed, apiAuds, recordAuthMetrics)
}

func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler,
apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    resp, ok := auth.AuthenticateRequest(req) // 遍历所有 authenticator,任何一个成功就返回 OK
    if !ok {
        return failed.ServeHTTP(w, req)       // 所有认证方式都失败了
    }

    if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
        fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
        failed.ServeHTTP(w, req)
        return
    }

    req.Header.Del("Authorization") // 认证成功后,这个 header 就没有用了,可以删掉

    // 将用户信息添加到请求上下文中,供后面的步骤使用
    req = req.WithContext(WithUser(req.Context(), resp.User))
    handler.ServeHTTP(w, req)
})


AuthenticateRequest() 实现:遍历所有 authenticator,任何一个成功就返回 OK。
// staging/src/k8s.io/apiserver/pkg/authentication/request/union/union.go

func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req) (*Response, bool) {
for currAuthRequestHandler := range authHandler.Handlers {
    resp, ok := currAuthRequestHandler.AuthenticateRequest(req)
    if ok {
        return resp, ok, err
    }
}

return nil, false, utilerrors.NewAggregate(errlist)


鉴权(Authorization)

发送者身份(认证)是一个问题,但他是否有权限执行这个操作(鉴权),是另一个问题。因此确认发送者身份之后,还需要进行鉴权。

鉴权的过程与认证非常相似,也是逐个匹配 authorizer 列表中的 authorizer:如果都失败了, 返回 Forbidden 并停止 进一步处理。如果成功,就继续。

内置的 几种 authorizer 类型
  • webhook:与其他服务交互,验证是否有权限。
  • ABAC:根据静态文件中规定的策略(policies)来进行鉴权。
  • RBAC:根据 role 进行鉴权,其中 role 是 Kubernetes 管理员提前配置的。
  • Node:确保 node clients,例如 kubelet,只能访问本机内的资源。


要看它们的具体做了哪些事情,可以查看它们各自的 Authorize() 方法。

Admission control

至此,认证和鉴权都通过了。但这还没结束,Kubernetes 中的其它组件还需要对请求进行检查,其中就包括 admission controllers

与鉴权的区别:
  • 鉴权(authorization)在前面,关注的是用户是否有操作权限
  • Admission controllers 在更后面,对请求进行拦截和过滤,确保它们符合一些更广泛的集群规则和限制, 是将请求对象持久化到 etcd 之前的最后堡垒


工作方式:
  • 与认证和鉴权类似,也是遍历一个列表,
  • 但有一点核心区别:任何一个 controller 检查没通过,请求就会失败


设计:可扩展
  • 每个 controller 作为一个 plugin 存放在 plugin/pkg/admission 目录
  • 设计时已经考虑,只需要实现很少的几个接口
  • 但注意,admission controller 最终会编译到 Kubernetes 的二进制文件(而非独立的 plugin binary)


类型:

Admission controllers 通常按不同目的分类,包括:资源管理、安全管理、默认值管 理、引用一致性(referential consistency)等类型。

例如,下面是资源管理类的几个 controller:
  • InitialResources:为容器设置默认的资源限制(基于过去的使用量);
  • LimitRanger:为容器的 requests and limits 设置默认值,或对特定资源设置上限(例如,内存默认 512MB,最高不超过 2GB)。
  • ResourceQuota:资源配额。


写入 etcd

至此,Kubernetes 已经完成对请求的验证,允许它进行接下来的处理。

kube-apiserver 将对请求进行反序列化,构造 runtime objects( kubectl generator 的反过程),并将它们持久化到 etcd。下面详细看这个过程。

调用栈概览

对于本文创建 Pod 的请求,相应的入口是 POST handler ,它又会进一步将请求委托给一个创建具体资源的 handler。
registerResourceHandlers // staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
|-case POST:

```

```
// staging/src/k8s.io/apiserver/pkg/endpoints/installer.go

    switch () {
    case "POST": // Create a resource.
        var handler restful.RouteFunction
        if isNamedCreater {
            handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
        } else {
            handler = restfulCreateResource(creater, reqScope, admit)
        }

        handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, .., handler)
        article := GetArticleForNoun(kind, " ")
        doc := "create" + article + kind
        if isSubresource {
            doc = "create " + subresource + " of" + article + kind
        }

        route := ws.POST(action.Path).To(handler).
            Doc(doc).
            Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
            Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
            Returns(http.StatusOK, "OK", producedObject).
            Returns(http.StatusCreated, "Created", producedObject).
            Returns(http.StatusAccepted, "Accepted", producedObject).
            Reads(defaultVersionedObject).
            Writes(producedObject)

        AddObjectParams(ws, route, versionedCreateOptions)
        addParams(route, action.Params)
        routes = append(routes, route)
    }

    for route := range routes {
        route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
            Group:   reqScope.Kind.Group,
            Version: reqScope.Kind.Version,
            Kind:    reqScope.Kind.Kind,
        })
        route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
        ws.Route(route)
    } 

kube-apiserver 请求处理过程

从 apiserver 的请求处理函数开始:
// staging/src/k8s.io/apiserver/pkg/server/handler.go

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path

// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
    switch {
    case ws.RootPath() == "/apis":
        if path == "/apis" || path == "/apis/" {
            return d.goRestfulContainer.Dispatch(w, req)
        }

    case strings.HasPrefix(path, ws.RootPath()):
        if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
            return d.goRestfulContainer.Dispatch(w, req)
        }
    }
}

// if we didn't find a match, then we just skip gorestful altogether
d.nonGoRestfulMux.ServeHTTP(w, req)


如果能匹配到请求(例如匹配到前面注册的路由),它将 分派给相应的 handler;否则,fall back 到 path-based handlerGET /apis 到达的就是这里);

基于 path 的 handlers:
// staging/src/k8s.io/apiserver/pkg/server/mux/pathrecorder.go

func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
    return exactHandler.ServeHTTP(w, r)
}

for prefixHandler := range h.prefixHandlers {
    if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
        return prefixHandler.handler.ServeHTTP(w, r)
    }
}

h.notFoundHandler.ServeHTTP(w, r)


如果还是没有找到路由,就会 fallback 到 non-gorestful handler,最终可能是一个 not found handler。

对于我们的场景,会匹配到一条已经注册的、名为 createHandler 为的路由。

Create handler 处理过程

// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go

func createHandler(r rest.NamedCreater, scope *RequestScope, admit Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
    namespace, name := scope.Namer.Name(req) // 获取资源的 namespace 和 name(etcd item key)
    s := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)

    body := limitedReadBody(req, scope.MaxRequestBodyBytes)
    obj, gvk := decoder.Decode(body, &defaultGVK, original)

    admit = admission.WithAudit(admit, ae)

    requestFunc := func() (runtime.Object, error) {
        return r.Create(
            name,
            obj,
            rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
        )
    }

    result := finishRequest(ctx, func() (runtime.Object, error) {
        if scope.FieldManager != nil {
            liveObj := scope.Creater.New(scope.Kind)
            obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
            admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
        }

        admit.(admission.MutationInterface)
        mutatingAdmission.Handles(admission.Create)
        mutatingAdmission.Admit(ctx, admissionAttributes, scope)

        return requestFunc()
    })

    code := http.StatusCreated
    status, ok := result.(*metav1.Status)
    transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
}


  1. 首先解析 HTTP request,然后执行基本的验证,例如保证 JSON 与 versioned API resource 期望的是一致的;
  2. 执行审计和最终 admission;

  3. 将资源最终写到 etcd,这会进一步调用到 storage provider

    etcd key 的格式一般是<namespace>/<name>(例如:default/nginx-0),但这个也是可配置的。

  4. 最后,storage provider 执行一次 get 操作,确保对象真的创建成功了。如果有额外的收尾任务(additional finalization),会执行 post-create handlers 和 decorators。
  5. 返回 生成的 HTTP response。


以上过程可以看出,apiserver 做了大量的事情。

总结:至此我们的 Pod 资源已经在 etcd 中了。但是,此时 kubectl get pods -n <ns> 还看不见它。

Initializers

对象持久化到 etcd 之后,apiserver 并未将其置位对外可见,它也不会立即就被调度,而是要先等一些 initializers 运行完成。

Initializer

Initializer 是与特定资源类型(resource type)相关的 controller
  • 负责在该资源对外可见之前对它们执行一些处理
  • 如果一种资源类型没有注册任何 initializer,这个步骤就会跳过,资源对外立即可见


这是一种非常强大的特性,使得我们能执行一些通用的启动初始化(bootstrap)操作。例如:
  • 向 Pod 注入 sidecar、暴露 80 端口,或打上特定的 annotation。
  • 向某个 namespace 内的所有 Pod 注入一个存放了测试证书(test certificates)的 volume。
  • 禁止创建长度小于 20 个字符的 Secret (例如密码)。


InitializerConfiguration

可以用 InitializerConfiguration 声明对哪些资源类型(resource type)执行哪些 initializer

例如,要实现所有 Pod 创建时都运行一个自定义的 initializer custom-pod-initializer,可以用下面的 yaml:
apiVersion: admissionregistration.k8s.io/v1alpha1
kind: InitializerConfiguration
metadata:
name: custom-pod-initializer
initializers:
- name: podimage.example.com
rules:
  - apiGroups:
      - ""
    apiVersions:
      - v1
    resources:
      - pods

创建以上配置(kubectl create -f xx.yaml)之后,Kubernetes 会将 custom-pod-initializer 追加到每个 Pod 的 metadata.initializers.pending 字段。

在此之前需要启动 initializer controller,它会:
  • 定期扫描是否有新 Pod 创建;
  • 检测到它的名字出现在 Pod 的 pending 字段时,就会执行它的处理逻辑;
  • 执行完成之后,它会将自己的名字从 pending list 中移除。


pending list 中的 initializers,每次只有第一个 initializer 能执行。 当所有 initializer 执行完成,pending 字段为空之后,就认为 这个对象已经完成初始化了(considered initialized)。

细心的同学可能会有疑问:前面说这个对象还没有对外可见,那用户空间的 initializer controller 又是如何能检测并操作这个对象的呢?答案是: kube-apiserver 提供了一个 ?includeUninitialized 查询参数,它会返回所有对象, 包括那些还未完成初始化的(uninitialized ones)。

Control loops(控制循环)

至此,对象已经在 etcd 中了,所有的初始化步骤也已经完成了。 下一步是设置资源拓扑(resource topology)。例如,一个 Deployment 其实就是一组 ReplicaSet,而一个 ReplicaSet 就是一组 Pod。 K8s 是如何根据一个 HTTP 请求创建出这个层级关系的呢?靠的是 Kubernetes 内置的控制器(controllers)。

Kubernetes 中大量使用 “controllers”:
  • 一个 controller 就是一个异步脚本(an asynchronous script),
  • 不断检查资源的当前状态(current state)和期望状态(desired state)是否一致,
  • 如果不一致就尝试将其变成期望状态,这个过程称为 reconcile


每个 controller 负责的东西都比较少,所有 controller 并行运行, 由 kube-controller-manager 统一管理

Deployments controller

Deployments controller 启动

当一个 Deployment record 存储到 etcd 并(被 initializers)初始化之后, kube-apiserver 就会将其置为对外可见的。此后, Deployment controller 监听了 Deployment 资源的变动,因此此时就会检测到这个新创建的资源。
// pkg/controller/deployment/deployment_controller.go

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer DeploymentInformer, rsInformer ReplicaSetInformer,
podInformer PodInformer, client clientset.Interface) (*DeploymentController, error) {

dc := &DeploymentController{
    client:        client,
    queue:         workqueue.NewNamedRateLimitingQueue(),
}
dc.rsControl = controller.RealRSControl{ // ReplicaSet controller
    KubeClient: client,
    Recorder:   dc.eventRecorder,
}

// 注册 Deployment 事件回调函数
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addDeployment,    // 有 Deployment 创建时触发
    UpdateFunc: dc.updateDeployment,
    DeleteFunc: dc.deleteDeployment,
})
// 注册 ReplicaSet 事件回调函数
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addReplicaSet,
    UpdateFunc: dc.updateReplicaSet,
    DeleteFunc: dc.deleteReplicaSet,
})
// 注册 Pod 事件回调函数
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: dc.deletePod,
})

dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue

return dc, nil


创建 Deployment:回调函数处理

在本文场景中,触发的是 controller 注册的 addDeployment() 回调函数 其所做的工作就是将 deployment 对象放到一个内部队列:
// pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*apps.Deployment)
dc.enqueueDeployment(d)


主处理循环

worker 不断遍历这个 queue,从中 dequeue item 并进行处理:
// pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}

func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
dc.syncHandler(key.(string)) // dc.syncHandler = dc.syncDeployment
}

// syncDeployment will sync the deployment with the given key.
func (dc *DeploymentController) syncDeployment(key string) error {
namespace, name := cache.SplitMetaNamespaceKey(key)

deployment := dc.dLister.Deployments(namespace).Get(name)
d := deployment.DeepCopy()

// 获取这个 Deployment 的所有 ReplicaSets, while reconciling ControllerRef through adoption/orphaning.
rsList := dc.getReplicaSetsForDeployment(d)

// 获取这个 Deployment 的所有 pods, grouped by their ReplicaSet
podMap := dc.getPodMapForDeployment(d, rsList)

if d.DeletionTimestamp != nil { // 这个 Deployment 已经被标记,等待被删除
    return dc.syncStatusOnly(d, rsList)
}

dc.checkPausedConditions(d)
if d.Spec.Paused { // pause 状态
    return dc.sync(d, rsList)
}

if getRollbackTo(d) != nil {
    return dc.rollback(d, rsList)
}

scalingEvent := dc.isScalingEvent(d, rsList)
if scalingEvent {
    return dc.sync(d, rsList)
}

switch d.Spec.Strategy.Type {
case RecreateDeploymentStrategyType:             // re-create
    return dc.rolloutRecreate(d, rsList, podMap)
case RollingUpdateDeploymentStrategyType:        // rolling-update
    return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)


controller 会通过 label selector 从 kube-apiserver 查询 与这个 deployment 关联的 ReplicaSet 或 Pod records(然后发现没有)。

如果发现当前状态与预期状态不一致,就会触发同步过程(synchronization process)。 这个同步过程是无状态的,也就是说,它并不区分是新记录还是老记录,一视同仁。

执行扩容(scale up)

如上,发现 Pod 不存在之后,它会开始扩容过程(scaling process):
// pkg/controller/deployment/sync.go

// scale up/down 或新创建(pause)时都会执行到这里
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {

newRS, oldRSs := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
dc.scale(d, newRS, oldRSs)

// Clean up the deployment when it's paused and no rollback is in flight.
if d.Spec.Paused && getRollbackTo(d) == nil {
    dc.cleanupDeployment(oldRSs, d)
}

allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, d)


大致步骤:
  1. Rolling out (例如 creating)一个 ReplicaSet resource
  2. 分配一个 label selector
  3. 初始版本好(revision number)置为 1


ReplicaSet 的 PodSpec,以及其他一些 metadata 是从 Deployment 的 manifest 拷过来的。

最后会更新 deployment 状态,然后重新进入 reconciliation 循环,直到 deployment 进入预期的状态。

小结

由于 Deployment controller 只负责 ReplicaSet 的创建,因此下一步 (ReplicaSet -> Pod)要由 reconciliation 过程中的另一个 controller —— ReplicaSet controller 来完成。

ReplicaSets controller

上一步周,Deployments controller 已经创建了 Deployment 的第一个 ReplicaSet,但此时还没有任何 Pod。 下面就轮到 ReplicaSet controller 出场了。 它的任务是监控 ReplicaSet 及其依赖资源(Pods)的生命周期,实现方式也是注册事件回调函数。

ReplicaSets controller 启动
// pkg/controller/replicaset/replica_set.go

func NewReplicaSetController(rsInformer ReplicaSetInformer, podInformer PodInformer,
kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {

return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
    apps.SchemeGroupVersion.WithKind("ReplicaSet"),
    "replicaset_controller",
    "replicaset",
    controller.RealPodControl{
        KubeClient: kubeClient,
    },
)
}

// 抽象出 NewBaseController() 是为了代码复用,例如 NewReplicationController() 也会调用这个函数。
func NewBaseController(rsInformer, podInformer, kubeClient clientset.Interface, burstReplicas int,
gvk GroupVersionKind, metricOwnerName, queueName, podControl PodControlInterface) *ReplicaSetController {

rsc := &ReplicaSetController{
    kubeClient:       kubeClient,
    podControl:       podControl,
    burstReplicas:    burstReplicas,
    expectations:     controller.NewUIDTrackingControllerExpectations(NewControllerExpectations()),
    queue:            workqueue.NewNamedRateLimitingQueue()
}

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    rsc.addRS,
    UpdateFunc: rsc.updateRS,
    DeleteFunc: rsc.deleteRS,
})
rsc.rsLister = rsInformer.Lister()

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: rsc.addPod,
    UpdateFunc: rsc.updatePod,
    DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()

rsc.syncHandler = rsc.syncReplicaSet
return rsc


创建 ReplicaSet:回调函数处理

主处理循环

当一个 ReplicaSet 被(Deployment controller)创建之后。
// pkg/controller/replicaset/replica_set.go

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {

namespace, name := cache.SplitMetaNamespaceKey(key)
rs := rsc.rsLister.ReplicaSets(namespace).Get(name)

selector := metav1.LabelSelectorAsSelector(rs.Spec.Selector)

// 包括那些不匹配 rs selector,但有 stale controller ref 的 pod
allPods := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
filteredPods := controller.FilterActivePods(allPods) // Ignore inactive pods.
filteredPods = rsc.claimPods(rs, selector, filteredPods)

if rsNeedsSync && rs.DeletionTimestamp == nil { // 需要同步,并且没有被标记待删除
    rsc.manageReplicas(filteredPods, rs)        // *主处理逻辑*
}

newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
updatedRS := updateReplicaSetStatus(AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)


RS controller 检查 ReplicaSet 的状态, 发现当前状态和期望状态之间有偏差(skew),因此接下来调用 manageReplicas() 来 reconcile 这个状态,在这里做的事情就是增加这个 ReplicaSet 的 Pod 数量。
// pkg/controller/replicaset/replica_set.go

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey := controller.KeyFunc(rs)

if diff < 0 {
    diff *= -1
    if diff > rsc.burstReplicas {
        diff = rsc.burstReplicas
    }

    rsc.expectations.ExpectCreations(rsKey, diff)
    successfulCreations := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() {
        return rsc.podControl.CreatePodsWithControllerRef( // 扩容
            // 调用栈 CreatePodsWithControllerRef -> createPod() -> Client.CoreV1().Pods().Create()
            rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    })

    // The skipped pods will be retried later. The next controller resync will retry the slow start process.
    if skippedPods := diff - successfulCreations; skippedPods > 0 {
        for i := 0; i < skippedPods; i++ {
            // Decrement the expected number of creates because the informer won't observe this pod
            rsc.expectations.CreationObserved(rsKey)
        }
    }
    return err
} else if diff > 0 {
    if diff > rsc.burstReplicas {
        diff = rsc.burstReplicas
    }

    relatedPods := rsc.getIndirectlyRelatedPods(rs)
    podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
    rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

    for _, pod := range podsToDelete {
        go func(targetPod *v1.Pod) {
            rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs) // 缩容
        }(pod)
    }
}

return nil


增加 Pod 数量的操作比较小心,每次最多不超过 burst count(这个配置是从 ReplicaSet 的父对象 Deployment 那里继承来的)。

另外,创建 Pods 的过程是 批处理的, “慢启动”操,开始时是 SlowStartInitialBatchSize,每执行成功一批,下次的 batch size 就翻倍。 这样设计是为了避免给 kube-apiserver 造成不必要的压力,例如,如果由于 quota 不足,这批 pod 大部分都会失败,那 这种方式只会有一小批请求到达 kube-apiserver,而如果一把全上的话,请求全部会打过去。 同样是失败,这种失败方式比较优雅。

Owner reference

Kubernetes 通过 Owner Reference(子资源中的一个字段,指向的是其父资源的 ID)维护对象层级(hierarchy)。这可以带来两方面好处:
  1. 实现了 cascading deletion,即父对象被 GC 时会确保 GC 子对象;
  2. 父对象之间不会出现竞争子对象的情况(例如,两个父对象认为某个子对象都是自己的)


另一个隐藏的好处是:Owner Reference 是有状态的:如果 controller 重启,重启期间不会影响 系统的其他部分,因为资源拓扑(resource topology)是独立于 controller 的。 这种隔离设计也体现在 controller 自己的设计中:controller 不应该操作 其他 controller 的资源(resources they don’t explicitly own)。

有时也可能会出现“孤儿”资源(“orphaned” resources)的情况,例如:
  1. 父资源删除了,子资源还在;
  2. GC 策略导致子资源无法被删除。


这种情况发生时,controller 会确保孤儿资源会被某个新的父资源收养。 多个父资源都可以竞争成为孤儿资源的父资源,但只有一个会成功(其余的会收到一个 validation 错误)。

Informers

很多 controller(例如 RBAC authorizer 或 Deployment controller)需要将集群信息拉到本地。

例如 RBAC authorizer 中,authenticator 会将用户信息保存到请求上下文中。随后, RBAC authorizer 会用这个信息获取 etcd 中所有与这个用户相关的 role 和 role bindings。

那么,controller 是如何访问和修改这些资源的?在 Kubernetes 中,这是通过 informer 机制实现的。

informer 是一种 controller 订阅存储(etcd)事件的机制,能方便地获取它们感兴趣的资源。
  • 这种方式除了提供一种很好的抽象之外,还负责处理缓存(caching,非常重要,因为可 以减少 kube-apiserver 连接数,降低 controller 测和 kube-apiserver 侧的序列化 成本)问题。
  • 此外,这种设计还使得 controller 的行为是 threadsafe 的,避免影响其他组件或服务。


关于 informer 和 controller 的联合工作机制,可参考 这篇博客

Scheduler(调度器)

以上 controllers 执行完各自的处理之后,etcd 中已经有了一个 Deployment、一个 ReplicaSet 和三个 Pods,可以通过 kube-apiserver 查询到。 但此时,这三个 Pod 还卡在 Pending 状态,因为它们还没有被调度到任何节点另外一个 controller —— 调度器 —— 负责做这件事情。

scheduler 作为控制平面的一个独立服务运行,但工作方式与其他 controller 是一样的: 监听事件,然后尝试 reconcile 状态。

调用栈概览
Run // pkg/scheduler/scheduler.go 
|-SchedulingQueue.Run()
|
|-scheduleOne()
 |-bind
 |  |-RunBindPlugins
 |     |-runBindPlugins
 |        |-Bind
 |-sched.Algorithm.Schedule(pod)
    |-findNodesThatFitPod
    |-prioritizeNodes
    |-selectHost

调度过程
// pkg/scheduler/core/generic_scheduler.go

// 将 Pod 调度到指定 node list 中的某台 Node 上
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework,
state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {

feasibleNodes, diagnosis := g.findNodesThatFitPod(ctx, fwk, state, pod) // 过滤可用 nodes
if len(feasibleNodes) == 0 {
    return result, &framework.FitError{}
}

if len(feasibleNodes) == 1 { // 可用 node 只有一个,就选它了
    return ScheduleResult{SuggestedHost:  feasibleNodes[0].Name}, nil
}

priorityList := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
host := g.selectHost(priorityList)

return ScheduleResult{
    SuggestedHost:  host,
    EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
    FeasibleNodes:  len(feasibleNodes),
}, err
}

// Filters nodes that fit the pod based on the framework filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework,
state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {

diagnosis := framework.Diagnosis{
    NodeToStatusMap:      make(framework.NodeToStatusMap),
    UnschedulablePlugins: sets.NewString(),
}

// Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes := g.nodeInfoSnapshot.NodeInfos().List()

if len(pod.Status.NominatedNodeName) > 0 && featureGate.Enabled(features.PreferNominatedNode) {
    feasibleNodes := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
    if len(feasibleNodes) != 0 {
        return feasibleNodes, diagnosis, nil
    }
}

feasibleNodes := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
feasibleNodes = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
return feasibleNodes, diagnosis, nil


它会过滤 过滤 PodSpect 中 NodeName 字段为空的 Pods,尝试为这样的 Pods 挑选一个 Node 调度上去。

调度算法

下面简单看下内置的默认调度算法。

注册默认 predicates

这些 predicates 其实都是函数,被调用到时,执行相应的 过滤。例如,如果 PodSpec 里面显式要求了 CPU 或 RAM 资源,而一个 node 无法满足这些条件, 那就会将这个 Node 从备选列表中删除。
// pkg/scheduler/algorithmprovider/registry.go

// NewRegistry returns an algorithm provider registry instance.
func NewRegistry() Registry {
defaultConfig := getDefaultConfig()
applyFeatureGates(defaultConfig)

caConfig := getClusterAutoscalerConfig()
applyFeatureGates(caConfig)

return Registry{
    schedulerapi.SchedulerDefaultProviderName: defaultConfig,
    ClusterAutoscalerProvider:                 caConfig,
}
}

func getDefaultConfig() *schedulerapi.Plugins {
plugins := &schedulerapi.Plugins{
    PreFilter: schedulerapi.PluginSet{...},
    Filter: schedulerapi.PluginSet{
        Enabled: []schedulerapi.Plugin{
            {Name: nodename.Name},        // 指定 node name 调度
            {Name: tainttoleration.Name}, // 指定 toleration 调度
            {Name: nodeaffinity.Name},    // 指定 node affinity 调度
            ...
        },
    },
    PostFilter: schedulerapi.PluginSet{...},
    PreScore: schedulerapi.PluginSet{...},
    Score: schedulerapi.PluginSet{
        Enabled: []schedulerapi.Plugin{
            {Name: interpodaffinity.Name, Weight: 1},
            {Name: nodeaffinity.Name, Weight: 1},
            {Name: tainttoleration.Name, Weight: 1},
            ...
        },
    },
    Reserve: schedulerapi.PluginSet{...},
    PreBind: schedulerapi.PluginSet{...},
    Bind: schedulerapi.PluginSet{...},
}

return plugins


plugin 的实现见:pkg/scheduler/framework/plugins/,以 nodename filter 为例:
// pkg/scheduler/framework/plugins/nodename/node_name.go

// Filter invoked at the filter extension point.
func (pl *NodeName) Filter(ctx context.Context, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if !Fits(pod, nodeInfo) {
    return framework.NewStatus(UnschedulableAndUnresolvable, ErrReason)
}
return nil
}

// 如果 Pod 没有指定 NodeName,或者指定的 NodeName 等于该 Node 的 name,返回 true;其他返回 false
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name


对筛选出的 node 排序

选择了合适的 Nodes 之后,接下来会执行一系列 priority function 对这些 Nodes 进行排序。 例如,如果算法是希望将 Pods 尽量分散到整个集群,那 priority 会选择资源尽量空闲的节点。

这些函数会给每个 Node 打分,得分最高的 Node 会被选中,调度到该节点。
// pkg/scheduler/core/generic_scheduler.go

// 运行打分插件(score plugins)对 nodes 进行排序。
func (g *genericScheduler) prioritizeNodes(ctx context.Context, fwk framework.Framework,
state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node,) (framework.NodeScoreList, error) {

// 如果没有指定 priority 配置,所有 node 将都得 1 分。
if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
    result := make(framework.NodeScoreList, 0, len(nodes))
    for i := range nodes {
        result = append(result, framework.NodeScore{ Name:  nodes[i].Name, Score: 1 })
    }
    return result, nil
}

preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)       // PreScoe 插件
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)  // Score 插件

result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
    result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
    for j := range scoresMap {
        result[i].Score += scoresMap[j][i].Score
    }
}

if len(g.extenders) != 0 && nodes != nil {
    combinedScores := make(map[string]int64, len(nodes))
    for i := range g.extenders {
        if !g.extenders[i].IsInterested(pod) {
            continue
        }
        go func(extIndex int) {
            prioritizedList, weight := g.extenders[extIndex].Prioritize(pod, nodes)
            for i := range *prioritizedList {
                host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                combinedScores[host] += score * weight
            }
        }(i)
    }

    for i := range result {
        result[i].Score += combinedScores[result[i].Name] * (MaxNodeScore / MaxExtenderPriority)
    }
}

return result, nil


创建 v1.Binding 对象

算法选出一个 Node 之后,调度器会 创建一个 Binding 对象, Pod 的 ObjectReference 字段的值就是选中的 Node 的名字
// pkg/scheduler/framework/runtime/framework.go

func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState,
pod *v1.Pod, nodeName string) *framework.Status {

if !state.ShouldRecordPluginMetrics() {
    return bp.Bind(ctx, state, pod, nodeName)
}

status := bp.Bind(ctx, state, pod, nodeName)
return status


// pkg/scheduler/framework/plugins/defaultbinder/default_binder.go

// Bind binds pods to nodes using the k8s client.
func (b DefaultBinder) Bind(ctx, state *CycleState, p *v1.Pod, nodeName string) *framework.Status {
binding := &v1.Binding{
    ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
    Target:     v1.ObjectReference{Kind: "Node", Name: nodeName}, // ObjectReference 字段为 nodeName
}

b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})


如上,最后 ClientSet().CoreV1().Pods(binding.Namespace).Bind() 通过一个 POST 请求发给 apiserver

kube-apiserver 更新 Pod 对象

kube-apiserver 收到这个 Binding object 请求后,registry 反序列化对象,更新 Pod 对象的下列字段:
  • 设置 NodeName
  • 添加 annotations
  • 设置 PodScheduled status 为 True


// pkg/registry/core/pod/storage/storage.go

func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string,
annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {

podKey := r.store.KeyFunc(ctx, podID)
r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil,
    storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {

    pod, ok := obj.(*api.Pod)
    pod.Spec.NodeName = machine
    if pod.Annotations == nil {
        pod.Annotations = make(map[string]string)
    }
    for k, v := range annotations {
        pod.Annotations[k] = v
    }
    podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
        Type:   api.PodScheduled,
        Status: api.ConditionTrue,
    })

    return pod, nil
}), dryRun, nil)


自定义调度器

predicate 和 priority function 都是可扩展的,可以通过 --policy-config-file 指定。

Kubernetes 还可以自定义调度器(自己实现调度逻辑)。如果 PodSpec 中 schedulerName 字段不为空,Kubernetes 就会将这个 Pod 的调度权交给指定的调度器。

小结

总结一下前面已经完成的步骤:
  1. HTTP 请求通过了认证、鉴权、admission control
  2. Deployment, ReplicaSet 和 Pod resources 已经持久化到 etcd
  3. 一系列 initializers 已经执行完毕
  4. 每个 Pod 也已经调度到了合适的 Node 上


但是,到目前为止,我们看到的所有东西(状态),还只是存在于 etcd 中的元数据。 下一步就是将这些状态同步到计算节点上,然后计算节点上的 agent(kubelet)就开始干活了。

原文链接:

0 个评论

要回复文章请先登录注册