Problem
昨天打了 2021 津门杯, 有道 justOCB,考察的是 OCB,之前没接触过。在此之前的 2021 红明谷杯也有类似的题 babyFogery,只不过这次的更难。
打得时候搜到了dawn_whisper 师傅关于 babyFogery 的 wp。发现 babyFogery 没有加解密的次数限制,并且解密时可以给定 header,也会给出解密的结果。但 这次的 justOCB 只有 5 次加密 or 解密的机会,加密时 header 固定为 from user,解密时 header 固定为 from admin,并且不会给出解密结果。
昨天打得时候看很不一样,也没怎么思考,刚好有其他事就放弃了。今天回过头来推了下发现其实还是很简单很有意思的一道题。
题目经过我的改编可以直接运行,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 import hashlibimport osimport randomimport stringimport signalimport socketserverfrom hashlib import sha256from binascii import hexlify, unhexlifyfrom ocb.aes import AESfrom ocb import OCBFLAG = 'flag{d29581e0-1d8e-40fa-81df-6e3e073a56d8}' class Task (socketserver.BaseRequestHandler) : def _recvall (self) : BUFF_SIZE = 2048 data = b'' while True : part = self.request.recv(BUFF_SIZE) data += part if len(part) < BUFF_SIZE: break return data.strip() def send (self, msg, newline=True) : try : if newline: msg += b'\n' self.request.sendall(msg) except : pass def recv (self, prompt=b'> ' ) : self.send(prompt, newline=False ) return self._recvall() def proof_of_work (self) : random.seed(os.urandom(128 )) proof = '' .join(random.choices(string.ascii_letters+string.digits, k=20 )) _hexdigest = sha256(proof.encode()).hexdigest() self.send(str.encode('sha256(XXXX+%s) == %s' % (proof[4 :], _hexdigest))) x = self.recv(prompt=b'Give me XXXX: ' ) if len(x) != 4 or sha256(x+proof[4 :].encode()).hexdigest() != _hexdigest: return False return True def timeout_handler (self, signum, frame) : self.send(b'\n\nTIMEOUT!!!\n' ) raise TimeoutError def encrypt (self, nonce, msg, associate_data) : self.ocb.setNonce(nonce) tag, cipher = self.ocb.encrypt(bytearray(msg), bytearray(associate_data)) return (bytes(cipher), bytes(tag)) def decrypt (self, nonce, cipher, tag, associate_data) : self.ocb.setNonce(nonce) authenticated, message = self.ocb.decrypt(bytearray(associate_data), bytearray(cipher), bytearray(tag)) if not authenticated: self.send(b'No no no!' ) return bytes(0 ) return bytes(message) def handle (self) : signal.signal(signal.SIGALRM, self.timeout_handler) signal.alarm(60 ) if not self.proof_of_work(): return aes = AES(128 ) self.ocb = OCB(aes) key = os.urandom(16 ) self.ocb.setKey(key) for i in range(5 ): try : choice = int(self.recv(b'Your choice: ' )) if choice == 1 : nonce = unhexlify(self.recv(b'Your nonce: ' )) msg = unhexlify(self.recv(b'Your message: ' )) associate_data = b'From user' ciphertext, tag = self.encrypt(nonce, msg, associate_data) self.send(b'Your ciphertext: ' + hexlify(ciphertext)) self.send(b'Your tag: ' + hexlify(tag)) elif choice == 2 : nonce = unhexlify(self.recv(b'Your nonce: ' )) cipher = unhexlify(self.recv(b'Your ciphertext: ' )) tag = unhexlify(self.recv(b'Your tag: ' )) associate_data = b'From admin' msg = self.decrypt(nonce, cipher, tag, associate_data) if msg == b'I_am_admin_plz_give_me_the_flag' : self.send(b'Your flag: ' + FLAG.encode()) break else : self.send(b'Wrong command!' ) except Exception as err: self.send(b'Error!' ) print(err) signal.alarm(0 ) self.send(b'Bye!' ) self.request.close() class ForkedServer (socketserver.ForkingMixIn, socketserver.TCPServer) : pass if __name__ == '__main__' : HOST, PORT = '0.0.0.0' , 10000 print(HOST, PORT) server = ForkedServer((HOST, PORT), Task) server.allow_reuse_address = True server.serve_forever()
题目的要求是解密后的明文为:I_am_admin_plz_give_me_the_flag
就 ok 了。密文是可以直接通过加密函得到的,关键在于加解密时的 header 不同,导致解密时给出的 tag 应该为 from admin
的 tag。因此要在 4 次加密的机会中伪造 tag,最后一次机会解密就好。
OCB
OCB 是一种可以同时满足“加密+鉴别”的工作模式。本题也给出了用 python 实现的 AES + OCB 的 源码 。
为了方便,定义 E() 为 AES 的 encrypt(),D() 为 AES 的 decrypt()。
encrypt
设消息的分块数为 n,则:
c i = { 2 i ⋅ E ( n o n c e ) ⨁ E ( 2 i ⋅ E ( n o n c e ) ⨁ m i ) , ( i = 1 , 2 , 3 , . . . , n − 1 ) m n ⨁ E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) , ( i = n ) c_i=
\begin{cases}
2^i\cdot E(nonce) \bigoplus E(2^i\cdot E(nonce) \bigoplus m_i), & \quad (i=1,2,3,...,n-1) \\
\\
m_n \bigoplus E(2^n\cdot E(nonce) \bigoplus length), & \quad (i=n)
\end{cases}
c i = ⎩ ⎪ ⎨ ⎪ ⎧ 2 i ⋅ E ( n o n c e ) ⨁ E ( 2 i ⋅ E ( n o n c e ) ⨁ m i ) , m n ⨁ E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) , ( i = 1 , 2 , 3 , . . . , n − 1 ) ( i = n )
可以发现最后一个消息块的加密方式有很大不同。
其中:
⋅ \cdot ⋅ 为 G F ( 2 N ) GF(2^N) G F ( 2 N ) 域中的多项式乘法, 其中 N N N 为 分块数 blocksize 的比特长度
l e n g t h length l e n g t h 为最后一个消息块 m n m_n m n 的比特长度。如 blocksize 为 16,m n m_n m n 长度为 15,则 l e n g t h length l e n g t h 为 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x78
2 i ⋅ E ( n o n c e ) 2^i\cdot E(nonce) 2 i ⋅ E ( n o n c e ) 在 OCB 规范中记为 offset
最后一个分块中的 E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) E(2^n\cdot E(nonce) \bigoplus length) E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) 在 OCB 规范中记为 pad
要计算 tag 要先计算 checksum, 计算方法如下:
c h e c k s u m = ( ⨁ i = 1 n − 1 m i ) ⨁ p a d d i n g ( m n ) checksum=(\bigoplus^{n-1}_{i=1}m_i) \bigoplus padding(m_n)
c h e c k s u m = ( i = 1 ⨁ n − 1 m i ) ⨁ p a d d i n g ( m n )
其中 padding 函数计算方法如下:
p a d d i n g ( m n ) = m n + p a d [ l e n ( m n ) : ] padding(m_n) = m_n + pad[len(m_n):]
p a d d i n g ( m n ) = m n + p a d [ l e n ( m n ) : ]
tag 的计算方法如下:
t a g = E ( 3 ⋅ 2 n ⋅ E ( n o n c e ) ⨁ c h e c k s u m ) tag=E(3\cdot 2^n\cdot E(nonce)\bigoplus checksum)
t a g = E ( 3 ⋅ 2 n ⋅ E ( n o n c e ) ⨁ c h e c k s u m )
如果有 header,则 tag 还要再做一次计算:
t a g = t a g ⨁ p m a c ( h e a d e r ) tag = tag \bigoplus pmac(header)
t a g = t a g ⨁ p m a c ( h e a d e r )
其中 pmac 函数的值只与 header 有关,但需要 AES 加密两次数据。计算方法稍微复杂点,建议直接看源码。
Decrypt
m i = { D ( c i ⨁ 2 i ⋅ E ( n o n c e ) ) ⨁ 2 i ⋅ E ( n o n c e ) , ( i = 1 , 2 , 3 , . . . , n − 1 ) c n ⨁ E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) , ( i = n ) m_i=
\begin{cases}
D(c_i\bigoplus 2^i\cdot E(nonce)) \bigoplus 2^i\cdot E(nonce), & \quad (i=1,2,3,...,n-1) \\
\\
c_n \bigoplus E(2^n\cdot E(nonce) \bigoplus length), & \quad (i=n)
\end{cases}
m i = ⎩ ⎪ ⎨ ⎪ ⎧ D ( c i ⨁ 2 i ⋅ E ( n o n c e ) ) ⨁ 2 i ⋅ E ( n o n c e ) , c n ⨁ E ( 2 n ⋅ E ( n o n c e ) ⨁ l e n g t h ) , ( i = 1 , 2 , 3 , . . . , n − 1 ) ( i = n )
计算 check_tag 的方法与加密时相同,然后与给出的 tag 相比较,如果相同则认为校验通过,返回解密后的数据。
Solve
首先可以先让服务器加密只有一个分块的任意数据,则:
c = m ⨁ E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h ) c = m \bigoplus E(2 \cdot E(nonce) \bigoplus length)
c = m ⨁ E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h )
得到 E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h ) E(2 \cdot E(nonce) \bigoplus length) E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h ) 。
然后构造两个分块的数据,第一个分块 m 1 m_1 m 1 与 l e n g t h length l e n g t h 相同,第二个分块任意,让服务器加密,则:
c 1 = 2 ⋅ E ( n o n c e ) ⨁ E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h ) c_1 = 2\cdot E(nonce) \bigoplus E(2\cdot E(nonce) \bigoplus length)
c 1 = 2 ⋅ E ( n o n c e ) ⨁ E ( 2 ⋅ E ( n o n c e ) ⨁ l e n g t h )
因此可以得到 2 ⋅ E ( n o n c e ) 2\cdot E(nonce) 2 ⋅ E ( n o n c e ) ,再逆运算,得到 E ( n o n c e ) E(nonce) E ( n o n c e ) 。当然不逆运算,直接将 2 ⋅ E ( n o n c e ) 2\cdot E(nonce) 2 ⋅ E ( n o n c e ) 当成一个整体也行。
观察加密时非最后一个分块的算法:
c i = 2 i ⋅ E ( n o n c e ) ⨁ E ( 2 i ⋅ E ( n o n c e ) ⨁ m i ) , ( i = 1 , 2 , 3 , . . . , n − 1 ) c_i = 2^i\cdot E(nonce) \bigoplus E(2^i\cdot E(nonce) \bigoplus m_i), \quad (i=1,2,3,...,n-1)
c i = 2 i ⋅ E ( n o n c e ) ⨁ E ( 2 i ⋅ E ( n o n c e ) ⨁ m i ) , ( i = 1 , 2 , 3 , . . . , n − 1 )
发现 E ( n o n c e ) E(nonce) E ( n o n c e ) 已知,2 i ⋅ E ( n o n c e ) 2^i\cdot E(nonce) 2 i ⋅ E ( n o n c e ) 可求,m i m_i m i 可控,因此构造 m i = 2 i ⋅ E ( n o n c e ) ⨁ m s g m_i = 2^i\cdot E(nonce) \bigoplus msg m i = 2 i ⋅ E ( n o n c e ) ⨁ m s g ,那么就可以求出任意明文 m s g msg m s g 对应的 AES 加密结果 E ( m s g ) E(msg) E ( m s g ) 了。当然由于题目的限制,还剩下 2 次加密的机会,因此对于可以并行加密的要并行加密,否则次数不够。
然后我们就可以计算目标明文的密文,并且伪造 tag 了。计算目标明文的密文只需要一次加密, 计算无 header 的 tag 时需要两次不能并行有依赖关系的加密,计算 p m a c ( h e a d e r ) pmac(header) p m a c ( h e a d e r ) 也需要两次不能并行有依赖关系的加密。这 3 项没有依赖关系,因此可以全都并行起来加密,最后只需要两次加密就行。
最后本地计算出密文、伪造的 tag,让服务器解密拿到 flag。
脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 import osimport itertoolsimport stringimport reimport codecsfrom hashlib import sha256from pwn import remotedef PoW (hash_value, part) : alphabet = string.ascii_letters + string.digits for x in itertools.product(alphabet, repeat=4 ): nonce = '' .join(x) if sha256((nonce + part).encode()).hexdigest() == hash_value: return nonce def times2 (input_data, blocksize=16 ) : assert len(input_data) == blocksize output = bytearray(blocksize) carry = input_data[0 ] >> 7 for i in range(len(input_data) - 1 ): output[i] = ((input_data[i] << 1 ) | (input_data[i + 1 ] >> 7 )) % 256 output[-1 ] = ((input_data[-1 ] << 1 ) ^ (carry * 0x87 )) % 256 assert len(output) == blocksize return output def times3 (input_data) : assert len(input_data) == 16 output = times2(input_data) output = xor_block(output, input_data) assert len(output) == 16 return output def back_times2 (output_data, blocksize=16 ) : assert len(output_data) == blocksize input_data = bytearray(blocksize) carry = output_data[-1 ] & 1 for i in range(len(output_data) - 1 , 0 , -1 ): input_data[i] = (output_data[i] >> 1 ) | ((output_data[i - 1 ] % 2 ) << 7 ) input_data[0 ] = (carry << 7 ) | (output_data[0 ] >> 1 ) if carry: input_data[-1 ] = ((output_data[-1 ] ^ (carry * 0x87 )) >> 1 ) | ((output_data[-2 ] % 2 ) << 7 ) assert len(input_data) == blocksize return input_data def xor_block (input1, input2) : assert len(input1) == len(input2) output = bytearray() for i in range(len(input1)): output.append(input1[i] ^ input2[i]) return output def my_aes_encrypt (sh, nonce, e_nonce, *msgs) : offset = e_nonce send_msg = bytearray() for msg in msgs: offset = times2(offset) send_msg += xor_block(offset, msg) send_msg += bytearray(os.urandom(16 )) ciphertext, tag = server_encrypt(sh, nonce, send_msg) e_msgs = [] offset = e_nonce for i in range(len(msgs)): offset = times2(offset) e_msg = xor_block(ciphertext[i * 16 : (i+1 ) * 16 ], offset) e_msgs.append(e_msg) return e_msgs def server_encrypt (sh, nonce, msg) : sh.recvuntil(b'Your choice: ' ) sh.send(b'1' ) sh.recvuntil(b'Your nonce: ' ) sh.send(nonce.hex()) sh.recvuntil(b'Your message: ' ) sh.send(msg.hex()) sh.recvuntil(b'Your ciphertext: ' ) ciphertext = codecs.decode(sh.recvline().strip(), 'hex' ) sh.recvuntil(b'Your tag: ' ) tag = codecs.decode(sh.recvline().strip(), 'hex' ) return ciphertext, tag def server_decrypt (sh, nonce, ciphertext, tag) : sh.recvuntil(b'Your choice: ' ) sh.send(b'2' ) sh.recvuntil(b'Your nonce: ' ) sh.send(nonce.hex()) sh.recvuntil(b'Your ciphertext: ' ) sh.send(ciphertext.hex()) sh.recvuntil(b'Your tag: ' ) sh.send(tag.hex()) flag = sh.recvline().decode().strip() return flag def main () : sh = remote('127.0.0.1' , 10000 ) line = sh.recvline().decode('l1' ) re_res = re.search(r'sha256\(XXXX\+([0-9a-zA-Z]{16})\) == ([0-9a-z]{64})' , line) part = re_res.group(1 ) hash_value = re_res.group(2 ) nonce = PoW(hash_value, part) sh.sendline(nonce.encode()) print('PoW finish.' ) associate_data_user = b'From user' associate_data_admin = b'From admin' nonce = os.urandom(16 ) msg = bytearray(b'\x00' * 16 ) ciphertext, tag = server_encrypt(sh, nonce, msg) tmp = ciphertext msg = bytearray(b'\x00' * 15 + b'\x80' + os.urandom(16 )) ciphertext, tag = server_encrypt(sh, nonce, msg) twice_e_nonce = xor_block(ciphertext[:16 ], tmp) e_nonce = back_times2(twice_e_nonce) print('E(nonce):' , e_nonce) target_msg = bytearray(b'I_am_admin_plz_give_me_the_flag' ) m1 = target_msg[:16 ] m2 = target_msg[16 :] offset_xor_m1 = xor_block(times2(e_nonce), m1) bit_length = len(m2) * 8 tmp = bytearray(16 ) tmp[-1 ] = bit_length pad = xor_block(tmp, times2(times2(e_nonce))) block_size_0 = bytearray(16 ) e_offset_xor_m1, e_block_size_0, e_pad = my_aes_encrypt(sh, nonce, e_nonce, offset_xor_m1, block_size_0, pad) c1 = xor_block(times2(e_nonce), e_offset_xor_m1) c2 = xor_block(m2, e_pad[:len(m2)]) offset_pmac = times3(times3(times2(times3(times3(e_block_size_0))))) checksum_pmac = bytearray(associate_data_admin) checksum_pmac.append(int('10000000' , 2 )) while len(checksum_pmac) < 16 : checksum_pmac.append(0 ) final_xor = xor_block(offset_pmac, checksum_pmac) checksum = m1 pad = m2 + e_pad[len(m2):] checksum = xor_block(pad, checksum) offset = times3(times2(times2(e_nonce))) tag = xor_block(offset, checksum) tag, auth = my_aes_encrypt(sh, nonce, e_nonce, tag, final_xor) tag = xor_block(tag, auth) print('tag:' , tag) print('c:' , c1 + c2) flag = server_decrypt(sh, nonce, c1 + c2, tag) print(flag) if __name__ == '__main__' : main()
Ref
http://dawn_whisper.hack.best/2021/04/04/Wp_for_%E7%BA%A2%E6%98%8E%E8%B0%B7_crypto/