import asyncio import aiomysql

class Database: def init(self, host, port, user, password, db): self.host = host self.port = port self.user = user self.password = password self.db = db self.pool = None

async def connect(self):
    self.pool = await aiomysql.create_pool(
        host=self.host,
        port=self.port,
        user=self.user,
        password=self.password,
        db=self.db,
        charset='utf8mb4',
        cursorclass=aiomysql.DictCursor,
        autocommit=True)

async def create_table(self, table_name, columns):
    async with self.pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({','.join(columns)})"
            await cur.execute(sql)

async def insert(self, table_name, data):
    async with self.pool.acquire() as conn:
        async with conn.cursor() as cur:
            keys = ','.join(data.keys())
            values = ','.join(['%s' for _ in range(len(data))])
            sql = f"INSERT INTO {table_name} ({keys}) VALUES ({values})"
            await cur.execute(sql, tuple(data.values()))

async def update(self, table_name, data, condition):
    async with self.pool.acquire() as conn:
        async with conn.cursor() as cur:
            set_values = ','.join([f"{key}=%s" for key in data.keys()])
            sql = f"UPDATE {table_name} SET {set_values} WHERE {condition}"
            await cur.execute(sql, tuple(data.values()))

async def delete(self, table_name, condition):
    async with self.pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = f"DELETE FROM {table_name} WHERE {condition}"
            await cur.execute(sql)

async def select(self, table_name, columns=None, condition=None, limit=None):
    async with self.pool.acquire() as conn:
        async with conn.cursor() as cur:
            column_str = '*' if columns is None else ','.join(columns)
            limit_str = '' if limit is None else f"LIMIT {limit}"
            sql = f"SELECT {column_str} FROM {table_name}"
            if condition is not None:
                sql += f" WHERE {condition}"
            sql += f" {limit_str}"
            await cur.execute(sql)
            return await cur.fetchall()
使用aiomysql接口封装一个数据库的类。包含创建表更新数据查询数据插入数据等接口函数只需要提供表名、字段、数据自动完成sql语句的拼接字符串在拼接的时候要最大化效率。接口的参数尽可能灵活方便调用-代码需要考虑到安全性请用Python表示

原文地址: https://www.cveoy.top/t/topic/ggy 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录