Skip to content

Middlewares

Compatibility

Middlewares are a subject to change, as those are representing internal functions.

They don't follow semantic versioning and can introduce breaking changes in minor versions. Patch versions, on the other hand, are guaranteed to not to introduce any breaking changes.

Any breaking changes will be mentioned in release notes.

Some of Repid functions are wrapped to emit events before and after its execution. You can subscribe to those events with your own middlewares.

To create a middleware, you can either use class-based or function-based approach.

Class-based

For example, let's create a middleware which will report errors to Sentry.

import os
import sentry_sdk
from repid import Repid, Connection, InMemoryMessageBroker
from repid.actor import ActorResult

app = Repid(Connection(InMemoryMessageBroker()))  # (1)


class SentryMiddleware:
    def __init__(self, sentry_dsn: str) -> None:
        sentry_sdk.init(dsn=sentry_dsn)

    def after_actor_run(self, result: ActorResult) -> None:  # (2)
        if not result.success:
            sentry_sdk.capture_exception(result.exception)  # (3)


sentry_middleware_instance = SentryMiddleware(os.environ.get("SENTRY_DSN"))  # (4)

app.connection.middleware.add_middleware(sentry_middleware_instance)  # (5)
  1. Create a Repid app and a Connection as you normally would
  2. Create a specifically named method, which will receive specifically named arguments
  3. Report exceptions to Sentry
  4. Initialize the middleware
  5. Pass middleware to Repid's connection

Function-based

Let's replicate the example above using function-based approach.

import os
import sentry_sdk
from repid import Repid, Connection, InMemoryMessageBroker
from repid.actor import ActorResult

app = Repid(Connection(InMemoryMessageBroker()))  # (1)

sentry_sdk.init(dsn=os.environ.get("SENTRY_DSN"))


def after_actor_run(result: ActorResult) -> None:  # (2)
    if not result.success:
        sentry_sdk.capture_exception(result.exception)  # (3)


app.connection.middleware.add_subscriber(after_actor_run)  # (4)
  1. Create a Repid app and a Connection as you normally would
  2. Create a specifically named function, which will receive specifically named arguments
  3. Report exceptions to Sentry
  4. Subscribe the function to the appropriate event

Naming convention, arguments, async

Only functions named accordingly to emitted events (see glossary below) will be executed.

You can optionally include any of the suggested arguments. Only included arguments will be passed to the subscriber. The "after_" functions also contain a special argument result, which represents the return of the function emitting the signal.

You can use both asynchronous & synchronous functions. The latter will be executed in threads to not to block async loop.

Events glossary

Here is the full list of events, their arguments and type hints, which you can subscribe to.

Consume

  • before_consume()
  • after_consume(result: tuple[RoutingKeyT, EncodedPayloadT, ParametersT])

Enqueue

  • before_enqueue(key: RoutingKeyT, payload: EncodedPayloadT, params: ParametersT | None)
  • after_enqueue(key: RoutingKeyT, payload: EncodedPayloadT,params: ParametersT | None, result: None)

Queue Declare

  • before_queue_declare(queue_name: str)
  • after_queue_declare(queue_name: str, result: None)

Queue Flush

  • before_queue_flush(queue_name: str)
  • after_queue_flush(queue_name: str, result: None)

Queue Delete

  • before_queue_delete(queue_name: str)
  • after_queue_delete(queue_name: str, result: None)

Ack

  • before_ack(key: RoutingKeyT)
  • after_ack(key: RoutingKeyT, result: None)

Nack

  • before_nack(key: RoutingKeyT)
  • after_nack(key: RoutingKeyT, result: None)

Reject

  • before_reject(key: RoutingKeyT)
  • after_reject(key: RoutingKeyT, result: None)

Requeue

  • before_requeue(key: RoutingKeyT, payload: str, params: ParametersT | None)
  • after_requeue(key: RoutingKeyT, payload: str, params: ParametersT | None, result: None)

Get Bucket

  • before_get_bucket(id_: str)
  • after_get_bucket(id_: str, result: BucketT | None)

Store Bucket

  • before_store_bucket(id_: str, payload: BucketT)
  • after_store_bucket(id_: str, payload: BucketT, result: None)

Delete Bucket

  • before_delete_bucket(id_: str)
  • after_delete_bucket(id_: str, result: None)

Actor Run

  • before_actor_run(actor: ActorData, key: RoutingKeyT, parameters: ParametersT,payload: str)
  • after_actor_run(actor: ActorData, key: RoutingKeyT, parameters: ParametersT,payload: str, result: ActorResult | None)