Flask 应用示例:使用 HDFS 和 Hadoop 进行数据处理
import importlib import json import os
from celery import Celery from flask import Flask, g, make_response, request from flask_sqlalchemy import SQLAlchemy from pyhdfs import HdfsClient from service_hadoop import service_hadoop_select
from core.middleware import middleware_options from router import router, CTX from core.middleware import middleware_check_permissions, toQuery
db = SQLAlchemy() app = Flask(name,static_folder='static')
数据库链接配置
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1:3306/djq' app.secret_key = 'v&#prqo7t*(!ktb+8r=+83@#_$n1pg_xig$j=6v^r3#$i)wx8@' db.init_app(app)
获得视图控制器路径
controller_dir = os.getcwd() + '\services'
模块绝对路径的数组
arr = []
def connect_db(): connection = db.engine return connection
def connect_hdfs_db(): '链接hdfs :return: ' try: connection = HdfsClient(hosts='127.0.0.1:50070') # 检查hdfs文件是否存在 rest = connection.exists('hdfs:///hadoop') if rest is False: connection.mkdirs('hdfs:///hadoop') except Exception as e: print("链接失败") connection = None return connection
@app.before_request def before_request(): # 将链接池对象加入至flask全局对象里,用于mysqlpool使用 g.db = connect_db() # 链接hdfs g.hdfs = connect_hdfs_db()
if request.method == "Origin" or request.method == "OPTIONS":
resp = make_response()
resp.status_code = 204
resp.headers['Content-Type'] = "text/html; charset=utf-8"
resp.headers['Access-Control-Allow-Origin'] = "*"
# 允许你携带Content-Type请求头
# 允许自定义前端可以添加请求头 token 字段
resp.headers['Access-Control-Allow-Headers'] = "Content-Type,Accept,Authorization,x-auth-token"
# 资格证书
resp.headers["Access-Control-Allow-Credentials"] = "true"
# 允许你发送DELETE,PUT
resp.headers['Access-Control-Allow-Methods'] = "*"
# 最大有效周期
resp.headers["Access-Control-Max-Age"] = "86400"
resp.status_code = 200
return resp
else:
middleware_check_permissions()
@app.after_request def after_request(response): g.db.dispose() # g.hdfs.close() return response
def foreach_file(path_name): for root, dirs, files in os.walk(path_name): for f in files: arr.append(os.path.join(root, f))
def load_module(f): # 将f变成相对路径 route_path = ( f.replace(controller_dir + "\", "").replace(".py", "").replace("\", "/") ) try: mod = importlib.import_module("views." + route_path) cs_controller = getattr(mod, route_path.capitalize()) # controller是view文件夹下的内容 controller = cs_controller() # 遍历出所有的controller的方法名(即action名) if route_path == "index": load_route(controller, "") else: load_route(controller, route_path) except Exception as e: print(e.args)
def load_route(controller, route_path): get = controller.config["get"] post = controller.config["post"] get_api = controller.config["get_api"] post_api = controller.config["post_api"]
# 帮助函数 ,负责添加路由urlpatterns列表,其中controller是外部变量
def append_urlpatterns(req_method, route_path_plus, action):
router_method = router(req_method, getattr(controller, action))
app.add_url_rule("/" + route_path_plus, endpoint=route_path_plus+"_" + req_method, view_func=router_method, methods=[req_method])
if hasattr(controller, "Index"):
append_urlpatterns("all", route_path, "Index")
if hasattr(controller, "Api"):
append_urlpatterns("all", "api" + "/" + route_path, "Api")
for action in get:
if action == "index":
append_urlpatterns("get", action, action.capitalize())
elif not route_path == "":
append_urlpatterns("get", route_path + "/" + action, action.capitalize())
for action in post:
if action == "index":
append_urlpatterns("post", action, action.capitalize())
else:
append_urlpatterns("post", route_path + "/" + action, action.capitalize())
for action in get_api:
if action == "index":
append_urlpatterns("get", "api" + "/" + route_path, action.capitalize())
else:
append_urlpatterns("get", "api" + "/" + route_path + "/" + action, action.capitalize())
for action in post_api:
if action == "index":
append_urlpatterns("post", "api" + "/" + route_path, action.capitalize())
else:
append_urlpatterns("post", "api" + "/" + route_path + "/" + action, action.capitalize())
foreach_file(controller_dir)
遍历模块数组,加载每个模块(内有加载路由)
for f in arr: if f.find(".pyc") == -1 and f.find("init") == -1: # pass load_module(f)
定义hadoop服务
def hadoop(): if request.method == 'POST': ctx = CTX() job = service_hadoop_select("address") rest = g.hdfs.exists(f'hdfs:///hadoop//address') if rest is False: g.hdfs.mkdirs(f'hdfs:///hadoop/address') # 读取mysql数据保存写入文件 file_path, file_name = job.make_data(ctx) # 读取文件并选择hadoop服务进行计算 job.load_args(['--runner=inline'] + [file_path]) output_data = [] with job.make_runner() as runner: # 运行hadoop runner.run() for chunk in runner.cat_output(): if chunk != b'': i = chunk.decode().split(" ") data = json.loads(i[1]) output_data.append(data) # 将计算结果写入hdfs文件数据库 g.hdfs.create(f'hdfs:///hadoop/address/{file_name}', json.dumps(output_data)) # 移除本地文件 os.remove(file_path) return output_data
添加hadoop服务的路由
app.add_url_rule("/hadoop", view_func=hadoop, methods=["POST"])
if name == 'main': app.run()
该代码使用了Flask框架,实现了一个Web应用程序。其中,Flask提供了路由、请求处理、中间件等基础功能,通过SQLAlchemy和pyhdfs库实现了数据存储和访问,通过Celery实现了异步任务。具体实现过程中,使用了装饰器、模块加载、异常处理等技术。其中,hadoop()函数实现了一个hadoop服务,通过读取MySQL数据并进行计算,将结果写入hdfs文件数据库。
原文地址: https://www.cveoy.top/t/topic/kUZ4 著作权归作者所有。请勿转载和采集!