Reconcile

Reconcileはカスタムコントローラのコアロジックになります。 あるべき状態(ユーザーが作成したカスタムリソース)と、実際のシステムの状態を比較し、差分があればそれを埋めるための処理を実行します。

Reconcileはいつ呼ばれるのか

Reconcile処理は下記のタイミングで呼び出されます。

  • コントローラが扱うリソースが作成、更新、削除されたとき
  • Reconcileに失敗してリクエストが再度キューに積まれたとき
  • コントローラの起動時
  • 外部イベントが発生したとき
  • キャッシュを再同期するとき(デフォルトでは10時間に1回)

このような様々なタイミングで呼び出されるので、Reconcile処理は必ず冪等(同じリクエストで何度呼び出しても同じ結果になること)でなければなりません。

なお、Reconcile処理はデフォルトでは1秒間に10回以上実行されないように制限されています。

監視対象の制御

Reconcileが呼ばれるタイミングを制御するために、NewControllerManagedBy関数を利用します。

tenant_controller.go

return ctrl.NewControllerManagedBy(mgr).
    For(&multitenancyv1.Tenant{}).
    Owns(&corev1.Namespace{}).
    Owns(&rbacv1.ClusterRole{}).
    Owns(&rbacv1.RoleBinding{}).
    Watches(&src, &handler.EnqueueRequestForObject{}).
    WithEventFilter(pred).
    Complete(r)

ForではこのコントローラのReconcile対象となるリソースの型を指定します。

Ownsにはこのコントローラが生成するリソースの型を指定します。 ここではテナントコントローラが生成するnamespaceとClusterRole,RoleBindingを指定しています。 これらのリソースに何らかの変更が発生した際にReconcileが呼び出されるようになります。 ただし、Ownsで指定した型のすべてのリソースの変更をウォッチするわけではなく、テナントリソースがownerReferenceに指定されているリソースのみが監視対象となります。

イベントのフィルタリング

WithEventFilterでは、For, Owns, Watchesで監視対象としたリソースの変更イベントをまとめてフィルタリングすることができます。 後述しますが、ForOwns 個々に、より細かくフィルタリングすることもできます。

下記のようなpredicate.Funcsを用意して、WithEventFilter関数で指定します。

tenant_controller.go

pred := predicate.Funcs{
    CreateFunc:  func(event.CreateEvent) bool { return true },
    DeleteFunc:  func(event.DeleteEvent) bool { return true },
    UpdateFunc:  func(event.UpdateEvent) bool { return true },
    GenericFunc: func(event.GenericEvent) bool { return true },
}

例えばCreateFuncでtrueを返し、DeleteFunc,UpdateFuncでfalseを返すようにすれば、リソースが作成されたときのみReconcileが呼び出されるようにできます。また、引数でイベントの詳細な情報が渡ってくるので、それを利用してより複雑なフィルタリングをおこなうことも可能です。 なお、GenericFuncは後述の外部イベントのフィルタリングに利用します。

重要な注意点として、kube-apiserver に発行した CREATE/UPDATE/PATCH 操作が一対一でイベントにはなりません。 たとえば CREATE 直後に UPDATE すると、イベントとしては CreateFunc しか呼び出されないことがあります。

WithEventFilterを利用するとForOwns,Watchesで指定したすべての監視対象にフィルターが適用されますが、下記のようにForOwns,Watchesのオプションとして個別にフィルターを指定することも可能です。

return ctrl.NewControllerManagedBy(mgr).
    For(&multitenancyv1.Tenant{}, builder.WithPredicates(pred1)).
    Owns(&corev1.Namespace{}, builder.WithPredicates(pred2)).
    Owns(&rbacv1.RoleBinding{}, builder.WithPredicates(pred3)).
    Watches(&src, &handler.EnqueueRequestForObject{}, builder.WithPredicates(pred4)).
    Complete(r)

外部イベントの監視

Watchesでは上記以外の外部イベントを監視したい場合に利用します。

