Storey api#

Targets:#

class storey.targets.CSVTarget(path: str, columns: Optional[List[str]] = None, header: bool = False, infer_columns_from_data: Optional[bool] = None, max_events: int = 128, flush_after_seconds: int = 3, **kwargs)#

Bases: _Batching, _Writer

Writes events to a CSV file.

Parameters:
  • path – path where CSV file will be written.

  • columns – Fields to be written to CSV. Will be written as the file header if write_header is True. Will be extracted from events when an event is a dictionary (lists will be written as is). Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time). Optional. Defaults to None (will be inferred if event is dictionary).

  • header – Whether to write the columns as a CSV header.

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If columns is not provided, infer_columns_from_data=True is implied. Optional. Default to False if columns is provided, True otherwise.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (CSVTarget).

  • max_events (int) – Maximum number of events to write at a time. If None (default), all events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set).

  • flush_after_seconds (int) – Maximum number of seconds to hold events before they are written. If None (default), all events will be written on flow termination, or after max_events are accumulated (if max_events is set).

  • key (str or Event) – batching will be done by key

  • storage_options (dict) – Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc., if using a URL that will be parsed by fsspec, e.g., starting “s3://”, “gcs://”. Optional.

class storey.targets.KafkaTarget(bootstrap_servers: Union[str, List[str]], topic: str, producer_options: Optional[dict] = None, sharding_func: Union[None, int, str, Callable[[Event], Any]] = None, columns: Optional[List[str]] = None, infer_columns_from_data: Optional[bool] = None, full_event: Optional[bool] = None, **kwargs)#

Bases: Flow, _Writer

Writes all incoming events into a Kafka stream.

Parameters:
  • topic – Kafka topic.

  • bootstrap_servers – Kafka bootstrap servers (brokers).

  • producer_options – Extra options to be passed as kwargs to kafka.KafkaProducer.

  • sharding_func – Partition, sharding key field, or function from event to partition or sharding key. Optional. If not set, event key will be used as the sharding key.

  • columns – Fields to be written to topic. Will be extracted from events when an event is a dictionary (other types will be written as is). Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time). Optional. Defaults to None (will be inferred if event is dictionary).

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If header is True and columns is not provided, infer_columns_from_data=True is implied. Optional. Default to False if columns is provided, True otherwise.

  • full_event – Enable metadata wrapper for serialized event. Defaults to False.

class storey.targets.NoSqlTarget(table: Union[Table, str], columns: Optional[List[Union[str, Tuple[str, str]]]] = None, infer_columns_from_data: Optional[bool] = None, **kwargs)#

Bases: _Writer, Flow

Persists the data in table to its associated storage by key.

Parameters:
  • table – A Table object or name to persist. If a table name is provided, it will be looked up in the context.

  • columns – Fields to be written to the storage. Will be extracted from events when an event is a dictionary (lists will be written as is). Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time). Optional. Defaults to None (will be inferred if event is dictionary).

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If header is True and columns is not provided, infer_columns_from_data=True is implied. Optional. Default to False if columns is provided, True otherwise.

  • storage_options (dict) – Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc., if using a URL that will be parsed by fsspec, e.g., starting “s3://”, “gcs://”. Optional.

class storey.targets.ParquetTarget(path: str, index_cols: Optional[Union[str, List[str], List[Tuple[str, str]]]] = None, columns: Optional[Union[str, List[str], List[Tuple[str, str]]]] = None, partition_cols: Optional[Union[str, List[str], List[Tuple[str, int]]]] = None, time_field: Union[None, str, int] = None, time_format: Optional[str] = None, infer_columns_from_data: Optional[bool] = None, max_events: Optional[int] = None, flush_after_seconds: Optional[Union[int, float]] = None, **kwargs)#

Bases: _Batching, _Writer

Writes incoming events to parquet files.

