Skip to content

Module

pglock

pglock.Cancel

Cancel(**filters)

Bases: PrioritizeSideEffect

The side effect for canceling blocking locks when using pglock.prioritize.

Calls cancel_blocking_activity on the blocked lock queryset.

Supply a duration to only cancel queries lasting greater than the duration.

Source code in pglock/core.py
def __init__(self, **filters):
    self.filters = filters

pglock.PrioritizeSideEffect

PrioritizeSideEffect(**filters)

Bases: SideEffect

Base class for pglock.prioritize side effects.

Must override the worker method, which takes a pglock.models.BlockedPGLock queryset of all locks that are blocking the prioritized process.

Return the process IDs or blocked locks that were handled.

Prioritize side effects take optional filters when initialize, which are passed to the underlying pglock.models.BlockedPGLock queryset.

Source code in pglock/core.py
def __init__(self, **filters):
    self.filters = filters

pglock.Raise

Bases: SideEffect

The side effect for raising an error on lock acquisition failure when using pglock.advisory or pglock.model.

pglock.Return

Bases: SideEffect

The side effect for returning the lock status when using pglock.advisory or pglock.model.

pglock.SideEffect

The base class for side effects

pglock.Skip

Bases: SideEffect

The side effect for skipping wrapped code on lock acquisition error when using pglock.advisory as a decorator.

pglock.Terminate

Terminate(**filters)

Bases: PrioritizeSideEffect

The side effect for terminating blocking locks when using pglock.prioritize.

Calls teminate_blocking_activity on the blocked lock queryset.

Supply a duration to only terminate queries lasting greater than the duration.

Source code in pglock/core.py
def __init__(self, **filters):
    self.filters = filters

pglock.advisory

advisory(
    lock_id=None,
    *,
    shared=False,
    using=DEFAULT_DB_ALIAS,
    timeout=_unset,
    side_effect=None
)

Bases: ContextDecorator

Obtain an advisory lock.

When using the default side effect, returns True if the lock was acquired or False if not.

Parameters:

Name Type Description Default
lock_id Union[str, int], default=None

The ID of the lock. When using the decorator, it defaults to the full module path and function name of the wrapped function. It must be supplied to the context manager.

None
shared bool, default=False

When True, creates a shared advisory lock. Consult the Postgres docs <https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS>__ for more information.

False
using str, default="default"

The database to use.

DEFAULT_DB_ALIAS
timeout Union[int, float, timedelta, None]

Set a timeout when waiting for the lock. This timeout only applies to the lock acquisition statement and not the wrapped code. If 0, pg_try_advisory_lock will be used to return immediately. If None, an infinite timeout will be used. When using a timeout, the acquisition status will be returned when running as a context manager. Use the side_effect argument to change the runtime behavior.

_unset
side_effect str

Adjust the runtime behavior when using a timeout. pglock.Return will return the acquisition status when using the context manager. pglock.Raise will raise a django.db.utils.OperationalError if the lock cannot be acquired or a timeout happens. pglock.Skip will skip decoratored code if the lock cannot be acquired. Defaults to pglock.Return when used as a context manager or pglock.Raise when used as a decorator.

None

Raises:

Type Description
OperationalError

When a lock cannot be acquired or a timeout happens when using side_effect=pglock.Raise.

ValueError

If an invalid side_effect is provided or no lock ID is supplied for the context manager.

TypeError

If the lock ID is not a string or int.

Source code in pglock/core.py
def __init__(
    self,
    lock_id=None,
    *,
    shared=False,
    using=DEFAULT_DB_ALIAS,
    timeout=_unset,
    side_effect=None,
):
    """Acquire an advisory lock"""
    self.lock_id = lock_id
    self.using = using
    self.side_effect = side_effect
    self.shared = shared
    self.timeout = _cast_timeout(timeout)

    # Use pg_try_advisory.. when a timeout of 0 has been applied.
    self.nowait = isinstance(self.timeout, dt.timedelta) and not self.timeout

    # "_func" will be set if a function is wrapped
    self._func = None

pglock.advisory_id

advisory_id(lock_id: Union[str, int]) -> Tuple[int, int]

Given a lock ID, return the (classid, objid) tuple that Postgres uses for the advisory lock in the pg_locks table,

Parameters:

Name Type Description Default
lock_id Union[str, int]

The lock ID

required

Returns:

Type Description
Tuple[int, int]

The (classid, objid) tuple

