splitgraph.ingestion.singer package
Subpackages
Submodules
splitgraph.ingestion.singer.common module
- splitgraph.ingestion.singer.common.log_exception(f)
Emit exceptions with full traceback instead of just the error text
- splitgraph.ingestion.singer.common.rollback_at_end(func: collections.abc.Callable) collections.abc.Callable
- splitgraph.ingestion.singer.common.store_ingestion_state(repository: splitgraph.core.repository.Repository, image_hash: str, current_state: Optional[Dict[str, Any]], new_state: str)
splitgraph.ingestion.singer.data_source module
- class splitgraph.ingestion.singer.data_source.GenericSingerDataSource(*args, **kwargs)
Bases:
splitgraph.ingestion.singer.data_source.SingerDataSource
- credentials_schema: Dict[str, Any] = {'type': 'object'}
- classmethod get_description() str
- classmethod get_name() str
- get_singer_executable()
- params_schema: Dict[str, Any] = {'properties': {'tap_path': {'type': 'string'}}, 'required': ['tap_path'], 'type': 'object'}
- class splitgraph.ingestion.singer.data_source.MySQLSingerDataSource(engine: PostgresEngine, credentials: Credentials, params: Params, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)
Bases:
splitgraph.ingestion.singer.data_source.SingerDataSource
- build_singer_catalog(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)
- credentials_schema: Dict[str, Any] = {'properties': {'password': {'type': 'string'}, 'user': {'type': 'string'}}, 'required': ['user', 'password'], 'type': 'object'}
- classmethod get_description() str
- classmethod get_name() str
- get_singer_executable()
- params_schema: Dict[str, Any] = {'properties': {'host': {'type': 'string'}, 'port': {'type': 'integer'}, 'replication_method': {'enum': ['INCREMENTAL', 'LOG_BASED', 'FULL TABLE'], 'type': 'string'}}, 'required': ['host', 'port', 'replication_method'], 'type': 'object'}
- use_legacy_stream_selection = False
- use_properties = True
- class splitgraph.ingestion.singer.data_source.SingerDataSource(engine: PostgresEngine, credentials: Credentials, params: Params, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)
Bases:
splitgraph.hooks.data_source.base.SyncableDataSource
,abc.ABC
- build_singer_catalog(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None) Dict[str, Any]
- get_singer_config()
- abstract get_singer_executable()
- introspect() IntrospectionResult
- load(repository: splitgraph.core.repository.Repository, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None) str
- sync(repository: splitgraph.core.repository.Repository, image_hash: Optional[str] = None, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None, use_state: bool = True) str
- use_legacy_stream_selection = False
- use_properties = False
- splitgraph.ingestion.singer.data_source.select_streams(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None, use_legacy_stream_selection=False) Dict[str, Any]
splitgraph.ingestion.singer.db_sync module
- class splitgraph.ingestion.singer.db_sync.DbSyncProxy(*args, **kwargs)
Bases:
target_postgres.db_sync.DbSync
- create_indices(stream)
- create_schema_if_not_exists(table_columns_cache=None)
- delete_rows(stream)
- load_csv(file, count, size_bytes)
- sync_table()
- splitgraph.ingestion.singer.db_sync.db_sync_wrapper(image: splitgraph.core.image.Image, staging_schema: str)
- splitgraph.ingestion.singer.db_sync.get_key_properties(stream_message)
Extract the PK from a stream message. Supports both legacy (“key_properties”) and new (“metadata”) Singer taps.
- splitgraph.ingestion.singer.db_sync.get_sg_schema(stream_schema_message, flattening_max_level=0)
- splitgraph.ingestion.singer.db_sync.get_table_name(stream_schema_message)
- splitgraph.ingestion.singer.db_sync.run_patched_sync(repository: splitgraph.core.repository.Repository, base_image: Optional[splitgraph.core.image.Image], new_image_hash: str, delete_old: bool, failure: str, input_stream: Optional[BinaryIO] = None, output_stream: Optional[TextIO] = None)
- splitgraph.ingestion.singer.db_sync.select_breadcrumb(stream_message, breadcrumb)