pyrate_limiter.abstracts.bucket module

Implement this class to create a workable bucket for Limiter to use

class pyrate_limiter.abstracts.bucket.AbstractBucket

Bases: ABC

Base bucket interface Assumption: len(rates) always > 0 TODO: allow empty rates

abstract count()

Count number of items in the bucket

Return type:

Union[int, Awaitable[int]]

failing_rate = None
abstract flush()

Flush the whole bucket - Must remove failing-rate after flushing

Return type:

Optional[Awaitable[None]]

abstract leak(current_timestamp=None)

leaking bucket - removing items that are outdated

Return type:

Union[int, Awaitable[int]]

abstract peek(index)

Peek at the rate-item at a specific index in latest-to-earliest order NOTE: The reason we cannot peek from the start of the queue(earliest-to-latest) is we can’t really tell how many outdated items are still in the queue

Return type:

Union[RateItem, None, Awaitable[Optional[RateItem]]]

abstract put(item)

Put an item (typically the current time) in the bucket return true if successful, otherwise false

Return type:

Union[bool, Awaitable[bool]]

rates
waiting(item)

Calculate time until bucket become availabe to consume an item again

Return type:

Union[int, Awaitable[int]]

class pyrate_limiter.abstracts.bucket.BucketFactory

Bases: ABC

Asbtract BucketFactory class. It is reserved for user to implement/override this class with his own bucket-routing/creating logic

create(clock, bucket_class, *args, **kwargs)

Creating a bucket dynamically

Return type:

AbstractBucket

dispose(bucket)

Delete a bucket from the factory

Return type:

bool

abstract get(item)

Get the corresponding bucket to this item

Return type:

Union[AbstractBucket, Awaitable[AbstractBucket]]

get_buckets()

Iterator over all buckets in the factory

Return type:

List[AbstractBucket]

property leak_interval

Retrieve leak-interval from inner Leaker task

schedule_leak(new_bucket, associated_clock)

Schedule all the buckets’ leak, reset bucket’s failing rate

Return type:

None

abstract wrap_item(name, weight=1)

Add the current timestamp to the receiving item using any clock backend - Turn it into a RateItem - Can return either a coroutine or a RateItem instance

Return type:

Union[RateItem, Awaitable[RateItem]]

class pyrate_limiter.abstracts.bucket.Leaker(leak_interval)

Bases: Thread

Responsible for scheduling buckets’ leaking at the background either through a daemon task(for sync buckets) or a task using asyncio.Task

aio_leak_task = None
async_buckets = None
clocks = None
daemon = True
deregister(bucket_id)

Deregister a bucket

Return type:

bool

leak_async()
leak_interval = 10000
name = "PyrateLimiter's Leaker"
register(bucket, clock)

Register a new bucket with its associated clock

run()

Override the original method of Thread Not meant to be called directly

Return type:

None

start()

Override the original method of Thread Call to run leaking sync buckets

Return type:

None

sync_buckets = None