Source code in pglock/core.py
def advisory_id(lock_id: Union[str, int]) -> Tuple[int, int]:
    """
    Given a lock ID, return the (classid, objid) tuple that Postgres uses
    for the advisory lock in the pg_locks table,

    Args:
        lock_id: The lock ID

    Returns:
        The (classid, objid) tuple
    """
    lock_id = _cast_lock_id(lock_id)
    return lock_id >> 32, lock_id & 0xFFFFFFFF

pglock.model

model(
    *models: Union[str, models.Model],
    mode: str = ACCESS_EXCLUSIVE,
    using: str = DEFAULT_DB_ALIAS,
    timeout: Union[int, float, dt.timedelta, None] = _unset,
    side_effect: SideEffect = Return
) -> bool

Lock model(s).

Parameters:

Name Type Description Default
*models Union[str, Model]

Model paths (e.g. "app_label.Model") or classes to lock.

()
mode str

The lock mode. See the Postgres docs for a list of all modes and what they mean. There is a constant for each one in the pglock module, e.g. pglock.ACCESS_SHARE.

ACCESS_EXCLUSIVE
using str

The database to use.

DEFAULT_DB_ALIAS
timeout Union[int, float, timedelta, None]

Set a timeout when waiting for the lock. If 0, NOWAIT will be used to return immediately. If None, the timeout is infinite. When using a timeout, the acquisition status will be returned. Use the side_effect argument to change the runtime behavior.

_unset
side_effect SideEffect

Adjust the runtime behavior when using a timeout. pglock.Return will return the acquisition status. pglock.Raise will raise a django.db.utils.OperationalError if the lock cannot be acquired or a timeout happens.

Return

Returns:

Type Description
bool

When using the default side effect, returns True if the lock was acquired or False if not.

Raises:

Type Description
OperationalError

If side_effect=pglock.Raise and a lock cannot be acquired or a timeout occurs.

RuntimeError

When running code outside of a transaction.

ValueError

When side_effect is an invalid value or no models are supplied.

TypeError

When timeout is an invalid type.

Source code in pglock/core.py
def model(
    *models: Union[str, models.Model],
    mode: str = ACCESS_EXCLUSIVE,
    using: str = DEFAULT_DB_ALIAS,
    timeout: Union[int, float, dt.timedelta, None] = _unset,
    side_effect: SideEffect = Return,
) -> bool:
    """Lock model(s).

    Args:
        *models: Model paths (e.g. "app_label.Model") or classes to lock.
        mode: The lock mode. See the
            [Postgres docs](https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-TABLES)
            for a list of all modes and what they mean. There is a constant for each one in the
            `pglock` module, e.g. `pglock.ACCESS_SHARE`.
        using: The database to use.
        timeout: Set a timeout when waiting for the lock. If 0, `NOWAIT` will be used to return
            immediately. If `None`, the timeout is infinite. When using a timeout, the acquisition
            status will be returned. Use the `side_effect` argument to change the runtime behavior.
        side_effect: Adjust the runtime behavior when using a timeout. `pglock.Return` will return
            the acquisition status. `pglock.Raise` will raise a `django.db.utils.OperationalError`
            if the lock cannot be acquired or a timeout happens.

    Returns:
        When using the default side effect, returns `True` if the lock was acquired or `False` if not.

    Raises:
        django.db.utils.OperationalError: If `side_effect=pglock.Raise` and a lock cannot be
            acquired or a timeout occurs.
        RuntimeError: When running code outside of a transaction.
        ValueError: When `side_effect` is an invalid value or no models are supplied.
        TypeError: When `timeout` is an invalid type.
    """  # noqa
    timeout = _cast_timeout(timeout)
    side_effect = side_effect.__class__ if not inspect.isclass(side_effect) else side_effect

    if side_effect not in (Return, Raise):
        raise ValueError("side_effect must be one of pglock.Return or pglock.Raise")

    if not models:
        raise ValueError("Must supply at least one model to pglock.model().")

    if not connections[using].in_atomic_block:
        raise RuntimeError(f'Database "{using}" must be in a transaction to lock models.')

    # Use NOWAIT when a timeout of 0 has been applied.
    nowait = isinstance(timeout, dt.timedelta) and not timeout
    models = [apps.get_model(model) if isinstance(model, str) else model for model in models]
    models = ", ".join(f'"{model._meta.db_table}"' for model in models)
    sql = f'LOCK TABLE {models} IN {mode} MODE {"NOWAIT" if nowait else ""}'

    try:
        with contextlib.ExitStack() as stack:
            if side_effect == Return:
                # If returning True/False, create a savepoint so that
                # the transaction isn't in an errored state when returning.
                stack.enter_context(transaction.atomic(using=using))

            # Set the lock timeout when either `None` or a non-zero
            # timeout has been supplied.
            if timeout is not _unset and not nowait:
                stack.enter_context(lock_timeout(timeout, using=using))

            with connections[using].cursor() as cursor:
                cursor.execute(sql)
                return True
    except OperationalError as exc:
        if side_effect == Return:
            return False
        elif side_effect == Raise:
            raise
        else:
            raise AssertionError from exc

