coherence.aggregator

EntryAggregator

class coherence.aggregator.EntryAggregator(extractor_or_property: ValueExtractor[T, E] | str | None = None)

Bases: ABC, Generic[R]

An EntryAggregator represents processing that can be directed to occur against some subset of the entries in n cache, resulting in an aggregated result. Common examples of aggregation include functions such as min(), max() and avg(). However, the concept of aggregation applies to any process that needs to evaluate a group of entries to come up with a single answer.

__init__(extractor_or_property: ValueExtractor[T, E] | str | None = None)

Construct an AbstractAggregator that will aggregate values extracted from the cache entries.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

and_then(aggregator: EntryAggregator[R]) EntryAggregator[List[R]]

Returns a coherence.aggregator.CompositeAggregator comprised of this and the provided aggregator.

Parameters:

aggregator – the next aggregator

Returns:

a coherence.aggregator.CompositeAggregator comprised of this and the provided aggregator

AbstractComparableAggregator

class coherence.aggregator.AbstractComparableAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: EntryAggregator[R]

Abstract aggregator that processes values extracted from a set of entries in a Map, with knowledge of how to compare those values. There are two-way to use the AbstractComparableAggregator:

If there are no entries to aggregate, the returned result will be None.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Construct an AbstractComparableAggregator that will aggregate Java-Comparable values extracted from the cache entries.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

AbstractDoubleAggregator

class coherence.aggregator.AbstractDoubleAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: EntryAggregator[Decimal]

Abstract aggregator that processes numeric values extracted from a set of entries in a Map. All the extracted Number objects will be treated as Java double values and the result of the aggregator is a Double. If the set of entries is empty, a None result is returned.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Construct an AbstractDoubleAggregator that will aggregate numeric values extracted from the cache entries.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

CompositeAggregator

class coherence.aggregator.CompositeAggregator(aggregators: list[EntryAggregator[R]])

Bases: EntryAggregator[List[R]]

CompositeAggregator provides an ability to execute a collection of aggregators against the same subset of the entries in a Map, resulting in a list of corresponding aggregation results. The size of the returned list will always be equal to the length of the aggregators list.

__init__(aggregators: list[EntryAggregator[R]])

Construct a CompositeAggregator based on a specified coherence.aggregator.EntryAggregator list.

Parameters:

aggregators – an array of coherence.aggregator.EntryAggregator objects; may not be None

MaxAggregator

class coherence.aggregator.MaxAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: AbstractComparableAggregator[R]

Calculates a maximum of numeric values extracted from a set of entries in a Map in a form of a numerical value. All the extracted objects will be treated as numerical values. If the set of entries is empty, a None result is returned.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Constructs a new MaxAggregator.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

MinAggregator

class coherence.aggregator.MinAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: AbstractComparableAggregator[R]

Calculates a minimum of numeric values extracted from a set of entries in a Map in a form of a numerical value. All the extracted objects will be treated as numerical values. If the set of entries is empty, a None result is returned.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Constructs a new MinAggregator.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

SumAggregator

class coherence.aggregator.SumAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: AbstractDoubleAggregator

Calculates a sum for values of any numeric type extracted from a set of entries in a Map in a form of a numeric value.

If the set of entries is empty, a ‘None’ result is returned.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Constructs a new SumAggregator.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

AverageAggregator

class coherence.aggregator.AverageAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: AbstractDoubleAggregator

Calculates an average for values of any numeric type extracted from a set of entries in a Map in a form of a numerical value. All the extracted objects will be treated as numerical values. If the set of entries is empty, a None result is returned.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Construct an AverageAggregator that will sum numeric values extracted from the cache entries.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

CountAggregator

class coherence.aggregator.CountAggregator

Bases: EntryAggregator[int]

Calculates a number of values in an entry set.

__init__() None

Constructs a new CountAggregator.

DistinctValuesAggregator

class coherence.aggregator.DistinctValuesAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: EntryAggregator[R]

Return the set of unique values extracted from a set of entries in a Map. If the set of entries is empty, an empty array is returned.

This aggregator could be used in combination with coherence.extractor.UniversalExtractor allowing to collect all unique combinations (tuples) of a given set of attributes.