Parameters:
  • path – Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet().

  • index_cols – Index columns for writing the data. Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time).If None (default), no index is set.

  • columns – Fields to be written to parquet. Will be extracted from events when an event is a dictionary (lists will be written as is). Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time). Can be a list of (name, type) tuples in order to set the schema explicitly, e.g. (‘my_field’, ‘str’). Supported types: str, int32, int, float32, float, bool, datetime. Optional. Defaults to None (will be inferred if event is dictionary).

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If columns were not provided and infer_columns_from_data=False, PyArrow will infer the schema per file written, which may cause the schemas to differ between files (e.g. if a column is all null in one file but not in another). Optional. Defaults to False.

  • partition_cols – Columns by which to partition the data into directories. The following metadata columns are also supported: $key, $date (e.g. 2020-02-09), $year, $month, $day, $hour, $minute, $second. A column may be specified as a tuple, such as (‘$key’, 64), which means partitioning by the event key hashed into 64 partitions. If None (the default), the data will only be partitioned if the path ends in .parquet or .pq. Otherwise, it will be partitioned by key/year/month/day/hour, where the key is hashed into 256 buckets.

  • time_field – Column to use for time partitioning, if applicable.

  • time_format – If time_field is provided, and the expected value type is str rather than datetime, time strings will be parsed according to this format. If not provided, they will be parsed in accordance with ISO-8601.

  • max_events (int) – Maximum number of events to write at a time. If None (default), all events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set).

  • flush_after_seconds (int) – Maximum number of seconds to hold events before they are written. If None (default), all events will be written on flow termination, or after max_events are accumulated (if max_events is set).

  • storage_options (dict) – Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc., if using a URL that will be parsed by fsspec, e.g., starting “s3://”, “gcs://”. Optional.

class storey.targets.StreamTarget(storage: Driver, stream_path: str, sharding_func: Union[None, int, str, Callable[[Event], Any]] = None, batch_size: int = 8, columns: Optional[List[str]] = None, infer_columns_from_data: Optional[bool] = None, shards: int = 1, retention_period_hours: int = 24, full_event: Optional[bool] = None, **kwargs)#

Bases: Flow, _Writer

Writes all incoming events into a V3IO stream.

Parameters:
  • storage – Database driver.

  • stream_path – Path to the V3IO stream.

  • sharding_func – Partition, sharding key field, or function from event to partition or sharding key. Optional. If not set, event key will be used as the sharding key.

  • batch_size – Batch size for each write request.

  • columns – Fields to be written to stream. Will be extracted from events when an event is a dictionary (other types will be written as is). Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time). Optional. Defaults to None (will be inferred if event is dictionary).

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If header is True and columns is not provided, infer_columns_from_data=True is implied. Optional. Default to False if columns is provided, True otherwise.

  • shards – If stream doesn’t exist, it will be created with this number of shards. Defaults to 1.

  • retention_period_hours – If stream doesn’t exist, it will be created with this retention time in hours. Defaults to 24.

  • full_event – Enable metadata wrapper for serialized event. Defaults to False.

  • storage_options (dict) – Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc., if using a URL that will be parsed by fsspec, e.g., starting “s3://”, “gcs://”. Optional

class storey.targets.TSDBTarget(path: str, time_col: str, time_format: Optional[str] = None, columns: Optional[Union[str, List[str]]] = None, infer_columns_from_data: Optional[bool] = None, index_cols: Optional[Union[str, List[str]]] = None, v3io_frames: Optional[str] = None, access_key: Optional[str] = None, rate: str = '', aggr: str = '', aggr_granularity: Optional[str] = None, frames_client=None, **kwargs)#

Bases: _Batching, _Writer

Writes incoming events to TSDB table.

Parameters:
  • path – Path to TSDB table.

  • time_col – Name of the time column.

  • time_format – If time_col is a string column, and its format is not compatible with ISO-8601, use this parameter to determine the expected format.

  • columns – List of column names to be passed to the DataFrame constructor. Use = notation for renaming fields (e.g. write_this=event_field). Use $ notation to refer to metadata ($key, event_time=$time).

  • infer_columns_from_data – Whether to infer columns from the first event, when events are dictionaries. If True, columns will be inferred from data and used in place of explicit columns list if none was provided, or appended to the provided list. If header is True and columns is not provided, infer_columns_from_data=True is implied. Optional. Default to False if columns is provided, True otherwise.

  • index_cols – List of column names to be used for metric labels.

  • v3io_frames – Frames service url.

  • access_key – Access key to the system.

  • container – Container name for this TSDB table.

  • rate – TSDB table sample rate.

  • aggr – Server-side aggregations for this TSDB table (e.g. ‘sum,count’).

  • aggr_granularity – Granularity of server-side aggregations for this TSDB table (e.g. ‘1h’).

  • frames_client – Frames instance. Allows usage of an existing frames client.

  • max_events (int) – Maximum number of events to write at a time. If None (default), all events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set).

  • flush_after_seconds (int) – Maximum number of seconds to hold events before they are written. If None (default), all events will be written on flow termination, or after max_events are accumulated (if max_events is set).

  • storage_options (dict) – Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc., if using a URL that will be parsed by fsspec, e.g., starting “s3://”, “gcs://”. Optional

Sources:#

class storey.sources.AsyncAwaitableResult(on_error: Optional[Callable[[BaseException], Coroutine]] = None, expected_number_of_results: int = 1)#

