Table#

Table methods#

class pystarburst.table.DeleteResult(rows_deleted: int)#

Bases: NamedTuple

Result of deleting rows in a Table.

rows_deleted: int#

The number of rows deleted.

class pystarburst.table.MergeResult(rows_affected: int)#

Bases: NamedTuple

Result of merging a DataFrame into a Table.

rows_affected: int#

The number of rows inserted, updated or deleted.

class pystarburst.table.Table(table_name: str, session: Session)#

Bases: DataFrame

Represents a lazily-evaluated Table. It extends DataFrame so all DataFrame operations can be applied to it.

You can create a Table object by calling Session.table() with the name of the table in Trino. See examples in Session.table().

delete(condition: Column | None = None, *, statement_properties: Dict[str, str] | None = None) DeleteResult#

Deletes rows in a Table and returns a DeleteResult, representing the number of rows deleted.

Parameters:

condition – An optional Column object representing the specified condition. It must be provided if source is provided.

Examples

>>> target_df = session.create_dataframe([(1, 1),(1, 2),(2, 1),(2, 2),(3, 1),(3, 2)], schema=["a", "b"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> t = session.table("my_table")

>>> # delete all rows in a table
>>> t.delete()
DeleteResult(rows_deleted=6)
>>> t.collect()
[]

>>> # delete all rows where column "a" has value 1
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> t.delete(t["a"] == 1)
DeleteResult(rows_deleted=2)
>>> t.collect()
[Row(A=2, B=1), Row(A=2, B=2), Row(A=3, B=1), Row(A=3, B=2)]
drop_table() None#

Drops the table from the Trino cluster, if exists.

Note that subsequent operations such as DataFrame.select(), DataFrame.collect() on this Table instance and the derived DataFrame will raise errors because the underlying table in the Trino cluster no longer exists.

is_cached: bool#

Whether the table is cached.

merge(source: DataFrame, join_expr: Column, clauses: Iterable[WhenMatchedClause | WhenNotMatchedClause], *, statement_properties: Dict[str, str] | None = None) MergeResult#

Merges this Table with DataFrame source on the specified join expression and a list of matched or not-matched clauses, and returns a MergeResult, representing the number of rows inserted, updated and deleted by this merge action. See MERGE for details.

Parameters:
  • source – A DataFrame to join with this Table. It can also be another Table.

  • join_expr – A Column object representing the expression on which to join this Table and source.

  • clauses – A list of matched or not-matched clauses specifying the actions to perform when the values from this Table and source match or not match on join_expr. These actions can only be instances of WhenMatchedClause and WhenNotMatchedClause, and will be performed sequentially in this list.

Examples

>>> from pystarburst.functions import when_matched, when_not_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"],
...              [when_matched().update({"value": source["value"]}), when_not_matched().insert({"key": source["key"]})])
MergeResult(rows_affected=3)
>>> target.collect()
[Row(KEY=13, VALUE=None), Row(KEY=12, VALUE=None), Row(KEY=10, VALUE='new'), Row(KEY=10, VALUE='new'), Row(KEY=11, VALUE='old')]
sample(frac: float, *, sampling_method: str | None = None) DataFrame#

Samples rows based on either the number of rows to be returned or a percentage of rows to be returned.

Sampling with a seed is not supported on views or subqueries. This method works on tables so it supports seed. This is the main difference between DataFrame.sample() and this method.

Parameters:
  • frac – The percentage of rows to be sampled.

  • sampling_method

    Specifies the sampling method to use:

    • BERNOULLI: Includes each row with a probability of p/100. Similar to flipping a weighted coin for each row.

    • SYSTEM: Includes each block of rows with a probability of p/100. Similar to flipping a weighted coin for each block of rows. This method does not support fixed-size sampling.

    Default is None. Then the Trino cluster will use “BERNOULLI” by default.

Note

  • SYSTEM sampling is often faster than BERNOULLI sampling.

table_name: str#

The table name

update(assignments: Dict[str, ColumnOrLiteral], condition: Column | None = None, *, statement_properties: Dict[str, str] | None = None) UpdateResult#

Updates rows in the Table with specified assignments and returns a UpdateResult, representing the number of rows modified and the number of multi-joined rows modified.

Parameters:
  • assignments – A dict that associates the names of columns with the values that should be updated. The value of assignments can either be a literal value or a Column object.

  • condition – An optional Column object representing the specified condition. It must be provided if source is provided.

Examples

>>> target_df = session.create_dataframe([(1, 1),(1, 2),(2, 1),(2, 2),(3, 1),(3, 2)], schema=["a", "b"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> t = session.table("my_table")

>>> # update all rows in column "b" to 0 and all rows in column "a"
>>> # to the summation of column "a" and column "b"
>>> t.update({"b": 0, "a": t.a + t.b})
UpdateResult(rows_updated=6, multi_joined_rows_updated=0)
>>> t.collect()
[Row(A=2, B=0), Row(A=3, B=0), Row(A=3, B=0), Row(A=4, B=0), Row(A=4, B=0), Row(A=5, B=0)]

>>> # update all rows in column "b" to 0 where column "a" has value 1
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> t.update({"b": 0}, t["a"] == 1)
UpdateResult(rows_updated=2, multi_joined_rows_updated=0)
>>> t.collect()
[Row(A=1, B=0), Row(A=1, B=0), Row(A=2, B=1), Row(A=2, B=2), Row(A=3, B=1), Row(A=3, B=2)]
class pystarburst.table.UpdateResult(rows_updated: int)#

Bases: NamedTuple

Result of updating rows in a Table.

rows_updated: int#

The number of rows modified.

class pystarburst.table.WhenMatchedClause(condition: Column | None = None)#

Bases: object

A matched clause for the Table.merge() action. It matches all remaining rows in the target Table that satisfy join_expr while also satisfying condition, if it is provided. You can use functions.when_matched() to instantiate this class.

Parameters:

condition – An optional Column object representing the specified condition.

delete()#

Defines a delete action for the matched clause and returns an updated WhenMatchedClause with the new delete action added.

Examples

>>> # Adds a matched clause where a row in source is matched
>>> # if its key is equal to the key of any row in target.
>>> # For all such rows, delete them.
>>> from pystarburst.functions import when_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"], [when_matched().delete()])
MergeResult(rows_affected=2)
>>> target.collect() # the rows are deleted
[Row(KEY=11, VALUE='old')]

Note

An exception will be raised if this method or WhenMatchedClause.update() is called more than once on the same WhenMatchedClause object.

update(assignments: Dict[str, ColumnOrLiteral]) WhenMatchedClause#

Defines an update action for the matched clause and returns an updated WhenMatchedClause with the new update action added.

Parameters:

assignments – A list of values or a dict that associates the names of columns with the values that should be updated. The value of assignments can either be a literal value or a Column object.

Examples

>>> # Adds a matched clause where a row in source is matched
>>> # if its key is equal to the key of any row in target.
>>> # For all such rows, update its value to the value of the
>>> # corresponding row in source.
>>> from pystarburst.functions import when_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"], [when_matched().update({"value": source["value"]})])
MergeResult(rows_affected=2)
>>> target.collect() # the value in the table is updated
[Row(KEY=10, VALUE='new'), Row(KEY=10, VALUE='new'), Row(KEY=11, VALUE='old')]

Note

An exception will be raised if this method or WhenMatchedClause.delete() is called more than once on the same WhenMatchedClause object.

class pystarburst.table.WhenNotMatchedClause(condition: Column | None = None)#

Bases: object

A not-matched clause for the Table.merge() action. It matches all remaining rows in the target Table that do not satisfy join_expr but satisfy condition, if it is provided. You can use functions.when_not_matched() to instantiate this class.

Parameters:

condition – An optional Column object representing the specified condition.

insert(assignments: Iterable[ColumnOrLiteral] | Dict[str, ColumnOrLiteral]) WhenNotMatchedClause#

Defines an insert action for the not-matched clause and returns an updated WhenNotMatchedClause with the new insert action added.

Parameters:

assignments – A list of values or a dict that associates the names of columns with the values that should be inserted. The value of assignments can either be a literal value or a Column object.

Examples

>>> # Adds a not-matched clause where a row in source is not matched
>>> # if its key does not equal the key of any row in target.
>>> # For all such rows, insert a row into target whose ley and value
>>> # are assigned to the key and value of the not matched row.
>>> from pystarburst.functions import when_not_matched
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(12, "new")], schema=["key", "value"])
>>> target.merge(source, target["key"] == source["key"], [when_not_matched().insert([source["key"], source["value"]])])
MergeResult(rows_affected=1)
>>> target.collect() # the rows are inserted
[Row(KEY=12, VALUE='new'), Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='too_old'), Row(KEY=11, VALUE='old')]

>>> # For all such rows, insert a row into target whose key is
>>> # assigned to the key of the not matched row.
>>> target_df.write.save_as_table("my_table", mode="overwrite")
>>> target.merge(source, target["key"] == source["key"], [when_not_matched().insert({"key": source["key"]})])
MergeResult(rows_affected=1)
>>> target.collect() # the rows are inserted
[Row(KEY=12, VALUE=None), Row(KEY=10, VALUE='old'), Row(KEY=10, VALUE='too_old'), Row(KEY=11, VALUE='old')]

Note

An exception will be raised if this method is called more than once on the same WhenNotMatchedClause object.