The DistinctValues aggregator covers a simple case of a more generic aggregation pattern implemented by the GroupAggregator, which in addition to collecting all distinct values or tuples, runs an aggregation against each distinct entry set (group).

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Construct a DistinctValuesAggregator that will aggregate numeric values extracted from the cache entries.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

TopAggregator

class coherence.aggregator.TopAggregator(number: int = 0, inverse: bool = False, extractor: ~coherence.extractor.ValueExtractor[~typing.Any, ~typing.Any] = <coherence.extractor.IdentityExtractor object>, comparator: ~coherence.comparator.Comparator | None = None, property_name: str | None = None)

Bases: Generic[E, R], EntryAggregator[List[R]]

TopAggregator aggregates the top N extracted values into an array. The extracted values must not be None, but do not need to be unique.

__init__(number: int = 0, inverse: bool = False, extractor: ~coherence.extractor.ValueExtractor[~typing.Any, ~typing.Any] = <coherence.extractor.IdentityExtractor object>, comparator: ~coherence.comparator.Comparator | None = None, property_name: str | None = None)

Constructs a new TopAggregator.

Parameters:
  • number – the maximum number of results to include in the aggregation result.

  • inverse – Result order. By default, results will be ordered in descending order.

  • extractor – The extractor to obtain the values to aggregate. If not explicitly set, this will default to an coherence.extractor.IdentityExtractor.

  • comparator – The coherence.comparator.Comparator to apply against the extracted values.

  • property_name – The property that results will be ordered by.

property ascending: TopAggregator[E, R]

Sort the returned values in ascending order.

Returns:

an instance of coherence.aggregator.TopAggregator

property descending: TopAggregator[E, R]

Sort the returned values in descending order.

Returns:

an instance of coherence.aggregator.TopAggregator

extract(property_name: str) TopAggregator[E, R]

The property name of the value to extract.

Parameters:

property_name – the property name

Returns:

order_by(property_name: str) TopAggregator[E, R]

Order the results based on the values of the specified property.

Parameters:

property_name – the property name

Returns:

an instance of coherence.aggregator.TopAggregator

GroupAggregator

class coherence.aggregator.GroupAggregator(extractor_or_property: ValueExtractor[T, E] | str, aggregator: EntryAggregator[R], filter: Filter | None = None)

Bases: EntryAggregator[R]

The GroupAggregator provides an ability to split a subset of entries in a Map into a collection of non-intersecting subsets and then aggregate them separately and independently. The splitting (grouping) is performed using the results of the underlying coherence.extractor.UniversalExtractor in such a way that two entries will belong to the same group if and only if the result of the corresponding extract call produces the same value or tuple (list of values). After the entries are split into the groups, the underlying aggregator is applied separately to each group. The result of the aggregation by the` GroupAggregator` is a Map that has distinct values (or tuples) as keys and results of the individual aggregation as values. Additionally, those results could be further reduced using an optional coherence.filter.Filter object.

Informally speaking, this aggregator is analogous to the SQL group by and having clauses. Note that the having Filter is applied independently on each server against the partial aggregation results; this generally implies that data affinity is required to ensure that all required data used to generate a given result exists within a single cache partition. In other words, the group by predicate should not span multiple partitions if the having clause is used.

The GroupAggregator is somewhat similar to the DistinctValues aggregator, which returns back a list of distinct values (tuples) without performing any additional aggregation work.

__init__(extractor_or_property: ValueExtractor[T, E] | str, aggregator: EntryAggregator[R], filter: Filter | None = None)

Construct a GroupAggregator based on a specified coherence.extractor.ValueExtractor and underlying coherence.aggregator.EntryAggregator.

Parameters:
  • extractor_or_property – a coherence.extractor.ValueExtractor object that is used to split entries into non-intersecting subsets; may not be None. This parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; may not be NONE

  • aggregator – an EntryAggregator object; may not be null

  • filter – an optional Filter object used to filter out results of individual group aggregation results

Timeout

class coherence.aggregator.Timeout(value)

Bases: IntEnum

An enumeration.

