Session#

Session#

class pystarburst.session.SchemaDiscoveryResult(session, uri, *, catalog_name=None, schema_name='', options='')#

Bases: object

register_discovered_table(if_exists=SaveMode.ERRORIFEXISTS)#

Register discovered table into the metastore.

Parameters:

if_exists

How to behave if the table already exists:

  • ERRORIFEXISTS: Raise an exception.

  • IGNORE: Preserve current table/schema, do nothing.

  • OVERWRITE: Unregister current table and re-register it.

Examples

>>>
# Run schema discovery
>>> schema_discovery_result = session.discover(uri)
# Register discovered table:
>>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE")
# Create Table (DataFrame) object from discovered table:
>>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
unregister_discovered_table()#

Unregister discovered table from the metastore.

Examples

>>>
# Run schema discovery
>>> schema_discovery_result = session.discover(uri)
# Unregister discovered table:
>>> schema_discovery_result.unregister_discovered_table()
class pystarburst.session.Session(conn: ServerConnection, use_endpoint: bool | None = False, type_coercion_mode: TypeCoercionMode = TypeCoercionMode.DEFAULT)#

Bases: object

Establishes a connection with a Trino cluster and provides methods for creating DataFrames.

When you create a Session object, you provide connection parameters to establish a connection with a Trino cluster (e.g. an hostname, a user name, etc.). You can specify these settings in a dict that associates connection parameters names with values. The pystarburst library uses the Trino Python Client to connect to Trino.

To create a Session object from a dict of connection parameters:

>>> connection_parameters = {
...     "host": "<host_name>",
...     "port": "<host_name>",
...     "user": "<user_name>",
...     "roles": {"system": "ROLE{analyst}"},
...     "catalog": "<catalog_name>",
...     "schema": "<schema1_name>",
... }
>>> session = Session.builder.configs(connection_parameters).create() 

Session contains functions to construct a DataFrame like table(), sql() and read.

A Session object is not thread-safe.

class SessionBuilder#

Bases: object

Provides methods to set connection parameters and create a Session.

config(key: str, value: int | str) SessionBuilder#

Adds the specified connection parameter to the SessionBuilder configuration.

configs(options: Dict[str, int | str]) SessionBuilder#

Adds the specified dict of connection parameters to the SessionBuilder configuration.

Note

Calling this method overwrites any existing connection parameters that you have already set in the SessionBuilder.

create() Session#

Creates a new Session.

builder: SessionBuilder#

Returns a builder you can use to set configuration properties and create a Session object.

cache_results(database_name: str = None, schema_name: str = None, table_properties: Dict[str, str | bool | int | float] = None, statement_properties: Dict[str, str] | None = None) ResultCache#

Creates an instance of ResultCache as a context manager to capture all created tables via the cache method and delete them on context exit.

While the context manager tries to clean up the data created during the session - there is no guarantee of clean-up in case of a connection/server problem and table may stay alive.

Important notes:
  • The user that runs the session should have permission to create and drop tables in the selected catalog and schema

  • Table data can be seen by other users (depending on permissions setup) which can lead to data leakage

  • We recommend using a catalog that allows the creation of managed tables (Iceberg or Hive catalog without a location) otherwise data files won’t be deleted

  • We recommend storing temporary data in separate storage where data retention is set up

>>> with session.cache_results("iceberg", "default") as cache_results:
...     df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"])
...     df = cache_results.cache(df)
...     conditions = [col("a") == 3, col("a") == 7]
...     for condition in conditions:
...        df.filter(condition).show()
>>>     ...
cancel_all() None#

Cancel all action methods that are running currently. This does not affect any action methods called in the future.

close() None#

Close this session.

createDataFrame(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame#

Creates a new DataFrame containing the specified values from the local data.

createDataFrame() is an alias of create_dataframe().

Parameters:
  • data – The local data for building a DataFrame. data can only be a list or tuple. Every element in data will constitute a row in the DataFrame.

  • schema – A StructType containing names and data types of columns, or a list of column names, or None. When schema is a list of column names or None, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.

Examples

>>> # create a dataframe with a schema
>>> from pystarburst.types import IntegerType, StringType, StructField
>>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
>>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect()
[Row(A=1, B='py'), Row(A=3, B='trino')]

>>> # create a dataframe by inferring a schema from the data
>>> from pystarburst import Row
>>> # infer schema
>>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4)]
>>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect()
[Row(A=1, B=2, C=3, D=4)]
>>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect()
[Row(A=1, B=2), Row(A=3, B=4)]
>>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect()
[Row(A=1, B=2, C=3, D=4)]
>>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect()
[Row(A=1, B=None), Row(A=None, B=2)]
create_dataframe(data: List | Tuple, schema: StructType | List[str] | None = None) DataFrame#

Creates a new DataFrame containing the specified values from the local data.

createDataFrame() is an alias of create_dataframe().

Parameters:
  • data – The local data for building a DataFrame. data can only be a list or tuple. Every element in data will constitute a row in the DataFrame.

  • schema – A StructType containing names and data types of columns, or a list of column names, or None. When schema is a list of column names or None, the schema of the DataFrame will be inferred from the data across all rows. To improve performance, provide a schema. This avoids the need to infer data types with large data sets.

Examples

>>> # create a dataframe with a schema
>>> from pystarburst.types import IntegerType, StringType, StructField
>>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
>>> session.create_dataframe([[1, "py"], [3, "trino"]], schema).collect()
[Row(A=1, B='py'), Row(A=3, B='trino')]

