python读取pcap文件并分析(附源码)

更新时间:2023-07-19 04:16:33 阅读: 评论:0

python读取pcap⽂件并分析(附源码)
从可以看到pcap⽂件的格式,前⾯24字节是不⽤管的,⽽每⼀个数据包前都有16字节的说明,其中第8-11字节是caplen字段,也就是这个数据包的长度信息,不包含这16个字节。我们打开⽂件读取时可以直接ek()到第32(24+8)字节,去读取第⼀个数据包的长度,然后再回退12个字节,读取(caplen+16)个长度,作为整个数据包的数据。
读取之后的数据包时,⽤同样的⽅法先向后 ek 8个字节,读取caplen后在回退12个字节,读取(caplen+16)的长度,即为数据包数据。
读取pcap⽂件
⽤open()、read()就可以读取pcap⽂件,但注意open时要⽤'rb'模式
import struct
fd_read = open(read_path, 'rb')
fd_read.ek(32, 0)                        # skip pcap head and packet head
len_c = ad(4)          # caplen
if not len_c:              # read fail or zero
break
转码
读取到caplen后,需要对数据进⾏格式转换,原本是类似 b'\xde\x3d'的格式,我们的⽬的是16进制数,所以⽤到struct模块中的unpack()函数
clen = struct.unpack('i', len_c)
caplen = clen[0] & 0xFFFFFFFF
struct.unpack()的参数中,第⼀个是指解码的模式,有很多种,需要的可以⾃⾏查看,但是有⼀个⼤⼩端的问题,这⾥涉及到了,如果使⽤'i'(感叹号加⼩写字母i),那么得到的数据包packet串就会是逆序的16进制数的排列,如原本应是“68 ca e4 e6”,会变成“e6 e4 ca 68”。⽽在读取数据包前16字节的说明字段时,只有不加感叹号才能得到正确的caplen和时间戳。
具体细节就不多说了,更多的都是简单的处理,可以在评论区交流
#! /usr/bin/env python3
import getopt
import sys
import struct
import re
def symbol_split(ss):
if "&" in ss:
cdt = ss.split('&')
el:
cdt = [ss]
return cdt
def compare_operate(filter, packet):
result = 1
for ft in filter:
ft_item = ft.split()
offt = int(ft_item[0]) * 2 + 32
length = int(ft_item[1]) * 2
s = packet[offt:offt + length]  # original value
d = int(s, 16) & int(ft_item[2], 16)  # after mask
cdt = symbol_split(ft_item[3])
for i in cdt:
if (('>' in i) and (d <= int(i[1:], 16))) or (('<' in i) and (d >= int(i[1:], 16))) or \
(('==' in i) and (d != int(i[2:], 16))) or (('!=' in i) and (d == int(i[2:], 16))):
result = 0
return result
def range_operate(pct_range, packet_num):
range_flag = 1
for r in pct_range:
if (('>' in r) and (packet_num <= int(r[1:]))) or (('!=' in r) and (packet_num == int(r[2:]))) \                or (('==' in r) and (packet_num < int(r[2:]))):
range_flag = 0
break
if (('<' in r) and (packet_num >= int(r[1:]))) or (('==' in r) and (packet_num > int(r[2:]))):            range_flag = 2
break
return range_flag
def decode_pcap(read_path, opt_dict):
packet_num = 0
filter_num = 0
fd_read = open(read_path, 'rb')
if '--save' in opt_dict:
pcap_head = ad(24)
fd_write = open(opt_dict['--save'][0], 'wb')
fd_write.write(pcap_head)
el:
fd_read.ek(24, 0)                        # skip pcap head
while True:
# one packet
# packet head write
packet = ''
packet_num += 1
head_lek = fd_read.ek(8, 1)            # -->8 bytes
if not head_lek:
break
len_c = ad(4)          # caplen
if not len_c:              # read fail or zero
break
clen = struct.unpack('i', len_c)
caplen = clen[0] & 0xFFFFFFFF
# range_flag = 1 ===>filter
# range_flag = 0 ===>less, skip and continue
# range_flag = 2 ===>more, stop and break
if '--pcap_range' in opt_dict:
pct_range = symbol_split(opt_dict['--pcap_range'][0])
range_flag = range_operate(pct_range, packet_num)
if range_flag == 0:
skip_lek = fd_read.ek(caplen + 4, 1)
if not skip_lek:
break
continue
elif range_flag == 2:
break
# print("%d come in"%packet_num)
packet_lek = fd_read.ek(-12, 1)祝福语送朋友
if not packet_lek:
break
head_c = ad(16)
if len(head_c) != 16:
break
# print(head_c)
for k in range(4):
s = struct.unpack('i', head_c[k*4:(k+1)*4])
head = s[0] & 0xFFFFFFFF
packet += str("%08x" % head)
# packet write
pkt_c = ad(caplen)          #caplen
read_num = len(pkt_c)
冰川病毒
if read_num != caplen:
print("Here Read fail")
break
if not pkt_c:              #read fail or zero
print("Read Null")
break
for i in range(0, (caplen+3)//4-1):
c_tuple = struct.unpack('!i', pkt_c[i*4:(i+1)*4])
c_four = c_tuple[0] & 0xFFFFFFFF
packet += str("%08x" % c_four)
looptime = caplen % 4
j = caplen - looptime
while(looptime):
one_byte = pkt_c[j]
saveData = one_byte & 0xFF
packet += str("%02x" % saveData)
j += 1
looptime -= 1
# print(len(packet))
filter_item = opt_dict['--filter']
if compare_operate(filter_item, packet):                    # filter success
check_flag = 1
filter_num += 1                                  # go check
# print(packet)
if '--check' in opt_dict:
check = opt_dict['--check']
if not compare_operate(check, packet):
check_flag = 0
if '--format' in opt_dict:
天然气供暖设备result = opt_dict['--format'][0]
if '&(index)' in result:
xxx17
result = place('&(index)', str(packet_num))
if '&(filter_index)' in result:
result = place('&(filter_index)', str(filter_num))
if '&(time)' in result:
result = place('&(time)', str(int(packet[0:8], 16))+'.'+str(int(packet[8:16], 16)))                if '&(result)' in result:
result = place('&(result)', str(bool(check_flag)))
offt_item = re.findall(r'&[(](.*?)[)]', result, re.S)
# print(format)
for fmt in offt_item:
fmt_item = fmt.split()
offt = int(fmt_item[0]) * 2 + 32
length = int(fmt_item[1]) * 2
result = place('&('+fmt+')', str("%x" % (int(packet[offt:offt + length], 16) & int(fmt_item[2], 16))))
if '--filter_range' in opt_dict:
掼蛋规则flt_range = symbol_split(opt_dict['--filter_range'][0])
flt_range_flag = range_operate(flt_range, filter_num)
关于奋斗的图片
if flt_range_flag == 1:
print(result)
elif flt_range_flag == 2:
break蝶泳腿
el:
print(result)
el:
print(str(filter_num)+"  "+str(packet_num)+"  "+str(bool(check_flag)))
if '--save' in opt_dict:
save_ek = fd_read.ek(-(caplen+16), 1)
if not save_ek:
break
save_c = ad(caplen+16)
if len(save_c) != caplen+16:
break
fd_write.write(save_c)
# print('filter ', filter_num, ' packets')
fd_read.clo()
if '--save' in opt_dict:
fd_write.clo()
德字
opt_dict = {}
long_args = ['filter=', 'save=', 'check=', 'format=', "pcap_range=", "filter_range="]
opts, args = pt(sys.argv[2:], '', long_args)
for opt, val in opts:
if opt in opt_dict:
opt_dict[opt].append(val)
el:
opt_dict[opt] = [val]
decode_pcap(sys.argv[1], opt_dict)

本文发布于:2023-07-19 04:16:33,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1104349.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:字节   数据包   长度   读取   数据   评论   原本   转换
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图