DEFAULT: int = 0

A special timeout value to indicate that the corresponding service’s default timeout value should be used.

NONE: int = -1

A special timeout value to indicate that this task or request can run indefinitely.

Schedule

class coherence.aggregator.Schedule(value)

Bases: Enum

An enumeration.

FIRST = 1

Scheduling value indicating that this task is to be queued in front of any equal or lower scheduling priority tasks and executed as soon as any of the worker threads become available.

IMMEDIATE = 2

Scheduling value indicating that this task is to be immediately executed by any idle worker thread; if all of them are active, a new thread will be created to execute this task.

STANDARD = 0

Scheduling value indicating that this task is to be queued and execute in a natural (based on the request arrival time) order.

PriorityAggregator

class coherence.aggregator.PriorityAggregator(aggregator: EntryAggregator[R], execution_timeout: int = Timeout.DEFAULT, request_timeout: int = Timeout.DEFAULT, scheduling_priority: Schedule = Schedule.STANDARD)

Bases: Generic[R], EntryAggregator[R]

A PriorityAggregator is used to explicitly control the scheduling priority and timeouts for execution of EntryAggregator-based methods.

For example, lets assume that there is an Orders cache that belongs to a partitioned cache service configured with a request-timeout and task-timeout of 5 seconds. Also assume that we are willing to wait longer for a particular aggregation request that scans the entire cache. Then we could override the default timeout values by using the PriorityAggregator as follows:

sumAggr = SumAggregator("cost")
priorityAgg = PriorityAggregator(sumAggr)
priorityAgg.executionTimeout = Timeout.NONE
priorityAgg.requestTimeout = Timeout.NONE
cacheOrders.aggregate(aFilter, priorityAgg)

This is an advanced feature which should be used judiciously.

__init__(aggregator: EntryAggregator[R], execution_timeout: int = Timeout.DEFAULT, request_timeout: int = Timeout.DEFAULT, scheduling_priority: Schedule = Schedule.STANDARD)

Construct a new PriorityAggregator.

Parameters:
  • aggregator – The wrapped coherence.aggregator.EntryAggregator.

  • execution_timeout – The task execution timeout value.

  • request_timeout – The request timeout value.

  • scheduling_priority – The scheduling priority.

property execution_timeout_in_millis: int

Return the execution timeout in milliseconds.

Returns:

the execution timeout

property request_timeout_in_millis: int

Return the request timeout in milliseconds.

Returns:

the request timeout

property scheduling_priority: Schedule

Return the scheduling priority or, if not explicitly set, the default is coherence.aggregator.Schedule.STANDARD

Returns:

the scheduling priority

QueryRecorder

class coherence.aggregator.QueryRecorder(query_type: RecordType)

Bases: EntryAggregator[Any]

This aggregator is used to produce an object that contains an estimated or actual cost of the query execution for a given coherence.filter.Filter.

For example, the following code will print a QueryRecord, containing the estimated query cost and corresponding execution steps:

agent  = QueryRecorder(RecordType.EXPLAIN);
record = cache.aggregate(someFilter, agent);
print(json.dumps(record));
__init__(query_type: RecordType)

Construct a new QueryRecorder.

Parameters:

query_type – the type for this aggregator

EXPLAIN: str = 'EXPLAIN'

String constant for serialization purposes.

TRACE: str = 'TRACE'

String constant for serialization purposes.

ReducerAggregator

class coherence.aggregator.ReducerAggregator(extractor_or_property: ValueExtractor[T, E] | str)

Bases: EntryAggregator[R]

The ReducerAggregator is used to implement functionality similar to coherence.client.NamedMap.getAll( ) API. Instead of returning the complete set of values, it will return a portion of value attributes based on the provided coherence.extractor.ValueExtractor.

This aggregator could be used in combination with {@link MultiExtractor} allowing one to collect tuples that are a subset of the attributes of each object stored in the cache.

__init__(extractor_or_property: ValueExtractor[T, E] | str)

Creates a new ReducerAggregator.

Parameters:

