Source code for dbgdb.db.postgres

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Created by pat on 5/19/18
"""
.. currentmodule:: postgres
.. moduleauthor:: Pat Daburu <pat@daburu.net>

This module contains utility functions to help when working with PostgreSQL
databases.
"""
# pylint: disable=no-member
import json
from pathlib import Path
from urllib.parse import urlparse, ParseResult
from typing import Iterable
from addict import Dict
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT


# Load the Postgres phrasebook.
sql_phrasebook = Dict(
    json.loads(
        (
            Path(__file__).resolve().parent / 'postgres.json'
        ).read_text()
    )['sql']
)


[docs]def connect(url: str, dbname: str = None, autocommit: bool = False): """ Create a connection to a Postgres database. :param url: the Postgres instance URL :param dbname: the target database name (if it differs from the one specified in the URL) :param autocommit: Set the `autocommit` flag on the connection? :return: a psycopg2 connection """ # Parse the URL. (We'll need the pieces to construct an ogr2ogr connection # string.) dbp: ParseResult = urlparse(url) # Create a dictionary to hold the arguments for the connection. (We'll # unpack it later.) cnx_opt = { k: v for k, v in { 'host': dbp.hostname, 'port': int(dbp.port), 'database': dbname if dbname is not None else dbp.path[1:], 'user': dbp.username, 'password': dbp.password }.items() if v is not None } cnx = psycopg2.connect(**cnx_opt) # If the caller requested that the 'autocommit' flag be set... if autocommit: # ...do that now. cnx.autocommit = True return cnx
[docs]def db_exists(url: str, dbname: str = None, admindb: str = 'postgres') -> bool: """ Does a given database on a Postgres instance exist? :param url: the Postgres instance URL :param dbname: the name of the database to test :param admindb: the name of an existing (presumably the main) database :return: `True` if the database exists, otherwise `False` """ # Let's see what we got for the database name. _dbname = dbname # If the caller didn't specify a database name... if not _dbname: # ...let's figure it out from the URL. db: ParseResult = urlparse(url) _dbname = db.path[1:] # Now, let's do this! with connect(url=url, dbname=admindb) as cnx: with cnx.cursor() as crs: # Execute the SQL query that counts the databases with a specified # name. crs.execute( sql_phrasebook.select_db_count.format(_dbname) ) # If the count isn't zero (0) the database exists. return crs.fetchone()[0] != 0
[docs]def create_schema(url: str, schema: str): """ Create a schema in the database. :param url: the URL of the database instance :param schema: the name of the schema """ with connect(url=url) as cnx: with cnx.cursor() as crs: crs.execute(sql_phrasebook.create_schema.format(schema))
[docs]def schema_exists(url: str, schema: str): """ Does a given schema exist within a Postgres database? :param url: the Postgres instance URL and database :param schema: the name of the schema :return: `True` if the schema exists, otherwise `False` """ # If the database specified in the URL doesn't exist... if not db_exists(url=url): # ...it stands to reason that the schema cannot exist. return False # At this point, it looks as thought database exists, so let's check for # the schema. with connect(url=url) as cnx: with cnx.cursor() as crs: # Execute the SQL query that counts the schemas with a specified # name. crs.execute( sql_phrasebook.select_schema_count.format(schema) ) # If the count isn't zero (0) the database exists. return crs.fetchone()[0] != 0
[docs]def select_schema_tables(url: str, schema: str) -> Iterable[str]: """ Select the names of the tables within a given schema. :param url: the URL of the dat :param schema: the name of the schema """ with connect(url=url) as cnx: with cnx.cursor() as crs: crs.execute(sql_phrasebook.select_schema_tables.format(schema)) for row in crs: yield row[0]
[docs]def drop_schema(url: str, schema: str): """ Drop a schema from the database. :param url: the URL of the database instance :param schema: the name of the schema """ with connect(url=url, autocommit=True) as cnx: with cnx.cursor() as crs: # Execute the SQL query that counts the schemas with a specified # name. crs.execute( sql_phrasebook.drop_schema.format(schema) )
[docs]def create_db( url: str, dbname: str, admindb: str = 'postgres'): """ Create a database on a Postgres instance. :param url: the Postgres instance URL :param dbname: the name of the database :param admindb: the name of an existing (presumably the main) database :return: """ with connect(url=url, dbname=admindb) as cnx: cnx.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) with cnx.cursor() as crs: crs.execute(sql_phrasebook.create_db.format(dbname))
[docs]def create_extensions(url: str): """ Create the necessary database extensions. :param url: the URL of the database instance """ with connect(url=url, autocommit=True) as cnx: with cnx.cursor() as crs: # Make sure the extensions are installed. for sql in sql_phrasebook.create_extensions: crs.execute(sql)