Session#
Session#
- class pystarburst.session.SchemaDiscoveryResult(session, uri, *, catalog_name=None, schema_name='', options='')#
Bases:
object
- register_discovered_table(if_exists=SaveMode.ERRORIFEXISTS)#
Register discovered table into the metastore.
- Parameters:
if_exists –
How to behave if the table already exists:
ERRORIFEXISTS: Raise an exception.
IGNORE: Preserve current table/schema, do nothing.
OVERWRITE: Unregister current table and re-register it.
Examples
>>> # Run schema discovery >>> schema_discovery_result = session.discover(uri) # Register discovered table: >>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE") # Create Table (DataFrame) object from discovered table: >>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
- unregister_discovered_table()#
Unregister discovered table from the metastore.
Examples
>>> # Run schema discovery >>> schema_discovery_result = session.discover(uri) # Unregister discovered table: >>> schema_discovery_result.unregister_discovered_table()
- class pystarburst.session.Session(conn: ServerConnection, use_endpoint: bool | None = False, type_coercion_mode: TypeCoercionMode = TypeCoercionMode.DEFAULT)#
Bases:
object
Establishes a connection with a Trino cluster and provides methods for creating DataFrames.
When you create a
Session
object, you provide connection parameters to establish a connection with a Trino cluster (e.g. an hostname, a user name, etc.). You can specify these settings in a dict that associates connection parameters names with values. The pystarburst library uses the Trino Python Client to connect to Trino.To create a
Session
object from adict
of connection parameters:>>> connection_parameters = { ... "host": "<host_name>", ... "port": "<host_name>", ... "user": "<user_name>", ... "roles": {"system": "ROLE{analyst}"}, ... "catalog": "<catalog_name>", ... "schema": "<schema1_name>", ... } >>> session = Session.builder.configs(connection_parameters).create()
Session
contains functions to construct aDataFrame
liketable()
,sql()
andread
.A
Session
object is not thread-safe.- class SessionBuilder#
Bases:
object
Provides methods to set connection parameters and create a
Session
.- config(key: str, value: int | str) SessionBuilder #
Adds the specified connection parameter to the
SessionBuilder
configuration.
- configs(options: Dict[str, int | str]) SessionBuilder #
Adds the specified
dict
of connection parameters to theSessionBuilder
configuration.Note
Calling this method overwrites any existing connection parameters that you have already set in the SessionBuilder.
- builder: SessionBuilder#
Returns a builder you can use to set configuration properties and create a
Session
object.
- cache_results(database_name: str = None, schema_name: str = None, table_properties: Dict[str, str | bool | int | float] = None, statement_properties: Dict[str, str] | None = None) ResultCache #
Creates an instance of
ResultCache
as a context manager to capture all created tables via the cache method and delete them on context exit.While the context manager tries to clean up the data created during the session - there is no guarantee of clean-up in case of a connection/server problem and table may stay alive.
- Important notes:
The user that runs the session should have permission to create and drop tables in the selected catalog and schema
Table data can be seen by other users (depending on permissions setup) which can lead to data leakage
We recommend using a catalog that allows the creation of managed tables (Iceberg or Hive catalog without a location) otherwise data files won’t be deleted
We recommend storing temporary data in separate storage where data retention is set up
>>> with session.cache_results("iceberg", "default") as cache_results: ... df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) ... df = cache_results.cache(df) ... conditions = [col("a") == 3, col("a") == 7] ... for condition in conditions: ... df.filter(condition).show() >>> ...
- cancel_all() None #
Cancel all action methods that are running currently. This does not affect any action methods called in the future.
- close() None #
Close this session.
- createDataFrame(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame #
Creates a new DataFrame containing the specified values from the local data.
createDataFrame()
is an alias ofcreate_dataframe()
.- Parameters:
data – The local data for building a
DataFrame
.data
can only be alist
ortuple
. Every element indata
will constitute a row in the DataFrame.schema – A
StructType
containing names and data types of columns, or a list of column names, orNone
. Whenschema
is a list of column names orNone
, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.
Examples
>>> # create a dataframe with a schema >>> from pystarburst.types import IntegerType, StringType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect() [Row(A=1, B='py'), Row(A=3, B='trino')] >>> # create a dataframe by inferring a schema from the data >>> from pystarburst import Row >>> # infer schema >>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4)] >>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect() [Row(A=1, B=None), Row(A=None, B=2)]
- create_dataframe(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame #
Creates a new DataFrame containing the specified values from the local data.
createDataFrame()
is an alias ofcreate_dataframe()
.- Parameters:
data – The local data for building a
DataFrame
.data
can only be alist
ortuple
. Every element indata
will constitute a row in the DataFrame.schema – A
StructType
containing names and data types of columns, or a list of column names, orNone
. Whenschema
is a list of column names orNone
, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.
Examples
>>> # create a dataframe with a schema >>> from pystarburst.types import IntegerType, StringType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect() [Row(A=1, B='py'), Row(A=3, B='trino')] >>> # create a dataframe by inferring a schema from the data >>> from pystarburst import Row >>> # infer schema >>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4)] >>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect() [Row(A=1, B=None), Row(A=None, B=2)]
- discover(uri, *, catalog_name=None, schema_name='', options='')#
Run Schema Discovery feature on specified location.
- Parameters:
uri – URI to scan
catalog_name – catalog name to use. Must be specified here, or in connection parameters.
schema_name – schema name to use - ‘discovered’ if not provided
options – Discovery options
Invoke
register_discovered_table()
method on result object to register discovered table.Examples
>>> # Run discovery: >>> schema_discovery_result = session.discover(uri, catalog_name='iceberg', schema_name='test_schema_1', options='discoveryMode=NORMAL') # Register discovered table: >>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE") # Create Table (DataFrame) object from discovered table: >>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
- get_current_catalog() str | None #
Returns the name of the current catalog for the Trino session attached to this session. See the example in
table()
.
- get_current_roles() Dict[str, str] | None #
Returns the name of the roles in use for the current session.
- get_current_schema() str | None #
Returns the name of the current schema for the Python connector session attached to this session. See the example in
table()
.
- get_fully_qualified_current_schema() str #
Returns the fully qualified name of the current schema for the session.
- query_history() QueryHistory #
Create an instance of
QueryHistory
as a context manager to record queries that are pushed down to the Trino cluster.>>> with session.query_history() as query_history: ... df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) ... df = df.filter(df.a == 1) ... res = df.collect() >>> assert len(query_history.queries) == 1
- range(start: int, end: int | None = None, step: int = 1) DataFrame #
Creates a new DataFrame from a range of numbers. The resulting DataFrame has single column named
ID
, containing elements in a range fromstart
toend
(exclusive) with the step valuestep
.- Parameters:
start – The start of the range. If
end
is not specified,start
will be used as the value ofend
.end – The end of the range.
step – The step of the range.
Examples
>>> session.range(10).collect() [Row(ID=0), Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)] >>> session.range(1, 10).collect() [Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)] >>> session.range(1, 10, 2).collect() [Row(ID=1), Row(ID=3), Row(ID=5), Row(ID=7), Row(ID=9)]
- set_role(role: str, catalog: str = 'system') None #
Specifies the active/current role for the session.
- Parameters:
role – the role name.
catalog – the catalog name (defaults to ‘system’)
- sql(query: str) DataFrame #
Returns a new DataFrame representing the results of a SQL query. You can use this method to execute a SQL statement. Note that you still need to call
DataFrame.collect()
to execute this query in Trino.- Parameters:
query – The SQL statement to execute.
Examples
>>> # create a dataframe from a SQL query >>> df = session.sql("select 1/2") >>> # execute the query >>> df.collect() [Row(1/2=Decimal('0.500000'))]
- table(name: str | Iterable[str]) Table #
Returns a Table that points the specified table.
- Parameters:
name – A string or list of strings that specify the table name or fully-qualified object identifier (database name, schema name, and table name).
Note – If your table name contains special characters, use double quotes to mark it like this,
session.table('"my table"')
. For fully qualified names, you need to use double quotes separately like this,session.table('"my db"."my schema"."my.table"')
.
Examples
>>> df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) >>> df1.write.save_as_table("my_table", mode="overwrite") >>> session.table("my_table").collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> current_db = session.get_current_catalog() >>> current_schema = session.get_current_schema() >>> session.table([current_db, current_schema, "my_table"]).collect() [Row(A=1, B=2), Row(A=3, B=4)]
- table_function(func_name: str | List[str] | TableFunctionCall, *func_arguments: ColumnOrName, **func_named_arguments: ColumnOrName) DataFrame #
Creates a new DataFrame from the given Trino SQL table function.
References: Trino SQL functions.
- Parameters:
func_name – The SQL function name.
func_arguments – The positional arguments for the SQL function.
func_named_arguments – The named arguments for the SQL function, if it accepts named arguments.
- Returns:
A new
DataFrame
with data from calling the table function.
- Example 1
Query a table function by function name:
>>> from pystarburst.functions import lit >>> session.table_function("sequence", lit(0), lit(4)).collect() [Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
- Example 2
Define a table function variable and query it:
>>> from pystarburst.functions import table_function, lit >>> sequence = table_function("sequence") >>> session.table_function(sequence(lit(0), lit(4))).collect() [Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
- use(catalog_schema: str) None #
Specifies the active/current schema for the session.
- Parameters:
catalog_schema – The catalog and/or schema name.