什麼是 client-go?

client-go 是 Kubernetes 官方的 Golang client,負責跟 Kubernetes API server 用 REST API 的方式互動。其實 client-go 基本上可以做任何事情,不只是拿來寫 operator,連 kubectl 內部的實作也是用 client-go。至於更專門拿來寫 operator 的框架,包含 controller-runtimekubebuilderoperator-sdk 會在本系列文的後續介紹。

Sample Controller 機制介紹

sample-controller 是 Kubernetes 官方使用 client-go 實作的範例 operator。

為了看懂程式碼,我們必須先了解我們自己寫的 operator 和 client-go 是怎麼互動的。這邊講的東西是 官方文件 的簡化版。

client-go-controller-interaction

上面這張圖出自官方文件,網路上也可以查到許多教學都會提到這張圖。

上半部是 client-go 裡面的東西,看起來很複雜,有什麼 Reflctor、Informer、Indexer 之類的,但其實你只要知道 Informer 就好了。Informer 最主要的功能就是當 Resource 的狀態發生改變的時候通知我們。為什麼不是我們自己打 API 到 Kubernetes API server 就好了呢?這是因為打 API 到 Kubernetes API server 是一個很貴的操作,因此 Informer 內部有維護一個 Informer Cache,減少打 Requests 到 API server 的次數。

下半部是我們要自己寫的部份:

  • Resource Event Handlers:當 Informer 通知我們某個 resource 的狀態改變的時候,我們應該做什麼事,基本上就是把它的 key(namespace + name)放進 workqueue 裡面。
  • Workqueue:這個會存放所有待處理的 object 的 key,我們的 operator 會不斷的從裡面拿東西出來處理,然後試圖把叢集調整成我們想要的狀態,如果失敗的話可能需要再把這個 object key 加回去 workqueue 待會處理。

Sample Controller 程式碼解析

定義 CRD

register.go 定義 GroupName,然後 v1alpha1/types.go 定義 CRD 的 type。可以看到裡面定義了一個 Foo 的資源如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Foo is a specification for a Foo resource
type Foo struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   FooSpec   `json:"spec"`
	Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
	DeploymentName string `json:"deploymentName"`
	Replicas       *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
	AvailableReplicas int32 `json:"availableReplicas"`
}

除了 TypeMetaObjectMeta 這些基本的東西之外,多定義了 SpecStatusSpec 是可以給使用者輸入的資料,定義的是「使用者期望這個資源最後該有的狀態」。Status 則是由我們的 Operator 去寫入值,代表的是「目前該資源的狀態」。

Sample Controller 使用了 Kubernetes 的 code-generator 來生成 CRD 的 typed client、informers、listers 和 deep-copy 函數。所以每次改動 types.go 的時候都要執行 ./hack/update-codegen.sh 來重新生成。

程式入口點

接著來看 main.go ,也就是程式的入口點,其實非常簡單,注意這幾行就好:

1
2
3
4
5
6
7
8
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(ctx, kubeClient, exampleClient,
    kubeInformerFactory.Apps().V1().Deployments(),
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
controller.Run(ctx, 2)

基本上就是創出 Kubernetes 內建 resource 以及我們自己建的這個 Foo Resource 的 client 和 informers,然後傳給 NewController。之後呼叫 controller.Run

主要邏輯

最後來看最主要的部份, controller.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.enqueueFoo,
    UpdateFunc: func(old, new interface{}) {
        controller.enqueueFoo(new)
    },
})

deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.handleObject,
    UpdateFunc: func(old, new interface{}) {
        newDepl := new.(*appsv1.Deployment)
        oldDepl := old.(*appsv1.Deployment)
        if newDepl.ResourceVersion == oldDepl.ResourceVersion {
            // Periodic resync will send update events for all known Deployments.
            // Two different versions of the same Deployment will always have different RVs.
            return
        }
        controller.handleObject(new)
    },
    DeleteFunc: controller.handleObject,
})

這個就是上面講的 event handler,這邊可以註冊 AddFuncUpdateFuncDeleteFunc,informer 在偵測到 resource 狀態改變的時候會呼叫相對應的函數。可以看到這邊 fooInformer 都只是簡單的呼叫 enqueueFoo,而 deploymentInformer 都是呼叫 handleObject

1
2
3
4
5
6
7
8
func (c *Controller) enqueueFoo(obj interface{}) {
	if objectRef, err := cache.ObjectToName(obj); err != nil {
		utilruntime.HandleError(err)
		return
	} else {
		c.workqueue.Add(objectRef)
	}
}