Bases: object

Future result of a computation. Calling await_result() will return with the result once the computation is completed. Same as AwaitableResult but for an async context.

async await_result()#

returns the result of the computation, once the computation is complete

class storey.sources.AsyncEmitSource(buffer_size: Optional[int] = None, key_field: Optional[Union[list, str]] = None, max_events_before_commit=None, explicit_ack=False, **kwargs)#

Bases: Flow

Asynchronous entry point into a flow. Produces an AsyncFlowController when run, for use from inside an async def. See SyncEmitSource for use from inside a synchronous context.

Parameters:
  • buffer_size – size of the incoming event buffer. Defaults to 8.

  • key_field – Field to extract and use as the key. Optional.

  • max_events_before_commit – Maximum number of events to be processed before committing offsets.

  • explicit_ack – Whether to explicitly commit offsets. Defaults to False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource).

for additional params, see documentation of Flow

run()#

Starts the flow

class storey.sources.AsyncFlowController(emit_fn, loop_task, await_result, key_field: Optional[str] = None, id_field: Optional[str] = None)#

Bases: FlowControllerBase

Used to emit events into the associated flow, terminate the flow, and await the flow’s termination. To be used from inside an async def.

async await_termination()#

Awaits the termination of the flow. To be called after terminate. Returns the termination result of the flow (if any).

async emit(element: object, key: Optional[Union[str, List[str]]] = None, await_result: Optional[bool] = None, expected_number_of_results: Optional[int] = None) object#

Emits an event into the associated flow.

Parameters:
  • element – The event data, or payload. To set metadata as well, pass an Event object.

  • key – The event key(s) (optional)

  • await_result – Deprecated. Will await a result if a Complete step appears in the flow.

  • expected_number_of_results – Number of times the event will have to pass through a Complete step to be completed (for graph flows).

Returns:

The result received from the flow if a Complete step appears in the flow. None otherwise.

async terminate()#

Terminates the associated flow.

class storey.sources.AwaitableResult(on_error: Optional[Callable[[], None]] = None, expected_number_of_results: int = 1)#

Bases: object

Future result of a computation. Calling await_result() will return with the result once the computation is completed.

await_result()#

Returns the result, once the computation is completed

class storey.sources.CSVSource(paths: Union[List[str], str], header: bool = True, build_dict: bool = False, key_field: Optional[Union[int, str, List[int], List[str]]] = None, time_field: Union[None, str, int] = None, timestamp_format: Optional[str] = None, id_field: Union[None, str, int] = None, type_inference: bool = True, parse_dates: Optional[Union[int, str, List[int], List[str]]] = None, **kwargs)#

Bases: DataframeSource

Reads CSV files as input source for a flow.

Parameters:
  • paths – paths to CSV files

  • header – Deprecated. whether CSV files have a header or not. Defaults to False.

  • build_dict – whether to format each record produced from the input file as a dictionary (as opposed to a list). Default to False.

  • key_field – the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys

  • time_field – the CSV field to be parsed as the timestamp for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no timestamp field).

  • timestamp_format – timestamp format as defined in datetime.strptime(). Default to ISO-8601 as defined in datetime.fromisoformat().

  • id_field – the CSV field to be used as the ID for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (random ID will be generated per event).

  • type_inference – Deprecated. Whether to infer data types from the data (when True), or read all fields in as strings (when False). Defaults to True.

  • parse_dates – list of columns (names or integers) that will be attempted to parse as date column

for additional params, see documentation of Flow

class storey.sources.DataframeSource(dfs: Union[DataFrame, Iterable[DataFrame]], key_field: Optional[Union[str, List[str]]] = None, id_field: Optional[str] = None, **kwargs)#

Bases: _IterableSource, WithUUID

Use pandas dataframe as input source for a flow.

Parameters:
  • dfs – A pandas dataframe, or dataframes, to be used as input source for the flow.

  • key_field – column to be used as key for events. can be list of columns

  • id_field – column to be used as ID for events.

for additional params, see documentation of Flow

class storey.sources.FlowAwaiter(await_termination_fn)#

Bases: object

Future termination result of a flow. Calling await_termination() will wait for the flow to terminate and return its termination result.

await_termination()#

“waits for the flow to terminate and returns the result

class storey.sources.FlowController(emit_fn, await_termination_fn, return_awaitable_result, key_field: Optional[str] = None, id_field: Optional[str] = None)#

Bases: FlowControllerBase

Used to emit events into the associated flow, terminate the flow, and await the flow’s termination. To be used from a synchronous context.