pglock.prioritize

prioritize(
    *,
    interval: Union[int, float, dt.timedelta] = 1,
    periodic: bool = True,
    using: str = DEFAULT_DB_ALIAS,
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    side_effect: PrioritizeSideEffect = Terminate
) -> None

Kill any blocking locks.

pglock.prioritize has a periodic background worker thread that checks for blocking activity and terminates it.

Parameters:

Name Type Description Default
interval Union[int, float, timedelta]

The interval (in seconds) at which the background worker runs.

1
periodic bool

If the worker should be ran periodically. If False, blocking locks are only killed once after the initial interval has happened.

True
using str

The database to use.

DEFAULT_DB_ALIAS
timeout Union[timedelta, int, float, None]

The lock timeout to apply to the wrapped code. This is synonymous with using with with pglock.prioritize(), pglock.timeout(). Although the background worker should properly terminate blocking locks, this serves as a backup option to ensure wrapped code doesn't block for too long. Never use a timeout that is less than interval.

_unset
side_effect PrioritizeSideEffect

The side effect called by the background worker. Supplied a BlockedPGLock queryset of locks blocking the prioritized code. Returns a list of all blocking PIDs that have been handled. The default side effect of pglock.Terminte will terminate blocking processes. pglock.Cancel is another side effect that can be used to cancel blocking processes.

Terminate

Raises:

Type Description
OperationalError

If timeout is used and the timeout expires.

Source code in pglock/core.py
@contextlib.contextmanager
def prioritize(
    *,
    interval: Union[int, float, dt.timedelta] = 1,
    periodic: bool = True,
    using: str = DEFAULT_DB_ALIAS,
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    side_effect: PrioritizeSideEffect = Terminate,
) -> None:
    """Kill any blocking locks.

    `pglock.prioritize` has a periodic background worker thread that checks for blocking activity
    and terminates it.

    Args:
        interval: The interval (in seconds) at which the background worker runs.
        periodic: If the worker should be ran periodically. If False, blocking locks are
            only killed once after the initial interval has happened.
        using: The database to use.
        timeout: The lock timeout to apply to the wrapped code. This is synonymous with using with
            `with pglock.prioritize(), pglock.timeout()`. Although the background worker should
            properly terminate blocking locks, this serves as a backup option
            to ensure wrapped code doesn't block for too long. Never use a `timeout` that
            is less than `interval`.
        side_effect: The side effect called by the background worker. Supplied a `BlockedPGLock`
            queryset of locks blocking the prioritized code. Returns a list of all blocking PIDs
            that have been handled. The default side effect of `pglock.Terminte` will terminate
            blocking processes. `pglock.Cancel` is another side effect that can be used to cancel
            blocking processes.

    Raises:
        django.db.utils.OperationalError: If `timeout` is used and the timeout expires.
    """
    side_effect = side_effect() if inspect.isclass(side_effect) else side_effect

    if isinstance(interval, (int, float)):
        interval = dt.timedelta(seconds=interval)

    if not isinstance(interval, dt.timedelta):
        raise TypeError('"interval" argument must be an int, float, or timedelta instance')

    backend_pid = pgactivity.pid(using=using)
    timer_class = _PeriodicTimer if periodic else threading.Timer
    killer_thread = timer_class(
        interval.total_seconds(), _prioritize_bg_task, (backend_pid, side_effect)
    )
    killer_thread.start()

    try:
        with contextlib.ExitStack() as stack:
            if timeout is not _unset:
                stack.enter_context(lock_timeout(timeout, using=using))

            yield
    finally:
        killer_thread.cancel()

pglock.timeout