enqueueFoo 其實就是把 Foo 這個 object 的 key 存進 workqueue 而已,可以看這裡,非常明顯:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (c *Controller) handleObject(obj interface{}) {
	...
	if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
		// If this object is not owned by a Foo, we should not do anything more
		// with it.
		if ownerRef.Kind != "Foo" {
			return
		}

		foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
		if err != nil {
			logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "foo", ownerRef.Name)
			return
		}

		c.enqueueFoo(foo)
		return
	}
}

這個是 handleObject 的節錄,這邊做的事情是檢查這個 deployment 的 owner 是不是 Foo,如果不是的話我們就不管他,是的話就把對應的 Foo 的 key 加進 workqueue。這邊要帶到一個觀念叫做 OwnerRefernce ,在 Kubernetes 裡面,某些 object 是其他 object 的 owner,預設行為是當 owner 被刪除的時候,他 own 的其他 object 也會被刪除,例如 ReplicaSet 就是 Pod 的 owner,所以 ReplicaSet 被刪除的時候它管的 pods 也會被刪除。這也是為什麼上面的 fooInformer 沒有加上 DeleteFunc handler 的原因,因為當 Foo 被刪除的時候我們在這裡想做的事就是把他 own 的對應的所有 deployment 刪掉,但因為我們已經把 deployment 的 owner 設成 Foo,當 Foo 被刪掉的時候對應的 deployment 本來就也會被刪掉,所以我們不用特別處理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (c *Controller) Run(ctx context.Context, workers int) error {
	...
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
	}
	...
}

func (c *Controller) runWorker(ctx context.Context) {
	for c.processNextWorkItem(ctx) {
	}
}

Run 就是 main.go 呼叫 controller 的入口點,會起多個 goruntine 跑 runWorkerrunWorker 其實就只是跑一個無窮迴圈不斷執行 processNextWorkItem 而已。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
	objRef, shutdown := c.workqueue.Get()

	...

	// Run the syncHandler, passing it the structured reference to the object to be synced.
	err := c.syncHandler(ctx, objRef)
	if err == nil {
		c.workqueue.Forget(objRef)
		logger.Info("Successfully synced", "objectName", objRef)
		return true
	}
	utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
	c.workqueue.AddRateLimited(objRef)
	return true
}

上面是 processNextWorkItem 的節錄,首先就是先從 workqueue 拿一個 object key 出來,然後呼叫 syncHandler 處理,如果成功的話就把它從 workqueue 裡面忘掉,不然就要做 error handling 然後放回去 workqueue 等等處理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
	...

	// Get the Foo resource with this namespace/name
	foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)

	...

	deploymentName := foo.Spec.DeploymentName

	...

	// Get the deployment with the name specified in Foo.spec
	deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
	// If the resource doesn't exist, we'll create it
	if errors.IsNotFound(err) {
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
	}

	if err != nil {
		return err
	}

	// If the Deployment is not controlled by this Foo resource, we should log
	// a warning to the event recorder and return error msg.
	if !metav1.IsControlledBy(deployment, foo) {
		msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
		c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
		return fmt.Errorf("%s", msg)
	}

	// If this number of the replicas on the Foo resource is specified, and the
	// number does not equal the current desired replicas on the Deployment, we
	// should update the Deployment resource.
	if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
		logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
	}

	if err != nil {
		return err
	}

	// Finally, we update the status block of the Foo resource to reflect the
	// current state of the world
	err = c.updateFooStatus(foo, deployment)
	if err != nil {
		return err
	}

	c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}

最後是 syncHandler 的節錄,這邊就是我們真正要寫的邏輯,把叢集調整成使用者在 Spec 裡面宣告的狀態。這邊想要的狀態就是在 Spec 裡面宣告的 deployment 有被創出來,且 replica 的數量和 Spec 裡面宣告的一致。

結語

看完之後,是不是覺得我好像只講了 Sample Controller 的程式碼一小部份?其實這是因為 client-go 算是很底層的 library,用來寫 operator 的話有一些缺點:

  • 我們真正需要寫自己的邏輯地方沒有那麼多,但還是必須寫上某些固定要寫的東西,導致有些地方有點冗餘。
  • 要監聽不同資源的時候,必須對每個資源都宣告 informers、listers 之類重複的東西,例如 Sample Controller 裡面就宣告了 fooInformerdeploymentInformer,要管的資源一多的話非常麻煩。

這些缺點也催生了其他更專門拿來寫 operator 的框架,包含 controller-runtimekubebuilderoperator-sdk 等,歡迎關注本系列文的後續介紹來了解這些框架。

參考資料