>>> # create a dataframe by inferring a schema from the data
>>> from pystarburst import Row
>>> # infer schema
>>> session.create_dataframe([1, 2, 3, 4], schema=["a"]).collect()
[Row(A=1), Row(A=2), Row(A=3), Row(A=4)]
>>> session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]).collect()
[Row(A=1, B=2, C=3, D=4)]
>>> session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).collect()
[Row(A=1, B=2), Row(A=3, B=4)]
>>> session.create_dataframe([Row(a=1, b=2, c=3, d=4)]).collect()
[Row(A=1, B=2, C=3, D=4)]
>>> session.create_dataframe([{"a": 1}, {"b": 2}]).collect()
[Row(A=1, B=None), Row(A=None, B=2)]
discover(uri, *, catalog_name=None, schema_name='', options='')#

Run Schema Discovery feature on specified location.

Parameters:
  • uri – URI to scan

  • catalog_name – catalog name to use. Must be specified here, or in connection parameters.

  • schema_name – schema name to use - ‘discovered’ if not provided

  • options – Discovery options

Invoke register_discovered_table() method on result object to register discovered table.

Examples

>>>
# Run discovery:
>>> schema_discovery_result = session.discover(uri, catalog_name='iceberg', schema_name='test_schema_1', options='discoveryMode=NORMAL')
# Register discovered table:
>>> schema_discovery_result.register_discovered_table(if_exists="OVERWRITE")
# Create Table (DataFrame) object from discovered table:
>>> df = session.table(schema_discovery_result.register_discovered_table(if_exists="IGNORE"))
get_current_catalog() str | None#

Returns the name of the current catalog for the Trino session attached to this session. See the example in table().

get_current_roles() Dict[str, str] | None#

Returns the name of the roles in use for the current session.

get_current_schema() str | None#

Returns the name of the current schema for the Python connector session attached to this session. See the example in table().

get_fully_qualified_current_schema() str#

Returns the fully qualified name of the current schema for the session.

query_history() QueryHistory#

Create an instance of QueryHistory as a context manager to record queries that are pushed down to the Trino cluster.

>>> with session.query_history() as query_history:
...     df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"])
...     df = df.filter(df.a == 1)
...     res = df.collect()
>>> assert len(query_history.queries) == 1
range(start: int, end: int | None = None, step: int = 1) DataFrame#

Creates a new DataFrame from a range of numbers. The resulting DataFrame has single column named ID, containing elements in a range from start to end (exclusive) with the step value step.

Parameters:
  • start – The start of the range. If end is not specified, start will be used as the value of end.

  • end – The end of the range.

  • step – The step of the range.

Examples

>>> session.range(10).collect()
[Row(ID=0), Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)]
>>> session.range(1, 10).collect()
[Row(ID=1), Row(ID=2), Row(ID=3), Row(ID=4), Row(ID=5), Row(ID=6), Row(ID=7), Row(ID=8), Row(ID=9)]
>>> session.range(1, 10, 2).collect()
[Row(ID=1), Row(ID=3), Row(ID=5), Row(ID=7), Row(ID=9)]
set_role(role: str, catalog: str = 'system') None#

Specifies the active/current role for the session.

Parameters:
  • role – the role name.

  • catalog – the catalog name (defaults to ‘system’)

sql(query: str) DataFrame#

Returns a new DataFrame representing the results of a SQL query. You can use this method to execute a SQL statement. Note that you still need to call DataFrame.collect() to execute this query in Trino.

Parameters:

query – The SQL statement to execute.

Examples

>>> # create a dataframe from a SQL query
>>> df = session.sql("select 1/2")
>>> # execute the query
>>> df.collect()
[Row(1/2=Decimal('0.500000'))]
table(name: str | Iterable[str]) Table#

Returns a Table that points the specified table.

Parameters:
  • name – A string or list of strings that specify the table name or fully-qualified object identifier (database name, schema name, and table name).

  • Note – If your table name contains special characters, use double quotes to mark it like this, session.table('"my table"'). For fully qualified names, you need to use double quotes separately like this, session.table('"my db"."my schema"."my.table"').

Examples

>>> df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"])
>>> df1.write.save_as_table("my_table", mode="overwrite")
>>> session.table("my_table").collect()
[Row(A=1, B=2), Row(A=3, B=4)]
>>> current_db = session.get_current_catalog()
>>> current_schema = session.get_current_schema()
>>> session.table([current_db, current_schema, "my_table"]).collect()
[Row(A=1, B=2), Row(A=3, B=4)]
table_function(func_name: str | List[str] | TableFunctionCall, *func_arguments: ColumnOrName, **func_named_arguments: ColumnOrName) DataFrame#

Creates a new DataFrame from the given Trino SQL table function.

References: Trino SQL functions.

Parameters:
  • func_name – The SQL function name.

  • func_arguments – The positional arguments for the SQL function.

  • func_named_arguments – The named arguments for the SQL function, if it accepts named arguments.

Returns:

A new DataFrame with data from calling the table function.

Example 1

Query a table function by function name:

>>> from pystarburst.functions import lit
>>> session.table_function("sequence", lit(0), lit(4)).collect()
[Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
Example 2

Define a table function variable and query it:

>>> from pystarburst.functions import table_function, lit
>>> sequence = table_function("sequence")
>>> session.table_function(sequence(lit(0), lit(4))).collect()
[Row(sequential_number=0), Row(sequential_number=1), Row(sequential_number=2), Row(sequential_number=3), Row(sequential_number=4)]
use(catalog_schema: str) None#

Specifies the active/current schema for the session.

Parameters:

catalog_schema – The catalog and/or schema name.