上半部是 client-go 裡面的東西,看起來很複雜,有什麼 Reflctor、Informer、Indexer 之類的,但其實你只要知道 Informer 就好了。Informer 最主要的功能就是當 Resource 的狀態發生改變的時候通知我們。為什麼不是我們自己打 API 到 Kubernetes API server 就好了呢?這是因為打 API 到 Kubernetes API server 是一個很貴的操作,因此 Informer 內部有維護一個 Informer Cache,減少打 Requests 到 API server 的次數。
// Foo is a specification for a Foo resource
typeFoostruct{metav1.TypeMeta`json:",inline"`metav1.ObjectMeta`json:"metadata,omitempty"`SpecFooSpec`json:"spec"`StatusFooStatus`json:"status"`}// FooSpec is the spec for a Foo resource
typeFooSpecstruct{DeploymentNamestring`json:"deploymentName"`Replicas*int32`json:"replicas"`}// FooStatus is the status for a Foo resource
typeFooStatusstruct{AvailableReplicasint32`json:"availableReplicas"`}
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:controller.enqueueFoo,UpdateFunc:func(old,newinterface{}){controller.enqueueFoo(new)},})deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:controller.handleObject,UpdateFunc:func(old,newinterface{}){newDepl:=new.(*appsv1.Deployment)oldDepl:=old.(*appsv1.Deployment)ifnewDepl.ResourceVersion==oldDepl.ResourceVersion{// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return}controller.handleObject(new)},DeleteFunc:controller.handleObject,})
func(c*Controller)handleObject(objinterface{}){...ifownerRef:=metav1.GetControllerOf(object);ownerRef!=nil{// If this object is not owned by a Foo, we should not do anything more
// with it.
ifownerRef.Kind!="Foo"{return}foo,err:=c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)iferr!=nil{logger.V(4).Info("Ignore orphaned object","object",klog.KObj(object),"foo",ownerRef.Name)return}c.enqueueFoo(foo)return}}
func(c*Controller)processNextWorkItem(ctxcontext.Context)bool{objRef,shutdown:=c.workqueue.Get()...// Run the syncHandler, passing it the structured reference to the object to be synced.
err:=c.syncHandler(ctx,objRef)iferr==nil{c.workqueue.Forget(objRef)logger.Info("Successfully synced","objectName",objRef)returntrue}utilruntime.HandleErrorWithContext(ctx,err,"Error syncing; requeuing for later retry","objectReference",objRef)c.workqueue.AddRateLimited(objRef)returntrue}
func(c*Controller)syncHandler(ctxcontext.Context,objectRefcache.ObjectName)error{...// Get the Foo resource with this namespace/name
foo,err:=c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)...deploymentName:=foo.Spec.DeploymentName...// Get the deployment with the name specified in Foo.spec
deployment,err:=c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)// If the resource doesn't exist, we'll create it
iferrors.IsNotFound(err){deployment,err=c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(),newDeployment(foo),metav1.CreateOptions{FieldManager:FieldManager})}iferr!=nil{returnerr}// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if!metav1.IsControlledBy(deployment,foo){msg:=fmt.Sprintf(MessageResourceExists,deployment.Name)c.recorder.Event(foo,corev1.EventTypeWarning,ErrResourceExists,msg)returnfmt.Errorf("%s",msg)}// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
iffoo.Spec.Replicas!=nil&&*foo.Spec.Replicas!=*deployment.Spec.Replicas{logger.V(4).Info("Update deployment resource","currentReplicas",*foo.Spec.Replicas,"desiredReplicas",*deployment.Spec.Replicas)deployment,err=c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(),newDeployment(foo),metav1.UpdateOptions{FieldManager:FieldManager})}iferr!=nil{returnerr}// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err=c.updateFooStatus(foo,deployment)iferr!=nil{returnerr}c.recorder.Event(foo,corev1.EventTypeNormal,SuccessSynced,MessageResourceSynced)returnnil}