await_termination()#

Awaits the termination of the flow. To be called after terminate. Returns the termination result of the flow (if any).

emit(element: object, key: Optional[Union[str, List[str]]] = None, return_awaitable_result: Optional[bool] = None, expected_number_of_results: Optional[int] = None)#

Emits an event into the associated flow.

Parameters:
  • element – The event data, or payload. To set metadata as well, pass an Event object.

  • key – The event key(s) (optional) #add to async

  • return_awaitable_result – Deprecated. An awaitable result object will be returned if a Complete step appears in the flow.

  • expected_number_of_results – Number of times the event will have to pass through a Complete step to be completed (for graph flows).

Returns:

AsyncAwaitableResult if a Complete appears in the flow. None otherwise.

terminate()#

Terminates the associated flow.

class storey.sources.ParquetSource(paths: Union[str, Iterable[str]], columns=None, start_filter: Optional[datetime] = None, end_filter: Optional[datetime] = None, filter_column: Optional[str] = None, **kwargs)#

Bases: DataframeSource

Reads Parquet files as input source for a flow.

Parameters:
  • paths – paths to Parquet files

  • columns – list, default=None. If not None, only these columns will be read from the file.

  • start_filter – datetime. If not None, the results will be filtered by partitions and ‘filter_column’ > start_filter. Default is None.

  • end_filter – datetime. If not None, the results will be filtered by partitions ‘filter_column’ <= end_filter. Default is None.

  • filter_column – Optional. if not None, the results will be filtered by this column and before and/or after

  • key_field – column to be used as key for events. can be list of columns

  • id_field – column to be used as ID for events.

class storey.sources.SQLSource(db_path: str, table_name: str, key_field: Optional[Union[str, List[str]]] = None, id_field: Optional[str] = None, time_fields: Optional[List[str]] = None, **kwargs)#

Bases: _IterableSource, WithUUID

Use SQL table as input source for a flow.

Parameters:
  • key_field – the primary key of the table.

  • id_field – column to be used as ID for events.

  • db_path – url string connection to sql database.

  • table_name – the name of the table to access, from the current database

  • time_fields – list of all fields that are timestamps

class storey.sources.SyncEmitSource(buffer_size: Optional[int] = None, key_field: Optional[Union[list, str, int]] = None, max_events_before_commit=None, explicit_ack=False, **kwargs)#

Bases: Flow

Synchronous entry point into a flow. Produces a FlowController when run, for use from inside a synchronous context. See AsyncEmitSource for use from inside an async context.

Parameters:
  • buffer_size – size of the incoming event buffer. Defaults to 8.

  • key_field – Field to extract and use as the key. Optional.

  • max_events_before_commit – Maximum number of events to be processed before committing offsets.

  • explicit_ack – Whether to explicitly commit offsets. Defaults to False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource).

for additional params, see documentation of storey.flow.Flow

run()#

Starts the flow

Transformations:#

class storey.transformations.AggregateByKey(aggregates: ~typing.Union[~typing.List[~storey.dtypes.FieldAggregator], ~typing.List[~typing.Dict[str, object]]], table: ~typing.Union[~storey.table.Table, str], key_field: ~typing.Optional[~typing.Union[str, ~typing.List[str], ~typing.Callable[[~storey.dtypes.Event], object]]] = None, time_field: ~typing.Optional[~typing.Union[str, ~typing.Callable[[~storey.dtypes.Event], object]]] = None, emit_policy: ~typing.Union[~storey.dtypes.EmitPolicy, ~typing.Dict[str, object]] = <storey.dtypes.EmitEveryEvent object>, augmentation_fn: ~typing.Optional[~typing.Callable[[~storey.dtypes.Event, ~typing.Dict[str, object]], ~storey.dtypes.Event]] = None, enrich_with: ~typing.Optional[~typing.List[str]] = None, aliases: ~typing.Optional[~typing.Dict[str, str]] = None, use_windows_from_schema: bool = False, time_format: ~typing.Optional[str] = None, **kwargs)#

Bases: Flow

Aggregates the data into the table object provided for later persistence, and outputs an event enriched with the requested aggregation features. Persistence is done via the NoSqlTarget step and based on the Cache object persistence settings.

