用pyspark计算postgresql的表A的B维度的C平均值。
首先,需要安装pyspark和postgresql的JDBC驱动程序。然后,可以使用以下代码来连接postgresql数据库并读取表A:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PostgreSQL to Spark") \
.config("spark.driver.extraClassPath", "/path/to/postgresql.jar") \
.getOrCreate()
url = "jdbc:postgresql://localhost:5432/dbname"
table = "A"
properties = {"user": "postgres", "password": "password"}
df = spark.read.jdbc(url=url, table=table, properties=properties)
接下来,可以使用pyspark的聚合函数来计算平均值:
from pyspark.sql.functions import avg
B = "B"
C = "C"
avg_c = df.groupBy(B).agg(avg(C)).collect()
最后,可以将结果保存到另一个postgresql表中:
result_table = "result"
avg_c_df = spark.createDataFrame(avg_c, [B, "avg_c"])
avg_c_df.write.jdbc(url=url, table=result_table, mode="overwrite", properties=properties)
完整代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder \
.appName("PostgreSQL to Spark") \
.config("spark.driver.extraClassPath", "/path/to/postgresql.jar") \
.getOrCreate()
url = "jdbc:postgresql://localhost:5432/dbname"
table = "A"
properties = {"user": "postgres", "password": "password"}
df = spark.read.jdbc(url=url, table=table, properties=properties)
B = "B"
C = "C"
avg_c = df.groupBy(B).agg(avg(C)).collect()
result_table = "result"
avg_c_df = spark.createDataFrame(avg_c, [B, "avg_c"])
avg_c_df.write.jdbc(url=url, table=result_table, mode="overwrite", properties=properties)
原文地址: http://www.cveoy.top/t/topic/mFE 著作权归作者所有。请勿转载和采集!