splitgraph.engine.postgres package
Submodules
splitgraph.engine.postgres.engine module
Default Splitgraph engine: uses PostgreSQL to store metadata and actual objects and an audit stored procedure to track changes, as well as the Postgres FDW interface to upload/download objects to/from other Postgres engines.
- class splitgraph.engine.postgres.engine.AuditTriggerChangeEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)
Bases:
splitgraph.engine.postgres.engine.PsycopgEngine
,splitgraph.engine.ChangeEngine
Change tracking based on an audit trigger stored procedure
- discard_pending_changes(schema: str, table: Optional[str] = None) None
Discard recorded pending changes for a tracked schema / table
- get_changed_tables(schema: str) List[str]
Get list of tables that have changed content
- get_pending_changes(schema: str, table: str, aggregate: bool = False) Union[List[Tuple[int, int]], List[Tuple[Tuple[str, ...], bool, Dict[str, Any], Dict[str, Any]]]]
Return pending changes for a given tracked table
- Parameters
schema – Schema the table belongs to
table – Table to return changes for
aggregate – Whether to aggregate changes or return them completely
- Returns
If aggregate is True: List of tuples of (change_type, number of rows). If aggregate is False: List of (primary_key, change_type, change_data)
- get_tracked_tables() List[Tuple[str, str]]
Return a list of tables that the audit trigger is working on.
- has_pending_changes(schema: str) bool
Return True if the tracked schema has pending changes and False if it doesn’t.
- track_tables(tables: List[Tuple[str, str]]) None
Install the audit trigger on the required tables
- untrack_tables(tables: List[Tuple[str, str]]) None
Remove triggers from tables and delete their pending changes
- class splitgraph.engine.postgres.engine.PostgresEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)
Bases:
splitgraph.engine.postgres.engine.AuditTriggerChangeEngine
,splitgraph.engine.ObjectEngine
An implementation of the Postgres engine for Splitgraph
- apply_fragments(objects: List[Tuple[str, str]], target_schema: str, target_table: str, extra_quals: Optional[psycopg2.sql.Composed] = None, extra_qual_args: Optional[Tuple[Any, ...]] = None, schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None, progress_every: Optional[int] = None) None
Apply multiple fragments to a target table as a single-query batch operation.
- Parameters
objects – List of tuples (object_schema, object_table) that the objects are stored in.
target_schema – Schema to apply the fragment to
target_table – Table to apply the fragment to
extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))
extra_qual_args – Optional, a tuple of arguments to use with extra_quals
schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.
progress_every – If set, will report the materialization progress via tqdm every progress_every objects.
- delete_objects(object_ids: List[str]) None
Delete one or more objects from the engine.
- Parameters
object_ids – IDs of objects to delete
- download_objects(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine) List[str]
Download objects from the remote engine to the local cache
- Parameters
objects – List of object IDs to download
remote_engine – A remote ObjectEngine to download the objects from.
:return List of object IDs that were downloaded.
- dump_object(object_id: str, stream: _io.TextIOWrapper, schema: str) None
Dump an object into a series of SQL statements
- Parameters
object_id – Object ID
stream – Text stream to dump the object into
schema – Schema the object lives in
- dump_object_creation(object_id: str, schema: str, table: Optional[str] = None, schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None, if_not_exists: bool = False) bytes
Generate the SQL that remounts a foreign table pointing to a Splitgraph object.
- Parameters
object_id – Name of the object
schema – Schema to create the table in
table – Name of the table to mount
schema_spec – Schema of the table
if_not_exists – Add IF NOT EXISTS to the DDL
- Returns
SQL in bytes format.
- get_change_key(schema: str, table: str) List[Tuple[str, str]]
Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.
- get_object_schema(object_id: str) List[splitgraph.core.types.TableColumn]
Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).
- Parameters
object_id – ID of the object
- get_object_size(object_id: str) int
Return the on-disk footprint of this object, in bytes :param object_id: ID of the object
- mount_object(object_id: str, table: None = None, schema: str = 'splitgraph_meta', schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None) None
Mount an object from local storage as a foreign table.
- Parameters
object_id – ID of the object
table – Table to mount the object into
schema – Schema to mount the object into
schema_spec – Schema of the object.
- rename_object(old_object_id: str, new_object_id: str)
- store_fragment(inserted: Any, deleted: Any, schema: str, table: str, source_schema: str, source_table: str, source_schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None) None
Store a fragment of a changed table in another table
- Parameters
inserted – List of PKs that have been updated/inserted
deleted – List of PKs that have been deleted
schema – Schema to store the change in
table – Table to store the change in
source_schema – Schema the source table is located in
source_table – Name of the source table
source_schema_spec – Schema of the source table (optional)
- store_object(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args=None, overwrite=False) None
Stores a Splitgraph object using a source query in the actual format implemented by this engine.
- Parameters
object_id – Name of the object
source_query – SELECT query that produces data required by the object
schema_spec – Schema of the source table
source_query_args – Arguments to mogrify into the source query.
overwrite – If True, will overwrite the object if it already exists.
- sync_object_mounts() None
Scan through local object storage and synchronize it with the foreign tables in splitgraph_meta (unmounting non-existing objects and mounting existing ones).
- unmount_objects(object_ids: List[str]) None
Unmount objects from splitgraph_meta (this doesn’t delete the physical files.
- upload_objects(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine) None
Upload objects from the local cache to the remote engine
- Parameters
objects – List of object IDs to upload
remote_engine – A remote ObjectEngine to upload the objects to.
- class splitgraph.engine.postgres.engine.PsycopgEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)
Bases:
splitgraph.engine.SQLEngine
Postgres SQL engine backed by a Psycopg connection.
- close() None
Commit and close the engine’s backing connection
- close_others() None
Close and release all other connections to the connection pool.
- commit() None
Commit the engine’s backing connection
- property connection: Connection
Engine-internal Psycopg connection.
- copy_cursor()
Return a cursor that can be used for copy_expert operations
- delete_database(database: str) None
Helper function to drop a database using the admin connection
- Parameters
database – Database name to drop
- dump_table_sql(schema: str, table_name: str, stream: _io.TextIOWrapper, columns: str = '*', where: str = '', where_args: Optional[Union[List[str], Tuple[str, str]]] = None, target_schema: Optional[str] = None, target_table: Optional[str] = None) None
Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)
- get_primary_keys(schema: str, table: str) List[Tuple[str, str]]
Inspects the Postgres information_schema to get the primary keys for a given table.
- in_fdw
List of notices issued by the server during the previous execution of run_sql.
- initialize(skip_object_handling: bool = False, skip_create_database: bool = False) None
Create the Splitgraph Postgres database and install the audit trigger
- Parameters
skip_object_handling – If True, skips installation of audit triggers and other object management routines for engines that don’t need change tracking or checkouts.
skip_create_database – Don’t create the Splitgraph database
- lock_table(schema: str, table: str) None
Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.
- rollback() None
Rollback the engine’s backing connection
- run_api_call(call: str, *args, schema: str = 'splitgraph_api') Any
- run_api_call_batch(call: str, argslist, schema: str = 'splitgraph_api')
- run_chunked_sql(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Sequence[Any], return_shape: Optional[splitgraph.engine.ResultShape] = ResultShape.MANY_MANY, chunk_size: int = 100, chunk_position: int = - 1) Any
Because the Splitgraph API has a request size limitation, certain SQL calls with variadic arguments are going to be too long to fit that. This function runs an SQL query against a set of broken up arguments and returns the combined result.
- run_sql(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Optional[Sequence[Any]] = None, return_shape: Optional[splitgraph.engine.ResultShape] = ResultShape.MANY_MANY, named: bool = False) Any
Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.
- run_sql_batch(statement: Union[psycopg2.sql.Composed, str], arguments: Any, schema: Optional[str] = None, max_size=261000) None
Run a parameterized SQL statement against multiple sets of arguments.
- Parameters
statement – Statement to run
arguments – Query arguments
schema – Schema to run the statement in
- property splitgraph_version: Optional[str]
Returns the version of the Splitgraph library installed on the engine and by association the version of the engine itself.
- splitgraph.engine.postgres.engine.add_ud_flag_column(table_schema: List[splitgraph.core.types.TableColumn]) List[splitgraph.core.types.TableColumn]
- splitgraph.engine.postgres.engine.chunk(sequence: Sequence[splitgraph.engine.postgres.engine.T], chunk_size: int = 100) Iterator[List[splitgraph.engine.postgres.engine.T]]
- splitgraph.engine.postgres.engine.get_change_key(schema_spec: List[splitgraph.core.types.TableColumn]) List[Tuple[str, str]]
- splitgraph.engine.postgres.engine.get_conn_str(conn_params: Dict[str, Optional[str]]) str