什麼是 client-go?#
client-go
是 Kubernetes 官方的 Golang client,負責跟 Kubernetes API server 用 REST API 的方式互動。其實 client-go
基本上可以做任何事情,不只是拿來寫 operator,連 kubectl
內部的實作也是用 client-go
。至於更專門拿來寫 operator 的框架,包含
controller-runtime
、
kubebuilder
、
operator-sdk
會在本系列文的後續介紹。
Sample Controller 機制介紹#
sample-controller
是 Kubernetes 官方使用 client-go 實作的範例 operator。
為了看懂程式碼,我們必須先了解我們自己寫的 operator 和 client-go
是怎麼互動的。這邊講的東西是
官方文件
的簡化版。

上面這張圖出自官方文件,網路上也可以查到許多教學都會提到這張圖。
上半部是 client-go
裡面的東西,看起來很複雜,有什麼 Reflctor、Informer、Indexer 之類的,但其實你只要知道 Informer 就好了。Informer 最主要的功能就是當 Resource 的狀態發生改變的時候通知我們。為什麼不是我們自己打 API 到 Kubernetes API server 就好了呢?這是因為打 API 到 Kubernetes API server 是一個很貴的操作,因此 Informer 內部有維護一個 Informer Cache,減少打 Requests 到 API server 的次數。
下半部是我們要自己寫的部份:
- Resource Event Handlers:當 Informer 通知我們某個 resource 的狀態改變的時候,我們應該做什麼事,基本上就是把它的 key(namespace + name)放進 workqueue 裡面。
- Workqueue:這個會存放所有待處理的 object 的 key,我們的 operator 會不斷的從裡面拿東西出來處理,然後試圖把叢集調整成我們想要的狀態,如果失敗的話可能需要再把這個 object key 加回去 workqueue 待會處理。
Sample Controller 程式碼解析#
定義 CRD#
在
register.go
定義 GroupName,然後
v1alpha1/types.go
定義 CRD 的 type。可以看到裡面定義了一個 Foo
的資源如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // Foo is a specification for a Foo resource
type Foo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}
// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}
// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}
|
除了 TypeMeta
和 ObjectMeta
這些基本的東西之外,多定義了 Spec
和 Status
。Spec
是可以給使用者輸入的資料,定義的是「使用者期望這個資源最後該有的狀態」。Status
則是由我們的 Operator 去寫入值,代表的是「目前該資源的狀態」。
Sample Controller 使用了
Kubernetes 的 code-generator
來生成 CRD 的 typed client、informers、listers 和 deep-copy 函數。所以每次改動 types.go
的時候都要執行 ./hack/update-codegen.sh
來重新生成。
程式入口點#
接著來看
main.go
,也就是程式的入口點,其實非常簡單,注意這幾行就好:
1
2
3
4
5
6
7
8
| kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(ctx, kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
controller.Run(ctx, 2)
|
基本上就是創出 Kubernetes 內建 resource 以及我們自己建的這個 Foo
Resource 的 client 和 informers,然後傳給 NewController
。之後呼叫 controller.Run
。
主要邏輯#
最後來看最主要的部份,
controller.go
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
|
這個就是上面講的 event handler,這邊可以註冊 AddFunc
、UpdateFunc
、DeleteFunc
,informer 在偵測到 resource 狀態改變的時候會呼叫相對應的函數。可以看到這邊 fooInformer
都只是簡單的呼叫 enqueueFoo
,而 deploymentInformer
都是呼叫 handleObject
。
1
2
3
4
5
6
7
8
| func (c *Controller) enqueueFoo(obj interface{}) {
if objectRef, err := cache.ObjectToName(obj); err != nil {
utilruntime.HandleError(err)
return
} else {
c.workqueue.Add(objectRef)
}
}
|
enqueueFoo
其實就是把 Foo
這個 object 的 key 存進 workqueue 而已,可以看這裡,非常明顯:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func (c *Controller) handleObject(obj interface{}) {
...
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "foo", ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
|
這個是 handleObject
的節錄,這邊做的事情是檢查這個 deployment 的 owner 是不是 Foo
,如果不是的話我們就不管他,是的話就把對應的 Foo
的 key 加進 workqueue。這邊要帶到一個觀念叫做
OwnerRefernce
,在 Kubernetes 裡面,某些 object 是其他 object 的 owner,預設行為是當 owner 被刪除的時候,他 own 的其他 object 也會被刪除,例如 ReplicaSet 就是 Pod 的 owner,所以 ReplicaSet 被刪除的時候它管的 pods 也會被刪除。這也是為什麼上面的 fooInformer
沒有加上 DeleteFunc
handler 的原因,因為當 Foo
被刪除的時候我們在這裡想做的事就是把他 own 的對應的所有 deployment 刪掉,但因為我們已經把 deployment 的 owner 設成 Foo
,當 Foo
被刪掉的時候對應的 deployment 本來就也會被刪掉,所以我們不用特別處理。
1
2
3
4
5
6
7
8
9
10
11
12
| func (c *Controller) Run(ctx context.Context, workers int) error {
...
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
...
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
|
Run
就是 main.go
呼叫 controller 的入口點,會起多個 goruntine 跑 runWorker
。runWorker
其實就只是跑一個無窮迴圈不斷執行 processNextWorkItem
而已。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
...
// Run the syncHandler, passing it the structured reference to the object to be synced.
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
|
上面是 processNextWorkItem
的節錄,首先就是先從 workqueue 拿一個 object key 出來,然後呼叫 syncHandler
處理,如果成功的話就把它從 workqueue 裡面忘掉,不然就要做 error handling 然後放回去 workqueue 等等處理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
...
// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
...
deploymentName := foo.Spec.DeploymentName
...
// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
}
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
}
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
|
最後是 syncHandler
的節錄,這邊就是我們真正要寫的邏輯,把叢集調整成使用者在 Spec
裡面宣告的狀態。這邊想要的狀態就是在 Spec
裡面宣告的 deployment 有被創出來,且 replica 的數量和 Spec
裡面宣告的一致。
看完之後,是不是覺得我好像只講了 Sample Controller 的程式碼一小部份?其實這是因為 client-go
算是很底層的 library,用來寫 operator 的話有一些缺點:
- 我們真正需要寫自己的邏輯地方沒有那麼多,但還是必須寫上某些固定要寫的東西,導致有些地方有點冗餘。
- 要監聽不同資源的時候,必須對每個資源都宣告 informers、listers 之類重複的東西,例如 Sample Controller 裡面就宣告了
fooInformer
和 deploymentInformer
,要管的資源一多的話非常麻煩。
這些缺點也催生了其他更專門拿來寫 operator 的框架,包含
controller-runtime
、
kubebuilder
、
operator-sdk
等,歡迎關注本系列文的後續介紹來了解這些框架。
參考資料#