diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index deedb3027a900..5c117569262dd 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -140,6 +140,12 @@ under the License.
+
+ org.apache.flink
+ flink-kubernetes_${scala.binary.version}
+ ${project.version}
+
+
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh
index a5168fe9562c8..dd56450b24be0 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-console.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -56,6 +56,8 @@ case $SERVICE in
;;
esac
+CLASS_TO_RUN=${FLINK_CLASS_TO_RUN:-$CLASS_TO_RUN}
+
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
diff --git a/flink-kubernetes/README.md b/flink-kubernetes/README.md
new file mode 100644
index 0000000000000..53e2d8abfe811
--- /dev/null
+++ b/flink-kubernetes/README.md
@@ -0,0 +1,71 @@
+# Flink On Kubernetes
+
+## Job manager
+Job manager is run as a pod of corresponding deployment which is supposed to be prepared beforehand.
+To create the deployment and a service for it the following templates (under `flink-kubernets/templates` path)
+should be used:
+```
+jobmanager-deployment.yaml
+jobmanager-service.yaml
+```
+Example:
+```
+kubectl create -f jobmanager-deployment.yaml
+kubectl create -f jobmanager-service.yaml
+```
+That creates the deployment with one job manager and a service around it that exposes
+(ClusterIP, NodePort, LoadBalancer, ExternalName) the job manager.
+
+## Task Manager
+Task manager is a temporary essence and is created (and deleted) by a job manager for a particular slot.
+No deployments/jobs/services are created for a task manager, only pods.
+A template for a task manager is hardcoded into the implementation
+(`org.apache.flink.kubernetes.client.DefaultKubernetesClient`).
+
+For every slot request the job manager passes a resource profile to a resource manager
+(`org.apache.flink.kubernetes.KubernetesResourceManager`). The resource profile contains specific hardware requirements
+(CPU, Memory and other). All these requirements are included into the pod template as labels thus they could be used for
+binding specific pods onto specific VMs.
+
+## Resource Profile
+A resource profile might be set to a `StreamTransformation` by calling a corresponding method. A resource profile has
+cpu cores, heap memory, direct memory, native memory, network memory and extended resources (GPU and user defined).
+Only `StreamTransformation.minResources` is used for a pod template.
+
+### Resource Profile Configuration Example
+TBD
+
+## Kubernetes Resource Management
+Resource management uses a default service account every pod contains. It should have admin privileges to be able
+to create and delete pods:
+```
+kubectl create clusterrolebinding serviceaccounts-cluster-admin \
+ --clusterrole=cluster-admin \
+ --group=system:serviceaccounts
+```
+
+## Build and run
+The implementation is based on existing mechanism of packaging (maven) and containerization (docker).
+
+Prepare a package first:
+```mvn clean package -DskipTests```
+Then in needs to be containerized:
+```cd flink-contrib/docker-flink/
+sh build.sh --from-local-dist --image-name flink-mmx
+```
+And uploaded to gcp (for example):
+```
+docker tag flink-mmx gcr.io/metamarkets-prod-xpn-host/flink-mmx
+gcloud docker -- push gcr.io/metamarkets-prod-xpn-host/flink-mmx
+```
+If minikube is used then a container image should be uploaded into minikube node.
+So before building a container image a docker env is supposed to be exported:
+```
+eval $(minikube docker-env)
+```
+Job manager deployment and service:
+```
+cd ../../flink-kubernetes/templates/
+kubectl create -f jobmanager-deployment.yaml
+kubectl create -f jobmanager-service.yaml
+```
diff --git a/flink-kubernetes/pom.xml b/flink-kubernetes/pom.xml
new file mode 100644
index 0000000000000..fec076c9a6ba2
--- /dev/null
+++ b/flink-kubernetes/pom.xml
@@ -0,0 +1,215 @@
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-parent
+ 1.7-SNAPSHOT
+ ..
+
+
+ flink-kubernetes_${scala.binary.version}
+ flink-kubernetes
+ jar
+
+
+ 1.0.1
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-runtime_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+
+ org.scala-lang
+ scala-library
+
+
+
+ io.kubernetes
+ client-java
+ 4.0.0
+ compile
+
+
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+ dependency-convergence
+
+ enforce
+
+
+ true
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ scala-compile-first
+ process-resources
+
+ compile
+
+
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ -Xms128m
+ -Xmx512m
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+ true
+
+ org.scala-ide.sdt.core.scalanature
+ org.eclipse.jdt.core.javanature
+
+
+ org.scala-ide.sdt.core.scalabuilder
+
+
+ org.scala-ide.sdt.launching.SCALA_CONTAINER
+ org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+ org.scala-lang:scala-library
+ org.scala-lang:scala-compiler
+
+
+ **/*.scala
+ **/*.java
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.7
+
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/main/scala
+
+
+
+
+
+ add-test-source
+ generate-test-sources
+
+ add-test-source
+
+
+
+ src/test/scala
+
+
+
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+
+ ${project.basedir}/../tools/maven/scalastyle-config.xml
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java
new file mode 100644
index 0000000000000..e0a6b81a4bce9
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Parameters that will be used in Flink on k8s cluster.
+ * */
+public class FlinkKubernetesOptions {
+
+ private Configuration configuration;
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public FlinkKubernetesOptions(Configuration configuration) {
+ this.configuration = configuration;
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
new file mode 100644
index 0000000000000..6e92b087e91bd
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -0,0 +1,126 @@
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.client.DefaultKubernetesClient;
+import org.apache.flink.kubernetes.client.KubernetesClient;
+import org.apache.flink.kubernetes.client.exception.KubernetesClientException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Kubernetes Resource Manager.
+ */
+public class KubernetesResourceManager extends ResourceManager {
+ public static final String FLINK_TM_IMAGE = "FLINK_TM_IMAGE";
+ public static final String FLINK_NAMESPACE = "FLINK_NAMESPACE";
+ public static final String FLINK_TM_RESOURCE_ID = "FLINK_TM_RESOURCE_ID";
+ public static final String FLINK_CLASS_TO_RUN = "FLINK_CLASS_TO_RUN";
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+ private final Map environment;
+
+ /**
+ * Client to communicate with the Node manager and launch TaskExecutor processes.
+ */
+ private KubernetesClient nodeManagerClient;
+
+ public KubernetesResourceManager(
+ RpcService rpcService,
+ String resourceManagerEndpointId,
+ ResourceID resourceId,
+ Configuration flinkConfig,
+ Map env,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ SlotManager slotManager,
+ MetricRegistry metricRegistry,
+ JobLeaderIdService jobLeaderIdService,
+ ClusterInformation clusterInformation,
+ FatalErrorHandler fatalErrorHandler,
+ JobManagerMetricGroup jobManagerMetricGroup
+ ) {
+ super(
+ rpcService,
+ resourceManagerEndpointId,
+ resourceId,
+ highAvailabilityServices,
+ heartbeatServices,
+ slotManager,
+ metricRegistry,
+ jobLeaderIdService,
+ clusterInformation,
+ fatalErrorHandler,
+ jobManagerMetricGroup
+ );
+ this.environment = env;
+ }
+
+ @Override
+ protected void initialize() throws ResourceManagerException {
+ try {
+ nodeManagerClient = new DefaultKubernetesClient(environment);
+ } catch (IOException e) {
+ throw new ResourceManagerException("Error while initializing K8s client", e);
+ }
+ }
+
+ @Override
+ protected void internalDeregisterApplication(
+ ApplicationStatus finalStatus, @Nullable String optionalDiagnostics
+ ) {
+ LOG.info("Shutting down and cleaning the cluster up.");
+ nodeManagerClient.stopAndCleanupCluster(null);
+ }
+
+ @Override
+ public Collection startNewWorker(ResourceProfile resourceProfile) {
+ LOG.info("Starting a new worker.");
+ try {
+ nodeManagerClient.createClusterPod(resourceProfile);
+ } catch (KubernetesClientException e) {
+ throw new RuntimeException("Could not start a new worker", e);
+ }
+ return Collections.singletonList(resourceProfile);
+ }
+
+ @Override
+ public boolean stopWorker(ResourceID worker) {
+ LOG.info("Stopping worker [{}].", worker.getResourceID());
+ try {
+ nodeManagerClient.terminateClusterPod(worker);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Could not terminate a worker", e);
+ return false;
+ }
+ }
+
+ @Override
+ protected ResourceID workerStarted(ResourceID resourceID) {
+ // Hooray it started!
+ return resourceID;
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java
new file mode 100644
index 0000000000000..8d20d3547e9fb
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java
@@ -0,0 +1,99 @@
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+
+/**
+ * This class is the executable entry point for running a TaskExecutor in a Kubernetes container.
+ * It duplicates an entry point of {@link org.apache.flink.runtime.taskexecutor.TaskManagerRunner}
+ */
+public class KubernetesTaskManagerRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskManagerRunner.class);
+
+ /**
+ * The process environment variables.
+ */
+ private static final Map ENV = System.getenv();
+
+ /**
+ * The exit code returned if the initialization of the Kubernetes task executor runner failed.
+ */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ // ------------------------------------------------------------------------
+ // Program entry point
+ // ------------------------------------------------------------------------
+
+ /**
+ * The entry point for the Kubernetes task executor runner.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
+
+ if (maxOpenFileHandles != -1L) {
+ LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
+ } else {
+ LOG.info("Cannot determine the maximum number of open file descriptors");
+ }
+
+ final Configuration configuration = TaskManagerRunner.loadConfiguration(args);
+
+ try {
+ FileSystem.initialize(configuration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+
+ SecurityUtils.install(new SecurityConfiguration(configuration));
+
+ LOG.info("All environment variables: {}", ENV);
+
+ try {
+
+ SecurityUtils.getInstalledContext().runSecured((Callable) () -> {
+ TaskManagerRunner.runTaskManager(configuration, new ResourceID(getContainerId()));
+ return null;
+ });
+ } catch (Throwable t) {
+ final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+ // make sure that everything whatever ends up in the log
+ LOG.error("Kubernetes TaskManager initialization failed.", strippedThrowable);
+ System.exit(INIT_ERROR_EXIT_CODE);
+ }
+ }
+
+ private static String getContainerId() {
+ if (ENV.containsKey(KubernetesResourceManager.FLINK_TM_RESOURCE_ID)) {
+ return ENV.get(KubernetesResourceManager.FLINK_TM_RESOURCE_ID);
+ } else {
+ LOG.warn("ResourceID env variable {} is not found. Generating resource id",
+ KubernetesResourceManager.FLINK_TM_RESOURCE_ID);
+ return ResourceID.generate().getResourceIdString();
+ }
+ }
+
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java
new file mode 100644
index 0000000000000..a9d4ec50c5cf2
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java
@@ -0,0 +1,192 @@
+package org.apache.flink.kubernetes.client;
+
+import org.apache.flink.kubernetes.KubernetesResourceManager;
+import org.apache.flink.kubernetes.client.exception.KubernetesClientException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import com.google.gson.JsonSyntaxException;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1ContainerPort;
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1EnvVar;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.util.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.NotImplementedError;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Kubernetes Client.
+ * It uses default service client to operate with kubernetes abstractions.
+ */
+public class DefaultKubernetesClient implements KubernetesClient {
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesClient.class);
+
+ private final CoreV1Api coreV1Api;
+ private final String flinkTMImageName;
+ private final String flinkNamespace;
+ private final Map podResourceProfiles = new HashMap<>();
+
+ public DefaultKubernetesClient(Map environment) throws IOException {
+ //TODO: add a property to specify a user
+ // Default user is used for managing deployments and their pods
+ // Make sure default user has enough permissions for doing that
+ final ApiClient apiClient = Config.defaultClient();
+ io.kubernetes.client.Configuration.setDefaultApiClient(apiClient);
+ this.coreV1Api = new CoreV1Api(apiClient);
+ this.flinkNamespace = checkNotNull(environment.get(KubernetesResourceManager.FLINK_NAMESPACE));
+ this.flinkTMImageName = checkNotNull(environment.get(KubernetesResourceManager.FLINK_TM_IMAGE));
+ }
+
+ @Override
+ public Endpoint createClusterService() {
+ throw new NotImplementedError();
+ }
+
+ @Override
+ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException {
+ final ResourceID resourceID = ResourceID.generate();
+ final String podName = getPodName(resourceID);
+ LOG.info("Creating a cluster pod [{}] for a resource profile [{}]", podName, resourceProfile);
+ //TODO: Place template into a resource file or somewhere into config file
+ V1Pod body = new V1Pod()
+ .apiVersion("v1")
+ .kind("Pod")
+ .metadata(
+ new V1ObjectMeta()
+ .name(podName)
+ .labels(new HashMap() {{
+ put("app", "flink");
+ put("component", "taskmanager");
+ put("role", "taskmanager");
+ put("ResourceId", resourceID.getResourceIdString());
+ put("CpuCores", String.valueOf(resourceProfile.getCpuCores()));
+ put("MemoryInMB", String.valueOf(resourceProfile.getMemoryInMB()));
+ put("HeapMemoryInMB", String.valueOf(resourceProfile.getHeapMemoryInMB()));
+ put("DirectMemoryInMB", String.valueOf(resourceProfile.getDirectMemoryInMB()));
+ put("NativeMemoryInMB", String.valueOf(resourceProfile.getNativeMemoryInMB()));
+ put("NetworkMemoryInMB", String.valueOf(resourceProfile.getNetworkMemoryInMB()));
+ put("OperatorsMemoryInMB", String.valueOf(resourceProfile.getOperatorsMemoryInMB()));
+ resourceProfile.getExtendedResources().forEach(
+ (key, resource) -> {
+ put(key, resource.getName());
+ put(resource.getName(), String.valueOf(resource.getValue()));
+ }
+ );
+ }}
+ ))
+ .spec(
+ //TODO: Add resource spec (CPU, Memory) and an option to turn the feature on/off
+ new V1PodSpec()
+ .containers(Collections.singletonList(
+ new V1Container()
+ .name("taskmanager")
+ .image(flinkTMImageName)
+ .imagePullPolicy("IfNotPresent")
+ .args(Collections.singletonList("taskmanager"))
+ .ports(Arrays.asList(
+ new V1ContainerPort().containerPort(6121).name("data"),
+ new V1ContainerPort().containerPort(6122).name("rpc"),
+ new V1ContainerPort().containerPort(6125).name("query")
+ ))
+ .env(Arrays.asList(
+ new V1EnvVar()
+ .name("JOB_MANAGER_RPC_ADDRESS")
+ .value("flink-jobmanager"),
+ new V1EnvVar()
+ .name(KubernetesResourceManager.FLINK_TM_RESOURCE_ID)
+ .value(resourceID.getResourceIdString()),
+ new V1EnvVar()
+ .name(KubernetesResourceManager.FLINK_CLASS_TO_RUN)
+ .value("org.apache.flink.kubernetes.KubernetesTaskManagerRunner")
+ ))
+ )));
+ try {
+ coreV1Api.createNamespacedPod(flinkNamespace, body, false, null, null);
+ podResourceProfiles.put(podName, resourceProfile);
+ } catch (ApiException e) {
+ final String message = String.format("Cannot create a pod for resource profile [%s]", resourceProfile);
+ throw new KubernetesClientException(message, e);
+ }
+ }
+
+ /**
+ * Terminates a cluster pod.
+ * @param resourceID cluster pod id in terms of flink
+ * @throws KubernetesClientException
+ */
+ @Override
+ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException {
+ final String podName = getPodName(resourceID);
+ LOG.info("Terminating a cluster pod [{}] for a resource profile [{}]", podName, resourceID);
+ deleteNamespacedPod(podName);
+ podResourceProfiles.remove(podName);
+ }
+
+ /**
+ * Stops and cleans up a cluster.
+ * @param clusterId cluster id
+ */
+ @Override
+ public void stopAndCleanupCluster(String clusterId) {
+ LOG.info("Stopping the cluster and deleting all its task manager pods");
+ podResourceProfiles.forEach(
+ (podName, resourceProfile) -> {
+ try {
+ deleteNamespacedPod(podName);
+ } catch (KubernetesClientException e) {
+ LOG.warn("Could not delete a pod [{}]", podName);
+ }
+ }
+ );
+ podResourceProfiles.clear();
+ }
+
+ @Override
+ public Endpoint getResetEndpoint(String flinkClusterId) {
+ throw new NotImplementedError();
+ }
+
+ @Override
+ public void close() {
+ throw new NotImplementedError();
+ }
+
+ private String getPodName(ResourceID resourceId) {
+ return "flink-taskmanager-" + resourceId.getResourceIdString();
+ }
+
+ private void deleteNamespacedPod(String podName) throws KubernetesClientException {
+ try {
+ V1DeleteOptions body = new V1DeleteOptions().gracePeriodSeconds(0L).orphanDependents(false);
+ coreV1Api.deleteNamespacedPod(podName, flinkNamespace, body, null, null, null, null, null);
+ } catch (ApiException e) {
+ if (e.getMessage().equals("Not Found")) {
+ LOG.warn("Could not delete a pod [{}] as it was not found", podName);
+ } else {
+ final String message =
+ String.format("Could not delete a pod [%s] for resource profile [%s]", podName, flinkNamespace);
+ throw new KubernetesClientException(message, e);
+ }
+ } catch (JsonSyntaxException e) {
+ // It's a known issue until the Swagger spec is updated to OpenAPI 3.0
+ // https://github.com/kubernetes-client/java/issues/86
+ // Simply ignoring the exception
+ }
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java
new file mode 100644
index 0000000000000..21cae78619cf9
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.client;
+
+/**
+ * Represents an endpoint.
+ * */
+public class Endpoint {
+
+ private String address;
+
+ private int port;
+
+ public Endpoint(String address, int port) {
+ this.address = address;
+ this.port = port;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java
new file mode 100644
index 0000000000000..9e925ac04d89b
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.client;
+
+import org.apache.flink.kubernetes.client.exception.KubernetesClientException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+/**
+ * The client to talk with kubernetes.
+ * */
+public interface KubernetesClient extends AutoCloseable {
+
+ /**
+ * Create kubernetes services and expose endpoints for access outside cluster.
+ */
+ Endpoint createClusterService();
+
+ /**
+ * Create cluster pod.
+ */
+ void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException;
+
+
+ /**
+ * Terminate a cluster pod.
+ * @param resourceID cluster pod id
+ */
+ void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException;
+
+ /**
+ * Stop cluster and clean up all resources, include services, auxiliary services and all running pods.
+ * */
+ void stopAndCleanupCluster(String clusterId);
+
+ /**
+ * Retrieval rest endpoint of the giving flink clusterId.
+ */
+ Endpoint getResetEndpoint(String flinkClusterId);
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java
new file mode 100644
index 0000000000000..2283e2733df88
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java
@@ -0,0 +1,14 @@
+package org.apache.flink.kubernetes.client.exception;
+
+/**
+ * Kubernetes Client Exception.
+ */
+public class KubernetesClientException extends Exception {
+ public KubernetesClientException(String message) {
+ super(message);
+ }
+
+ public KubernetesClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java
new file mode 100644
index 0000000000000..8d80958334989
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.cluster;
+
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.kubernetes.FlinkKubernetesOptions;
+import org.apache.flink.kubernetes.client.Endpoint;
+import org.apache.flink.kubernetes.client.KubernetesClient;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Kubernetes specific {@link ClusterDescriptor} implementation.
+ * This class is responsible for cluster creation from scratch
+ * and communication with its api
+ */
+public class KubernetesClusterDescriptor implements ClusterDescriptor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class);
+
+ private static final String CLUSTER_ID_PREFIX = "flink-session-cluster-";
+
+ private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster";
+
+ private FlinkKubernetesOptions options;
+
+ private KubernetesClient client;
+
+ public KubernetesClusterDescriptor(@Nonnull FlinkKubernetesOptions options, @Nonnull KubernetesClient client) {
+ this.options = options;
+ this.client = client;
+ }
+
+ @Override
+ public String getClusterDescription() {
+ return CLUSTER_DESCRIPTION;
+ }
+
+ @Override
+ public ClusterClient retrieve(String clusterId) throws ClusterRetrieveException {
+ try {
+ Endpoint clusterEndpoint = client.getResetEndpoint(clusterId);
+ return createClusterEndpoint(clusterEndpoint, clusterId);
+ } catch (Exception e) {
+ throw new ClusterRetrieveException("Could not create the RestClusterClient", e);
+ }
+ }
+
+ @Override
+ public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification)
+ throws ClusterDeploymentException {
+ String clusterId = generateClusterId();
+ return deployClusterInternal(clusterId, null);
+ }
+
+ @Override
+ public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void killCluster(String clusterId) throws FlinkException {
+ try {
+ client.stopAndCleanupCluster(clusterId);
+ } catch (Exception e) {
+ throw new FlinkException(String.format("Could not create Kubernetes cluster [%s]", clusterId), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close Kubernetes client: {}", e.toString());
+ }
+ }
+
+ private String generateClusterId() {
+ return CLUSTER_ID_PREFIX + UUID.randomUUID();
+ }
+
+ private ClusterClient createClusterEndpoint(Endpoint clusterEndpoint, String clusterId) throws Exception {
+ Configuration configuration = new Configuration(options.getConfiguration());
+ configuration.setString(JobManagerOptions.ADDRESS, clusterEndpoint.getAddress());
+ configuration.setInteger(JobManagerOptions.PORT, clusterEndpoint.getPort());
+ return new RestClusterClient<>(configuration, clusterId);
+ }
+
+ @Nonnull
+ private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException {
+ try {
+ Endpoint clusterEndpoint = client.createClusterService();
+ client.createClusterPod(ResourceProfile.UNKNOWN);
+ return createClusterEndpoint(clusterEndpoint, clusterId);
+ } catch (Exception e) {
+ tryKillCluster(clusterId);
+ throw new ClusterDeploymentException(String.format("Could not create Kubernetes cluster [%s]", clusterId), e);
+ }
+ }
+
+ /**
+ * Try to kill cluster without throw exception.
+ */
+ private void tryKillCluster(String clusterId) {
+ try {
+ killCluster(clusterId);
+ } catch (Exception e) {
+ LOG.error("Could not kill a cluster [{}]: {}", clusterId, e.toString());
+ }
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
new file mode 100644
index 0000000000000..4aec9d670f925
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
@@ -0,0 +1,63 @@
+package org.apache.flink.kubernetes.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesResourceManager;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * Kubernetes Resource Manager Factory.
+ */
+public enum KubernetesResourceManagerFactory implements ResourceManagerFactory {
+ INSTANCE;
+
+ @Override
+ public ResourceManager createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl,
+ JobManagerMetricGroup jobManagerMetricGroup
+ ) throws Exception {
+ final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration
+ .fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+ rmServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor()
+ );
+
+ return new KubernetesResourceManager(
+ rpcService,
+ ResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ configuration,
+ System.getenv(),
+ highAvailabilityServices,
+ heartbeatServices,
+ rmRuntimeServices.getSlotManager(),
+ metricRegistry,
+ rmRuntimeServices.getJobLeaderIdService(),
+ clusterInformation,
+ fatalErrorHandler,
+ jobManagerMetricGroup
+ );
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java
new file mode 100644
index 0000000000000..e19c4da1251dd
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfigurationParserFactory;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+
+/**
+ * Entrypoint for a Kubernetes session cluster.
+ */
+public class KubernetesSessionClusterEntrypoint extends SessionClusterEntrypoint {
+
+ private KubernetesSessionClusterEntrypoint(Configuration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected DispatcherResourceManagerComponentFactory> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+ return new SessionDispatcherResourceManagerComponentFactory(KubernetesResourceManagerFactory.INSTANCE);
+ }
+
+ public static void main(String[] args) {
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, KubernetesSessionClusterEntrypoint.class.getSimpleName(), args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ EntrypointClusterConfiguration entrypointClusterConfiguration = null;
+ final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
+
+ try {
+ entrypointClusterConfiguration = commandLineParser.parse(args);
+ } catch (FlinkParseException e) {
+ LOG.error("Could not parse command line arguments {}.", args, e);
+ commandLineParser.printHelp(KubernetesSessionClusterEntrypoint.class.getSimpleName());
+ System.exit(1);
+ }
+
+ LOG.info("Started {}.", KubernetesSessionClusterEntrypoint.class.getSimpleName());
+
+ Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
+ final KubernetesSessionClusterEntrypoint entrypoint =
+ new KubernetesSessionClusterEntrypoint(configuration);
+ ClusterEntrypoint.runClusterEntrypoint(entrypoint);
+ }
+}
diff --git a/flink-kubernetes/templates/jobmanager-deployment.yaml b/flink-kubernetes/templates/jobmanager-deployment.yaml
new file mode 100644
index 0000000000000..3802f31c7d869
--- /dev/null
+++ b/flink-kubernetes/templates/jobmanager-deployment.yaml
@@ -0,0 +1,36 @@
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: flink-jobmanager
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: flink
+ component: jobmanager
+ spec:
+ containers:
+ - name: jobmanager
+ image: flink-mmx:latest
+ imagePullPolicy: IfNotPresent
+ args:
+ - jobmanager
+ ports:
+ - containerPort: 6123
+ name: rpc
+ - containerPort: 6124
+ name: blob
+ - containerPort: 6125
+ name: query
+ - containerPort: 8081
+ name: ui
+ env:
+ - name: JOB_MANAGER_RPC_ADDRESS
+ value: flink-jobmanager
+ - name: FLINK_CLASS_TO_RUN
+ value: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
+ - name: FLINK_NAMESPACE
+ value: default
+ - name: FLINK_TM_IMAGE
+ value: flink-mmx:latest
diff --git a/flink-kubernetes/templates/jobmanager-service.yaml b/flink-kubernetes/templates/jobmanager-service.yaml
new file mode 100644
index 0000000000000..57182037fdf4c
--- /dev/null
+++ b/flink-kubernetes/templates/jobmanager-service.yaml
@@ -0,0 +1,23 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: flink-jobmanager
+spec:
+ ports:
+ - name: rpc
+ port: 6123
+ # nodePort: 6123
+ - name: blob
+ port: 6124
+ # nodePort: 6124
+ - name: query
+ port: 6125
+ # nodePort: 6125
+ - name: ui
+ port: 8081
+ nodePort: 30000
+ type: NodePort
+ selector:
+ app: flink
+ component: jobmanager
+
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 5b133e7b624fb..4b5d6d9461518 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -300,7 +300,7 @@ public String toString() {
'}';
}
- static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
+ static public ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
Map copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
return new ResourceProfile(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e3b501e52e837..55de1c1f38ba5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -515,7 +515,7 @@ public CompletableFuture allocateAndAssignSlotForExecution(
toSchedule,
queued,
new SlotProfile(
- ResourceProfile.UNKNOWN,
+ ResourceProfile.fromResourceSpec(vertex.getJobVertex().getJobVertex().getMinResources(), 0),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
index 282fd2ccf4e3b..5d7f54d52d508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -67,4 +68,10 @@ public int getPhysicalSlotNumber() {
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
+
+ @Override
+ public ResourceProfile getResourceProfile()
+ {
+ return ResourceProfile.ANY;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
index 8777edd2fe7b3..1e35ac8d64c4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
/**
@@ -35,4 +36,11 @@ public interface SlotContext extends SlotInfo {
* @return The gateway that can be used to send messages to the TaskManager.
*/
TaskManagerGateway getTaskManagerGateway();
+
+ /**
+ * Gets the resource profile of the underlying allocated slot
+ *
+ * @return The resource profile that can be used to define if the slot has the requested resources
+ */
+ ResourceProfile getResourceProfile();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index af5582752d928..83613bec73267 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -188,8 +188,13 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy
slotProfile,
() -> resolvedRootSlotsValues.stream().flatMap(Collection::stream),
(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
- (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
- MultiTaskSlotLocality::of);
+ (MultiTaskSlot multiTaskSlot) ->
+ !multiTaskSlot.contains(groupId) && multiTaskSlot.getSlotContextFuture()
+ .join()
+ .getResourceProfile()
+ .isMatching(slotProfile.getResourceProfile()),
+ MultiTaskSlotLocality::of
+ );
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index e06be5329cf5a..30f68d92ecde0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -309,7 +309,7 @@ public Void call() throws Exception {
}
}
- private static Configuration loadConfiguration(String[] args) throws FlinkParseException {
+ public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());
final ClusterConfiguration clusterConfiguration;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 683b0cde52f7a..0dd5eedd1980e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -162,7 +162,7 @@ public TaskManagerLocation addTaskManager(int numberSlots) {
final SlotOffer slotOffer = new SlotOffer(
new AllocationID(),
i,
- ResourceProfile.UNKNOWN);
+ ResourceProfile.ANY);
slotOffers.add(slotOffer);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
index c5beda44d17ec..72d92b5ef771e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
@@ -319,5 +320,11 @@ public int getPhysicalSlotNumber() {
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
+
+ @Override
+ public ResourceProfile getResourceProfile()
+ {
+ return ResourceProfile.ANY;
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 5fd305a7514d5..faf2115db56a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@ under the License.
flink-metrics
flink-yarn
flink-yarn-tests
+ flink-kubernetes
flink-fs-tests
flink-docs