timeout(
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    *,
    using: str = DEFAULT_DB_ALIAS,
    **timedelta_kwargs: int
)

Set the lock timeout as a decorator or context manager.

A value of None will set an infinite lock timeout. A value of less than a millisecond is not permitted.

Nested invocations will successfully apply and rollback the timeout to the previous value.

Parameters:

Name Type Description Default
timeout Union[timedelta, int, float, None]

The number of seconds as an integer or float. Use a timedelta object to precisely specify the timeout interval. Use None for an infinite timeout.

_unset
using str

The database to use.

DEFAULT_DB_ALIAS
**timedelta_kwargs int

Keyword arguments to directly supply to datetime.timedelta to create an interval. E.g. pglock.timeout(seconds=1, milliseconds=100) will create a timeout of 1100 milliseconds.

{}

Raises:

Type Description
OperationalError

When a timeout occurs

TypeError

When the timeout interval is an incorrect type

Source code in pglock/core.py
@contextlib.contextmanager
def lock_timeout(
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    *,
    using: str = DEFAULT_DB_ALIAS,
    **timedelta_kwargs: int,
):
    """Set the lock timeout as a decorator or context manager.

    A value of `None` will set an infinite lock timeout.
    A value of less than a millisecond is not permitted.

    Nested invocations will successfully apply and rollback the timeout to
    the previous value.

    Args:
        timeout: The number of seconds as an integer or float. Use a timedelta object to
            precisely specify the timeout interval. Use `None` for an infinite timeout.
        using: The database to use.
        **timedelta_kwargs: Keyword arguments to directly supply to
            datetime.timedelta to create an interval. E.g.
            `pglock.timeout(seconds=1, milliseconds=100)`
            will create a timeout of 1100 milliseconds.

    Raises:
        django.db.utils.OperationalError: When a timeout occurs
        TypeError: When the timeout interval is an incorrect type
    """
    if timedelta_kwargs:
        timeout = dt.timedelta(**timedelta_kwargs)
    elif timeout is _unset:
        raise ValueError("Must supply a value to pglock.timeout")

    if timeout is not None:
        timeout = _cast_timeout(timeout)

        if not timeout:
            raise ValueError(
                "Must supply value greater than a millisecond to pglock.timeout or use `None` to"
                " reset the timeout."
            )
    else:
        timeout = dt.timedelta()

    if not hasattr(_timeout, "value"):
        _timeout.value = None

    old_timeout = _timeout.value
    _timeout.value = int(timeout.total_seconds() * 1000)

    try:
        with connections[using].cursor() as cursor:
            cursor.execute(f"SELECT set_config('lock_timeout', '{_timeout.value}', false)")
            yield
    finally:
        _timeout.value = old_timeout

        with connections[using].cursor() as cursor:
            if not _is_transaction_errored(cursor):
                if _timeout.value is None:
                    cursor.execute("SELECT set_config('lock_timeout', NULL, false)")
                else:
                    cursor.execute(f"SELECT set_config('lock_timeout', '{_timeout.value}', false)")

pglock.models

pglock.models.BlockedPGLock

Bases: BasePGLock

Models a blocked lock.

Uses Postgres's pg_blocking_pids function to unnest and denormalize any blocking activity for a lock, returning both the activity and blocking activity as a row.

Attributes:

Name Type Description
blocking_activity ForeignKey[PGActivity]

The activity that's blocking the lock.

pglock.models.BlockedPGLockQuerySet

BlockedPGLockQuerySet(model=None, query=None, using=None, hints=None)

Bases: PGLockQuerySet

The Queryset for the BlockedPGLock model. Inherits PGLockQuerySet

Source code in pglock/models.py
def __init__(self, model=None, query=None, using=None, hints=None):
    if query is None:
        query = PGTableQuery(model)

    super().__init__(model, query, using, hints)

cancel_blocking_activity

cancel_blocking_activity()

Cancel all PIDs in the blocking_activity field of the filtered queryset

Source code in pglock/models.py
def cancel_blocking_activity(self):
    """Cancel all PIDs in the `blocking_activity` field of the filtered queryset"""
    pids = list(self.values_list("blocking_activity_id", flat=True).distinct())
    return pgactivity.cancel(*pids, using=self.db)

terminate_blocking_activity

terminate_blocking_activity()

Terminate all PIDs in the blocking_activity field of the filtered queryset