Kubernetes内のリソースの変更だけでなく、外部イベントをトリガーにしてReconcileを呼び出したい場合があります。 例えばGitHubのWebhook呼び出しに応じて処理をおこないたい場合や、外部の状態をポーリングしてその状態の変化によって処理をおこないたい場合などが考えられます。

外部イベント監視の例として、ここでは10秒ごとに起動しテナントリソースがReady状態になっていればイベントを発行する仕組みを実装してみます。

external_event.go

package controllers

import (
    "time"

    multitenancyv1 "github.com/zoetrope/kubebuilder-training/codes/api/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/event"
)

func newExternalEventWatcher() *externalEventWatcher {
    ch := make(chan event.GenericEvent)

    return &externalEventWatcher{
        channel: ch,
    }
}

func (r *externalEventWatcher) InjectClient(c client.Client) error {
    r.client = c
    return nil
}

type externalEventWatcher struct {
    channel chan event.GenericEvent
    client  client.Client
}

func (r externalEventWatcher) Start(ch <-chan struct{}) error {
    ticker := time.NewTicker(10 * time.Second)
    ctx := contextFromStopChannel(ch)

    defer ticker.Stop()
    for {
        select {
        case <-ch:
            return nil
        case <-ticker.C:
            var tenants multitenancyv1.TenantList
            err := r.client.List(ctx, &tenants, client.MatchingFields(map[string]string{conditionReadyField: string(corev1.ConditionTrue)}))
            if err != nil {
                break
            }
            for _, tenant := range tenants.Items {
                r.channel <- event.GenericEvent{
                    Meta: &metav1.ObjectMeta{
                        Name: tenant.Name,
                    },
                }
            }
        }
    }
}

上記のwatcherをmgr.Addでマネージャに登録し、GenericEventをウォッチします。

tenant_controller.go

external := newExternalEventWatcher()
err = mgr.Add(external)
if err != nil {
    return err
}
src := source.Channel{
    Source: external.channel,
}
return ctrl.NewControllerManagedBy(mgr).
    For(&multitenancyv1.Tenant{}).
    Owns(&corev1.Namespace{}).
    Owns(&rbacv1.ClusterRole{}).
    Owns(&rbacv1.RoleBinding{}).
    Watches(&src, &handler.EnqueueRequestForObject{}).
    WithEventFilter(pred).
    Complete(r)

これにより、Ready状態のテナントリソースが存在すると10秒ごとにReconcileが呼び出されるようになります。

Reconcileの実装

いよいよReconcileの本体を実装します。

Reconciler

Reconcileはreconcile.Reconcilerインタフェースを実装することになります。

type Reconciler interface {
    Reconcile(Request) (Result, error)
}

引数として渡ってくるreconcile.Requestには、Forで指定した監視対象のNamespacedNameが含まれています。

このNamespacedNameを利用して、テナントリソースの取得をおこないます。

tenant_controller.go

var tenant multitenancyv1.Tenant
err := r.Get(ctx, req.NamespacedName, &tenant)

なお、OwnsでnamespaceやClusterRole, RoleBindingを監視対象に設定しましたが、これらのリソースの変更によってReconcileが呼び出された場合でも、RequestのNamespacedNameにはこれらのリソースのownerであるテナントリソースの名前が入っています。

戻り値のreconcile.Resultには、Requeue, RequeueAfterというフィールドが含まれています。 この戻り値を利用すると、指定した時間が経過したあとに再度Reconcileを呼び出させることが可能になります。 例えば何らかの時間がかかる処理(コンテナの起動など)を待つ場合に利用できます。

また、Recnocileがエラーを返した場合は、失敗するたびに待ち時間が指数関数的に増加します。

Reconcileは複数のリソースを管理しているため、1つのリソースを処理するために多くの時間をかけるべきではありません。 何らかの待ちが発生する場合は、RequeueRequeueAfterを指定してReconcileをすぐに抜けるようにしましょう。

reconcileNamespaces

