Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
apiVersion: flinkoperator.k8s.io/v1beta1  
kind: FlinkCluster  
metadata:  
  name: wordcount  
spec:  
  image:  
    name: flink:1.9.2  
  jobManager:  
    ports:  
      ui: 8081  
    resources:  
      limits:  
        memory: "1024Mi"  
        cpu: "200m"  
  taskManager:  
    replicas: 1  
    resources:  
      limits:  
        memory: "1024Mi"  
        cpu: "200m"  
  job:  
    jarFile: /cache/flink-app.jar  
    className: org.apache.flink.streaming.examples.wordcount.WordCount  
    args: ["--input", "./README.txt"]  
    parallelism: 1  
    savepointsDir: /cache/savepoints  
    initContainers:  
      - name: downloader  
        image: curlimages/curl  
        env:  
          - name: JAR_URL  
            value: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.9.2/flink-examples-streaming_2.12-1.9.2-WordCount.jar  
          - name: DEST_PATH  
            value: /cache/flink-app.jar  
        command: ['sh', '-c', 'curl -o ${DEST_PATH} ${JAR_URL}']  
    volumes:  
      - name: cache  
        emptyDir: {}  
    volumeMounts:  
      - mountPath: /cache  
        name: cache  
  flinkProperties:  
    taskmanager.numberOfTaskSlots: "1"

Check job status and events

API

FlinkCluster

1
2
3
4
5
6
7
8
// FlinkCluster is the Schema for the flinkclusters APItype
FlinkCluster struct {
   metav1.TypeMeta   `json:",inline"`
   metav1.ObjectMeta `json:"metadata,omitempty"`

   Spec   FlinkClusterSpec   `json:"spec"`
   Status FlinkClusterStatus `json:"status,omitempty"`
}

FlinkClusterSpec

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// FlinkClusterSpec defines the desired state of FlinkCluster
type FlinkClusterSpec struct {
    Image ImageSpec `json:"image"`
    ServiceAccountName *string `json:"serviceAccountName,omitempty"`
    BatchSchedulerName *string `json:"batchSchedulerName,omitempty"`
    JobManager JobManagerSpec `json:"jobManager"`
    TaskManager TaskManagerSpec `json:"taskManager"`
    Job *JobSpec `json:"job,omitempty"`
    EnvVars []corev1.EnvVar `json:"envVars,omitempty"`
    EnvFrom []corev1.EnvFromSource `json:"envFrom,omitempty"`
    FlinkProperties map[string]string `json:"flinkProperties,omitempty"`
    HadoopConfig *HadoopConfig `json:"hadoopConfig,omitempty"`
    GCPConfig *GCPConfig `json:"gcpConfig,omitempty"`
    LogConfig map[string]string `json:"logConfig,omitempty"`
    RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
    RecreateOnUpdate *bool `json:"recreateOnUpdate,omitempty"`
}

FlinkClusterStatus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// FlinkClusterStatus defines the observed state of FlinkCluster
type FlinkClusterStatus struct {
    State string `json:"state"`
    Components FlinkClusterComponentsStatus `json:"components"`
    Control *FlinkClusterControlStatus `json:"control,omitempty"`
    Savepoint *SavepointStatus `json:"savepoint,omitempty"`
    CurrentRevision string `json:"currentRevision,omitempty"`
    NextRevision string `json:"nextRevision,omitempty"`
    CollisionCount *int32 `json:"collisionCount,omitempty"`
    LastUpdateTime string `json:"lastUpdateTime,omitempty"`
}

JobManagerSpec