Parameters:
  • aggregates – List of aggregates to apply for each event. accepts either list of FieldAggregators or a dictionary describing FieldAggregators.

  • table – A Table object or name for persistence of aggregations. If a table name is provided, it will be looked up in the context object passed in kwargs.

  • key_field – Key field to aggregate by, accepts either a string representing the key field or a key extracting function. Defaults to the key in the event’s metadata. (Optional)

  • time_field – Time field to aggregate by, accepts either a string representing the time field or a time extracting function. Defaults to the processing time in the event’s metadata. (Optional)

  • emit_policy – Policy indicating when the data will be emitted. Defaults to EmitEveryEvent

  • augmentation_fn – Function that augments the features into the event’s body. Defaults to updating a dict. (Optional)

  • enrich_with – List of attributes names from the associated storage object to be fetched and added to every event. (Optional)

  • aliases – Dictionary specifying aliases for enriched or aggregate columns, of the format {‘col_name’: ‘new_col_name’}. (Optional)

  • time_format – If the value of the time field is of type string, this format will be used to parse it, as defined in datetime.strptime(). By default, parsing will follow ISO-8601.

class storey.transformations.Assert(**kwargs)#

Bases: Flow

Exposes an API for testing the flow between steps.

class storey.transformations.Batch(max_events: Optional[int] = None, flush_after_seconds: Optional[Union[int, float]] = None, key_field: Optional[Union[str, Callable[[Event], str]]] = None, **kwargs)#

Bases: _Batching

Batches events into lists of up to max_events events. Each emitted list contained max_events events, unless flush_after_seconds seconds have passed since the first event in the batch was received, at which the batch is emitted with potentially fewer than max_events event.

Parameters:
  • max_events – Maximum number of events per emitted batch. Set to None to emit all events in one batch on flow termination.

  • flush_after_seconds – Maximum number of seconds to wait before a batch is emitted.

  • key – The key by which events are grouped. By default (None), events are not grouped. Other options may be: Set a ‘$key’ to group events by the Event.key property. set a ‘str’ key to group events by Event.body[str]. set a Callable[Any, Any] to group events by a a custom key extractor.

class storey.transformations.Choice(choice_array, default=None, **kwargs)#

Bases: Flow

Redirects each input element into at most one of multiple downstreams.

Parameters:
  • choice_array (tuple of (Flow, Function (Event=>boolean))) – a list of (downstream, condition) tuples, where downstream is a step and condition is a function. The first condition in the list to evaluate as true for an input element causes that element to be redirected to that downstream step.

  • default (Flow) – a default step for events that did not match any condition in choice_array. If not set, elements that don’t match any condition will be discarded.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (Choice).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.Extend(fn, long_running=None, **kwargs)#

Bases: _UnaryFunctionFlow

Adds fields to each incoming event.

Parameters:
  • fn (Function (Event=>Dict)) – Function to transform each event to a dictionary. The fields in the returned dictionary are then added to the original event.

  • long_running (boolean) – Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (Extend).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.Filter(fn, long_running=None, **kwargs)#

Bases: _UnaryFunctionFlow

Filters events based on a user-provided function.

Parameters:
  • fn (Function (Event=>boolean)) – Function to decide whether to keep each event.

  • long_running (boolean) – Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (Filter).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.FlatMap(fn, long_running=None, **kwargs)#

Bases: _UnaryFunctionFlow

Maps, or transforms, each incoming event into any number of events.

Parameters:
  • fn (Function (Event=>list of Event)) – Function to transform each event to a list of events.

  • long_running (boolean) – Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (FlatMap).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

storey.transformations.Flatten(**kwargs)#

Flatten is equivalent to FlatMap(lambda x: x).

class storey.transformations.ForEach(fn, long_running=None, **kwargs)#

Bases: _UnaryFunctionFlow

Applies given function on each event in the stream, passes original event downstream.

class storey.transformations.JoinWithTable(table: Union[Table, str], key_extractor: Union[str, Callable[[Event], str]], attributes: Optional[List[str]] = None, inner_join: bool = False, join_function: Optional[Callable[[Any, Dict[str, object]], Any]] = None, **kwargs)#

Bases: _ConcurrentJobExecution

Joins each event with data from the given table.

Parameters:
  • table – A Table object or name to join with. If a table name is provided, it will be looked up in the context.

  • key_extractor – Key’s column name or a function for extracting the key, for table access from an event.

  • attributes – A comma-separated list of attributes to be queried for. Defaults to all attributes.

  • inner_join – Whether to drop events when the table does not have a matching entry (join_function won’t be called in such a case). Defaults to False.

  • join_function – Joins the original event with relevant data received from the storage. Event is dropped when this function returns None. Defaults to assume the event’s body is a dict-like object and updating it.

  • name – Name of this step, as it should appear in logs. Defaults to class name (JoinWithTable).

  • full_event – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

  • context – Context object that holds global configurations and secrets.

class storey.transformations.Map(fn, long_running=None, **kwargs)#

