[Python] Scapy 를 이용한 ICMP, UDP 테스트 Develop Tip

파이썬은 C와 같이 초당 수만, 수십만 개 이상의 패킷을 충분히 처리할 수 있을 정도로
빠르지 않으므로 일반적으로 패킷 관련 작업을 하기에는 적합하지 않을 수 있습니다.

하지만 아주 간단하게 패킷을 만들어 진단을 한다던지 테스트를 진행할 때는
파이썬의 scapy 만한 것이 없습니다.

테스트 환경은 Ubuntu Server 16.04 LTS 또는 18.04 에서 진행하였습니다.
(모하비 맥에서도 같이 테스트 하는데 이상없이 잘 동작합니다)

설치는 간단히 

$ pip install scapy 

로 설치가 가능했는데
경우에 따라 오류가 발생하면,

$ pip install --pre scapy

로 설치하면 잘 되었습니다.

일반적으로 1024 포트 이후의 well-known 포트를 다루지 않는다면,
root 권한이 없어도 되지만 그 이전의 포트 등을 다루려면
root 권한으로 돌려야 합니다.

scapy 는 .py 스크립트로 돌려도 되고,
scapy 를 직접 실행하여 인터엑티브하게 돌려도 됩니다.

scapy 를 직접 실행하면,

$ scapy
INFO: Can't import matplotlib. Won't be able to plot.
INFO: Can't import PyX. Won't be able to use psdump() or pdfdump().
WARNING: No route found for IPv6 destination :: (no default route?)
INFO: Can't import python-cryptography v1.7+. Disabled WEP decryption/encryption. (Dot11)
INFO: Can't import python-cryptography v1.7+. Disabled IPsec encryption/authentication.
WARNING: IPython not available. Using standard Python shell instead.
AutoCompletion, History are disabled.

                     aSPY//YASa
             apyyyyCY//////////YCa       |
            sY//////YSpcs  scpCY//Pp     | Welcome to Scapy
 ayp ayyyyyyySCP//Pp           syY//C    | Version 2.4.2
 AYAsAYYYYYYYY///Ps              cY//S   |
         pCCCCY//p          cSSps y//Y   | https://github.com/secdev/scapy
         SPPPP///a          pP///AC//Y   |
              A//A            cyP////C   | Have fun!
              p///Ac            sC///a   |
              P////YCpc           A//A   | Wanna support scapy? Rate it on
       scccccp///pSP///p          p//Y   | sectools!
      sY/////////y  caa           S//P   | http://sectools.org/tool/scapy/
       cayCyayP//Ya              pY/Ya   |             -- Satoshi Nakamoto
        sY/PsY////YCc          aC//Yp    |
         sc  sccaCY//PCypaapyCP//YSs
                  spCPY//////YPSps
                       ccaacs

>>>

와 같이 뜨고, iPython 인터프리터가 동작합니다.

여기서 작업을 해도 되고,

다음과 같이 

from scapy.all import *
r = sr1(IP(dst="8.8.8.8")/ICMP()/'HelloWorld')
r.show()

파이썬 스크립트로 동작시켜도 됩니다.

위의 스크립트는 아주 간단하게 구글 NAME 서버 (8.8.8.8) 로 ICMP Ping을 해 보고 그 결과를 받아 
화면에 출력하는 것입니다.

$ python icmp_01.py
Begin emission:
..Finished sending 1 packets.
.*
Received 4 packets, got 1 answers, remaining 0 packets
###[ IP ]###
  version   = 4
  ihl       = 5
  tos       = 0x0
  len       = 38
  id        = 0
  flags     =
  frag      = 0
  ttl       = 54
  proto     = icmp
  chksum    = 0xb294
  src       = 8.8.8.8
  dst       = 192.168.1.139
  \options   \
###[ ICMP ]###
     type      = echo-reply
     code      = 0
     chksum    = 0x0
     id        = 0x0
     seq       = 0x0
###[ Raw ]###
        load      = 'HelloWorld'
###[ Padding ]###
           load      = '\x00\x00\x00\x00\xb9n\xe3\xcb'

아주 쉽지요?

이를 만약 libpcap 을 이용하여 C로 만든다 생각하면 
정말 100여 줄은 작성해야 하는데, 이렇게 두줄로 표현이 되다니,
파이썬의 추상화는 정말이지 짱! 입니다.

또 다른 예로, 위의 ICMP 대신 UDP DNS Query 를 해 보겠습니다.

파일 내용은,

$ cat dns_req_01.py
from scapy.all import *

r = sr1(IP(dst="8.8.8.8")/UDP()/DNS(rd=1,qd=DNSQR(qname="www.naver.com")))
r.show()

인데, 이를 실행하면,

(scapy) mcchae@Jerrys-MBP:scapy$ python dns_req_01.py
Begin emission:
...Finished sending 1 packets.
.*
Received 5 packets, got 1 answers, remaining 0 packets
###[ IP ]###
  version   = 4
  ihl       = 5
  tos       = 0x0
  len       = 181
  id        = 31671
  flags     =
  frag      = 0
  ttl       = 120
  proto     = udp
  chksum    = 0xf43d
  src       = 8.8.8.8
  dst       = 192.168.1.139
  \options   \
