Problem

昨天打了 2021 津门杯, 有道 justOCB,考察的是 OCB,之前没接触过。在此之前的 2021 红明谷杯也有类似的题 babyFogery,只不过这次的更难。

打得时候搜到了dawn_whisper 师傅关于 babyFogery 的 wp。发现 babyFogery 没有加解密的次数限制,并且解密时可以给定 header,也会给出解密的结果。但 这次的 justOCB 只有 5 次加密 or 解密的机会,加密时 header 固定为 from user,解密时 header 固定为 from admin,并且不会给出解密结果。

昨天打得时候看很不一样,也没怎么思考,刚好有其他事就放弃了。今天回过头来推了下发现其实还是很简单很有意思的一道题。

题目经过我的改编可以直接运行,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import hashlib
import os
import random
import string
import signal
import socketserver
from hashlib import sha256
from binascii import hexlify, unhexlify
# https://github.com/kravietz/pyOCB
from ocb.aes import AES
from ocb import OCB

FLAG = 'flag{d29581e0-1d8e-40fa-81df-6e3e073a56d8}'


class Task(socketserver.BaseRequestHandler):
def _recvall(self):
BUFF_SIZE = 2048
data = b''
while True:
part = self.request.recv(BUFF_SIZE)
data += part
if len(part) < BUFF_SIZE:
break
return data.strip()

def send(self, msg, newline=True):
try:
if newline: msg += b'\n'
self.request.sendall(msg)
except:
pass

def recv(self, prompt=b'> '):
self.send(prompt, newline=False)
return self._recvall()

def proof_of_work(self):
random.seed(os.urandom(128))
proof = ''.join(random.choices(string.ascii_letters+string.digits, k=20))
_hexdigest = sha256(proof.encode()).hexdigest()
self.send(str.encode('sha256(XXXX+%s) == %s' % (proof[4:], _hexdigest)))
x = self.recv(prompt=b'Give me XXXX: ')
if len(x) != 4 or sha256(x+proof[4:].encode()).hexdigest() != _hexdigest:
return False
return True

def timeout_handler(self, signum, frame):
self.send(b'\n\nTIMEOUT!!!\n')
raise TimeoutError

def encrypt(self, nonce, msg, associate_data):
self.ocb.setNonce(nonce)
tag, cipher = self.ocb.encrypt(bytearray(msg), bytearray(associate_data))
return (bytes(cipher), bytes(tag))

def decrypt(self, nonce, cipher, tag, associate_data):
self.ocb.setNonce(nonce)
authenticated, message = self.ocb.decrypt(bytearray(associate_data), bytearray(cipher), bytearray(tag))
if not authenticated:
self.send(b'No no no!')
return bytes(0)
return bytes(message)

def handle(self):
signal.signal(signal.SIGALRM, self.timeout_handler)
signal.alarm(60)
if not self.proof_of_work():
return

aes = AES(128)
self.ocb = OCB(aes)
key = os.urandom(16)
self.ocb.setKey(key)

for i in range(5):
try:
choice = int(self.recv(b'Your choice: '))
if choice == 1:
nonce = unhexlify(self.recv(b'Your nonce: '))
msg = unhexlify(self.recv(b'Your message: '))
associate_data = b'From user'
ciphertext, tag = self.encrypt(nonce, msg, associate_data)
self.send(b'Your ciphertext: ' + hexlify(ciphertext))
self.send(b'Your tag: ' + hexlify(tag))
elif choice == 2:
nonce = unhexlify(self.recv(b'Your nonce: '))
cipher = unhexlify(self.recv(b'Your ciphertext: '))
tag = unhexlify(self.recv(b'Your tag: '))
associate_data = b'From admin'
msg = self.decrypt(nonce, cipher, tag, associate_data)
if msg == b'I_am_admin_plz_give_me_the_flag':
self.send(b'Your flag: ' + FLAG.encode())
break
else:
self.send(b'Wrong command!')
except Exception as err:
self.send(b'Error!')
print(err)