Bases: _UnaryFunctionFlow

Maps, or transforms, incoming events using a user-provided function.

Parameters:
  • fn (Function (Event=>Event)) – Function to apply to each event

  • long_running (boolean) – Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (Map).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.MapClass(long_running=None, **kwargs)#

Bases: Flow

Similar to Map, but instead of a function argument, this class should be extended and its do() method overridden.

class storey.transformations.MapWithState(initial_state, fn, group_by_key=False, **kwargs)#

Bases: _FunctionWithStateFlow

Maps, or transforms, incoming events using a stateful user-provided function, and an initial state,

which may be a database table.

Parameters:
  • initial_state (dictionary or Table if group_by_key is True. Any object otherwise.) – Initial state for the computation. If group_by_key is True, this must be a dictionary or a Table object.

  • fn (Function ((Event, state)=>(Event, state))) – A function to run on each event and the current state. Must yield an event and an updated state.

  • group_by_key (boolean) – Whether the state is computed by key. Optional. Default to False.

  • full_event (boolean) – Whether fn will receive and return an Event object or only the body (payload). Optional. Defaults to False (body only).

class storey.transformations.Partition(predicate: Callable[[Any], bool], **kwargs)#

Bases: Flow

Partitions events by calling a predicate function on each event. Each processed event results in a Partitioned namedtuple of (left=Optional[Event], right=Optional[Event]).

For a given event, if the predicate function results in True, the event is assigned to left. Otherwise, the event is assigned to right.

Parameters:

predicate – A predicate function that results in a boolean.

class storey.transformations.ReifyMetadata(mapping: Iterable[str], **kwargs)#

Bases: Flow

Inserts event metadata into the event body.

Parameters:
  • mapping – Dictionary from event attribute name to entry key in the event body (which must be a dictionary). Alternatively, an iterable of names may be provided, and these will be used as both attribute name and entry key.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (ReifyMetadata).

class storey.transformations.SampleWindow(window_size: int, emit_period: EmitPeriod = EmitPeriod.FIRST, emit_before_termination: bool = False, key: Optional[Union[str, Callable[[Event], str]]] = None, **kwargs)#

Bases: Flow

Emits a single event in a window of window_size events, in accordance with emit_period and emit_before_termination.

Parameters:
  • window_size – The size of the window we want to sample a single event from.

  • emit_period – What event should this step emit for each window_size (default: EmitPeriod.First). Available options: 1.1) EmitPeriod.FIRST - will emit the first event in a window window_size events. 1.2) EmitPeriod.LAST - will emit the last event in a window of window_size events.

  • emit_before_termination – On termination signal, should the step emit the last event it seen (default: False). Available options: 2.1) True - The last event seen will be emitted downstream. 2.2) False - The last event seen will NOT be emitted downstream.

  • key – The key by which events are sampled. By default (None), events are not sampled by key. Other options may be: Set to ‘$key’ to sample events by the Event.key property. set to ‘str’ key to sample events by Event.body[str]. set a Callable[[Event], str] to sample events by a custom key extractor.

class storey.transformations.SendToHttp(request_builder, join_from_response, **kwargs)#

Bases: _ConcurrentJobExecution

Joins each event with data from any HTTP source. Used for event augmentation.

Parameters:
  • request_builder (Function (Event=>HttpRequest)) – Creates an HTTP request from the event. This request is then sent to its destination.

  • join_from_response (Function ((Event, HttpResponse)=>Event)) – Joins the original event with the HTTP response into a new event.

  • name (string) – Name of this step, as it should appear in logs. Defaults to class name (SendToHttp).

  • full_event (boolean) – Whether user functions should receive and/or return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.ToDataFrame(index: Optional[str] = None, columns: Optional[List[str]] = None, **kwargs)#

Bases: Flow

Create pandas data frame from events. Can appear in the middle of the flow, as opposed to ReduceToDataFrame

Parameters:
  • index – Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.

  • columns – List of column names to be passed as-is to the DataFrame constructor. Optional.

for additional params, see documentation of storey.flow.Flow

Miscellaneous:#

class storey.drivers.Driver#

Bases: object

Abstract class for database connection

class storey.drivers.NeedsV3ioAccess(webapi=None, access_key=None)#

Bases: object

Checks that params for access to V3IO exist and are legal

Parameters:
  • webapi – URL to the web API (https or http). If not set, the V3IO_API environment variable will be used.

  • access_key – V3IO access key. If not set, the V3IO_ACCESS_KEY environment variable will be used.

class storey.drivers.NoopDriver#

Bases: Driver