テナントリソースに記述されたnamespaceを作成します。

tenant_controller.go

func (r *TenantReconciler) reconcileNamespaces(ctx context.Context, log logr.Logger, tenant multitenancyv1.Tenant) (bool, error) {
    //! [matching-fields]
    var namespaces corev1.NamespaceList
    err := r.List(ctx, &namespaces, client.MatchingFields(map[string]string{ownerControllerField: tenant.Name}))
    //! [matching-fields]
    if err != nil {
        log.Error(err, "unable to fetch namespaces")
        return false, err
    }
    namespaceNames := make(map[string]corev1.Namespace)
    for _, ns := range namespaces.Items {
        namespaceNames[ns.Name] = ns
    }

    updated := false
    for _, ns := range tenant.Spec.Namespaces {
        name := tenant.Spec.NamespacePrefix + ns
        if _, ok := namespaceNames[name]; ok {
            delete(namespaceNames, name)
            continue
        }
        target := corev1.Namespace{
            ObjectMeta: metav1.ObjectMeta{
                Name: name,
            },
        }
        //! [controller-reference]
        err = ctrl.SetControllerReference(&tenant, &target, r.Scheme)
        //! [controller-reference]
        if err != nil {
            log.Error(err, "unable to set owner reference", "name", name)
            return updated, err
        }
        log.Info("creating the new namespace", "name", name)
        err = r.Create(ctx, &target, &client.CreateOptions{})
        if err != nil {
            log.Error(err, "unable to create the namespace", "name", name)
            return updated, err
        }
        addedNamespaces.Inc()
        updated = true
        delete(namespaceNames, name)
    }

    for _, ns := range namespaceNames {
        log.Info("deleting the new namespace", "name", ns.Name)
        err = r.Delete(ctx, &ns, &client.DeleteOptions{})
        if err != nil {
            log.Error(err, "unable to delete the namespace", "name", ns.Name)
            return updated, err
        }
        removedNamespaces.Inc()
        updated = true
    }

    return updated, nil
}

reconcileRBAC

ClusterRoleとRoleBindingを作成し、テナントの管理対象のnamespaceに管理者権限を付与します。

tenant_controller.go

func (r *TenantReconciler) reconcileRBAC(ctx context.Context, log logr.Logger, tenant multitenancyv1.Tenant) (bool, error) {
    updated := false
    for _, ns := range tenant.Spec.Namespaces {
        //! [create-or-update]
        name := tenant.Spec.NamespacePrefix + ns

        role := &rbacv1.ClusterRole{}
        role.SetName(name + "-admin-role")
        op, err := ctrl.CreateOrUpdate(ctx, r.Client, role, func() error {
            role.Rules = []rbacv1.PolicyRule{
                {
                    Verbs:         []string{"get", "list", "watch", "update", "patch", "delete"},
                    APIGroups:     []string{multitenancyv1.GroupVersion.Group},
                    Resources:     []string{"tenants"},
                    ResourceNames: []string{tenant.Name},
                },
                {
                    Verbs:         []string{"get", "list", "watch"},
                    APIGroups:     []string{""},
                    Resources:     []string{"namespaces"},
                    ResourceNames: []string{name},
                },
            }
            return ctrl.SetControllerReference(&tenant, role, r.Scheme)
        })
        //! [create-or-update]
        if err != nil {
            log.Error(err, "unable to create-or-update RoleBinding")
            return updated, err
        }

        if op != controllerutil.OperationResultNone {
            updated = true
            log.Info("reconcile RoleBinding successfully", "op", op)
        }

        rb := &rbacv1.RoleBinding{}
        rb.SetNamespace(name)
        rb.SetName(name + "-admin-rolebinding")

        op, err = ctrl.CreateOrUpdate(ctx, r.Client, rb, func() error {
            rb.RoleRef = rbacv1.RoleRef{
                APIGroup: "rbac.authorization.k8s.io",
                Kind:     "ClusterRole",
                Name:     name + "-admin-role",
            }
            rb.Subjects = []rbacv1.Subject{tenant.Spec.Admin}
            return ctrl.SetControllerReference(&tenant, rb, r.Scheme)
        })
        if err != nil {
            log.Error(err, "unable to create-or-update RoleBinding")
            return updated, err
        }

        if op != controllerutil.OperationResultNone {
            updated = true
            log.Info("reconcile RoleBinding successfully", "op", op)
        }
    }
    return updated, nil
}

