Source code for guardrail.ext.sqlalchemy

# -*- coding: utf-8 -*-
"""SQLAlchemy plugin for guardrail."""

from __future__ import absolute_import

import sqlalchemy as sa

from guardrail.core import models
from guardrail.core import exceptions
from guardrail.core.registry import registry


def _get_primary_column(schema):
    primary = sa.inspection.inspect(schema).primary_key
    if len(primary) == 1:
        return primary[0]
    raise RuntimeError('Composite foreign keys not currently supported')


class SqlalchemyPermissionManager(models.BasePermissionManager):

    def __init__(self, session, registry=registry):
        super(SqlalchemyPermissionManager, self).__init__(registry)
        self.session = session

    @staticmethod
    def _is_saved(record):
        return True

    @staticmethod
    def _build_query(query, agent, target, schema, Agent=None, Target=None, custom=None):
        if Agent is None:
            query = query.filter(schema.agent == agent)
        if Target is None:
            query = query.filter(schema.target == target)
        if custom is not None:
            query = custom(query, agent=agent, target=target, schema=schema)
        return query

    def _get_permissions(self, agent, target, schema,
                         Agent=None, Target=None, custom=None):
        query = self.session.query(schema.permission)
        query = self._build_query(
            query, agent, target, schema,
            Agent=Agent, Target=Target, custom=custom,
        )
        return {each.permission for each in query}

    def _has_permission(self, agent, target, schema, permission,
                        Agent=None, Target=None, custom=None):
        query = self.session.query(schema.permission)
        query = query.filter(schema.permission == permission)
        query = self._build_query(
            query, agent, target, schema,
            Agent=Agent, Target=Target, custom=custom,
        )
        return bool(query.first())

    def _add_permission(self, agent, target, schema, permission):
        row = schema(
            agent=agent,
            target=target,
            permission=permission,
        )
        self.session.add(row)
        try:
            self.session.flush()
        except sa.exc.IntegrityError:
            self.session.rollback()
            raise exceptions.PermissionExists()
        return row

    def _remove_permission(self, agent, target, schema, permission):
        query = self.session.query(schema)
        query = query.filter(schema.permission == permission)
        query = self._build_query(query, agent, target, schema)
        count = query.delete()
        if not count:
            raise exceptions.PermissionNotFound


def _reference_column(schema, **kwargs):
    return sa.Column(
        sa.Integer,
        sa.ForeignKey(
            _get_primary_column(schema),
            onupdate='CASCADE',
            ondelete='CASCADE',
        ),
        **kwargs
    )


[docs]class SqlalchemyPermissionSchemaFactory(models.BasePermissionSchemaFactory): """Permission schema factory for use with SQLAlchemy. :param tuple bases: Base classes for created schema classes :param bool cascade: Database backend supports `ON DELETE` and `ON UPDATE` cascades; see :meth:`_update_parents` for details """ def __init__(self, bases, cascade=False): super(SqlalchemyPermissionSchemaFactory, self).__init__(bases) self.cascade = cascade @staticmethod def _get_table_name(schema): return schema.__tablename__ def _update_parents(self, agent, target, schema): """Create a many-to-many `relationship` between the `agent` and `target` schemas, using the created `schema` as the join table. Note: Use the `passive_deletes` and `passive_updates` flags only if the database backend supports the ON UPDATE and ON DELETE cascades. These options are not supported in SQLite, or in MySQL using the MyISAM storage engine. """ attr = 'targets_{0}'.format(schema.__tablename__) backref = 'agents_{0}'.format(schema.__tablename__) relation = sa.orm.relationship( target, secondary=schema.__table__, backref=backref, passive_deletes=self.cascade, passive_updates=self.cascade, ) setattr(agent, attr, relation) def _make_schema_dict(self, agent, target): return dict( __tablename__=self._make_table_name(agent, target), __table_args__=( sa.UniqueConstraint('agent_id', 'target_id', 'permission'), ), id=sa.Column(sa.Integer, primary_key=True), agent_id=_reference_column(agent, nullable=False, index=True), agent=sa.orm.relationship(agent), target_id=_reference_column(target, nullable=False, index=True), target=sa.orm.relationship(target), permission=sa.Column(sa.String, nullable=False, index=True), )
class SqlalchemyLoader(models.BaseLoader): def __init__(self, schema, session, column=None, kwarg='id'): column = column if column is not None else _get_primary_column(schema) super(SqlalchemyLoader, self).__init__(schema, column, kwarg) self.session = session def __call__(self, *args, **kwargs): return self.session.query( self.schema ).filter( self.column == kwargs.get(self.kwarg) ).first()