65 lines
No EOL
2.3 KiB
Python
65 lines
No EOL
2.3 KiB
Python
from .ssh import SSHClient
|
|
import tempfile
|
|
import os
|
|
|
|
class DatabaseManager:
|
|
def __init__(self, config):
|
|
# Database server details
|
|
self.host = config.get('database.host', '10.32.70.54')
|
|
self.user = config.get('database.user', 'root')
|
|
self.ssh_key = config.get('proxmox.ssh_key_path')
|
|
self.client = SSHClient(self.host, self.user, self.ssh_key)
|
|
|
|
def exec_sql(self, sql):
|
|
# Use a temporary file to avoid shell quoting hell
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.sql', delete=False) as tf:
|
|
tf.write(sql)
|
|
tf_name = tf.name
|
|
|
|
try:
|
|
remote_path = f"/tmp/exec_{os.path.basename(tf_name)}"
|
|
self.client.scp_to(tf_name, remote_path)
|
|
|
|
# Ensure the postgres user can read the file
|
|
self.client.run(f"chmod 644 {remote_path}")
|
|
|
|
# Execute the SQL file as postgres user
|
|
cmd = f"su - postgres -c 'psql -f {remote_path}'"
|
|
res = self.client.run(cmd)
|
|
|
|
# Cleanup remote file
|
|
self.client.run(f"rm {remote_path}")
|
|
|
|
if res.returncode != 0:
|
|
raise RuntimeError(f"PostgreSQL command failed: {res.stderr}")
|
|
return res.stdout
|
|
finally:
|
|
if os.path.exists(tf_name):
|
|
os.remove(tf_name)
|
|
|
|
def create_database(self, db_name, owner=None):
|
|
sql = f"CREATE DATABASE {db_name};"
|
|
if owner:
|
|
sql = f"CREATE DATABASE {db_name} OWNER {owner};"
|
|
return self.exec_sql(sql)
|
|
|
|
def create_user(self, username, password):
|
|
# SQL with proper quoting for the password
|
|
sql = f"CREATE USER {username} WITH PASSWORD '{password}';"
|
|
return self.exec_sql(sql)
|
|
|
|
def grant_privileges(self, db_name, username):
|
|
sql = f"GRANT ALL PRIVILEGES ON DATABASE {db_name} TO {username};"
|
|
return self.exec_sql(sql)
|
|
|
|
def list_databases(self):
|
|
return self.exec_sql("\l")
|
|
|
|
def list_users(self):
|
|
return self.exec_sql("\du")
|
|
|
|
def drop_database(self, db_name):
|
|
return self.exec_sql(f"DROP DATABASE IF EXISTS {db_name};")
|
|
|
|
def drop_user(self, username):
|
|
return self.exec_sql(f"DROP USER IF EXISTS {username};") |