JobManagerSpec 主要描述了一个 Flink > JobManager 的属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// JobManagerSpec defines properties of JobManager.type
JobManagerSpec struct {
    Replicas *int32 `json:"replicas,omitempty"`
    AccessScope string `json:"accessScope"`
    Ingress *JobManagerIngressSpec `json:"ingress,omitempty"`
    Ports JobManagerPorts `json:"ports,omitempty"`
    ExtraPorts []NamedPort `json:"extraPorts,omitempty"`
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
    MemoryOffHeapRatio *int32 `json:"memoryOffHeapRatio,omitempty"`
    MemoryOffHeapMin resource.Quantity `json:"memoryOffHeapMin,omitempty"`
    Volumes []corev1.Volume `json:"volumes,omitempty"`
    VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
    VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`
    InitContainers []corev1.Container `json:"initContainers,omitempty"`
    NodeSelector map[string]string `json:"nodeSelector,omitempty"`
    Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
    Sidecars []corev1.Container `json:"sidecars,omitempty"`
    PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
    SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
    PodLabels map[string]string `json:"podLabels,omitempty"`
}

TaskManagerSpec

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// TaskManagerSpec defines properties of TaskManager.type
TaskManagerSpec struct {  
    Replicas int32 `json:"replicas"`  
    Ports TaskManagerPorts `json:"ports,omitempty"`  
    ExtraPorts []NamedPort `json:"extraPorts,omitempty"`  
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`  
    MemoryOffHeapRatio *int32 `json:"memoryOffHeapRatio,omitempty"`  
    MemoryOffHeapMin resource.Quantity `json:"memoryOffHeapMin,omitempty"`  
    Volumes []corev1.Volume `json:"volumes,omitempty"`  
    VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`  
    VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`   
    InitContainers []corev1.Container `json:"initContainers,omitempty"`  
    NodeSelector map[string]string `json:"nodeSelector,omitempty"`  
    Tolerations []corev1.Toleration `json:"tolerations,omitempty"`  
    Sidecars []corev1.Container `json:"sidecars,omitempty"`   
    PodAnnotations map[string]string `json:"podAnnotations,omitempty"`  
    SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`  
    PodLabels map[string]string `json:"podLabels,omitempty"`  
}

JobSpec

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// JobSpec defines properties of a Flink job.type 
JobSpec struct {  
    JarFile string `json:"jarFile"`  
    ClassName *string `json:"className,omitempty"`  
    Args []string `json:"args,omitempty"`  
    FromSavepoint *string `json:"fromSavepoint,omitempty"`   
    AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`  
    TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`  
    SavepointsDir *string `json:"savepointsDir,omitempty"`   
    AutoSavepointSeconds *int32 `json:"autoSavepointSeconds,omitempty"`  
    SavepointGeneration int32 `json:"savepointGeneration,omitempty"`  
    Parallelism *int32 `json:"parallelism,omitempty"`  
    NoLoggingToStdout *bool `json:"noLoggingToStdout,omitempty"`  
    Volumes []corev1.Volume `json:"volumes,omitempty"`  
    VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`  
    InitContainers []corev1.Container `json:"initContainers,omitempty"`  
    RestartPolicy *JobRestartPolicy `json:"restartPolicy"`   
    CleanupPolicy *CleanupPolicy `json:"cleanupPolicy,omitempty"`  
    CancelRequested *bool `json:"cancelRequested,omitempty"`   
    PodAnnotations map[string]string `json:"podAnnotations,omitempty"`   
    PodLabels map[string]string `json:"podLabels,omitempty"`  
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`  
    SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`  
}

Architecture

  1. The Flink Operator (including CRD and Controller) has been deployed in the cluster.
  2. The user runs kubectl apply -f myjobcluster.yaml which sends a FlinkClusterspec to the API server.
  3. API server validates the spec against on the CRD, then creates a FlinkClusterCR and stores it in etcd.
  4. A FlinkClusterADDED event is triggered by Kubernetes and dispatched to the Flink Controller.

  1. The Flink Controller analyzes the FlinkClusterCR, then calls the API server to create the underlying primitive resources (JobManager service, JobManager deployment, TaskManager deployment).
  2. The controller implements the reconciliation loop: watches the status changes of the primitive resources, updates the status field of the CR accordingly, continuously take actions to drive the observed state to the desired state when needed.

2023-02-01_flink-on-k8s-2
2023-02-01_flink-on-k8s-2

  1. When JobManager deployment, JobManager service, TaskManager deployment are all ready, the controller creates a Flink job submitter which submits the job to Flink REST API through the JobManager service.

  2. The JobSubmitter keeps polling the job status from the Flink REST API, finishes itself when the job is completed or failed.

  3. After the job is done, the controller deletes all the resources (JM, TM) for the job, but the job cluster metadata is kept.

    2023-02-01_flink-on-k8s-3
    2023-02-01_flink-on-k8s-3