ステータスの更新

最初にステータスを更新するためのヘルパー関数を用意しておきます。

status.go

package controllers

import (
    "time"

    multitenancyv1 "github.com/zoetrope/kubebuilder-training/codes/api/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func setCondition(conditions *[]multitenancyv1.TenantCondition, newCondition multitenancyv1.TenantCondition) {
    if conditions == nil {
        conditions = &[]multitenancyv1.TenantCondition{}
    }
    current := findCondition(*conditions, newCondition.Type)
    if current == nil {
        newCondition.LastTransitionTime = metav1.NewTime(time.Now())
        *conditions = append(*conditions, newCondition)
        return
    }
    if current.Status != newCondition.Status {
        current.Status = newCondition.Status
        current.LastTransitionTime = metav1.NewTime(time.Now())
    }
    current.Reason = newCondition.Reason
    current.Message = newCondition.Message
}

func findCondition(conditions []multitenancyv1.TenantCondition, conditionType multitenancyv1.TenantConditionType) *multitenancyv1.TenantCondition {
    for _, c := range conditions {
        if c.Type == conditionType {
            return &c
        }
    }
    return nil
}

コントローラが扱うリソースに何も変更が加えられなかった場合は、ステータスを更新する必要もないでしょう。 そこで下記のような関数を用意し、namespaceとRBACのどちらかに変更が加えられたことをわかるようにしておきます。

tenant_controller.go

func (r *TenantReconciler) reconcile(ctx context.Context, log logr.Logger, tenant multitenancyv1.Tenant) (bool, error) {
    nsUpdated, err := r.reconcileNamespaces(ctx, log, tenant)
    if err != nil {
        return nsUpdated, err
    }
    rbUpdated, err := r.reconcileRBAC(ctx, log, tenant)
    if err != nil {
        return rbUpdated, err
    }
    return nsUpdated || rbUpdated, nil
}

上記の関数の戻り値に応じてステータスの更新をおこないます。

tenant_controller.go

updated, err := r.reconcile(ctx, log, tenant)
if err != nil {
    log.Error(err, "unable to reconcile", "name", tenant.Name)
    r.Recorder.Eventf(&tenant, corev1.EventTypeWarning, "Failed", "failed to reconcile: %s", err.Error())
    setCondition(&tenant.Status.Conditions, multitenancyv1.TenantCondition{
        Type:    multitenancyv1.ConditionReady,
        Status:  corev1.ConditionFalse,
        Reason:  "Failed",
        Message: err.Error(),
    })
    stErr := r.Status().Update(ctx, &tenant)
    if stErr != nil {
        log.Error(stErr, "failed to update status", "name", tenant.Name)
    }
    return ctrl.Result{}, err
}

currentCond := findCondition(tenant.Status.Conditions, multitenancyv1.ConditionReady)
if updated || currentCond == nil || currentCond.Status != corev1.ConditionTrue {
    r.Recorder.Event(&tenant, corev1.EventTypeNormal, "Updated", "the tenant was updated")
    setCondition(&tenant.Status.Conditions, multitenancyv1.TenantCondition{
        Type:   multitenancyv1.ConditionReady,
        Status: corev1.ConditionTrue,
    })
    err = r.Status().Update(ctx, &tenant)
    if err != nil {
        log.Error(err, "failed to update status", "name", tenant.Name)
        return ctrl.Result{}, err
    }
}

これにより、ユーザーはテナントリソースのステータスを確認することが可能になります。

results matching ""

    No results matching ""