signal.alarm(0)

self.send(b'Bye!')
self.request.close()


class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer):
pass


if __name__ == '__main__':
HOST, PORT = '0.0.0.0', 10000
print(HOST, PORT)
server = ForkedServer((HOST, PORT), Task)
server.allow_reuse_address = True
server.serve_forever()

题目的要求是解密后的明文为:I_am_admin_plz_give_me_the_flag 就 ok 了。密文是可以直接通过加密函得到的,关键在于加解密时的 header 不同,导致解密时给出的 tag 应该为 from admin 的 tag。因此要在 4 次加密的机会中伪造 tag,最后一次机会解密就好。

OCB

OCB 是一种可以同时满足“加密+鉴别”的工作模式。本题也给出了用 python 实现的 AES + OCB 的 源码

  • nonce 应为不可重复的值,用完一次应当丢弃。

  • header 类似于署名、备注,如果有的话在计算 tag 时会加入进行加密计算。

  • tag 是加密时将 nonce、header 及明文通过一定算法及加密得到的校验值,用来在解密时进行校验。

为了方便,定义 E() 为 AES 的 encrypt(),D() 为 AES 的 decrypt()。

encrypt

设消息的分块数为 n,则:

ci={2iE(nonce)E(2iE(nonce)mi),(i=1,2,3,...,n1)mnE(2nE(nonce)length),(i=n)c_i= \begin{cases} 2^i\cdot E(nonce) \bigoplus E(2^i\cdot E(nonce) \bigoplus m_i), & \quad (i=1,2,3,...,n-1) \\ \\ m_n \bigoplus E(2^n\cdot E(nonce) \bigoplus length), & \quad (i=n) \end{cases}

可以发现最后一个消息块的加密方式有很大不同。

其中:

  • \cdotGF(2N)GF(2^N) 域中的多项式乘法, 其中 NN 为 分块数 blocksize 的比特长度

  • lengthlength 为最后一个消息块 mnm_n 的比特长度。如 blocksize 为 16,mnm_n 长度为 15,则 lengthlength\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x78

  • 2iE(nonce)2^i\cdot E(nonce) 在 OCB 规范中记为 offset

  • 最后一个分块中的 E(2nE(nonce)length)E(2^n\cdot E(nonce) \bigoplus length) 在 OCB 规范中记为 pad

要计算 tag 要先计算 checksum, 计算方法如下:

checksum=(i=1n1mi)padding(mn)checksum=(\bigoplus^{n-1}_{i=1}m_i) \bigoplus padding(m_n)

其中 padding 函数计算方法如下:

padding(mn)=mn+pad[len(mn):]padding(m_n) = m_n + pad[len(m_n):]

tag 的计算方法如下:

tag=E(32nE(nonce)checksum)tag=E(3\cdot 2^n\cdot E(nonce)\bigoplus checksum)

如果有 header,则 tag 还要再做一次计算:

tag=tagpmac(header)tag = tag \bigoplus pmac(header)

其中 pmac 函数的值只与 header 有关,但需要 AES 加密两次数据。计算方法稍微复杂点,建议直接看源码。

Decrypt

mi={D(ci2iE(nonce))2iE(nonce),(i=1,2,3,...,n1)cnE(2nE(nonce)length),(i=n)m_i= \begin{cases} D(c_i\bigoplus 2^i\cdot E(nonce)) \bigoplus 2^i\cdot E(nonce), & \quad (i=1,2,3,...,n-1) \\ \\ c_n \bigoplus E(2^n\cdot E(nonce) \bigoplus length), & \quad (i=n) \end{cases}

计算 check_tag 的方法与加密时相同,然后与给出的 tag 相比较,如果相同则认为校验通过,返回解密后的数据。

Solve

首先可以先让服务器加密只有一个分块的任意数据,则:

c=mE(2E(nonce)length)c = m \bigoplus E(2 \cdot E(nonce) \bigoplus length)

得到 E(2E(nonce)length)E(2 \cdot E(nonce) \bigoplus length)

然后构造两个分块的数据,第一个分块 m1m_1lengthlength 相同,第二个分块任意,让服务器加密,则:

c1=2E(nonce)E(2E(nonce)length)c_1 = 2\cdot E(nonce) \bigoplus E(2\cdot E(nonce) \bigoplus length)

因此可以得到 2E(nonce)2\cdot E(nonce) ,再逆运算,得到 E(nonce)E(nonce)。当然不逆运算,直接将 2E(nonce)2\cdot E(nonce) 当成一个整体也行。

观察加密时非最后一个分块的算法:

ci=2iE(nonce)E(2iE(nonce)mi),(i=1,2,3,...,n1)c_i = 2^i\cdot E(nonce) \bigoplus E(2^i\cdot E(nonce) \bigoplus m_i), \quad (i=1,2,3,...,n-1)

发现 E(nonce)E(nonce) 已知,2iE(nonce)2^i\cdot E(nonce) 可求,mim_i 可控,因此构造 mi=2iE(nonce)msgm_i = 2^i\cdot E(nonce) \bigoplus msg,那么就可以求出任意明文 msgmsg 对应的 AES 加密结果 E(msg)E(msg)了。当然由于题目的限制,还剩下 2 次加密的机会,因此对于可以并行加密的要并行加密,否则次数不够。

然后我们就可以计算目标明文的密文,并且伪造 tag 了。计算目标明文的密文只需要一次加密, 计算无 header 的 tag 时需要两次不能并行有依赖关系的加密,计算 pmac(header)pmac(header) 也需要两次不能并行有依赖关系的加密。这 3 项没有依赖关系,因此可以全都并行起来加密,最后只需要两次加密就行。

最后本地计算出密文、伪造的 tag,让服务器解密拿到 flag。

flag

脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import itertools
import string
import re
import codecs
from hashlib import sha256
from pwn import remote


def PoW(hash_value, part):
alphabet = string.ascii_letters + string.digits
for x in itertools.product(alphabet, repeat=4):
nonce = ''.join(x)
if sha256((nonce + part).encode()).hexdigest() == hash_value:
return nonce


def times2(input_data, blocksize=16):
assert len(input_data) == blocksize
output = bytearray(blocksize)
carry = input_data[0] >> 7
for i in range(len(input_data) - 1):
output[i] = ((input_data[i] << 1) | (input_data[i + 1] >> 7)) % 256
output[-1] = ((input_data[-1] << 1) ^ (carry * 0x87)) % 256
assert len(output) == blocksize
return output


def times3(input_data):
assert len(input_data) == 16
output = times2(input_data)
output = xor_block(output, input_data)
assert len(output) == 16
return output


def back_times2(output_data, blocksize=16):
assert len(output_data) == blocksize
input_data = bytearray(blocksize)
carry = output_data[-1] & 1
for i in range(len(output_data) - 1, 0, -1):
input_data[i] = (output_data[i] >> 1) | ((output_data[i - 1] % 2) << 7)
input_data[0] = (carry << 7) | (output_data[0] >> 1)
if carry:
input_data[-1] = ((output_data[-1] ^ (carry * 0x87)) >> 1) | ((output_data[-2] % 2) << 7)
assert len(input_data) == blocksize
return input_data


def xor_block(input1, input2):
assert len(input1) == len(input2)
output = bytearray()
for i in range(len(input1)):
output.append(input1[i] ^ input2[i])
return output


def my_aes_encrypt(sh, nonce, e_nonce, *msgs):
offset = e_nonce
send_msg = bytearray()
for msg in msgs:
offset = times2(offset)
send_msg += xor_block(offset, msg)
send_msg += bytearray(os.urandom(16))

ciphertext, tag = server_encrypt(sh, nonce, send_msg)

e_msgs = []
offset = e_nonce
for i in range(len(msgs)):
offset = times2(offset)
e_msg = xor_block(ciphertext[i * 16: (i+1) * 16], offset)
e_msgs.append(e_msg)

return e_msgs


def server_encrypt(sh, nonce, msg):
sh.recvuntil(b'Your choice: ')
sh.send(b'1')
sh.recvuntil(b'Your nonce: ')
sh.send(nonce.hex())
sh.recvuntil(b'Your message: ')
sh.send(msg.hex())
sh.recvuntil(b'Your ciphertext: ')
ciphertext = codecs.decode(sh.recvline().strip(), 'hex')
sh.recvuntil(b'Your tag: ')
tag = codecs.decode(sh.recvline().strip(), 'hex')
return ciphertext, tag


def server_decrypt(sh, nonce, ciphertext, tag):
sh.recvuntil(b'Your choice: ')
sh.send(b'2')
sh.recvuntil(b'Your nonce: ')
sh.send(nonce.hex())
sh.recvuntil(b'Your ciphertext: ')
sh.send(ciphertext.hex())
sh.recvuntil(b'Your tag: ')
sh.send(tag.hex())
flag = sh.recvline().decode().strip()
return flag


def main():
sh = remote('127.0.0.1', 10000)

line = sh.recvline().decode('l1')
re_res = re.search(r'sha256\(XXXX\+([0-9a-zA-Z]{16})\) == ([0-9a-z]{64})', line)
part = re_res.group(1)
hash_value = re_res.group(2)
nonce = PoW(hash_value, part)
sh.sendline(nonce.encode())
print('PoW finish.')

associate_data_user = b'From user'
associate_data_admin = b'From admin'

nonce = os.urandom(16)

# get E(nonce)
msg = bytearray(b'\x00' * 16)
ciphertext, tag = server_encrypt(sh, nonce, msg)
tmp = ciphertext

msg = bytearray(b'\x00' * 15 + b'\x80' + os.urandom(16))
ciphertext, tag = server_encrypt(sh, nonce, msg)
twice_e_nonce = xor_block(ciphertext[:16], tmp)
e_nonce = back_times2(twice_e_nonce)
print('E(nonce):', e_nonce)

target_msg = bytearray(b'I_am_admin_plz_give_me_the_flag')
m1 = target_msg[:16]
m2 = target_msg[16:]

# 2^i.E(nonce) xor m1
offset_xor_m1 = xor_block(times2(e_nonce), m1)

# pad
bit_length = len(m2) * 8
tmp = bytearray(16)
tmp[-1] = bit_length
pad = xor_block(tmp, times2(times2(e_nonce)))

# [0] * block_size
block_size_0 = bytearray(16)

e_offset_xor_m1, e_block_size_0, e_pad = my_aes_encrypt(sh, nonce, e_nonce, offset_xor_m1, block_size_0, pad)

c1 = xor_block(times2(e_nonce), e_offset_xor_m1)
c2 = xor_block(m2, e_pad[:len(m2)])

offset_pmac = times3(times3(times2(times3(times3(e_block_size_0)))))
checksum_pmac = bytearray(associate_data_admin)
checksum_pmac.append(int('10000000', 2))
while len(checksum_pmac) < 16:
checksum_pmac.append(0)
final_xor = xor_block(offset_pmac, checksum_pmac)

# tag
checksum = m1
pad = m2 + e_pad[len(m2):]
checksum = xor_block(pad, checksum)
offset = times3(times2(times2(e_nonce)))
tag = xor_block(offset, checksum)

tag, auth = my_aes_encrypt(sh, nonce, e_nonce, tag, final_xor)

tag = xor_block(tag, auth)
print('tag:', tag)
print('c:', c1 + c2)

flag = server_decrypt(sh, nonce, c1 + c2, tag)
print(flag)


if __name__ == '__main__':
main()

Ref

  • http://dawn_whisper.hack.best/2021/04/04/Wp_for_%E7%BA%A2%E6%98%8E%E8%B0%B7_crypto/