###[ UDP ]###
     sport     = domain
     dport     = domain
     len       = 161
     chksum    = 0xd11c
###[ DNS ]###
        id        = 0
        qr        = 1
        opcode    = QUERY
        aa        = 0
        tc        = 0
        rd        = 1
        ra        = 1
        z         = 0
        ad        = 0
        cd        = 0
        rcode     = ok
        qdcount   = 1
        ancount   = 4
        nscount   = 0
        arcount   = 0
        \qd        \
         |###[ DNS Question Record ]###
         |  qname     = 'www.naver.com.'
         |  qtype     = A
         |  qclass    = IN
        \an        \
         |###[ DNS Resource Record ]###
         |  rrname    = 'www.naver.com.'
         |  type      = CNAME
         |  rclass    = IN
         |  ttl       = 1598
         |  rdlen     = 25
         |  rdata     = 'www.naver.com.nheos.com.'
         |###[ DNS Resource Record ]###
         |  rrname    = 'www.naver.com.nheos.com.'
         |  type      = CNAME
         |  rclass    = IN
         |  ttl       = 1469
         |  rdlen     = 27
         |  rdata     = 'www.naver.com.edgekey.net.'
         |###[ DNS Resource Record ]###
         |  rrname    = 'www.naver.com.edgekey.net.'
         |  type      = CNAME
         |  rclass    = IN
         |  ttl       = 6638
         |  rdlen     = 24
         |  rdata     = 'e6030.a.akamaiedge.net.'
         |###[ DNS Resource Record ]###
         |  rrname    = 'e6030.a.akamaiedge.net.'
         |  type      = A
         |  rclass    = IN
         |  ttl       = 19
         |  rdlen     = 4
         |  rdata     = '23.50.3.12'
        ns        = None
        ar        = None

우왕! dig 명령이 필요 없을 정도네요.

이제 약간 더 고급 응용으로 SIP REGISTER 또는 INVITE 명령을 수행하는 DNS 패킷을 생성하고
asterisk 서버에 던저 보는 소스를 만들어 봅니다.


from datetime import datetime
from scapy.all import IP, UDP, sr1
from random import randrange
from time import sleep, time

user = '9195551212'
sip_host = "10.211.55.114"
s_port = 5060
d_port = 5060

pkt = IP(dst=sip_host)
client = pkt.src
callid = str(randrange(10000,99999))
# for invite
sip_invite = (
    "INVITE sip:" + user + "@" + sip_host + ";user=phone SIP/2.0\r\n"
    "Via: SIP/2.0/UDP " + client + ";branch=z9hG4bKac1450533991\r\n"
    "Max-Forwards: 70\r\n"
    "From: <sip:" + user + "@" + client + ">;tag=1c1450530943\r\n"
    "To: <sip:" + user + "@" + sip_host + ";user=phone>\r\n"
    "Call-ID: " + callid + "@" + client + "\r\n"
    "CSeq: 1 INVITE\r\n"
    "Contact: <sip:" + user + "@" + client + ":5060>\r\n"
    "Supported: em,100rel,timer,replaces,path,resource-priority,sdp-anat\r\n"
    "Allow: REGISTER,OPTIONS,INVITE,ACK,CANCEL,BYE,NOTIFY,PRACK,REFER,INFO,SUBSCRIBE,UPDATE\r\n"
    "User-agent: GW/v.6.20A.027.012\r\n"
    "Content-Type: application/sdp\r\n"
    "Content-Length: 0\r\n\r\n"
)
# for register
sip_register = (
    "REGISTER sip:" + sip_host + " SIP/2.0\r\n"
    "Via: SIP/2.0/UDP " + client + ";branch=z9hG4bKac730294792\r\n"
    "Max-Forwards: 70\r\n"
    "From: <sip:" + user + "@" + client + ":" + str(s_port) + ";tag=1c730289287>\r\n"
    "To: <sip:" + user + "@" + sip_host + ">\r\n"
    "Call-ID: " + callid + "@" + client + "\r\n"
    "CSeq: 112 REGISTER\r\n"
    "Contact: <sip:" + callid + "@" + client + ":" + str(s_port) + ">;expires=180\r\n"
    "Supported: path\r\n"
    "Allow: REGISTER,OPTIONS,INVITE,ACK,CANCEL,BYE,NOTIFY,PRACK,REFER,INFO,SUBSCRIBE,UPDATE\r\n"
    "Expires: 180\r\n"
    "User-Agent: GW/v.6.20A.027.012\r\n"
    "Content-Length: 0\r\n\r\n"
)

pkt = IP(dst=sip_host)/UDP(sport=s_port, dport=d_port)/sip_register
r = sr1(pkt, timeout=0.5, verbose=0)
if r:
    r.show()

pkt = IP(dst=sip_host)/UDP(sport=s_port, dport=d_port)/sip_invite
r = sr1(pkt, timeout=0.5, verbose=0)
if r:
    r.show()

위와 같은 scapy 를 이용하면 해당 서비스가 잘 돌고 있는지
진단할 수 있는 코드를 정말 간단하게 생성할 수 있다는
장점이 있습니다.


어느 분께는 도움이 되셨기를 ..










핑백

덧글

댓글 입력 영역

구글애드텍스트