Session#
Session#
- class pystarburst.session.SchemaDiscoveryResult(session, uri, *, catalog_name=None, schema_name='', options='')#
Bases:
object
- register_discovered_table(if_exists=SaveMode.ERRORIFEXISTS)#
Register discovered table into the metastore.
- Parameters:
if_exists –
How to behave if the table already exists:
ERRORIFEXISTS: Raise an exception.
IGNORE: Preserve current table/schema, do nothing.
OVERWRITE: Unregister current table and re-register it.
Examples
>>> # Run schema discovery >>> schema_discovery_result = session.discover(uri) # Register discovered table: >>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE") # Create Table (DataFrame) object from discovered table: >>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
- unregister_discovered_table()#
Unregister discovered table from the metastore.
Examples
>>> # Run schema discovery >>> schema_discovery_result = session.discover(uri) # Unregister discovered table: >>> schema_discovery_result.unregister_discovered_table()
- class pystarburst.session.Session(conn: ServerConnection, use_endpoint: bool | None = False, type_coercion_mode: TypeCoercionMode = TypeCoercionMode.DEFAULT)#
Bases:
object
Establishes a connection with a Trino cluster and provides methods for creating DataFrames.
When you create a
Session
object, you provide connection parameters to establish a connection with a Trino cluster (e.g. an hostname, a user name, etc.). You can specify these settings in a dict that associates connection parameters names with values. The pystarburst library uses the Trino Python Client to connect to Trino.To create a
Session
object from adict
of connection parameters:>>> connection_parameters = { ... "host": "<host_name>", ... "port": "<host_name>", ... "user": "<user_name>", ... "roles": {"system": "ROLE{analyst}"}, ... "catalog": "<catalog_name>", ... "schema": "<schema1_name>", ... } >>> session = Session.builder.configs(connection_parameters).create()
Session
contains functions to construct aDataFrame
liketable()
,sql()
andread
.A
Session
object is not thread-safe.- class SessionBuilder#
Bases:
object
Provides methods to set connection parameters and create a
Session
.- config(key: str, value: int | str) SessionBuilder #
Adds the specified connection parameter to the
SessionBuilder
configuration.
- configs(options: Dict[str, int | str]) SessionBuilder #
Adds the specified
dict
of connection parameters to theSessionBuilder
configuration.Note
Calling this method overwrites any existing connection parameters that you have already set in the SessionBuilder.
- builder: SessionBuilder#
Returns a builder you can use to set configuration properties and create a
Session
object.
- cancel_all() None #
Cancel all action methods that are running currently. This does not affect any action methods called in the future.
- close() None #
Close this session.
- createDataFrame(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame #
Creates a new DataFrame containing the specified values from the local data.
createDataFrame()
is an alias ofcreate_dataframe()
.- Parameters:
data – The local data for building a
DataFrame
.data
can only be alist
ortuple
. Every element indata
will constitute a row in the DataFrame.schema – A
StructType
containing names and data types of columns, or a list of column names, orNone
. Whenschema
is a list of column names orNone
, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.
Examples
>>> # create a dataframe with a schema >>> from pystarburst.types import IntegerType, StringType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect() [Row(A=1, B='py'), Row(A=3, B='trino')] >>> # create a dataframe by inferring a schema from the data >>> from pystarburst import Row >>> # infer schema >>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4)] >>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect() [Row(A=1, B=None), Row(A=None, B=2)]
- create_dataframe(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame #
Creates a new DataFrame containing the specified values from the local data.
createDataFrame()
is an alias ofcreate_dataframe()
.- Parameters:
data – The local data for building a
DataFrame
.data
can only be alist
ortuple
. Every element indata
will constitute a row in the DataFrame.schema – A
StructType
containing names and data types of columns, or a list of column names, orNone
. Whenschema
is a list of column names orNone
, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.
Examples
>>> # create a dataframe with a schema >>> from pystarburst.types import IntegerType, StringType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect() [Row(A=1, B='py'), Row(A=3, B='trino')] >>> # create a dataframe by inferring a schema from the data >>> from pystarburst import Row >>> # infer schema >>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect() [Row(A=1), Row(A=2), Row(A=3), Row(A=4)] >>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect() [Row(A=1, B=2, C=3, D=4)] >>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect() [Row(A=1, B=None), Row(A=None, B=2)]
- discover(uri, *, catalog_name=None, schema_name='', options='')#
Run Schema Discovery feature on specified location.
- Parameters:
uri – URI to scan
catalog_name – catalog name to use. Must be specified here, or in connection parameters.
schema_name – schema name to use - ‘discovered’ if not provided
options – Discovery options
Invoke
register_discovered_table()
method on result object to register discovered table.Examples
>>> # Run discovery: >>> schema_discovery_result = session.discover(uri, catalog_name='iceberg', schema_name='test_schema_1', options='discoveryMode=NORMAL') # Register discovered table: >>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE") # Create Table (DataFrame) object from discovered table: >>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
- get_current_catalog() str | None #
Returns the name of the current catalog for the Trino session attached to this session. See the example in
table()
.
- get_current_roles() Dict[str, str] | None #
Returns the name of the roles in use for the current session.
- get_current_schema() str | None #
Returns the name of the current schema for the Python connector session attached to this session. See the example in
table()
.
- get_fully_qualified_current_schema() str #
Returns the fully qualified name of the current schema for the session.
- query_history() QueryHistory #
Create an instance of
QueryHistory
as a context manager to record queries that are pushed down to the Trino cluster.>>> with session.query_history() as query_history: ... df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) ... df = df.filter(df.a == 1) ... res = df.collect() >>> assert len(query_history.queries) == 1
- range(start: int, end: int | None = None, step: int = 1) DataFrame #
Creates a new DataFrame from a range of numbers. The resulting DataFrame has single column named
ID
, containing elements in a range fromstart
toend
(exclusive) with the step valuestep
.- Parameters:
start – The start of the range. If
end
is not specified,start
will be used as the value ofend
.end – The end of the range.
step – The step of the range.
Examples
>>> session.range(10).collect() [Row(ID=0), Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)] >>> session.range(1, 10).collect() [Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)] >>> session.range(1, 10, 2).collect() [Row(ID=1), Row(ID=3), Row(ID=5), Row(ID=7), Row(ID=9)]
- set_role(role: str, catalog: str = 'system') None #
Specifies the active/current role for the session.
- Parameters:
role – the role name.
catalog – the catalog name (defaults to ‘system’)
- sql(query: str) DataFrame #
Returns a new DataFrame representing the results of a SQL query. You can use this method to execute a SQL statement. Note that you still need to call
DataFrame.collect()
to execute this query in Trino.- Parameters:
query – The SQL statement to execute.
Examples
>>> # create a dataframe from a SQL query >>> df = session.sql("select 1/2") >>> # execute the query >>> df.collect() [Row(1/2=Decimal('0.500000'))]
- table(name: str | Iterable[str]) Table #
Returns a Table that points the specified table.
- Parameters:
name – A string or list of strings that specify the table name or fully-qualified object identifier (database name, schema name, and table name).
Note – If your table name contains special characters, use double quotes to mark it like this,
session.table('"my table"')
. For fully qualified names, you need to use double quotes separately like this,session.table('"my db"."my schema"."my.table"')
.
Examples
>>> df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) >>> df1.write.save_as_table("my_table", mode="overwrite") >>> session.table("my_table").collect() [Row(A=1, B=2), Row(A=3, B=4)] >>> current_db = session.get_current_catalog() >>> current_schema = session.get_current_schema() >>> session.table([current_db, current_schema, "my_table"]).collect() [Row(A=1, B=2), Row(A=3, B=4)]
- table_function(func_name: str | List[str] | TableFunctionCall, *func_arguments: ColumnOrName, **func_named_arguments: ColumnOrName) DataFrame #
Creates a new DataFrame from the given Trino SQL table function.
References: Trino SQL functions.
- Parameters:
func_name – The SQL function name.
func_arguments – The positional arguments for the SQL function.
func_named_arguments – The named arguments for the SQL function, if it accepts named arguments.
- Returns:
A new
DataFrame
with data from calling the table function.
- Example 1
Query a table function by function name:
>>> from pystarburst.functions import lit >>> session.table_function("sequence", lit(0), lit(4)).collect() [Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
- Example 2
Define a table function variable and query it:
>>> from pystarburst.functions import table_function, lit >>> sequence = table_function("sequence") >>> session.table_function(sequence(lit(0), lit(4))).collect() [Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
- use(catalog_schema: str) None #
Specifies the active/current schema for the session.
- Parameters:
catalog_schema – The catalog and/or schema name.