kubernetes源码分析-deployment分析
1. deployemt启动注册:
cmd/kube-controller-manager/
秦岭水泥
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
.....
controllers["deployment"] = startDeploymentController
....
}
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, fal, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
NewControllerInitializers 注册 startDeploymentController
2. NewDeploymentController 定义在pkg/controller/deployment/
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.P odInformer, client clientt.Interface) (*DeploymentController, error) {
......
dc.syncHandler = dc.syncDeployment
......
return dc, nil
}
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.Infof("Starting deployment controller")
defer klog.Infof("Shutting down deployment controller")
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
调⽤run启动, handler处理响应, 实际通过dc.syncHandler = dc.syncDeployment 指向syncDeployment
3. syncDeployment定义:
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
defer func() {
klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).Infof("Deployment %v has been deleted", key)
return nil
}
if err != nil {
return err
}电脑怎么连无线网
// Deep-copy otherwi we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is lecting all pods. A non-empty lector is required.") if d.Status.ObrvedGeneration < d.Generation {
d.Status.ObrvedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current us of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a ur
// resumes a Deployment with a t progressDeadlineSeconds.
if err = dc.checkPaudConditions(d); err != nil {
大通草
return err
return err
}
if d.Spec.Paud {
return dc.sync(d, rsList)
}
/
/ rollback is not re-entrant in ca the underlying replica ts are updated with a new
// revision so we should ensure that we won't proceed to update replica ts until we
// make sure that the deployment has cleaned up its rollback spec in subquent enqueues.
雨果带你看世界if getRollbackTo(d) != nil {
llback(d, rsList)
}
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}
switch d.Spec.Strategy.Type {
ca apps.RecreateDeploymentStrategyType:
lloutRecreate(d, rsList, podMap)
ca apps.RollingUpdateDeploymentStrategyType:
lloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
4. Deployment删除
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
通过标记deletiontimestamp判断, 删除调⽤syncStatusOnly 更新状态:
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, fal)
if err != nil {
return err工程劳务合同
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
注释// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. 说明只更新状态, 不进⾏实际的删除操作
getAllReplicaSetsAndSyncRevision 获取所有的replicationt以及版本信息列表
将rs的列表的new以及old⽣成为所有的列表allrss
dc.syncDeploymentStatus(allRSs, newRS, d) 同步状态信息
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d)
if reflect.DeepEqual(d.Status, newStatus) {
return nil
}
newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) return err
}
calculateStatus 计算状态
/ calculateStatus calculates the latest status for the provided deployment by looking into the provided replica ts.
func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
unavailableReplicas := totalReplicas - availableReplicas
// If unavailableReplicas is negative, then that means the Deployment has more available replicas running than
// desired, e.g. whenever it scales down. In such a ca we should simply default unavailableReplicas to zero.
if unavailableReplicas < 0 {
unavailableReplicas = 0
}
status := apps.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObrvedGeneration: deployment.Generation,
Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
AvailableReplicas: availableReplicas,
UnavailableReplicas: unavailableReplicas,
CollisionCount: deployment.Status.CollisionCount,
}
// Copy conditions one by one so we won't mutate the original object.
conditions := deployment.Status.Conditions
for i := range conditions {
status.Conditions = append(status.Conditions, conditions[i])
}
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable , "Deployment has minimum availability.")
deploymentutil.SetDeploymentCondition(&status, *minAvailability)
} el {
noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFal, deploymentutil.MinimumReplicasUnav ailable, "Deployment does not have minimum availability.")
deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
}
return status
}
获取到最新的status信息
然后调⽤dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus 更新状态,执⾏对应操作
对于删除操作,实际实在 kube-controller-manager 的垃圾回收器(garbagecollector controller)中完成的
5. Deployment 暂停和恢复
if err = dc.checkPaudConditions(d); err != nil {
物联网工程技术>桂花树长什么样
return err
}
if d.Spec.Paud {
return dc.sync(d, rsList)
}
检查是否可以更新pasud暂停
func (dc *DeploymentController) checkPaudConditions(d *apps.Deployment) error {
if !deploymentutil.HasProgressDeadline(d) {
return nil
}
cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
// If we have reported lack of progress, do not overwrite it with a paud condition.
return nil
}
paudCondExists := cond != nil && cond.Reason == deploymentutil.PaudDeployReason
needsUpdate := fal
if d.Spec.Paud && !paudCondExists {
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PaudDeployReason, "Deployment is paud")
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
needsUpdate = true
} el if !d.Spec.Paud && paudCondExists {
condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReaso n, "Deployment is resumed")
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
needsUpdate = true
}
if !needsUpdate {
return nil
}
var err error
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
return err
}
调⽤sync进⾏状态更新
6. Deployment 回滚
if getRollbackTo(d) != nil {
llback(d, rsList)
}
/ rollback the deployment to the specified revision. In any ca cleanup the rollback spec.
func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(allOldRSs, newRS)
rollbackTo := getRollbackTo(d)
// If rollback revision is 0, rollback to the last revision
if rollbackTo.Revision == 0 {
if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
// If we still can't find the last revision, gives up rollback
十五的月亮打一成语// Gives up rollback
return dc.updateDeploymentAndClearRollbackTo(d)
}
}
for _, rs := range allRSs {
v, err := deploymentutil.Revision(rs)
if err != nil {
klog.V(4).Infof("Unable to extract revision from deployment's replica t %q: %v", rs.Name, err)