class storey.drivers.V3ioDriver(webapi: Optional[str] = None, access_key: Optional[str] = None, use_parallel_operations=True, v3io_client_kwargs=None)#

Bases: NeedsV3ioAccess, Driver

Database connection to V3IO. :param webapi: URL to the web API (https or http). If not set, the V3IO_API environment variable will be used. :param access_key: V3IO access key. If not set, the V3IO_ACCESS_KEY environment variable will be used.

async close()#

Closes database connection to V3IO

class storey.dtypes.EmissionType(value)#

Bases: Enum

An enumeration.

class storey.dtypes.EmitAfterMaxEvent(max_events: int, timeout_secs: Optional[int] = None, emission_type=EmissionType.All)#

Bases: EmitPolicy

Emit the Nth event

Parameters:
  • max_events – Which number of event to emit

  • timeout_secs – Emit event after timeout expires even if it didn’t reach max_events event (Optional)

class storey.dtypes.EmitAfterPeriod(delay_in_seconds: Optional[int] = 0, emission_type=EmissionType.All)#

Bases: EmitPolicy

Emit event for next step after each period ends

Parameters:

delay_in_seconds – Delay event emission by seconds (Optional)

class storey.dtypes.EmitAfterWindow(delay_in_seconds: Optional[int] = 0, emission_type=EmissionType.All)#

Bases: EmitPolicy

Emit event for next step after each window ends

Parameters:

delay_in_seconds – Delay event emission by seconds (Optional)

class storey.dtypes.EmitEveryEvent(emission_type=EmissionType.All)#

Bases: EmitPolicy

Emit every event

class storey.dtypes.Event(body: object, key: Optional[Union[str, List[str]]] = None, processing_time: Union[None, datetime, int, float] = None, id: Optional[str] = None, headers: Optional[dict] = None, method: Optional[str] = None, path: Optional[str] = '/', content_type=None, awaitable_result=None)#

Bases: object

The basic unit of data in storey. All steps receive and emit events.

Parameters:
  • body – the event payload, or data

  • key – Event key. Used by steps that aggregate events by key, such as AggregateByKey. (Optional). Can be list

  • processing_time – Event processing time. Defaults to the time the event was created, UTC. (Optional)

  • id – Event identifier. Usually a unique identifier. (Optional)

  • headers – Request headers (HTTP only) (Optional)

  • method – Request method (HTTP only) (Optional)

  • path – Request path (HTTP only) (Optional)

  • content_type – Request content type (HTTP only) (Optional)

  • awaitable_result (AwaitableResult (Optional)) – Generally not passed directly. (Optional)

class storey.dtypes.FieldAggregator(name: str, field: Optional[Union[str, Callable[[Event], object]]], aggr: List[str], windows: Union[FixedWindows, SlidingWindows], aggr_filter: Optional[Callable[[Event], bool]] = None, max_value: Optional[float] = None)#

Bases: object

Field Aggregator represents an set of aggregation features.

Parameters:
  • name – Name for the feature.

  • field – Field in the event body to aggregate.

  • aggr – List of aggregates to apply. Valid values are: [count, sum, sqr, avg, max, min, last, first, sttdev, stdvar]

  • windows – Time windows to aggregate the data by.

  • aggr_filter – Filter specifying which events to aggregate. (Optional)

  • max_value – Maximum value for the aggregation (Optional)

class storey.dtypes.FixedWindow(window: str)#

Bases: WindowBase

Time window representing fixed time interval. The interval will be divided to 10 periods

Parameters:

window – Time window in the format [0-9]+[smhd]

class storey.dtypes.FixedWindowType(value)#

Bases: Enum

An enumeration.

class storey.dtypes.FixedWindows(windows: List[str])#

Bases: WindowsBase

List of time windows representing fixed time intervals. For example: 1h will represent 1h windows starting every round hour.

Parameters:

windows – List of time windows in the format [0-9]+[smhd]

exception storey.dtypes.FlowError#

Bases: Exception

class storey.dtypes.LateDataHandling(value)#

Bases: Enum

An enumeration.

exception storey.dtypes.RedisError#

Bases: Exception

class storey.dtypes.SlidingWindow(window: str, period: str)#

Bases: WindowBase

Time window representing sliding time interval divided to periods.

Parameters:
  • window – Time window in the format [0-9]+[smhd]

  • period – Number of buckets to use for the window [0-9]+[smhd]

class storey.dtypes.SlidingWindows(windows: List[str], period: Optional[str] = None)#

Bases: WindowsBase

List of time windows representing sliding time intervals. For example: 1h will represent 1h windows starting from the current time.

