mirror of
https://github.com/cowrie/cowrie.git
synced 2025-07-01 18:07:27 -04:00
Mypy6 (#1542)
* lots of fixes for python typing * fixed base64 and added testing * removed convert() from logging, should ease up CPU usage * clear up bytes/string confusion.
This commit is contained in:
@ -4,7 +4,7 @@ import random
|
||||
|
||||
import psutil
|
||||
|
||||
command = {}
|
||||
command: dict = {}
|
||||
command["command"] = {}
|
||||
command["command"]["ps"] = []
|
||||
|
||||
|
30
mypy.ini
30
mypy.ini
@ -4,19 +4,21 @@ python_version = 3.6
|
||||
namespace_packages = True
|
||||
plugins = mypy_zope:plugin
|
||||
|
||||
ignore_missing_imports = True
|
||||
warn_unused_configs = True
|
||||
no_implicit_optional = True
|
||||
show_column_numbers = True
|
||||
show_error_codes = True
|
||||
strict_optional = True
|
||||
warn_no_return = True
|
||||
warn_redundant_casts = True
|
||||
warn_return_any = True
|
||||
warn_unreachable = True
|
||||
warn_unused_ignores = True
|
||||
disallow_incomplete_defs = True
|
||||
disallow_any_unimported = True
|
||||
ignore_missing_imports = True
|
||||
warn_unused_configs = True
|
||||
no_implicit_optional = True
|
||||
show_column_numbers = True
|
||||
show_error_codes = True
|
||||
strict_optional = True
|
||||
warn_no_return = True
|
||||
warn_redundant_casts = True
|
||||
warn_return_any = True
|
||||
warn_unreachable = True
|
||||
warn_unused_ignores = True
|
||||
disallow_incomplete_defs = True
|
||||
disallow_any_unimported = True
|
||||
strict_equality = True
|
||||
disallow_untyped_decorators = True
|
||||
|
||||
# These are too strict for us at the moment
|
||||
|
||||
@ -28,8 +30,6 @@ disallow_any_expr = False
|
||||
disallow_any_generics = False
|
||||
disallow_subclassing_any = False
|
||||
disallow_untyped_calls = False
|
||||
disallow_untyped_decorators = False
|
||||
strict_equality = False
|
||||
|
||||
# Disable some checks until the effected modules fully adopt mypy
|
||||
|
||||
|
@ -988,7 +988,7 @@ class command_yes(HoneyPotCommand):
|
||||
|
||||
def y(self):
|
||||
if len(self.args):
|
||||
self.write("{}\n".format(" ".join(self.args, "\n")))
|
||||
self.write("{}\n".format(" ".join(self.args)))
|
||||
else:
|
||||
self.write("y\n")
|
||||
self.scheduled = reactor.callLater(0.01, self.y)
|
||||
|
@ -1,4 +1,6 @@
|
||||
import base64
|
||||
import getopt
|
||||
import sys
|
||||
|
||||
from twisted.python import log
|
||||
|
||||
@ -12,6 +14,9 @@ class command_base64(HoneyPotCommand):
|
||||
author: Ivan Korolev (@fe7ch)
|
||||
"""
|
||||
|
||||
mode: str = "e"
|
||||
ignore: bool
|
||||
|
||||
def start(self):
|
||||
self.mode = "e"
|
||||
self.ignore = False
|
||||
@ -106,22 +111,23 @@ Try 'base64 --help' for more information.
|
||||
|
||||
self.exit()
|
||||
|
||||
def dojob(self, s):
|
||||
def dojob(self, s: bytes) -> None:
|
||||
if self.ignore:
|
||||
s = "".join(
|
||||
s = b"".join(
|
||||
[
|
||||
i
|
||||
i.to_bytes(1, sys.byteorder)
|
||||
for i in s
|
||||
if i
|
||||
in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
|
||||
in b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
|
||||
]
|
||||
)
|
||||
|
||||
if self.mode == "e":
|
||||
self.write(s.encode("base64"))
|
||||
self.writeBytes(base64.b64encode(s))
|
||||
self.writeBytes(b"\n")
|
||||
else:
|
||||
try:
|
||||
self.write(s.decode("base64"))
|
||||
self.writeBytes(base64.b64decode(s))
|
||||
except Exception:
|
||||
self.errorWrite("base64: invalid input\n")
|
||||
|
||||
|
@ -256,14 +256,14 @@ class command_curl(HoneyPotCommand):
|
||||
def download(self, url, fakeoutfile, outputfile, *args, **kwargs):
|
||||
try:
|
||||
parsed = compat.urllib_parse.urlparse(url)
|
||||
scheme = parsed.scheme
|
||||
host = parsed.hostname.decode("utf8")
|
||||
port = parsed.port or (443 if scheme == "https" else 80)
|
||||
scheme: bytes = parsed.scheme
|
||||
host: str = parsed.hostname.decode("utf8")
|
||||
port: int = parsed.port or (443 if scheme == "https" else 80)
|
||||
if scheme != b"http" and scheme != b"https":
|
||||
raise NotImplementedError
|
||||
except Exception:
|
||||
self.errorWrite(
|
||||
f'curl: (1) Protocol "{scheme}" not supported or disabled in libcurl\n'
|
||||
f'curl: (1) Protocol "{scheme.encode("utf8")}" not supported or disabled in libcurl\n'
|
||||
)
|
||||
self.exit()
|
||||
return None
|
||||
|
@ -7,6 +7,7 @@ This module ...
|
||||
|
||||
|
||||
import getopt
|
||||
from typing import Dict
|
||||
|
||||
from cowrie.shell.command import HoneyPotCommand
|
||||
|
||||
@ -80,7 +81,7 @@ class command_free(HoneyPotCommand):
|
||||
# Write the output to screen
|
||||
self.write(FREE_OUTPUT.format(**raw_mem_stats))
|
||||
|
||||
def get_free_stats(self):
|
||||
def get_free_stats(self) -> Dict[str, int]:
|
||||
"""
|
||||
Get the free stats from /proc
|
||||
"""
|
||||
@ -94,7 +95,7 @@ class command_free(HoneyPotCommand):
|
||||
"Shmem",
|
||||
"MemAvailable",
|
||||
]
|
||||
mem_info_map = {}
|
||||
mem_info_map: Dict[str, int] = {}
|
||||
with open("/proc/meminfo") as proc_file:
|
||||
for line in proc_file:
|
||||
tokens = line.split(":")
|
||||
|
@ -179,6 +179,8 @@ class command_head(HoneyPotCommand):
|
||||
head command
|
||||
"""
|
||||
|
||||
n: int = 10
|
||||
|
||||
def head_application(self, contents):
|
||||
i = 0
|
||||
contentsplit = contents.split(b"\n")
|
||||
|
@ -33,7 +33,7 @@ class FTP(ftplib.FTP):
|
||||
(self.host, self.port), self.timeout, self.source_address
|
||||
)
|
||||
self.af = self.sock.family
|
||||
self.file = self.sock.makefile("rb")
|
||||
self.file = self.sock.makefile(mode="rb")
|
||||
self.welcome = self.getresp()
|
||||
return self.welcome
|
||||
|
||||
@ -81,6 +81,15 @@ class command_ftpget(HoneyPotCommand):
|
||||
"""
|
||||
|
||||
download_path = CowrieConfig.get("honeypot", "download_path")
|
||||
verbose: bool
|
||||
host: str
|
||||
port: int
|
||||
username: str
|
||||
password: str
|
||||
remote_path: str
|
||||
remote_dir: str
|
||||
remote_file: str
|
||||
artifactFile: Artifact
|
||||
|
||||
def help(self):
|
||||
self.write(
|
||||
|
@ -185,7 +185,7 @@ gcc version {} (Debian {}-5)""".format(
|
||||
self.exit()
|
||||
|
||||
def generate_file(self, outfile):
|
||||
data = ""
|
||||
data = b""
|
||||
# TODO: make sure it is written to temp file, not downloads
|
||||
tmp_fname = "{}_{}_{}_{}".format(
|
||||
time.strftime("%Y%m%d%H%M%S"),
|
||||
|
@ -48,6 +48,12 @@ local_networks = [
|
||||
|
||||
|
||||
class command_nc(HoneyPotCommand):
|
||||
"""
|
||||
netcat
|
||||
"""
|
||||
|
||||
s: socket.socket
|
||||
|
||||
def help(self):
|
||||
self.write(
|
||||
"""This is nc from the netcat-openbsd package. An alternative nc is available
|
||||
@ -122,7 +128,7 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
|
||||
else:
|
||||
data += packet
|
||||
|
||||
self.write(data)
|
||||
self.writeBytes(data)
|
||||
self.s.close()
|
||||
self.exit()
|
||||
|
||||
|
@ -7,6 +7,7 @@ import hashlib
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
from typing import Any
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
@ -16,6 +17,17 @@ commands = {}
|
||||
|
||||
|
||||
class command_ping(HoneyPotCommand):
|
||||
"""
|
||||
ping command
|
||||
"""
|
||||
|
||||
host: str
|
||||
ip: str
|
||||
count: int
|
||||
max: int
|
||||
running: bool
|
||||
scheduled: Any
|
||||
|
||||
def valid_ip(self, address):
|
||||
try:
|
||||
socket.inet_aton(address)
|
||||
@ -24,7 +36,7 @@ class command_ping(HoneyPotCommand):
|
||||
return False
|
||||
|
||||
def start(self):
|
||||
self.host = None
|
||||
self.host = ""
|
||||
self.max = 0
|
||||
self.running = False
|
||||
|
||||
|
@ -52,6 +52,8 @@ class command_scp(HoneyPotCommand):
|
||||
"honeypot", "download_path_uniq", fallback=download_path
|
||||
)
|
||||
|
||||
out_dir: str = ""
|
||||
|
||||
def help(self):
|
||||
self.write(
|
||||
"""usage: scp [-12346BCpqrv] [-c cipher] [-F ssh_config] [-i identity_file]
|
||||
|
@ -7,6 +7,7 @@ import hashlib
|
||||
import re
|
||||
import socket
|
||||
import time
|
||||
from typing import Callable, List
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.python import log
|
||||
@ -29,6 +30,13 @@ OUTPUT = [
|
||||
|
||||
|
||||
class command_ssh(HoneyPotCommand):
|
||||
"""
|
||||
ssh
|
||||
"""
|
||||
|
||||
host: str
|
||||
callbacks: List[Callable]
|
||||
|
||||
def valid_ip(self, address):
|
||||
try:
|
||||
socket.inet_aton(address)
|
||||
|
@ -1,9 +1,5 @@
|
||||
import tftpy
|
||||
|
||||
try:
|
||||
from tftpy.TftpPacketTypes import TftpPacketDAT, TftpPacketOACK
|
||||
except ImportError:
|
||||
from tftpy import TftpPacketDAT, TftpPacketOACK
|
||||
from tftpy.TftpPacketTypes import TftpPacketDAT, TftpPacketOACK
|
||||
|
||||
from twisted.python import log
|
||||
|
||||
|
@ -32,14 +32,14 @@ class command_ulimit(HoneyPotCommand):
|
||||
# Parse options
|
||||
for o, a in opts:
|
||||
if o in ("-c"):
|
||||
self.do_ulimit(key="core", value=a)
|
||||
self.do_ulimit(key="core", value=int(a))
|
||||
return
|
||||
elif o in ("-a"):
|
||||
self.do_ulimit(key="all")
|
||||
return
|
||||
self.do_ulimit()
|
||||
|
||||
def do_ulimit(self, key="core"):
|
||||
def do_ulimit(self, key: str = "core", value: int = 0) -> None:
|
||||
pass
|
||||
|
||||
|
||||
|
@ -55,8 +55,9 @@ class command_wget(HoneyPotCommand):
|
||||
wget command
|
||||
"""
|
||||
|
||||
limit_size = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
|
||||
downloadPath = CowrieConfig.get("honeypot", "download_path")
|
||||
limit_size: int = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
|
||||
downloadPath: str = CowrieConfig.get("honeypot", "download_path")
|
||||
quiet: bool = False
|
||||
|
||||
def start(self):
|
||||
try:
|
||||
@ -67,7 +68,7 @@ class command_wget(HoneyPotCommand):
|
||||
return
|
||||
|
||||
if len(args):
|
||||
url = args[0].strip()
|
||||
url: str = args[0].strip()
|
||||
else:
|
||||
self.errorWrite("wget: missing URL\n")
|
||||
self.errorWrite("Usage: wget [OPTION]... [URL]...\n\n")
|
||||
@ -75,7 +76,7 @@ class command_wget(HoneyPotCommand):
|
||||
self.exit()
|
||||
return
|
||||
|
||||
self.outfile = None
|
||||
self.outfile: str = None
|
||||
self.quiet = False
|
||||
for opt in optlist:
|
||||
if opt[0] == "-O":
|
||||
@ -215,7 +216,11 @@ class command_wget(HoneyPotCommand):
|
||||
|
||||
def error(self, error, url):
|
||||
# we need to handle 301 redirects separately
|
||||
if hasattr(error, "webStatus") and error.webStatus.decode() == "301":
|
||||
if (
|
||||
hasattr(error, "webStatus")
|
||||
and error.webStatus
|
||||
and error.webStatus.decode() == "301"
|
||||
):
|
||||
self.errorWrite(f"{error.webStatus.decode()} {error.webMessage.decode()}\n")
|
||||
https_url = error.getErrorMessage().replace("301 Moved Permanently to ", "")
|
||||
self.errorWrite(f"Location {https_url} [following]\n")
|
||||
|
@ -24,7 +24,7 @@ class command_faked_package_class_factory:
|
||||
def getCommand(name):
|
||||
class command_faked_installation(HoneyPotCommand):
|
||||
def call(self):
|
||||
self.write(b"{}: Segmentation fault\n".format(name))
|
||||
self.write("{}: Segmentation fault\n".format(name))
|
||||
|
||||
return command_faked_installation
|
||||
|
||||
|
0
src/cowrie/core/__init__.py
Normal file
0
src/cowrie/core/__init__.py
Normal file
@ -172,7 +172,7 @@ class Output(metaclass=abc.ABCMeta):
|
||||
if "message" not in event and "format" not in event:
|
||||
return
|
||||
|
||||
ev = convert(event) # type: ignore
|
||||
ev: Dict[str, any] = event # type: ignore
|
||||
ev["sensor"] = self.sensor
|
||||
|
||||
if "isError" in ev:
|
||||
|
0
src/cowrie/insults/__init__.py
Normal file
0
src/cowrie/insults/__init__.py
Normal file
@ -46,6 +46,10 @@ class Output(cowrie.core.output.Output):
|
||||
cuckoo output
|
||||
"""
|
||||
|
||||
api_user: str
|
||||
api_passwd: str
|
||||
url_base: bytes
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start output plugin
|
||||
@ -96,7 +100,7 @@ class Output(cowrie.core.output.Output):
|
||||
try:
|
||||
print(f"Looking for tasks for: {sha256}")
|
||||
res = requests.get(
|
||||
urljoin(self.url_base, f"/files/view/sha256/{sha256}"),
|
||||
urljoin(self.url_base, f"/files/view/sha256/{sha256}".encode("utf-8")),
|
||||
verify=False,
|
||||
auth=HTTPBasicAuth(self.api_user, self.api_passwd),
|
||||
timeout=60,
|
||||
@ -120,7 +124,7 @@ class Output(cowrie.core.output.Output):
|
||||
files = {"file": (fileName, open(artifact, "rb").read())}
|
||||
try:
|
||||
res = requests.post(
|
||||
urljoin(self.url_base, "tasks/create/file").encode("utf-8"),
|
||||
urljoin(self.url_base, b"tasks/create/file"),
|
||||
files=files,
|
||||
auth=HTTPBasicAuth(self.api_user, self.api_passwd),
|
||||
verify=False,
|
||||
@ -143,7 +147,7 @@ class Output(cowrie.core.output.Output):
|
||||
data = {"url": scanUrl}
|
||||
try:
|
||||
res = requests.post(
|
||||
urljoin(self.url_base, "tasks/create/url").encode("utf-8"),
|
||||
urljoin(self.url_base, b"tasks/create/url"),
|
||||
data=data,
|
||||
auth=HTTPBasicAuth(self.api_user, self.api_passwd),
|
||||
verify=False,
|
||||
|
@ -1,6 +1,8 @@
|
||||
# Simple elasticsearch logger
|
||||
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from elasticsearch import Elasticsearch, NotFoundError
|
||||
|
||||
import cowrie.core.output
|
||||
@ -12,43 +14,38 @@ class Output(cowrie.core.output.Output):
|
||||
elasticsearch output
|
||||
"""
|
||||
|
||||
index: str
|
||||
pipeline: str
|
||||
|
||||
def start(self):
|
||||
self.host = CowrieConfig.get("output_elasticsearch", "host")
|
||||
self.port = CowrieConfig.get("output_elasticsearch", "port")
|
||||
host = CowrieConfig.get("output_elasticsearch", "host")
|
||||
port = CowrieConfig.get("output_elasticsearch", "port")
|
||||
self.index = CowrieConfig.get("output_elasticsearch", "index")
|
||||
self.type = CowrieConfig.get("output_elasticsearch", "type")
|
||||
self.pipeline = CowrieConfig.get("output_elasticsearch", "pipeline")
|
||||
# new options (creds + https)
|
||||
self.username = CowrieConfig.get(
|
||||
"output_elasticsearch", "username", fallback=None
|
||||
)
|
||||
self.password = CowrieConfig.get(
|
||||
"output_elasticsearch", "password", fallback=None
|
||||
)
|
||||
self.use_ssl = CowrieConfig.getboolean(
|
||||
"output_elasticsearch", "ssl", fallback=False
|
||||
)
|
||||
self.ca_certs = CowrieConfig.get(
|
||||
"output_elasticsearch", "ca_certs", fallback=None
|
||||
)
|
||||
self.verify_certs = CowrieConfig.getboolean(
|
||||
username = CowrieConfig.get("output_elasticsearch", "username", fallback=None)
|
||||
password = CowrieConfig.get("output_elasticsearch", "password", fallback=None)
|
||||
use_ssl = CowrieConfig.getboolean("output_elasticsearch", "ssl", fallback=False)
|
||||
ca_certs = CowrieConfig.get("output_elasticsearch", "ca_certs", fallback=None)
|
||||
verify_certs = CowrieConfig.getboolean(
|
||||
"output_elasticsearch", "verify_certs", fallback=True
|
||||
)
|
||||
|
||||
options = {}
|
||||
options: Dict[str, Any] = {}
|
||||
# connect
|
||||
if (self.username is not None) and (self.password is not None):
|
||||
options["http_auth"] = (self.username, self.password)
|
||||
if self.use_ssl:
|
||||
if (username is not None) and (password is not None):
|
||||
options["http_auth"] = (username, password)
|
||||
if use_ssl:
|
||||
options["scheme"] = "https"
|
||||
options["use_ssl"] = self.use_ssl
|
||||
options["use_ssl"] = use_ssl
|
||||
options["ssl_show_warn"] = False
|
||||
options["verify_certs"] = self.verify_certs
|
||||
if self.verify_certs:
|
||||
options["ca_certs"] = self.ca_certs
|
||||
options["verify_certs"] = verify_certs
|
||||
if verify_certs:
|
||||
options["ca_certs"] = ca_certs
|
||||
|
||||
# connect
|
||||
self.es = Elasticsearch(f"{self.host}:{self.port}", **options)
|
||||
self.es = Elasticsearch(f"{host}:{port}", **options)
|
||||
# self.es = Elasticsearch('{0}:{1}'.format(self.host, self.port))
|
||||
|
||||
self.check_index()
|
||||
@ -95,7 +92,7 @@ class Output(cowrie.core.output.Output):
|
||||
try:
|
||||
# check if the geoip pipeline exists. An error
|
||||
# is raised if the pipeline does not exist
|
||||
self.es.ingest.get_pipeline(self.pipeline)
|
||||
self.es.ingest.get_pipeline(id=self.pipeline)
|
||||
except NotFoundError:
|
||||
# geoip pipeline
|
||||
body = {
|
||||
|
@ -23,8 +23,8 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
Start output plugin
|
||||
"""
|
||||
self.apiKey = CowrieConfig().get("output_greynoise", "api_key", fallback=None)
|
||||
self.debug = CowrieConfig().getboolean(
|
||||
self.apiKey = CowrieConfig.get("output_greynoise", "api_key", fallback=None)
|
||||
self.debug = CowrieConfig.getboolean(
|
||||
"output_greynoise", "debug", fallback=False
|
||||
)
|
||||
|
||||
|
@ -50,6 +50,8 @@ class Output(cowrie.core.output.Output):
|
||||
TODO: use `treq`
|
||||
"""
|
||||
|
||||
apiKey: str
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start output plugin
|
||||
|
@ -36,6 +36,8 @@ class Output(cowrie.core.output.Output):
|
||||
The decision is done by searching for the SHA 256 sum in all matching attributes.
|
||||
"""
|
||||
|
||||
debug: bool
|
||||
|
||||
@ignore_warnings
|
||||
def start(self):
|
||||
"""
|
||||
|
@ -45,6 +45,7 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
|
||||
db = None
|
||||
debug: bool = False
|
||||
|
||||
def start(self):
|
||||
self.debug = CowrieConfig.getboolean("output_mysql", "debug", fallback=False)
|
||||
|
@ -22,8 +22,8 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
Initialize pymisp module and ObjectWrapper (Abstract event and object creation)
|
||||
"""
|
||||
host = CowrieConfig.get("output_redis", "host")
|
||||
port = CowrieConfig.get("output_redis", "port")
|
||||
host: str = CowrieConfig.get("output_redis", "host")
|
||||
port: int = CowrieConfig.getint("output_redis", "port")
|
||||
|
||||
try:
|
||||
db = CowrieConfig.get("output_redis", "db")
|
||||
|
@ -14,6 +14,8 @@ class Output(cowrie.core.output.Output):
|
||||
Output plugin used for reverse DNS lookup
|
||||
"""
|
||||
|
||||
timeout: int = 3
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start Output Plugin
|
||||
@ -35,6 +37,8 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
Create log messages for connect events
|
||||
"""
|
||||
if result is None:
|
||||
return
|
||||
payload = result[0][0].payload
|
||||
log.msg(
|
||||
eventid="cowrie.reversedns.connect",
|
||||
@ -50,6 +54,8 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
Create log messages for forward events
|
||||
"""
|
||||
if result is None:
|
||||
return
|
||||
payload = result[0][0].payload
|
||||
log.msg(
|
||||
eventid="cowrie.reversedns.forward",
|
||||
|
@ -8,11 +8,8 @@ JSON log file is still recommended way to go
|
||||
|
||||
|
||||
import json
|
||||
|
||||
try:
|
||||
from BytesIO import BytesIO
|
||||
except ImportError:
|
||||
from io import BytesIO
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.ssl import ClientContextFactory
|
||||
@ -29,6 +26,10 @@ class Output(cowrie.core.output.Output):
|
||||
Splunk HEC output
|
||||
"""
|
||||
|
||||
token: str
|
||||
agent: Any
|
||||
url: bytes
|
||||
|
||||
def start(self):
|
||||
self.token = CowrieConfig.get("output_splunk", "token")
|
||||
self.url = CowrieConfig.get("output_splunk", "url").encode("utf8")
|
||||
|
@ -1,4 +1,5 @@
|
||||
import sqlite3
|
||||
from typing import Any
|
||||
|
||||
from twisted.enterprise import adbapi
|
||||
from twisted.internet import defer
|
||||
@ -13,6 +14,8 @@ class Output(cowrie.core.output.Output):
|
||||
sqlite output
|
||||
"""
|
||||
|
||||
db: Any
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start sqlite3 logging module using Twisted ConnectionPool.
|
||||
|
@ -34,6 +34,7 @@ Send SSH logins to Virustotal
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
from urllib.parse import urlencode, urlparse
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
@ -58,6 +59,13 @@ class Output(cowrie.core.output.Output):
|
||||
virustotal output
|
||||
"""
|
||||
|
||||
apiKey: str
|
||||
debug: bool = False
|
||||
commenttext: str
|
||||
agent: Any
|
||||
scan_url: bool
|
||||
scan_file: bool
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start output plugin
|
||||
@ -89,7 +97,7 @@ class Output(cowrie.core.output.Output):
|
||||
"""
|
||||
pass
|
||||
|
||||
def write(self, entry):
|
||||
def write(self, entry: dict) -> None:
|
||||
if entry["eventid"] == "cowrie.session.file_download":
|
||||
if self.scan_url and "url" in entry:
|
||||
log.msg("Checking url scan report at VT")
|
||||
|
0
src/cowrie/pool_interface/__init__.py
Normal file
0
src/cowrie/pool_interface/__init__.py
Normal file
0
src/cowrie/python/__init__.py
Normal file
0
src/cowrie/python/__init__.py
Normal file
0
src/cowrie/shell/__init__.py
Normal file
0
src/cowrie/shell/__init__.py
Normal file
@ -11,6 +11,7 @@ import re
|
||||
import shlex
|
||||
import stat
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
from twisted.internet import error
|
||||
from twisted.python import failure, log
|
||||
@ -33,7 +34,7 @@ class HoneyPotCommand:
|
||||
self.fs = self.protocol.fs
|
||||
self.data = None # output data
|
||||
self.input_data = None # used to store STDIN data passed via PIPE
|
||||
self.writefn = self.protocol.pp.outReceived
|
||||
self.writefn: Callable = self.protocol.pp.outReceived
|
||||
self.errorWritefn = self.protocol.pp.errReceived
|
||||
# MS-DOS style redirect handling, inside the command
|
||||
# TODO: handle >>, 2>, etc
|
||||
@ -95,23 +96,23 @@ class HoneyPotCommand:
|
||||
else:
|
||||
self.safeoutfile = p[fs.A_REALFILE]
|
||||
|
||||
def write(self, data):
|
||||
def write(self, data: str) -> None:
|
||||
"""
|
||||
Write a string to the user on stdout
|
||||
"""
|
||||
return self.writefn(data.encode("utf8"))
|
||||
self.writefn(data.encode("utf8"))
|
||||
|
||||
def writeBytes(self, data):
|
||||
def writeBytes(self, data: bytes) -> None:
|
||||
"""
|
||||
Like write() but input is bytes
|
||||
"""
|
||||
return self.writefn(data)
|
||||
self.writefn(data)
|
||||
|
||||
def errorWrite(self, data):
|
||||
def errorWrite(self, data: str) -> None:
|
||||
"""
|
||||
Write errors to the user on stderr
|
||||
"""
|
||||
return self.errorWritefn(data.encode("utf8"))
|
||||
self.errorWritefn(data.encode("utf8"))
|
||||
|
||||
def check_arguments(self, application, args):
|
||||
files = []
|
||||
@ -128,7 +129,7 @@ class HoneyPotCommand:
|
||||
def set_input_data(self, data):
|
||||
self.input_data = data
|
||||
|
||||
def write_to_file(self, data):
|
||||
def write_to_file(self, data: bytes) -> None:
|
||||
with open(self.safeoutfile, "ab") as f:
|
||||
f.write(data)
|
||||
self.writtenBytes += len(data)
|
||||
@ -137,15 +138,15 @@ class HoneyPotCommand:
|
||||
def write_to_failed(self, data):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
def start(self) -> None:
|
||||
if self.write != self.write_to_failed:
|
||||
self.call()
|
||||
self.exit()
|
||||
|
||||
def call(self):
|
||||
self.write("Hello World! [{}]\n".format(repr(self.args)).encode("utf8"))
|
||||
def call(self) -> None:
|
||||
self.write("Hello World! [{}]\n".format(repr(self.args)))
|
||||
|
||||
def exit(self):
|
||||
def exit(self) -> None:
|
||||
"""
|
||||
Sometimes client is disconnected and command exits after. So cmdstack is gone
|
||||
"""
|
||||
@ -172,25 +173,25 @@ class HoneyPotCommand:
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def handle_CTRL_C(self):
|
||||
def handle_CTRL_C(self) -> None:
|
||||
log.msg("Received CTRL-C, exiting..")
|
||||
self.write("^C\n")
|
||||
self.exit()
|
||||
|
||||
def lineReceived(self, line):
|
||||
def lineReceived(self, line: str) -> None:
|
||||
log.msg(f"QUEUED INPUT: {line}")
|
||||
# FIXME: naive command parsing, see lineReceived below
|
||||
# line = "".join(line)
|
||||
self.protocol.cmdstack[0].cmdpending.append(shlex.split(line, posix=True))
|
||||
|
||||
def resume(self):
|
||||
def resume(self) -> None:
|
||||
pass
|
||||
|
||||
def handle_TAB(self):
|
||||
def handle_TAB(self) -> None:
|
||||
pass
|
||||
|
||||
def handle_CTRL_D(self):
|
||||
def handle_CTRL_D(self) -> None:
|
||||
pass
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return str(self.__class__.__name__)
|
||||
|
@ -113,9 +113,9 @@ class PermissionDenied(Exception):
|
||||
|
||||
|
||||
class HoneyPotFilesystem:
|
||||
def __init__(self, fs: str, arch: str, home: str) -> None:
|
||||
def __init__(self, arch: str, home: str) -> None:
|
||||
|
||||
self.fs: List
|
||||
self.fs: List[Any]
|
||||
|
||||
try:
|
||||
with open(CowrieConfig.get("shell", "filesystem"), "rb") as f:
|
||||
@ -153,7 +153,9 @@ class HoneyPotFilesystem:
|
||||
realfile_path: str = os.path.join(path, filename)
|
||||
virtual_path: str = "/" + os.path.relpath(realfile_path, honeyfs_path)
|
||||
|
||||
f = self.getfile(virtual_path, follow_symlinks=False)
|
||||
f: Optional[List[Any]] = self.getfile(
|
||||
virtual_path, follow_symlinks=False
|
||||
)
|
||||
if f and f[A_TYPE] == T_FILE:
|
||||
self.update_realfile(f, realfile_path)
|
||||
|
||||
@ -223,7 +225,7 @@ class HoneyPotFilesystem:
|
||||
"""
|
||||
This returns the Cowrie file system objects for a directory
|
||||
"""
|
||||
cwd: List = self.fs
|
||||
cwd: List[Any] = self.fs
|
||||
for part in path.split("/"):
|
||||
if not len(part):
|
||||
continue
|
||||
@ -250,8 +252,8 @@ class HoneyPotFilesystem:
|
||||
Return True if path refers to an existing path.
|
||||
Returns False for broken symbolic links.
|
||||
"""
|
||||
f: Any = self.getfile(path, follow_symlinks=True)
|
||||
if f is not False:
|
||||
f: Optional[List[Any]] = self.getfile(path, follow_symlinks=True)
|
||||
if f is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -260,8 +262,8 @@ class HoneyPotFilesystem:
|
||||
Return True if path refers to an existing path.
|
||||
Returns True for broken symbolic links.
|
||||
"""
|
||||
f: Any = self.getfile(path, follow_symlinks=False)
|
||||
if f is not False:
|
||||
f: Optional[List[Any]] = self.getfile(path, follow_symlinks=False)
|
||||
if f is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -276,7 +278,7 @@ class HoneyPotFilesystem:
|
||||
):
|
||||
f[A_REALFILE] = realfile
|
||||
|
||||
def getfile(self, path: str, follow_symlinks: bool = True) -> Optional[List]:
|
||||
def getfile(self, path: str, follow_symlinks: bool = True) -> Optional[List[Any]]:
|
||||
"""
|
||||
This returns the Cowrie file system object for a path
|
||||
"""
|
||||
@ -284,7 +286,7 @@ class HoneyPotFilesystem:
|
||||
return self.fs
|
||||
pieces: List[str] = path.strip("/").split("/")
|
||||
cwd: str = ""
|
||||
p: Optional[List] = self.fs
|
||||
p: Optional[List[Any]] = self.fs
|
||||
for piece in pieces:
|
||||
if not isinstance(p, list):
|
||||
return None
|
||||
@ -399,9 +401,11 @@ class HoneyPotFilesystem:
|
||||
links, so both islink() and isfile() can be true for the same path.
|
||||
"""
|
||||
try:
|
||||
f: Any = self.getfile(path)
|
||||
f: Optional[List[Any]] = self.getfile(path)
|
||||
except Exception:
|
||||
return False
|
||||
if f is None:
|
||||
return False
|
||||
if f[A_TYPE] == T_FILE:
|
||||
return True
|
||||
else:
|
||||
@ -414,9 +418,11 @@ class HoneyPotFilesystem:
|
||||
runtime.
|
||||
"""
|
||||
try:
|
||||
f: Any = self.getfile(path)
|
||||
f: Optional[List[Any]] = self.getfile(path)
|
||||
except Exception:
|
||||
return False
|
||||
if f is None:
|
||||
return False
|
||||
if f[A_TYPE] == T_LINK:
|
||||
return True
|
||||
else:
|
||||
@ -433,7 +439,7 @@ class HoneyPotFilesystem:
|
||||
dir = self.getfile(path)
|
||||
except Exception:
|
||||
dir = None
|
||||
if dir is None or dir is False:
|
||||
if dir is None:
|
||||
return False
|
||||
if dir[A_TYPE] == T_DIR:
|
||||
return True
|
||||
@ -526,7 +532,7 @@ class HoneyPotFilesystem:
|
||||
"""
|
||||
FIXME mkdir() name conflicts with existing mkdir
|
||||
"""
|
||||
dir: Any = self.getfile(path)
|
||||
dir: Optional[List[Any]] = self.getfile(path)
|
||||
if dir:
|
||||
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
|
||||
self.mkdir(path, 0, 0, 4096, 16877)
|
||||
@ -550,19 +556,19 @@ class HoneyPotFilesystem:
|
||||
return False
|
||||
|
||||
def utime(self, path: str, atime: float, mtime: float) -> None:
|
||||
p: Optional[List] = self.getfile(path)
|
||||
p: Optional[List[Any]] = self.getfile(path)
|
||||
if not p:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
p[A_CTIME] = mtime
|
||||
|
||||
def chmod(self, path: str, perm: int) -> None:
|
||||
p: Optional[List] = self.getfile(path)
|
||||
p: Optional[List[Any]] = self.getfile(path)
|
||||
if not p:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
p[A_MODE] = stat.S_IFMT(p[A_MODE]) | perm
|
||||
|
||||
def chown(self, path: str, uid: int, gid: int) -> None:
|
||||
p: Optional[List] = self.getfile(path)
|
||||
p: Optional[List[Any]] = self.getfile(path)
|
||||
if not p:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
if uid != -1:
|
||||
@ -571,13 +577,13 @@ class HoneyPotFilesystem:
|
||||
p[A_GID] = gid
|
||||
|
||||
def remove(self, path: str) -> None:
|
||||
p: Optional[List] = self.getfile(path, follow_symlinks=False)
|
||||
p: Optional[List[Any]] = self.getfile(path, follow_symlinks=False)
|
||||
if not p:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
self.get_path(os.path.dirname(path)).remove(p)
|
||||
|
||||
def readlink(self, path: str) -> str:
|
||||
p: Optional[List] = self.getfile(path, follow_symlinks=False)
|
||||
p: Optional[List[Any]] = self.getfile(path, follow_symlinks=False)
|
||||
if not p:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
if not (p[A_MODE] & stat.S_IFLNK):
|
||||
@ -588,7 +594,7 @@ class HoneyPotFilesystem:
|
||||
raise NotImplementedError
|
||||
|
||||
def rename(self, oldpath: str, newpath: str) -> None:
|
||||
old: Optional[List] = self.getfile(oldpath)
|
||||
old: Optional[List[Any]] = self.getfile(oldpath)
|
||||
if not old:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
new = self.getfile(newpath)
|
||||
@ -607,7 +613,7 @@ class HoneyPotFilesystem:
|
||||
return self.stat(path, follow_symlinks=False)
|
||||
|
||||
def stat(self, path: str, follow_symlinks: bool = True) -> _statobj:
|
||||
p: Optional[List]
|
||||
p: Optional[List[Any]]
|
||||
if path == "/":
|
||||
# TODO: shouldn't this be a list?
|
||||
p = []
|
||||
@ -640,7 +646,7 @@ class HoneyPotFilesystem:
|
||||
return path
|
||||
|
||||
def update_size(self, filename: str, size: int) -> None:
|
||||
f: Optional[List] = self.getfile(filename)
|
||||
f: Optional[List[Any]] = self.getfile(filename)
|
||||
if not f:
|
||||
return
|
||||
if f[A_TYPE] != T_FILE:
|
||||
|
@ -362,8 +362,8 @@ class HoneyPotShell:
|
||||
this should probably not go through ctrl-d, but use processprotocol to close stdin
|
||||
"""
|
||||
log.msg("received eof, sending ctrl-d to command")
|
||||
if self.cmdstack:
|
||||
self.cmdstack[-1].handle_CTRL_D()
|
||||
if self.protocol.cmdstack:
|
||||
self.protocol.cmdstack[-1].handle_CTRL_D()
|
||||
|
||||
def handle_CTRL_C(self):
|
||||
self.protocol.lineBuffer = []
|
||||
|
@ -57,6 +57,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
|
||||
self.kippoIP = None
|
||||
self.clientIP = None
|
||||
self.sessionno = None
|
||||
self.factory = None
|
||||
|
||||
if self.fs.exists(user.avatar.home):
|
||||
self.cwd = user.avatar.home
|
||||
@ -79,12 +80,12 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
|
||||
Send log directly to factory, avoiding normal log dispatch
|
||||
"""
|
||||
args["sessionno"] = self.sessionno
|
||||
pt = self.getProtoTransport()
|
||||
pt.factory.logDispatch(**args)
|
||||
self.factory.logDispatch(**args)
|
||||
|
||||
def connectionMade(self):
|
||||
pt = self.getProtoTransport()
|
||||
|
||||
self.factory = pt.factory
|
||||
self.sessionno = pt.transport.sessionno
|
||||
self.realClientIP = pt.transport.getPeer().host
|
||||
self.realClientPort = pt.transport.getPeer().port
|
||||
@ -106,13 +107,11 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
|
||||
self.kippoIP = CowrieConfig.get("honeypot", "internet_facing_ip")
|
||||
else:
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 80))
|
||||
self.kippoIP = s.getsockname()[0]
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||||
s.connect(("8.8.8.8", 80))
|
||||
self.kippoIP = s.getsockname()[0]
|
||||
except Exception:
|
||||
self.kippoIP = "192.168.0.1"
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
def timeoutConnection(self):
|
||||
"""
|
||||
|
@ -29,7 +29,7 @@
|
||||
|
||||
from binascii import crc32
|
||||
from random import randint, seed
|
||||
from typing import Dict, Union
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from twisted.python import log
|
||||
|
||||
@ -44,11 +44,12 @@ class Passwd:
|
||||
"""
|
||||
|
||||
passwd_file = "{}/etc/passwd".format(CowrieConfig.get("honeypot", "contents_path"))
|
||||
passwd: List[Dict[str, Any]] = []
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self.load()
|
||||
|
||||
def load(self):
|
||||
def load(self) -> None:
|
||||
"""
|
||||
Load /etc/passwd
|
||||
"""
|
||||
@ -107,25 +108,25 @@ class Passwd:
|
||||
# f.write('%s:%d:%s\n' % (login, uid, passwd))
|
||||
raise NotImplementedError
|
||||
|
||||
def getpwnam(self, name):
|
||||
def getpwnam(self, name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get passwd entry for username
|
||||
"""
|
||||
for _ in self.passwd:
|
||||
if name == _["pw_name"]:
|
||||
return _
|
||||
for e in self.passwd:
|
||||
if e["pw_name"] == name:
|
||||
return e
|
||||
raise KeyError("getpwnam(): name not found in passwd file: " + name)
|
||||
|
||||
def getpwuid(self, uid):
|
||||
def getpwuid(self, uid: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Get passwd entry for uid
|
||||
"""
|
||||
for _ in self.passwd:
|
||||
if uid == _["pw_uid"]:
|
||||
return _
|
||||
for e in self.passwd:
|
||||
if uid == e["pw_uid"]:
|
||||
return e
|
||||
raise KeyError("getpwuid(): uid not found in passwd file: " + str(uid))
|
||||
|
||||
def setpwentry(self, name):
|
||||
def setpwentry(self, name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
If the user is not in /etc/passwd, creates a new user entry for the session
|
||||
"""
|
||||
@ -134,7 +135,7 @@ class Passwd:
|
||||
seed_id = crc32(name.encode("utf-8"))
|
||||
seed(seed_id)
|
||||
|
||||
e = {}
|
||||
e: Dict[str, Any] = {}
|
||||
e["pw_name"] = name
|
||||
e["pw_passwd"] = "x"
|
||||
e["pw_gecos"] = 0
|
||||
@ -153,11 +154,12 @@ class Group:
|
||||
"""
|
||||
|
||||
group_file = "{}/etc/group".format(CowrieConfig.get("honeypot", "contents_path"))
|
||||
group: List[Dict[str, Any]]
|
||||
|
||||
def __init__(self):
|
||||
self.load()
|
||||
|
||||
def load(self):
|
||||
def load(self) -> None:
|
||||
"""
|
||||
Load /etc/group
|
||||
"""
|
||||
@ -187,7 +189,7 @@ class Group:
|
||||
|
||||
self.group.append(e)
|
||||
|
||||
def save(self):
|
||||
def save(self) -> None:
|
||||
"""
|
||||
Save the group db
|
||||
Note: this is subject to races between cowrie instances, but hey ...
|
||||
@ -197,20 +199,20 @@ class Group:
|
||||
# f.write('%s:%d:%s\n' % (login, uid, passwd))
|
||||
raise NotImplementedError
|
||||
|
||||
def getgrnam(self, name):
|
||||
def getgrnam(self, name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get group entry for groupname
|
||||
"""
|
||||
for _ in self.group:
|
||||
if name == _["gr_name"]:
|
||||
return _
|
||||
for e in self.group:
|
||||
if name == e["gr_name"]:
|
||||
return e
|
||||
raise KeyError("getgrnam(): name not found in group file: " + name)
|
||||
|
||||
def getgrgid(self, uid):
|
||||
def getgrgid(self, uid: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Get group entry for gid
|
||||
"""
|
||||
for _ in self.group:
|
||||
if uid == _["gr_gid"]:
|
||||
return _
|
||||
for e in self.group:
|
||||
if uid == e["gr_gid"]:
|
||||
return e
|
||||
raise KeyError("getgruid(): uid not found in group file: " + str(uid))
|
||||
|
@ -74,7 +74,7 @@ class CowrieServer:
|
||||
"""
|
||||
Do this so we can trigger it later. Not all sessions need file system
|
||||
"""
|
||||
self.fs = fs.HoneyPotFilesystem(None, self.arch, home)
|
||||
self.fs = fs.HoneyPotFilesystem(self.arch, home)
|
||||
|
||||
try:
|
||||
self.process = self.getCommandOutput(
|
||||
|
0
src/cowrie/ssh/__init__.py
Normal file
0
src/cowrie/ssh/__init__.py
Normal file
0
src/cowrie/ssh_proxy/__init__.py
Normal file
0
src/cowrie/ssh_proxy/__init__.py
Normal file
0
src/cowrie/ssh_proxy/protocols/__init__.py
Normal file
0
src/cowrie/ssh_proxy/protocols/__init__.py
Normal file
@ -28,7 +28,7 @@
|
||||
|
||||
|
||||
class BaseProtocol:
|
||||
data = ""
|
||||
data = b""
|
||||
packetSize = 0
|
||||
name = ""
|
||||
uuid = ""
|
||||
|
0
src/cowrie/telnet/__init__.py
Normal file
0
src/cowrie/telnet/__init__.py
Normal file
0
src/cowrie/telnet_proxy/__init__.py
Normal file
0
src/cowrie/telnet_proxy/__init__.py
Normal file
0
src/cowrie/test/__init__.py
Normal file
0
src/cowrie/test/__init__.py
Normal file
@ -17,7 +17,7 @@ class FakeServer:
|
||||
self.arch = "linux-x64-lsb"
|
||||
self.hostname = "unitTest"
|
||||
|
||||
self.fs = fs.HoneyPotFilesystem(None, "arch", "/root")
|
||||
self.fs = fs.HoneyPotFilesystem("arch", "/root")
|
||||
self.process = None
|
||||
|
||||
|
||||
|
50
src/cowrie/test/test_base64.py
Normal file
50
src/cowrie/test/test_base64.py
Normal file
@ -0,0 +1,50 @@
|
||||
# -*- test-case-name: Cowrie Test Cases -*-
|
||||
|
||||
# Copyright (c) 2020 Peter Sufliarsky
|
||||
# See LICENSE for details.
|
||||
|
||||
"""
|
||||
Tests for general shell interaction and base64 command
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
from cowrie.shell import protocol
|
||||
from cowrie.test import fake_server, fake_transport
|
||||
|
||||
os.environ["COWRIE_HONEYPOT_DATA_PATH"] = "../data"
|
||||
os.environ["COWRIE_HONEYPOT_DOWNLOAD_PATH"] = "/tmp"
|
||||
os.environ["COWRIE_SHELL_FILESYSTEM"] = "../share/cowrie/fs.pickle"
|
||||
|
||||
TRY_CHMOD_HELP_MSG = b"Try 'base64 --help' for more information.\n"
|
||||
PROMPT = b"root@unitTest:~# "
|
||||
|
||||
|
||||
class ShellBase64CommandTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.proto = protocol.HoneyPotInteractiveProtocol(
|
||||
fake_server.FakeAvatar(fake_server.FakeServer())
|
||||
)
|
||||
self.tr = fake_transport.FakeTransport("1.1.1.1", "1111")
|
||||
self.proto.makeConnection(self.tr)
|
||||
self.tr.clear()
|
||||
|
||||
def test_base64_command_001(self):
|
||||
"""
|
||||
Missing operand
|
||||
"""
|
||||
self.proto.lineReceived(b"echo cowrie | base64")
|
||||
self.assertEqual(self.tr.value(), b"Y293cmllCg==\n" + PROMPT)
|
||||
|
||||
def test_base64_command_002(self):
|
||||
"""
|
||||
Missing operand
|
||||
"""
|
||||
self.proto.lineReceived(b"echo Y293cmllCg== | base64 -d")
|
||||
self.assertEqual(self.tr.value(), b"cowrie\n" + PROMPT)
|
||||
|
||||
def tearDown(self):
|
||||
self.proto.connectionLost("tearDown From Unit Test")
|
Reference in New Issue
Block a user