Source code in pglock/models.py
def terminate_blocking_activity(self):
    """Terminate all PIDs in the `blocking_activity` field of the filtered queryset"""
    pids = list(self.values_list("blocking_activity_id", flat=True).distinct())
    return pgactivity.terminate(*pids, using=self.db)

pglock.models.PGLock

Bases: BasePGLock

Wraps Postgres's pg_locks view.

Attributes:

Name Type Description
type CharField

The type of lock. One of RELATION, EXTEND, FROZENID, PAGE, TUPLE, TRANSACTIONID, VIRTUALXID, SPECTOKEN, OBJECT, USERLOCK, or ADVISORY.

activity ForeignKey[PGActivity]

The activity from pg_stats_activity this lock references.

mode CharField

The mode of lock. One of ACCESS_SHARE, ROW_SHARE, ROW_EXCLUSIVE, SHARE_UPDATE_EXCLUSIVE, SHARE, SHARE_ROW_EXCLUSIVE, EXCLUSIVE, ACCESS_EXCLUSIVE.

granted BooleanField

True if the lock has been granted, False if the lock is blocked by another.

wait_start DateTimeField

When the lock started waiting. Only available in Postgres 14 and up.

wait_duration DurationField

How long the lock has been blocked. Only available in Postgres 14 and up

rel_kind CharField

The kind of relation being locked. One of TABLE, INDEX, SEQUENCE, TOAST, VIEW, MATERIALIZED_VIEW, COMPOSITE_TYPE, FOREIGN_TABLE, PARTITIONED_TABLE, or PARTITIONED_INDEX.

rel_name CharField

The name of the relation. E.g. the table name when rel_kind=TABLE.

pglock.models.PGLockQuerySet

PGLockQuerySet(model=None, query=None, using=None, hints=None)

Bases: PGTableQuerySet

The Queryset for the PGLock model.

Source code in pglock/models.py
def __init__(self, model=None, query=None, using=None, hints=None):
    if query is None:
        query = PGTableQuery(model)

    super().__init__(model, query, using, hints)

cancel_activity

cancel_activity() -> List[int]

Cancel all PIDs in the activity field of the filtered queryset

Source code in pglock/models.py
def cancel_activity(self) -> List[int]:
    """Cancel all PIDs in the `activity` field of the filtered queryset"""
    pids = list(self.values_list("activity_id", flat=True).distinct())
    return pgactivity.cancel(*pids, using=self.db)

config

config(name: str, **overrides: Any) -> models.QuerySet

Use a config name from settings.PGLOCK_CONFIGS to apply filters. Config overrides can be provided in the keyword arguments.

Parameters:

Name Type Description Default
name str

Name of the config. Must be a key from settings.PGLOCK_CONFIGS.

required
**overrides Any

Any overrides to apply to the final config dictionary.

{}

Returns:

Type Description
QuerySet

The configuration

Source code in pglock/models.py
def config(self, name: str, **overrides: Any) -> models.QuerySet:
    """
    Use a config name from `settings.PGLOCK_CONFIGS` to apply filters.
    Config overrides can be provided in the keyword arguments.

    Args:
        name: Name of the config. Must be a key from `settings.PGLOCK_CONFIGS`.
        **overrides: Any overrides to apply to the final config dictionary.

    Returns:
        The configuration
    """
    qset = self

    cfg = config.get(name, **overrides)

    qset = qset.using(cfg.get("database", DEFAULT_DB_ALIAS))
    qset = qset.pid(*cfg.get("pids", []))
    qset = qset.on(*cfg.get("on", []))

    for f in cfg.get("filters", []) or []:
        key, val = f.split("=", 1)
        qset = qset.filter(**{key: val})

    return qset

on

on(*relations: Union[models.Model, str]) -> models.QuerySet

Set the relations to filter against.

Currently model names or classes are accepted.

Source code in pglock/models.py
def on(self, *relations: Union[models.Model, str]) -> models.QuerySet:
    """Set the relations to filter against.

    Currently model names or classes are accepted.
    """
    qs = self._clone()
    qs.query.relations = relations
    return qs

terminate_activity

terminate_activity() -> List[int]

Terminate all PIDs in the activity field of the filtered queryset

Source code in pglock/models.py
def terminate_activity(self) -> List[int]:
    """Terminate all PIDs in the `activity` field of the filtered queryset"""
    pids = list(self.values_list("activity_id", flat=True).distinct())
    return pgactivity.terminate(*pids, using=self.db)