请问StucturedStreaming 如何保持k8s票据
StructuredStreaming 是 Apache Spark 的一个流式处理引擎,它可以与 Kubernetes 集成使用。在与 Kubernetes 集成使用时,保持 k8s 票据的方法与在其他 Kubernetes 应用程序中保持相同。
一种常见的方法是将 k8s 票据存储在 Kubernetes Secret 中,并在应用程序部署期间将其挂载到容器中。然后,应用程序可以通过读取文件系统中的文件来访问 k8s 票据。在 Spark 中,可以使用以下配置来将 Kubernetes Secret 挂载到容器中:
spark.kubernetes.driver.secrets.[secret-name].mount.path=[mount-path-on-container]
spark.kubernetes.driver.secrets.[secret-name].mount.readOnly=[true|false]
spark.kubernetes.driver.secrets.[secret-name].items=[key1,key2,...,keyN]
其中,secret-name 是 Kubernetes Secret 的名称,mount-path-on-container 是将该 Secret 挂载到容器中的路径,readOnly 指定是否为只读访问,items 指定要从 Secret 中挂载的键列表。
然后,在应用程序中,可以使用以下代码来访问 k8s 票据:
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent
class MyStreamingQueryListener extends StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
// Access k8s token here
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
val streamingQuery = df.writeStream
.format("console")
.trigger(ProcessingTime("10 seconds"))
.option("checkpointLocation", "s3a://my-bucket/checkpoints")
.queryName("my-query")
.start()
streamingQuery.addListener(new MyStreamingQueryListener)
在 onQueryStarted 方法中,可以访问 k8s 票据并将其用于认证和授权。可以使用任何 k8s API 客户端库(如 Kubernetes Java 客户端)来访问 k8s API
原文地址: https://www.cveoy.top/t/topic/ckqX 著作权归作者所有。请勿转载和采集!