Parameters:
  • windows – List of time windows in the format [0-9]+[smhd]

  • period – Period in the format [0-9]+[smhd]

exception storey.dtypes.V3ioError#

Bases: Exception

class storey.table.Table(table_path: str, storage: Driver, partitioned_by_key: bool = True, flush_interval_secs: Optional[int] = 300, max_updates_in_flight: int = 8)#

Bases: object

Table object, represents a single table in a specific storage.

Parameters:
  • table_path – Path to the table in the storage.

  • storage – Storage driver

  • partitioned_by_key – Whether that data is partitioned by the key or not, based on this indication storage drivers can optimize writes. Defaults to True.

  • flush_interval_secs – How often the cache will be flushed in seconds. None for flush every event. Default is 300 (5 minutes)

  • max_updates_in_flight – Maximum number of concurrent updates.

class storey.aggregations.AggregateByKey(aggregates: ~typing.Union[~typing.List[~storey.dtypes.FieldAggregator], ~typing.List[~typing.Dict[str, object]]], table: ~typing.Union[~storey.table.Table, str], key_field: ~typing.Optional[~typing.Union[str, ~typing.List[str], ~typing.Callable[[~storey.dtypes.Event], object]]] = None, time_field: ~typing.Optional[~typing.Union[str, ~typing.Callable[[~storey.dtypes.Event], object]]] = None, emit_policy: ~typing.Union[~storey.dtypes.EmitPolicy, ~typing.Dict[str, object]] = <storey.dtypes.EmitEveryEvent object>, augmentation_fn: ~typing.Optional[~typing.Callable[[~storey.dtypes.Event, ~typing.Dict[str, object]], ~storey.dtypes.Event]] = None, enrich_with: ~typing.Optional[~typing.List[str]] = None, aliases: ~typing.Optional[~typing.Dict[str, str]] = None, use_windows_from_schema: bool = False, time_format: ~typing.Optional[str] = None, **kwargs)#

Bases: Flow

Aggregates the data into the table object provided for later persistence, and outputs an event enriched with the requested aggregation features. Persistence is done via the NoSqlTarget step and based on the Cache object persistence settings.

Parameters:
  • aggregates – List of aggregates to apply for each event. accepts either list of FieldAggregators or a dictionary describing FieldAggregators.

  • table – A Table object or name for persistence of aggregations. If a table name is provided, it will be looked up in the context object passed in kwargs.

  • key_field – Key field to aggregate by, accepts either a string representing the key field or a key extracting function. Defaults to the key in the event’s metadata. (Optional)

  • time_field – Time field to aggregate by, accepts either a string representing the time field or a time extracting function. Defaults to the processing time in the event’s metadata. (Optional)

  • emit_policy – Policy indicating when the data will be emitted. Defaults to EmitEveryEvent

  • augmentation_fn – Function that augments the features into the event’s body. Defaults to updating a dict. (Optional)

  • enrich_with – List of attributes names from the associated storage object to be fetched and added to every event. (Optional)

  • aliases – Dictionary specifying aliases for enriched or aggregate columns, of the format {‘col_name’: ‘new_col_name’}. (Optional)

  • time_format – If the value of the time field is of type string, this format will be used to parse it, as defined in datetime.strptime(). By default, parsing will follow ISO-8601.

class storey.aggregations.QueryByKey(features: List[str], table: Union[Table, str], key_field: Optional[Union[str, List[str], Callable[[Event], object]]] = None, time_field: Optional[Union[str, List[str], Callable[[Event], object]]] = None, augmentation_fn: Optional[Callable[[Event, Dict[str, object]], Event]] = None, aliases: Optional[Dict[str, str]] = None, fixed_window_type: Optional[FixedWindowType] = FixedWindowType.CurrentOpenWindow, **kwargs)#

Bases: AggregateByKey

Query features by name

Parameters:
  • features – List of features to get.

  • table – A Table object or name for persistence of aggregations. If a table name is provided, it will be looked up in the context object passed in kwargs.

  • key_field – Key field to query by, accepts either a string representing the key field or a key extracting function. Defaults to the key in the event’s metadata. Can be list of keys (Optional)

  • time_field – Time field to query by, accepts either a string representing the time field or a time extracting function. Defaults to the processing time in the event’s metadata. (Optional)

  • augmentation_fn – Function that augments the features into the event’s body. Defaults to updating a dict. (Optional)

  • aliases – Dictionary specifying aliases for enriched or aggregate columns, of the format {‘col_name’: ‘new_col_name’}. (Optional)

  • options – Enum flags specifying query options. (Optional)