extractor_or_property – the extractor that provides values to aggregate or the name of the method that could be invoked via Java reflection and that returns values to aggregate; this parameter can also be a dot-delimited sequence of method names which would result in an aggregator based on the coherence.extractor.ChainedExtractor that is based on an array of corresponding coherence.extractor.UniversalExtractor objects; must not be None

Aggregators

class coherence.aggregator.Aggregators

Simple Aggregator DSL.

The methods in this class are for the most part simple factory methods for various coherence.aggregator.EntryAggregator classes, but in some cases provide additional type safety. They also tend to make the code more readable, especially if imported statically, so their use is strongly encouraged in lieu of direct construction of coherence.aggregator.EntryAggregator classes.

static average(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[Decimal]

Return an aggregator that calculates an average of the numeric values extracted from a set of entries in a Map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that calculates an average of the numeric values extracted from a set of entries in a Map.

static count() EntryAggregator[int]

Return an aggregator that calculates a number of values in an entry set.

Returns:

an aggregator that calculates a number of values in an entry set.

static distinct(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[List[R]]

Return an aggregator that calculates the set of distinct values from the entries in a Map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that calculates the set of distinct values from the entries in a Map.

static group_by(extractor_or_property: ValueExtractor[T, E] | str, aggregator: EntryAggregator[Any], filter: Filter | None = None) EntryAggregator[Dict[G, T]]

Return a coherence.aggregator.GroupAggregator based on a specified property or method name(s) and an coherence.aggregator.EntryAggregator.

Parameters:
Returns:

a coherence.aggregator.GroupAggregator based on a specified property or method name(s) and an coherence.aggregator.EntryAggregator.

static max(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[R]

Return an aggregator that calculates a maximum of the numeric values extracted from a set of entries in a Map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that calculates a maximum of the numeric values extracted from a set of entries in a Map

static min(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[R]

Return an aggregator that calculates a minimum of the numeric values extracted from a set of entries in a Map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that calculates a minimum of the numeric values extracted from a set of entries in a Map.

static priority(aggregator: EntryAggregator[R], execution_timeout: Timeout = Timeout.DEFAULT, request_timeout: Timeout = Timeout.DEFAULT, scheduling_priority: Schedule = Schedule.STANDARD) EntryAggregator[R]

Return a new coherence.aggregator.PriorityAggregator to control scheduling priority of an aggregation operation.

Parameters:
Returns:

a new coherence.aggregator.PriorityAggregator to control scheduling priority of an aggregation operation.

static record(query_type: RecordType = RecordType.EXPLAIN) EntryAggregator[Any]

Returns a new coherence.aggregator.QueryRecorder aggregator which may be used is used to produce an object that contains an estimated or actual cost of the query execution for a given coherence.filter.Filter.

Parameters:

query_type – the coherence.aggregator.RecordType

Returns:

a new coherence.aggregator.QueryRecorder aggregator which may be used is used to produce an object that contains an estimated or actual cost of the query execution for a given coherence.filter.Filter.

static reduce(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[Dict[K, Any | List[Any]]]

Return an aggregator that will return the extracted value for each entry in the map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that will return the extracted value for each entry in the map.

static script(language: str, script_name: str, characteristics: int = 0, *args: Any) EntryAggregator[R]

Return an aggregator that is implemented in a script using the specified language.

Parameters:
  • language – The language with which the script is written in.

  • script_name – The name of the coherence.aggregator.EntryAggregator that needs to be evaluated.

  • characteristics – Present only for serialization purposes.

  • args – The arguments to be passed to the script for evaluation

Returns:

an aggregator that is implemented in a script using the specified language.

static sum(extractor_or_property: ValueExtractor[T, E] | str) EntryAggregator[Decimal]

Return an aggregator that calculates a sum of the numeric values extracted from a set of entries in a Map.

Parameters:

extractor_or_property – the extractor or method/property name to provide values for aggregation

Returns:

an aggregator that calculates a sum of the numeric values extracted from a set of entries in a Map.

static top(count: int) TopAggregator[Any, Any]

Return an aggregator that aggregates the top N extracted values into an array.

Parameters:

count – the maximum number of results to include in the aggregation result

Returns:

an aggregator that aggregates the top N extracted values into an array.