* add way more typing
This commit is contained in:
Michel Oosterhof
2021-04-15 14:25:33 +08:00
committed by GitHub
parent 8f3c1c4e79
commit 2e0e3a80e4
53 changed files with 554 additions and 437 deletions

View File

@ -1,12 +1,11 @@
[mypy]
python_version = 3.6
namespace_packages = True
plugins=mypy_zope:plugin
ignore_missing_imports = True
# Increase our expectations
plugins = mypy_zope:plugin
ignore_missing_imports = True
warn_unused_configs = True
no_implicit_optional = True
show_column_numbers = True
show_error_codes = True
@ -16,17 +15,17 @@ warn_redundant_casts = True
warn_return_any = True
warn_unreachable = True
warn_unused_ignores = True
disallow_incomplete_defs = True
disallow_any_unimported = True
# These are too strict for us at the moment
check_untyped_defs = False
disallow_incomplete_defs = False
disallow_untyped_defs = False
disallow_any_decorated = False
disallow_any_explicit = False
disallow_any_expr = False
disallow_any_generics = False
disallow_any_unimported = False
disallow_subclassing_any = False
disallow_untyped_calls = False
disallow_untyped_decorators = False
@ -45,3 +44,6 @@ warn_return_any = False
allow_untyped_defs = True
check_untyped_defs = False
[mypy-twisted.internet.reactor]
allow_untyped_defs = True
check_untyped_defs = False

View File

@ -171,6 +171,8 @@ class PoolServerFactory(Factory):
# pool handling
self.pool_service = None
self.tac = None
# NAT service
self.nat = NATService()

View File

@ -5,6 +5,7 @@ import os
import random
import subprocess
import time
from typing import Set
def ping(guest_ip):
@ -45,7 +46,7 @@ def generate_network_table(seed=None):
# number of IPs per network is 253 (2-254)
# generate random MACs, set ensures they are unique
macs = set()
macs: Set[str] = set()
while len(macs) < 253:
macs.add(
"48:d2:24:bf:"

View File

@ -1,4 +1,4 @@
# setup version
from ._version import __version__ as version
__version__ = version.short()
__version__: str = version.short()

View File

@ -5,7 +5,9 @@ Provides cowrie version information.
# This file is auto-generated! Do not edit!
# Use `python -m incremental.update cowrie` to change this file.
from typing import List
from incremental import Version
__version__ = Version("cowrie", 2, 2, 0)
__all__ = ["__version__"]
__all__: List[str] = ["__version__"]

View File

@ -349,7 +349,7 @@ class command_ps(HoneyPotCommand):
output_array.append(output)
else:
output_array = (
output_array = [
(
"USER ",
" PID",
@ -779,17 +779,17 @@ class command_ps(HoneyPotCommand):
" 0:00 ",
"ps %s" % " ".join(self.args),
),
)
]
output = output_array
for i in range(len(output)):
for i in range(len(output_array)):
if i != 0:
if "a" not in args and output[i][_user].strip() != user:
if "a" not in args and output_array[i][_user].strip() != user:
continue
elif (
"a" not in args
and "x" not in args
and output[i][_tty].strip() != "pts/0"
and output_array[i][_tty].strip() != "pts/0"
):
continue
line = [_pid, _tty, _time, _command]
@ -809,7 +809,7 @@ class command_ps(HoneyPotCommand):
_time,
_command,
]
s = "".join([output[i][x] for x in line])
s = "".join([output_array[i][x] for x in line])
if "w" not in args:
s = s[
: (
@ -876,7 +876,7 @@ commands["passwd"] = command_passwd
class command_shutdown(HoneyPotCommand):
def start(self):
if len(self.args) and self.args[0].strip().count("--help"):
output = (
output = [
"Usage: shutdown [-akrhHPfnc] [-t secs] time [warning message]",
"-a: use /etc/shutdown.allow ",
"-k: don't really shutdown, only warn. ",
@ -890,7 +890,7 @@ class command_shutdown(HoneyPotCommand):
"-c: cancel a running shutdown. ",
"-t secs: delay between warning and kill signal. ",
'** the "time" argument is mandatory! (try "now") **',
)
]
for line in output:
self.write(f"{line}\n")
self.exit()
@ -1049,7 +1049,7 @@ class command_php(HoneyPotCommand):
self.write(f"{line}\n")
self.exit()
elif self.args[0] == "-h":
output = (
output = [
"Usage: php [options] [-f] <file> [--] [args...]",
" php [options] -r <code> [--] [args...]",
" php [options] [-B <begin_code>] -R <code> [-E <end_code>] [--] [args...]",
@ -1088,7 +1088,7 @@ class command_php(HoneyPotCommand):
" --re <name> Show information about extension <name>.",
" --ri <name> Show configuration for extension <name>.",
"",
)
]
for line in output:
self.write(f"{line}\n")
self.exit()

View File

@ -25,28 +25,28 @@ class command_gcc(HoneyPotCommand):
# Random binary data, which looks awesome. You could change this to whatever you want, but this
# data will be put in the actual file and thus exposed to our hacker when he\she cats the file.
RANDOM_DATA = (
"\x6a\x00\x48\x89\xe5\x48\x83\xe4\xf0\x48\x8b\x7d\x08\x48\x8d\x75\x10\x89\xfa"
"\x83\xc2\x01\xc1\xe2\x03\x48\x01\xf2\x48\x89\xd1\xeb\x04\x48\x83\xc1\x08\x48"
"\x83\x39\x00\x75\xf6\x48\x83\xc1\x08\xe8\x0c\x00\x00\x00\x89\xc7\xe8\xb9\x00"
"\x00\x00\xf4\x90\x90\x90\x90\x55\x48\x89\xe5\x48\x83\xec\x40\x89\x7d\xfc\x48"
"\x89\x75\xf0\x48\x8b\x45\xf0\x48\x8b\x00\x48\x83\xf8\x00\x75\x0c\xb8\x00\x00"
"\x00\x00\x89\xc7\xe8\x8c\x00\x00\x00\x48\x8b\x45\xf0\x48\x8b\x40\x08\x30\xc9"
"\x48\x89\xc7\x88\xc8\xe8\x7e\x00\x00\x00\x89\xc1\x89\x4d\xdc\x48\x8d\x0d\xd8"
"\x01\x00\x00\x48\x89\xcf\x48\x89\x4d\xd0\xe8\x72\x00\x00\x00\x8b\x4d\xdc\x30"
"\xd2\x48\x8d\x3d\xa4\x00\x00\x00\x89\xce\x88\x55\xcf\x48\x89\xc2\x8a\x45\xcf"
"\xe8\x53\x00\x00\x00\x8b\x45\xdc\x88\x05\xc3\x01\x00\x00\x8b\x45\xdc\xc1\xe8"
"\x08\x88\x05\xb8\x01\x00\x00\x8b\x45\xdc\xc1\xe8\x10\x88\x05\xad\x01\x00\x00"
"\x8b\x45\xdc\xc1\xe8\x18\x88\x05\xa2\x01\x00\x00\x48\x8b\x45\xd0\x48\x89\x45"
"\xe0\x48\x8b\x45\xe0\xff\xd0\x8b\x45\xec\x48\x83\xc4\x40\x5d\xc3\xff\x25\x3e"
"\x01\x00\x00\xff\x25\x40\x01\x00\x00\xff\x25\x42\x01\x00\x00\xff\x25\x44\x01"
"\x00\x00\x4c\x8d\x1d\x1d\x01\x00\x00\x41\x53\xff\x25\x0d\x01\x00\x00\x90\x68"
"\x00\x00\x00\x00\xe9\xe6\xff\xff\xff\x68\x0c\x00\x00\x00\xe9\xdc\xff\xff\xff"
"\x68\x1d\x00\x00\x00\xe9\xd2\xff\xff\xff\x68\x2b\x00\x00\x00\xe9\xc8\xff\xff"
"\xff\x01\x00\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x00\x00"
"\x00\x00\x1c\x00\x00\x00\x02\x00\x00\x00\x00\x0e\x00\x00\x34\x00\x00\x00\x34"
"\x00\x00\x00\xf5\x0e\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x03\x00\x00\x00"
"\x0c\x00\x02\x00\x14\x00\x02\x00\x00\x00\x00\x01\x40\x00\x00\x00\x00\x00\x00"
"\x01\x00\x00\x00"
b"\x6a\x00\x48\x89\xe5\x48\x83\xe4\xf0\x48\x8b\x7d\x08\x48\x8d\x75\x10\x89\xfa"
b"\x83\xc2\x01\xc1\xe2\x03\x48\x01\xf2\x48\x89\xd1\xeb\x04\x48\x83\xc1\x08\x48"
b"\x83\x39\x00\x75\xf6\x48\x83\xc1\x08\xe8\x0c\x00\x00\x00\x89\xc7\xe8\xb9\x00"
b"\x00\x00\xf4\x90\x90\x90\x90\x55\x48\x89\xe5\x48\x83\xec\x40\x89\x7d\xfc\x48"
b"\x89\x75\xf0\x48\x8b\x45\xf0\x48\x8b\x00\x48\x83\xf8\x00\x75\x0c\xb8\x00\x00"
b"\x00\x00\x89\xc7\xe8\x8c\x00\x00\x00\x48\x8b\x45\xf0\x48\x8b\x40\x08\x30\xc9"
b"\x48\x89\xc7\x88\xc8\xe8\x7e\x00\x00\x00\x89\xc1\x89\x4d\xdc\x48\x8d\x0d\xd8"
b"\x01\x00\x00\x48\x89\xcf\x48\x89\x4d\xd0\xe8\x72\x00\x00\x00\x8b\x4d\xdc\x30"
b"\xd2\x48\x8d\x3d\xa4\x00\x00\x00\x89\xce\x88\x55\xcf\x48\x89\xc2\x8a\x45\xcf"
b"\xe8\x53\x00\x00\x00\x8b\x45\xdc\x88\x05\xc3\x01\x00\x00\x8b\x45\xdc\xc1\xe8"
b"\x08\x88\x05\xb8\x01\x00\x00\x8b\x45\xdc\xc1\xe8\x10\x88\x05\xad\x01\x00\x00"
b"\x8b\x45\xdc\xc1\xe8\x18\x88\x05\xa2\x01\x00\x00\x48\x8b\x45\xd0\x48\x89\x45"
b"\xe0\x48\x8b\x45\xe0\xff\xd0\x8b\x45\xec\x48\x83\xc4\x40\x5d\xc3\xff\x25\x3e"
b"\x01\x00\x00\xff\x25\x40\x01\x00\x00\xff\x25\x42\x01\x00\x00\xff\x25\x44\x01"
b"\x00\x00\x4c\x8d\x1d\x1d\x01\x00\x00\x41\x53\xff\x25\x0d\x01\x00\x00\x90\x68"
b"\x00\x00\x00\x00\xe9\xe6\xff\xff\xff\x68\x0c\x00\x00\x00\xe9\xdc\xff\xff\xff"
b"\x68\x1d\x00\x00\x00\xe9\xd2\xff\xff\xff\x68\x2b\x00\x00\x00\xe9\xc8\xff\xff"
b"\xff\x01\x00\x00\x00\x1c\x00\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x00\x00"
b"\x00\x00\x1c\x00\x00\x00\x02\x00\x00\x00\x00\x0e\x00\x00\x34\x00\x00\x00\x34"
b"\x00\x00\x00\xf5\x0e\x00\x00\x00\x00\x00\x00\x34\x00\x00\x00\x03\x00\x00\x00"
b"\x0c\x00\x02\x00\x14\x00\x02\x00\x00\x00\x00\x01\x40\x00\x00\x00\x00\x00\x00"
b"\x01\x00\x00\x00"
)
def start(self):

View File

@ -114,10 +114,10 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
self.exit()
def recv_data(self):
data = ""
data = b""
while 1:
packet = self.s.recv(1024)
if packet == "":
if packet == b"":
break
else:
data += packet

View File

@ -60,7 +60,7 @@ class command_wget(HoneyPotCommand):
def start(self):
try:
optlist, args = getopt.getopt(self.args, "cqO:P:", "header=")
optlist, args = getopt.getopt(self.args, "cqO:P:", ["header="])
except getopt.GetoptError:
self.errorWrite("Unrecognized option\n")
self.exit()

View File

@ -24,6 +24,8 @@ or:
import hashlib
import os
import tempfile
from types import TracebackType
from typing import Any, Optional, Tuple, Type
from twisted.python import log
@ -32,35 +34,41 @@ from cowrie.core.config import CowrieConfig
class Artifact:
artifactDir = CowrieConfig.get("honeypot", "download_path")
artifactDir: str = CowrieConfig.get("honeypot", "download_path")
def __init__(self, label):
self.label = label
def __init__(self, label: str) -> None:
self.label: str = label
self.fp = tempfile.NamedTemporaryFile(dir=self.artifactDir, delete=False)
self.tempFilename = self.fp.name
self.closed = False
self.closed: bool = False
self.shasum = ""
self.shasumFilename = ""
self.shasum: str = ""
self.shasumFilename: str = ""
def __enter__(self):
def __enter__(self) -> Any:
return self.fp
def __exit__(self, exception_type, exception_value, trace):
def __exit__(
self,
etype: Optional[Type[BaseException]],
einst: Optional[BaseException],
etrace: Optional[TracebackType],
) -> bool:
self.close()
return True
def write(self, bytes):
def write(self, bytes: bytes) -> None:
self.fp.write(bytes)
def fileno(self):
def fileno(self) -> Any:
return self.fp.fileno()
def close(self, keepEmpty=False):
size = self.fp.tell()
def close(self, keepEmpty: bool = False) -> Optional[Tuple[str, str]]:
size: int = self.fp.tell()
if size == 0 and not keepEmpty:
os.remove(self.fp.name)
return
return None
self.fp.seek(0)
data = self.fp.read()

View File

@ -11,12 +11,13 @@ import re
from collections import OrderedDict
from os import path
from random import randint
from typing import Any, Dict, List, Pattern, Tuple, Union
from twisted.python import log
from cowrie.core.config import CowrieConfig
_USERDB_DEFAULTS = [
_USERDB_DEFAULTS: List[str] = [
"root:x:!root",
"root:x:!123456",
"root:x:!/honeypot/i",
@ -31,25 +32,28 @@ class UserDB:
By Walter de Jong <walter@sara.nl>
"""
def __init__(self):
self.userdb = OrderedDict()
def __init__(self) -> None:
self.userdb: Dict[
Tuple[Union[Pattern, bytes], Union[Pattern, bytes]], bool
] = OrderedDict()
self.load()
def load(self):
def load(self) -> None:
"""
load the user db
"""
dblines: List[str]
try:
with open(
"{}/userdb.txt".format(CowrieConfig.get("honeypot", "etc_path"))
) as db:
userdb = db.readlines()
dblines = db.readlines()
except OSError:
log.msg("Could not read etc/userdb.txt, default database activated")
userdb = _USERDB_DEFAULTS
dblines = _USERDB_DEFAULTS
for user in userdb:
for user in dblines:
if not user.startswith("#"):
try:
login = user.split(":")[0].encode("utf8")
@ -59,8 +63,12 @@ class UserDB:
else:
self.adduser(login, password)
def checklogin(self, thelogin, thepasswd, src_ip="0.0.0.0"):
def checklogin(
self, thelogin: bytes, thepasswd: bytes, src_ip: str = "0.0.0.0"
) -> bool:
for credentials, policy in self.userdb.items():
login: Union[bytes, Pattern]
passwd: Union[bytes, Pattern]
login, passwd = credentials
if self.match_rule(login, thelogin):
@ -69,13 +77,15 @@ class UserDB:
return False
def match_rule(self, rule, input):
if type(rule) is bytes:
def match_rule(
self, rule: Union[bytes, Pattern], input: bytes
) -> Union[bool, bytes]:
if isinstance(rule, bytes):
return rule in [b"*", input]
else:
return bool(rule.search(input))
def re_or_str(self, rule):
def re_or_bytes(self, rule: bytes) -> Union[Pattern, bytes]:
"""
Convert a /.../ type rule to a regex, otherwise return the string as-is
@ -88,7 +98,7 @@ class UserDB:
return rule
def adduser(self, login, passwd):
def adduser(self, login: bytes, passwd: bytes) -> None:
"""
All arguments are bytes
@ -97,7 +107,7 @@ class UserDB:
@param passwd: password
@type passwd: bytes
"""
login = self.re_or_str(login)
user = self.re_or_bytes(login)
if passwd[0] == ord("!"):
policy = False
@ -105,8 +115,8 @@ class UserDB:
else:
policy = True
passwd = self.re_or_str(passwd)
self.userdb[(login, passwd)] = policy
p = self.re_or_bytes(passwd)
self.userdb[(user, p)] = policy
class AuthRandom:
@ -115,14 +125,16 @@ class AuthRandom:
Users will be authenticated after a random number of attempts.
"""
def __init__(self):
def __init__(self) -> None:
# Default values
self.mintry, self.maxtry, self.maxcache = 2, 5, 10
self.mintry: int = 2
self.maxtry: int = 5
self.maxcache: int = 10
# Are there auth_class parameters?
if CowrieConfig.has_option("honeypot", "auth_class_parameters"):
parameters = CowrieConfig.get("honeypot", "auth_class_parameters")
parlist = parameters.split(",")
parameters: str = CowrieConfig.get("honeypot", "auth_class_parameters")
parlist: List[str] = parameters.split(",")
if len(parlist) == 3:
self.mintry = int(parlist[0])
self.maxtry = int(parlist[1])
@ -131,13 +143,14 @@ class AuthRandom:
if self.maxtry < self.mintry:
self.maxtry = self.mintry + 1
log.msg(f"maxtry < mintry, adjusting maxtry to: {self.maxtry}")
self.uservar = {}
self.uservar_file = "{}/auth_random.json".format(
self.uservar: Dict[Any, Any] = {}
self.uservar_file: str = "{}/auth_random.json".format(
CowrieConfig.get("honeypot", "state_path")
)
self.loadvars()
def loadvars(self):
def loadvars(self) -> None:
"""
Load user vars from json file
"""
@ -148,7 +161,7 @@ class AuthRandom:
except Exception:
self.uservar = {}
def savevars(self):
def savevars(self) -> None:
"""
Save the user vars to json file
"""
@ -157,7 +170,7 @@ class AuthRandom:
with open(self.uservar_file, "w") as fp:
json.dump(data, fp)
def checklogin(self, thelogin, thepasswd, src_ip):
def checklogin(self, thelogin: bytes, thepasswd: bytes, src_ip: str) -> bool:
"""
Every new source IP will have to try a random number of times between
'mintry' and 'maxtry' before succeeding to login.
@ -168,8 +181,8 @@ class AuthRandom:
Variables are saved in 'uservar.json' in the data directory.
"""
auth = False
userpass = str(thelogin) + ":" + str(thepasswd)
auth: bool = False
userpass: str = str(thelogin) + ":" + str(thepasswd)
if "cache" not in self.uservar:
self.uservar["cache"] = []
@ -219,8 +232,8 @@ class AuthRandom:
return auth
ipinfo["try"] += 1
attempts = ipinfo["try"]
need = ipinfo["max"]
attempts: int = ipinfo["try"]
need: int = ipinfo["max"]
log.msg(f"login attempt: {attempts}")
# Check if enough login attempts are tried

View File

@ -43,8 +43,10 @@
# cowrie.session.file_download
# cowrie.session.file_upload
from typing import Dict
def formatCef(logentry):
def formatCef(logentry: Dict[str, str]) -> str:
"""
Take logentry and turn into CEF string
"""

View File

@ -9,9 +9,10 @@ This module contains code to deal with Cowrie's configuration
import configparser
from os import environ
from os.path import abspath, dirname, exists, join
from typing import List, Union
def to_environ_key(key):
def to_environ_key(key: str) -> str:
return key.upper()
@ -21,19 +22,19 @@ class EnvironmentConfigParser(configparser.ConfigParser):
# TODO: def sections()
"""
def has_option(self, section, option):
def has_option(self, section: str, option: str) -> bool:
if to_environ_key("_".join(("cowrie", section, option))) in environ:
return True
return super().has_option(section, option)
def get(self, section, option, raw=False, **kwargs):
key = to_environ_key("_".join(("cowrie", section, option)))
def get(self, section: str, option: str, *, raw: bool = False, **kwargs) -> str: # type: ignore
key: str = to_environ_key("_".join(("cowrie", section, option)))
if key in environ:
return environ[key]
return super().get(section, option, raw=raw, **kwargs)
def readConfigFile(cfgfile):
def readConfigFile(cfgfile: Union[str, List[str]]) -> configparser.ConfigParser:
"""
Read config files and return ConfigParser object
@ -45,7 +46,7 @@ def readConfigFile(cfgfile):
return parser
def get_config_path():
def get_config_path() -> List[str]:
"""Get absolute path to the config file"""
current_path = abspath(dirname(__file__))
root = "/".join(current_path.split("/")[:-3])
@ -62,6 +63,7 @@ def get_config_path():
return found_confs
print("Config file not found")
return []
CowrieConfig = readConfigFile(get_config_path())

View File

@ -68,16 +68,16 @@ class PluggableAuthenticationModulesIP:
Twisted removed IPAM in 15, adding in Cowrie now
"""
def __init__(self, username, pamConversion, ip):
self.username = username
self.pamConversion = pamConversion
self.ip = ip
def __init__(self, username: str, pamConversion: bool, ip: str) -> None:
self.username: str = username
self.pamConversion: bool = pamConversion
self.ip: str = ip
@implementer(IUsername)
class Username:
def __init__(self, username):
self.username = username
def __init__(self, username: str):
self.username: str = username
@implementer(IUsernamePasswordIP)
@ -86,10 +86,10 @@ class UsernamePasswordIP:
This credential interface also provides an IP address
"""
def __init__(self, username, password, ip):
self.username = username
self.password = password
self.ip = ip
def __init__(self, username: str, password: str, ip: str) -> None:
self.username: str = username
self.password: str = password
self.ip: str = ip
def checkPassword(self, password):
def checkPassword(self, password: str) -> bool:
return self.password == password

View File

@ -32,6 +32,7 @@ import re
import socket
import time
from os import environ
from typing import Dict, Pattern, Union
from twisted.internet import reactor
from twisted.logger import formatTime
@ -63,7 +64,7 @@ in UTC.
"""
def convert(input):
def convert(input: Union[str, list, dict, bytes]) -> Union[str, list, dict]:
"""
This converts a nested dictionary with bytes in it to string
"""
@ -84,15 +85,19 @@ class Output(metaclass=abc.ABCMeta):
methods: stop, start and write
"""
def __init__(self):
self.sessions = {}
self.ips = {}
def __init__(self) -> None:
self.sessions: Dict[str, str] = {}
self.ips: Dict[str, str] = {}
# Need these for each individual transport, or else the session numbers overlap
self.sshRegex = re.compile(".*SSHTransport,([0-9]+),[0-9a-f:.]+$")
self.telnetRegex = re.compile(".*TelnetTransport,([0-9]+),[0-9a-f:.]+$")
self.sensor = CowrieConfig.get(
self.sshRegex: Pattern = re.compile(".*SSHTransport,([0-9]+),[0-9a-f:.]+$")
self.telnetRegex: Pattern = re.compile(
".*TelnetTransport,([0-9]+),[0-9a-f:.]+$"
)
self.sensor: str = CowrieConfig.get(
"honeypot", "sensor_name", fallback=socket.gethostname()
)
self.timeFormat: str
# use Z for UTC (Zulu) time, it's shorter.
if "TZ" in environ and environ["TZ"] == "UTC":
@ -101,41 +106,41 @@ class Output(metaclass=abc.ABCMeta):
self.timeFormat = "%Y-%m-%dT%H:%M:%S.%f%z"
# Event trigger so that stop() is called by the reactor when stopping
reactor.addSystemEventTrigger("before", "shutdown", self.stop)
reactor.addSystemEventTrigger("before", "shutdown", self.stop) # type: ignore
self.start()
def logDispatch(self, *msg, **kw):
def logDispatch(self, **kw: str) -> None:
"""
Use logDispatch when the HoneypotTransport prefix is not available.
Here you can explicitly set the sessionIds to tie the sessions together
"""
ev = kw
ev["message"] = msg
# ev["message"] = msg
self.emit(ev)
@abc.abstractmethod
def start(self):
def start(self) -> None:
"""
Abstract method to initialize output plugin
"""
pass
@abc.abstractmethod
def stop(self):
def stop(self) -> None:
"""
Abstract method to shut down output plugin
"""
pass
@abc.abstractmethod
def write(self, event):
def write(self, event: dict) -> None:
"""
Handle a general event within the output plugin
"""
pass
def emit(self, event):
def emit(self, event: dict) -> None:
"""
This is the main emit() hook that gets called by the the Twisted logging
@ -144,6 +149,9 @@ class Output(metaclass=abc.ABCMeta):
- 'sessionno' or 'session'
- 'message' or 'format'
"""
sessionno: str
ev: dict
# Ignore stdout and stderr in output plugins
if "printed" in event:
return
@ -164,7 +172,7 @@ class Output(metaclass=abc.ABCMeta):
if "message" not in event and "format" not in event:
return
ev = convert(event)
ev = convert(event) # type: ignore
ev["sensor"] = self.sensor
if "isError" in ev:
@ -199,7 +207,7 @@ class Output(metaclass=abc.ABCMeta):
return
# Extract session id from the twisted log prefix
elif "system" in ev:
sessionno = 0
sessionno = "0"
telnetmatch = self.telnetRegex.match(ev["system"])
if telnetmatch:
sessionno = "T{}".format(telnetmatch.groups()[0])
@ -207,7 +215,7 @@ class Output(metaclass=abc.ABCMeta):
sshmatch = self.sshRegex.match(ev["system"])
if sshmatch:
sessionno = "S{}".format(sshmatch.groups()[0])
if sessionno == 0:
if sessionno == "0":
return
if sessionno in self.ips:

View File

@ -40,7 +40,7 @@ from cowrie.telnet import session
@implementer(IRealm)
class HoneyPotRealm:
def __init__(self):
def __init__(self) -> None:
pass
def requestAvatar(self, avatarId, mind, *interfaces):

View File

@ -16,7 +16,7 @@ TYPE_INPUT, TYPE_OUTPUT, TYPE_INTERACT = 1, 2, 3
TTYSTRUCT = "<iLiiLL"
def ttylog_open(logfile, stamp):
def ttylog_open(logfile: str, stamp: float) -> None:
"""
Initialize new tty log
@ -28,7 +28,9 @@ def ttylog_open(logfile, stamp):
f.write(struct.pack(TTYSTRUCT, OP_OPEN, 0, 0, 0, sec, usec))
def ttylog_write(logfile, length, direction, stamp, data=None):
def ttylog_write(
logfile: str, length: int, direction: int, stamp: float, data: bytes
) -> None:
"""
Write to tty log
@ -44,7 +46,7 @@ def ttylog_write(logfile, length, direction, stamp, data=None):
f.write(data)
def ttylog_close(logfile, stamp):
def ttylog_close(logfile: str, stamp: float) -> None:
"""
Close tty log
@ -56,22 +58,28 @@ def ttylog_close(logfile, stamp):
f.write(struct.pack(TTYSTRUCT, OP_CLOSE, 0, 0, 0, sec, usec))
def ttylog_inputhash(logfile):
def ttylog_inputhash(logfile: str) -> str:
"""
Create unique hash of the input parts of tty log
@param logfile: logfile name
"""
ssize = struct.calcsize(TTYSTRUCT)
inputbytes = b""
ssize: int = struct.calcsize(TTYSTRUCT)
inputbytes: bytes = b""
with open(logfile, "rb") as fd:
while 1:
try:
(op, _tty, length, direction, _sec, _usec) = struct.unpack(
op: int
_tty: int
length: int
direction: int
_sec: int
_usec: int
op, _tty, length, direction, _sec, _usec = struct.unpack(
TTYSTRUCT, fd.read(ssize)
)
data = fd.read(length)
data: bytes = fd.read(length)
except struct.error:
break
@ -79,5 +87,5 @@ def ttylog_inputhash(logfile):
continue
inputbytes = inputbytes + data
shasum = hashlib.sha256(inputbytes).hexdigest()
shasum: str = hashlib.sha256(inputbytes).hexdigest()
return shasum

View File

@ -2,62 +2,70 @@
# Copyright (c) 2010-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
import configparser
from typing import BinaryIO, List
from twisted.application import internet
from twisted.internet import endpoints
def durationHuman(seconds):
def durationHuman(duration: float) -> str:
"""
Turn number of seconds into human readable string
"""
seconds = int(round(seconds))
seconds: int = int(round(duration))
minutes: int
minutes, seconds = divmod(seconds, 60)
hours: int
hours, minutes = divmod(minutes, 60)
days: float
days, hours = divmod(hours, 24)
years: float
years, days = divmod(days, 365.242199)
syears = str(years)
sseconds = str(seconds).rjust(2, "0")
sminutes = str(minutes).rjust(2, "0")
shours = str(hours).rjust(2, "0")
syears: str = str(years)
sseconds: str = str(seconds).rjust(2, "0")
sminutes: str = str(minutes).rjust(2, "0")
shours: str = str(hours).rjust(2, "0")
duration = []
sduration: List[str] = []
if years > 0:
duration.append("{} year{} ".format(syears, "s" * (years != 1)))
sduration.append("{} year{} ".format(syears, "s" * (years != 1)))
else:
if days > 0:
duration.append("{} day{} ".format(days, "s" * (days != 1)))
sduration.append("{} day{} ".format(days, "s" * (days != 1)))
if hours > 0:
duration.append(f"{shours}:")
sduration.append(f"{shours}:")
if minutes >= 0:
duration.append(f"{sminutes}:")
sduration.append(f"{sminutes}:")
if seconds >= 0:
duration.append(f"{sseconds}")
sduration.append(f"{sseconds}")
return "".join(duration)
return "".join(sduration)
def tail(the_file, lines_2find=20):
def tail(the_file: BinaryIO, lines_2find: int = 20) -> List[bytes]:
"""
From http://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
"""
lines_found: int = 0
total_bytes_scanned: int = 0
the_file.seek(0, 2)
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
bytes_in_file: int = the_file.tell()
while lines_2find + 1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file - total_bytes_scanned)
byte_block: int = min(1024, bytes_in_file - total_bytes_scanned)
the_file.seek(-(byte_block + total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count("\n")
lines_found += the_file.read(1024).count(b"\n")
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
line_list: List[bytes] = list(the_file.readlines())
return line_list[-lines_2find:]
# We read at least 21 line breaks from the bottom, block by block for speed
# 21 to ensure we don't get a half line
def uptime(total_seconds):
def uptime(total_seconds: float) -> str:
"""
Gives a human-readable uptime string
Thanks to http://thesmithfam.org/blog/2005/11/19/python-uptime-script/
@ -66,19 +74,19 @@ def uptime(total_seconds):
total_seconds = float(total_seconds)
# Helper vars:
MINUTE = 60
HOUR = MINUTE * 60
DAY = HOUR * 24
MINUTE: int = 60
HOUR: int = MINUTE * 60
DAY: int = HOUR * 24
# Get the days, hours, etc:
days = int(total_seconds / DAY)
hours = int((total_seconds % DAY) / HOUR)
minutes = int((total_seconds % HOUR) / MINUTE)
days: int = int(total_seconds / DAY)
hours: int = int((total_seconds % DAY) / HOUR)
minutes: int = int((total_seconds % HOUR) / MINUTE)
# 14 days, 3:53
# 11 min
s = ""
s: str = ""
if days > 0:
s += str(days) + " " + (days == 1 and "day" or "days") + ", "
if len(s) > 0 or hours > 0:
@ -88,7 +96,13 @@ def uptime(total_seconds):
return s
def get_endpoints_from_section(cfg, section, default_port):
def get_endpoints_from_section(
cfg: configparser.ConfigParser, section: str, default_port: int
) -> List[str]:
listen_addr: str
listen_port: int
listen_endpoints: List[str] = []
if cfg.has_option(section, "listen_endpoints"):
return cfg.get(section, "listen_endpoints").split()
@ -102,7 +116,6 @@ def get_endpoints_from_section(cfg, section, default_port):
else:
listen_port = default_port
listen_endpoints = []
for i in listen_addr.split():
listen_endpoints.append(f"tcp:{listen_port}:interface={i}")

View File

@ -113,7 +113,7 @@ class Output(cowrie.core.output.Output):
# ToDo Compress to opimise the space and if your sending to remote db
with open(entry["ttylog"]) as ttylog:
entry["ttylogpath"] = entry["ttylog"]
entry["ttylog"] = ttylog.read().hex()
entry["ttylog"] = ttylog.read().encode().hex()
self.insert_one(self.col_ttylog, entry)
elif eventid == "cowrie.client.fingerprint":

View File

@ -15,7 +15,7 @@ import cowrie.core.output
from cowrie.core.config import CowrieConfig
class XMPPLoggerProtocol(muc.MUCClient):
class XMPPLoggerProtocol(muc.MUCClient): # type: ignore
def __init__(self, rooms, server, nick):
muc.MUCClient.__init__(self)
self.server = rooms.host

View File

@ -24,6 +24,8 @@ class HoneyPotCommand:
This is the super class for all commands in cowrie/commands
"""
safeoutfile: str = ""
def __init__(self, protocol, *args):
self.protocol = protocol
self.args = list(args)
@ -77,13 +79,13 @@ class HoneyPotCommand:
)
self.writefn = self.write_to_failed
self.outfile = None
self.safeoutfile = None
self.safeoutfile = ""
except fs.PermissionDenied:
# The outfile locates in a file-system that doesn't allow file creation
self.errorWrite("-bash: %s: Permission denied\n" % self.outfile)
self.writefn = self.write_to_failed
self.outfile = None
self.safeoutfile = None
self.safeoutfile = ""
else:
with open(self.safeoutfile, "ab"):
@ -141,7 +143,7 @@ class HoneyPotCommand:
self.exit()
def call(self):
self.write(b"Hello World! [%s]\n" % (repr(self.args),))
self.write("Hello World! [{}]\n".format(repr(self.args)).encode("utf8"))
def exit(self):
"""

View File

@ -1,6 +1,7 @@
# Copyright (c) 2009-2014 Upi Tamminen <desaster@gmail.com>
# See the COPYRIGHT file for more information
# Todo, use os.stat_result, which contains the stat 10-tuple instead of the custom object.
import errno
import fnmatch
@ -10,6 +11,7 @@ import pickle
import re
import stat
import time
from typing import Any, Dict, List, Optional
from twisted.python import log
@ -30,7 +32,37 @@ from cowrie.core.config import CowrieConfig
T_LINK, T_DIR, T_FILE, T_BLK, T_CHR, T_SOCK, T_FIFO = list(range(0, 7))
SPECIAL_PATHS = ["/sys", "/proc", "/dev/pts"]
SPECIAL_PATHS: List[str] = ["/sys", "/proc", "/dev/pts"]
class _statobj:
"""
Transform a tuple into a stat object
"""
def __init__(
self,
st_mode: int,
st_ino: int,
st_dev: int,
st_nlink: int,
st_uid: int,
st_gid: int,
st_size: int,
st_atime: float,
st_mtime: float,
st_ctime: float,
) -> None:
self.st_mode: int = st_mode
self.st_ino: int = st_ino
self.st_dev: int = st_dev
self.st_nlink: int = st_nlink
self.st_uid: int = st_uid
self.st_gid: int = st_gid
self.st_size: int = st_size
self.st_atime: float = st_atime
self.st_mtime: float = st_mtime
self.st_ctime: float = st_ctime
class TooManyLevels(Exception):
@ -81,7 +113,9 @@ class PermissionDenied(Exception):
class HoneyPotFilesystem:
def __init__(self, fs, arch, home):
def __init__(self, fs: str, arch: str, home: str) -> None:
self.fs: List
try:
with open(CowrieConfig.get("shell", "filesystem"), "rb") as f:
@ -94,21 +128,21 @@ class HoneyPotFilesystem:
exit(2)
# Keep track of arch so we can return appropriate binary
self.arch = arch
self.home = home
self.arch: str = arch
self.home: str = home
# Keep track of open file descriptors
self.tempfiles = {}
self.filenames = {}
self.tempfiles: Dict[int, str] = {}
self.filenames: Dict[int, str] = {}
# Keep count of new files, so we can have an artificial limit
self.newcount = 0
self.newcount: int = 0
# Get the honeyfs path from the config file and explore it for file
# contents:
self.init_honeyfs(CowrieConfig.get("honeypot", "contents_path"))
def init_honeyfs(self, honeyfs_path):
def init_honeyfs(self, honeyfs_path: str) -> None:
"""
Explore the honeyfs at 'honeyfs_path' and set all A_REALFILE attributes on
the virtual filesystem.
@ -116,17 +150,18 @@ class HoneyPotFilesystem:
for path, directories, filenames in os.walk(honeyfs_path):
for filename in filenames:
realfile_path = os.path.join(path, filename)
virtual_path = "/" + os.path.relpath(realfile_path, honeyfs_path)
realfile_path: str = os.path.join(path, filename)
virtual_path: str = "/" + os.path.relpath(realfile_path, honeyfs_path)
f = self.getfile(virtual_path, follow_symlinks=False)
if f and f[A_TYPE] == T_FILE:
self.update_realfile(f, realfile_path)
def resolve_path(self, pathspec, cwd):
def resolve_path(self, pathspec: str, cwd: str) -> str:
"""
This function does not need to be in this class, it has no dependencies
"""
cwdpieces: List[str] = []
# If a path within home directory is specified, convert it to an absolute path
if pathspec.startswith("~/"):
@ -137,35 +172,36 @@ class HoneyPotFilesystem:
pieces = path.rstrip("/").split("/")
if path[0] == "/":
cwd = []
cwdpieces = []
else:
cwd = [x for x in cwd.split("/") if len(x) and x is not None]
cwdpieces = [x for x in cwd.split("/") if len(x) and x is not None]
while 1:
if not len(pieces):
break
piece = pieces.pop(0)
if piece == "..":
if len(cwd):
cwd.pop()
if len(cwdpieces):
cwdpieces.pop()
continue
if piece in (".", ""):
continue
cwd.append(piece)
cwdpieces.append(piece)
return "/{}".format("/".join(cwd))
return "/{}".format("/".join(cwdpieces))
def resolve_path_wc(self, path, cwd):
def resolve_path_wc(self, path: str, cwd: str) -> List[str]:
"""
Resolve_path with wildcard support (globbing)
"""
pieces = path.rstrip("/").split("/")
pieces: List[str] = path.rstrip("/").split("/")
cwdpieces: List[str]
if len(pieces[0]):
cwd = [x for x in cwd.split("/") if len(x) and x is not None]
cwdpieces = [x for x in cwd.split("/") if len(x) and x is not None]
path = path[1:]
else:
cwd, pieces = [], pieces[1:]
found = []
cwdpieces, pieces = [], pieces[1:]
found: List[str] = []
def foo(p, cwd):
if not len(p):
@ -180,14 +216,14 @@ class HoneyPotFilesystem:
for match in matches:
foo(p[1:], cwd + [match])
foo(pieces, cwd)
foo(pieces, cwdpieces)
return found
def get_path(self, path, follow_symlinks=True):
def get_path(self, path: str, follow_symlinks: bool = True) -> Any:
"""
This returns the Cowrie file system objects for a directory
"""
cwd = self.fs
cwd: List = self.fs
for part in path.split("/"):
if not len(part):
continue
@ -195,7 +231,12 @@ class HoneyPotFilesystem:
for c in cwd[A_CONTENTS]:
if c[A_NAME] == part:
if c[A_TYPE] == T_LINK:
cwd = self.getfile(c[A_TARGET], follow_symlinks=follow_symlinks)
f = self.getfile(c[A_TARGET], follow_symlinks=follow_symlinks)
if f is None:
ok = False
break
else:
cwd = f
else:
cwd = c
ok = True
@ -204,25 +245,27 @@ class HoneyPotFilesystem:
raise FileNotFound
return cwd[A_CONTENTS]
def exists(self, path):
def exists(self, path: str) -> bool:
"""
Return True if path refers to an existing path.
Returns False for broken symbolic links.
"""
f = self.getfile(path, follow_symlinks=True)
f: Any = self.getfile(path, follow_symlinks=True)
if f is not False:
return True
return False
def lexists(self, path):
def lexists(self, path: str) -> bool:
"""
Return True if path refers to an existing path.
Returns True for broken symbolic links.
"""
f = self.getfile(path, follow_symlinks=False)
f: Any = self.getfile(path, follow_symlinks=False)
if f is not False:
return True
return False
def update_realfile(self, f, realfile):
def update_realfile(self, f: Any, realfile: str) -> None:
if (
not f[A_REALFILE]
@ -233,18 +276,20 @@ class HoneyPotFilesystem:
):
f[A_REALFILE] = realfile
def getfile(self, path, follow_symlinks=True):
def getfile(self, path: str, follow_symlinks: bool = True) -> Optional[List]:
"""
This returns the Cowrie file system object for a path
"""
if path == "/":
return self.fs
pieces = path.strip("/").split("/")
cwd = ""
p = self.fs
pieces: List[str] = path.strip("/").split("/")
cwd: str = ""
p: Optional[List] = self.fs
for piece in pieces:
if not isinstance(p, list):
return None
if piece not in [x[A_NAME] for x in p[A_CONTENTS]]:
return False
return None
for x in p[A_CONTENTS]:
if x[A_NAME] == piece:
if piece == pieces[-1] and not follow_symlinks:
@ -252,34 +297,35 @@ class HoneyPotFilesystem:
elif x[A_TYPE] == T_LINK:
if x[A_TARGET][0] == "/":
# Absolute link
p = self.getfile(
fileobj = self.getfile(
x[A_TARGET], follow_symlinks=follow_symlinks
)
else:
# Relative link
p = self.getfile(
fileobj = self.getfile(
"/".join((cwd, x[A_TARGET])),
follow_symlinks=follow_symlinks,
)
if not p:
if not fileobj:
# Broken link
return False
return None
p = fileobj
else:
p = x
# cwd = '/'.join((cwd, piece))
return p
def file_contents(self, target):
def file_contents(self, target: str) -> bytes:
"""
Retrieve the content of a file in the honeyfs
It follows links.
It tries A_REALFILE first and then tries honeyfs directory
Then return the executable header for executables
"""
path = self.resolve_path(target, os.path.dirname(target))
path: str = self.resolve_path(target, os.path.dirname(target))
if not path or not self.exists(path):
raise FileNotFound
f = self.getfile(path)
f: Any = self.getfile(path)
if f[A_TYPE] == T_DIR:
raise IsADirectoryError
elif f[A_TYPE] == T_FILE and f[A_REALFILE]:
@ -288,32 +334,50 @@ class HoneyPotFilesystem:
# Zero-byte file lacking A_REALFILE backing: probably empty.
# (The exceptions to this are some system files in /proc and /sys,
# but it's likely better to return nothing than suspiciously fail.)
return ""
return b""
elif f[A_TYPE] == T_FILE and f[A_MODE] & stat.S_IXUSR:
return open(
CowrieConfig.get("honeypot", "share_path") + "/arch/" + self.arch,
"rb",
).read()
else:
return b""
def mkfile(self, path, uid, gid, size, mode, ctime=None):
def mkfile(
self,
path: str,
uid: int,
gid: int,
size: int,
mode: int,
ctime: Optional[float] = None,
) -> bool:
if self.newcount > 10000:
return False
if ctime is None:
ctime = time.time()
_path = os.path.dirname(path)
_path: str = os.path.dirname(path)
if any([_path.startswith(_p) for _p in SPECIAL_PATHS]):
raise PermissionDenied
_dir = self.get_path(_path)
outfile = os.path.basename(path)
outfile: str = os.path.basename(path)
if outfile in [x[A_NAME] for x in _dir]:
_dir.remove([x for x in _dir if x[A_NAME] == outfile][0])
_dir.append([outfile, T_FILE, uid, gid, size, mode, ctime, [], None, None])
self.newcount += 1
return True
def mkdir(self, path, uid, gid, size, mode, ctime=None):
def mkdir(
self,
path: str,
uid: int,
gid: int,
size: int,
mode: int,
ctime: Optional[float] = None,
) -> None:
if self.newcount > 10000:
raise OSError(errno.EDQUOT, os.strerror(errno.EDQUOT), path)
if ctime is None:
@ -329,30 +393,36 @@ class HoneyPotFilesystem:
)
self.newcount += 1
def isfile(self, path):
def isfile(self, path: str) -> bool:
"""
Return True if path is an existing regular file. This follows symbolic
links, so both islink() and isfile() can be true for the same path.
"""
try:
f = self.getfile(path)
f: Any = self.getfile(path)
except Exception:
return False
return f[A_TYPE] == T_FILE
if f[A_TYPE] == T_FILE:
return True
else:
return False
def islink(self, path):
def islink(self, path: str) -> bool:
"""
Return True if path refers to a directory entry that is a symbolic
link. Always False if symbolic links are not supported by the python
runtime.
"""
try:
f = self.getfile(path)
f: Any = self.getfile(path)
except Exception:
return False
return f[A_TYPE] == T_LINK
if f[A_TYPE] == T_LINK:
return True
else:
return False
def isdir(self, path):
def isdir(self, path: str) -> bool:
"""
Return True if path is an existing directory.
This follows symbolic links, so both islink() and isdir() can be true for the same path.
@ -374,7 +444,7 @@ class HoneyPotFilesystem:
Below additions for SFTP support, try to keep functions here similar to os.*
"""
def open(self, filename, openFlags, mode):
def open(self, filename: str, openFlags: int, mode: int) -> Optional[int]:
"""
#log.msg("fs.open %s" % filename)
@ -394,8 +464,8 @@ class HoneyPotFilesystem:
"""
if openFlags & os.O_WRONLY == os.O_WRONLY or openFlags & os.O_RDWR == os.O_RDWR:
# strip executable bit
hostmode = mode & ~(111)
hostfile = "{}/{}_sftp_{}".format(
hostmode: int = mode & ~(111)
hostfile: str = "{}/{}_sftp_{}".format(
CowrieConfig.get("honeypot", "download_path"),
time.strftime("%Y%m%d-%H%M%S"),
re.sub("[^A-Za-z0-9]", "_", filename),
@ -407,24 +477,30 @@ class HoneyPotFilesystem:
self.filenames[fd] = filename
return fd
# TODO: throw exception
elif openFlags & os.O_RDONLY == os.O_RDONLY:
return None
# TODO: throw exception
return None
def read(self, fd, size):
def read(self, fd: int, n: int) -> bytes:
# this should not be called, we intercept at readChunk
raise NotImplementedError
def write(self, fd, string):
def write(self, fd: int, string: bytes) -> int:
return os.write(fd, string)
def close(self, fd):
def close(self, fd: int) -> None:
if not fd:
return True
return
if self.tempfiles[fd] is not None:
shasum = hashlib.sha256(open(self.tempfiles[fd], "rb").read()).hexdigest()
shasumfile = CowrieConfig.get("honeypot", "download_path") + "/" + shasum
shasum: str = hashlib.sha256(
open(self.tempfiles[fd], "rb").read()
).hexdigest()
shasumfile: str = (
CowrieConfig.get("honeypot", "download_path") + "/" + shasum
)
if os.path.exists(shasumfile):
os.remove(self.tempfiles[fd])
else:
@ -439,33 +515,33 @@ class HoneyPotFilesystem:
)
del self.tempfiles[fd]
del self.filenames[fd]
return os.close(fd)
os.close(fd)
def lseek(self, fd, offset, whence):
def lseek(self, fd: int, offset: int, whence: int) -> int:
if not fd:
return True
return os.lseek(fd, offset, whence)
def mkdir2(self, path):
def mkdir2(self, path: str) -> None:
"""
FIXME mkdir() name conflicts with existing mkdir
"""
dir = self.getfile(path)
dir: Any = self.getfile(path)
if dir:
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
self.mkdir(path, 0, 0, 4096, 16877)
def rmdir(self, path):
path = path.rstrip("/")
name = os.path.basename(path)
parent = os.path.dirname(path)
dir = self.getfile(path, follow_symlinks=False)
def rmdir(self, path: str) -> bool:
p: str = path.rstrip("/")
name: str = os.path.basename(p)
parent: str = os.path.dirname(p)
dir: Any = self.getfile(p, follow_symlinks=False)
if not dir:
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), path)
raise OSError(errno.EEXIST, os.strerror(errno.EEXIST), p)
if dir[A_TYPE] != T_DIR:
raise OSError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), path)
if len(self.get_path(path)) > 0:
raise OSError(errno.ENOTEMPTY, os.strerror(errno.ENOTEMPTY), path)
raise OSError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), p)
if len(self.get_path(p)) > 0:
raise OSError(errno.ENOTEMPTY, os.strerror(errno.ENOTEMPTY), p)
pdir = self.get_path(parent, follow_symlinks=True)
for i in pdir[:]:
if i[A_NAME] == name:
@ -473,20 +549,20 @@ class HoneyPotFilesystem:
return True
return False
def utime(self, path, atime, mtime):
p = self.getfile(path)
def utime(self, path: str, atime: float, mtime: float) -> None:
p: Optional[List] = self.getfile(path)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
p[A_CTIME] = mtime
def chmod(self, path, perm):
p = self.getfile(path)
def chmod(self, path: str, perm: int) -> None:
p: Optional[List] = self.getfile(path)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
p[A_MODE] = stat.S_IFMT(p[A_MODE]) | perm
def chown(self, path, uid, gid):
p = self.getfile(path)
def chown(self, path: str, uid: int, gid: int) -> None:
p: Optional[List] = self.getfile(path)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
if uid != -1:
@ -494,25 +570,25 @@ class HoneyPotFilesystem:
if gid != -1:
p[A_GID] = gid
def remove(self, path):
p = self.getfile(path, follow_symlinks=False)
def remove(self, path: str) -> None:
p: Optional[List] = self.getfile(path, follow_symlinks=False)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
self.get_path(os.path.dirname(path)).remove(p)
def readlink(self, path):
p = self.getfile(path, follow_symlinks=False)
def readlink(self, path: str) -> str:
p: Optional[List] = self.getfile(path, follow_symlinks=False)
if not p:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
if not (p[A_MODE] & stat.S_IFLNK):
raise OSError
return p[A_TARGET]
return p[A_TARGET] # type: ignore
def symlink(self, targetPath, linkPath):
def symlink(self, targetPath: str, linkPath: str) -> None:
raise NotImplementedError
def rename(self, oldpath, newpath):
old = self.getfile(oldpath)
def rename(self, oldpath: str, newpath: str) -> None:
old: Optional[List] = self.getfile(oldpath)
if not old:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
new = self.getfile(newpath)
@ -523,23 +599,24 @@ class HoneyPotFilesystem:
old[A_NAME] = os.path.basename(newpath)
self.get_path(os.path.dirname(newpath)).append(old)
def listdir(self, path):
names = [x[A_NAME] for x in self.get_path(path)]
def listdir(self, path: str) -> List[str]:
names: List[str] = [x[A_NAME] for x in self.get_path(path)]
return names
def lstat(self, path):
def lstat(self, path: str) -> _statobj:
return self.stat(path, follow_symlinks=False)
def stat(self, path, follow_symlinks=True):
def stat(self, path: str, follow_symlinks: bool = True) -> _statobj:
p: Optional[List]
if path == "/":
p = {
A_TYPE: T_DIR,
A_UID: 0,
A_GID: 0,
A_SIZE: 4096,
A_MODE: 16877,
A_CTIME: time.time(),
}
# TODO: shouldn't this be a list?
p = []
p[A_TYPE] = T_DIR
p[A_UID] = 0
p[A_GID] = 0
p[A_SIZE] = 4096
p[A_MODE] = 16877
p[A_CTIME] = time.time()
else:
p = self.getfile(path, follow_symlinks=follow_symlinks)
@ -559,43 +636,13 @@ class HoneyPotFilesystem:
p[A_CTIME],
)
def realpath(self, path):
def realpath(self, path: str) -> str:
return path
def update_size(self, filename, size):
f = self.getfile(filename)
def update_size(self, filename: str, size: int) -> None:
f: Optional[List] = self.getfile(filename)
if not f:
return
if f[A_TYPE] != T_FILE:
return
f[A_SIZE] = size
class _statobj:
"""
Transform a tuple into a stat object
"""
def __init__(
self,
st_mode,
st_ino,
st_dev,
st_nlink,
st_uid,
st_gid,
st_size,
st_atime,
st_mtime,
st_ctime,
):
self.st_mode = st_mode
self.st_ino = st_ino
self.st_dev = st_dev
self.st_nlink = st_nlink
self.st_uid = st_uid
self.st_gid = st_gid
self.st_size = st_size
self.st_atime = st_atime
self.st_mtime = st_mtime
self.st_ctime = st_ctime

View File

@ -6,6 +6,7 @@ import copy
import os
import re
import shlex
from typing import List
from twisted.internet import error
from twisted.python import failure, log
@ -34,7 +35,7 @@ class HoneyPotShell:
# Add these special characters that are not in the default lexer
self.lexer.wordchars += "@%{}=$:+^,()`"
tokens = []
tokens: List[str] = []
while True:
try:

View File

@ -48,7 +48,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
def __init__(self, user):
self.user = user
self.environ = user.environ
self.hostname = user.server.hostname
self.hostname: str = user.server.hostname
self.fs = user.server.fs
self.pp = None
self.logintime = None
@ -74,13 +74,13 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
"""
return self.terminal.transport.session.conn.transport
def logDispatch(self, *msg, **args):
def logDispatch(self, **args):
"""
Send log directly to factory, avoiding normal log dispatch
"""
args["sessionno"] = self.sessionno
pt = self.getProtoTransport()
pt.factory.logDispatch(*msg, **args)
pt.factory.logDispatch(**args)
def connectionMade(self):
pt = self.getProtoTransport()

View File

@ -29,6 +29,7 @@
from binascii import crc32
from random import randint, seed
from typing import Dict, Union
from twisted.python import log
@ -79,7 +80,7 @@ class Passwd:
pw_shell,
) = line.split(":")
e = {}
e: Dict[str, Union[str, int]] = {}
e["pw_name"] = pw_name
e["pw_passwd"] = pw_passwd
e["pw_gecos"] = pw_gecos
@ -176,7 +177,7 @@ class Group:
(gr_name, gr_passwd, gr_gid, gr_mem) = line.split(":")
e = {}
e: Dict[str, Union[str, int]] = {}
e["gr_name"] = gr_name
try:
e["gr_gid"] = int(gr_gid)

View File

@ -27,7 +27,7 @@ class CowrieSSHChannel(channel.SSHChannel):
bytesReceivedLimit = 0
bytesWritten = 0
name = b"cowrie-ssh-channel"
startTime = None
startTime: float = 0.0
ttylogPath = CowrieConfig.get("honeypot", "log_path")
downloadPath = CowrieConfig.get("honeypot", "download_path")
ttylogEnabled = CowrieConfig.getboolean("honeypot", "ttylog", fallback=True)
@ -69,7 +69,7 @@ class CowrieSSHChannel(channel.SSHChannel):
def closed(self):
log.msg(
eventid="cowrie.log.closed",
format="Closing TTY Log: %(ttylog)s after %(duration)d seconds",
format="Closing TTY Log: %(ttylog)s after %(duration)f seconds",
ttylog=self.ttylogFile,
size=self.bytesReceived + self.bytesWritten,
duration=time.time() - self.startTime,

View File

@ -8,10 +8,12 @@ This module contains ...
import time
from configparser import NoOptionError
from typing import Optional
from twisted.conch.openssh_compat import primes
from twisted.conch.ssh import factory
from twisted.conch.ssh import keys
from twisted.cred import portal as tp
from twisted.python import log
from cowrie.core.config import CowrieConfig
@ -34,6 +36,7 @@ class CowrieSSHFactory(factory.SSHFactory):
privateKeys = None
publicKeys = None
primes = None
portal: Optional[tp.Portal] = None # gets set by plugin
tac = None # gets set later
ourVersionString = CowrieConfig.get(
"ssh", "version", fallback="SSH-2.0-OpenSSH_6.0p1 Debian-4+deb7u2"
@ -50,13 +53,13 @@ class CowrieSSHFactory(factory.SSHFactory):
}
super().__init__()
def logDispatch(self, *msg, **args):
def logDispatch(self, **args):
"""
Special delivery to the loggers to avoid scope problems
"""
args["sessionno"] = "S{}".format(args["sessionno"])
for output in self.tac.output_plugins:
output.logDispatch(*msg, **args)
output.logDispatch(**args)
def startFactory(self):
# For use by the uptime command

View File

@ -55,8 +55,8 @@ def getDSAKeys():
with open(privateKeyFile, "w+b") as f:
f.write(privateKeyString)
else:
with open(publicKeyFile) as f:
with open(publicKeyFile, "rb") as f:
publicKeyString = f.read()
with open(privateKeyFile) as f:
with open(privateKeyFile, "rb") as f:
privateKeyString = f.read()
return publicKeyString, privateKeyString

View File

@ -24,13 +24,13 @@ from cowrie.core.config import CowrieConfig
class HoneyPotSSHTransport(transport.SSHServerTransport, TimeoutMixin):
startTime = None
gotVersion = False
startTime: float = 0.0
gotVersion: bool = False
ipv4rex = re.compile(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$")
auth_timeout = CowrieConfig.getint(
auth_timeout: int = CowrieConfig.getint(
"honeypot", "authentication_timeout", fallback=120
)
interactive_timeout = CowrieConfig.getint(
interactive_timeout: int = CowrieConfig.getint(
"honeypot", "interactive_timeout", fallback=300
)
@ -48,8 +48,8 @@ class HoneyPotSSHTransport(transport.SSHServerTransport, TimeoutMixin):
Called when the connection is made from the other side.
We send our version, but wait with sending KEXINIT
"""
self.transportId = uuid.uuid4().hex[:12]
src_ip = self.transport.getPeer().host
self.transportId: str = uuid.uuid4().hex[:12]
src_ip: str = self.transport.getPeer().host
ipv4_search = self.ipv4rex.search(src_ip)
if ipv4_search is not None:

View File

@ -1,16 +1,16 @@
import struct
def string_to_hex(message):
def string_to_hex(message: str) -> bytes:
b = message.encode("utf-8")
size = struct.pack(">L", len(b))
return size + b
def bin_string_to_hex(message):
def bin_string_to_hex(message: bytes) -> bytes:
size = struct.pack(">L", len(message))
return size + message
def int_to_hex(value):
def int_to_hex(value: int) -> bytes:
return struct.pack(">L", value)

View File

@ -7,8 +7,9 @@ Telnet Transport and Authentication for the Honeypot
import time
from typing import Optional
from twisted.cred import portal as tp
from twisted.internet import protocol
from twisted.python import log
@ -26,6 +27,7 @@ class HoneyPotTelnetFactory(protocol.ServerFactory):
"""
tac = None
portal: Optional[tp.Portal] = None # gets set by Twisted plugin
def __init__(self, backend, pool_handler):
self.backend = backend
@ -33,13 +35,13 @@ class HoneyPotTelnetFactory(protocol.ServerFactory):
super().__init__()
# TODO logging clarity can be improved: see what SSH does
def logDispatch(self, *msg, **args):
def logDispatch(self, **args):
"""
Special delivery to the loggers to avoid scope problems
"""
args["sessionno"] = "T{}".format(str(args["sessionno"]))
for output in self.tac.output_plugins:
output.logDispatch(*msg, **args)
output.logDispatch(**args)
def startFactory(self):
try:

View File

@ -1,4 +0,0 @@
# -*- test-case-name: Cowrie Test Cases -*-
# Copyright (c) 2016 Dave Germiquet
# See LICENSE for details.

View File

@ -36,35 +36,35 @@ class ShellEchoCommandTests(unittest.TestCase):
Test $0, full input line contents
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $0 }"\n')
self.assertEquals(self.tr.value(), b"test test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_002(self):
"""
Test $1, first agument
"""
self.proto.lineReceived(b'echo "test" | awk "{ print $1 }"\n')
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_awk_command_003(self):
"""
Test $1 $2 space separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1 $2 }"\n')
self.assertEquals(self.tr.value(), b"test test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_004(self):
"""
Test $1,$2 comma separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1,$2 }"\n')
self.assertEquals(self.tr.value(), b"test test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_awk_command_005(self):
"""
Test $1$2 not separated
"""
self.proto.lineReceived(b'echo "test test" | awk "{ print $1$2 }"\n')
self.assertEquals(self.tr.value(), b"testtest\n" + PROMPT)
self.assertEqual(self.tr.value(), b"testtest\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -45,18 +45,18 @@ class ShellBaseCommandsTests(unittest.TestCase):
def test_hostname_command(self):
self.proto.lineReceived(b"hostname unitChanged\n")
self.assertEquals(self.tr.value(), b"root@unitChanged:~# ")
self.assertEqual(self.tr.value(), b"root@unitChanged:~# ")
# def test_reset_command(self):
# self.proto.lineReceived(b'reset')
# def test_ps_command(self):
# self.proto.lineReceived(b'ps\n')
# self.assertEquals(self.tr.value().decode('utf8'), "\n".join(self.data['results']['ps']))
# self.assertEqual(self.tr.value().decode('utf8'), "\n".join(self.data['results']['ps']))
def test_id_command(self):
self.proto.lineReceived(b"id\n")
self.assertEquals(
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
@ -64,7 +64,7 @@ class ShellBaseCommandsTests(unittest.TestCase):
self.proto.lineReceived(b"passwd\n")
self.proto.lineReceived(b"changeme\n")
self.proto.lineReceived(b"changeme\n")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully\n"
+ PROMPT,
@ -95,7 +95,7 @@ class ShellBaseCommandsTests(unittest.TestCase):
def test_sh_command(self):
self.proto.lineReceived(b"sh -c id\n")
self.assertEquals(
self.assertEqual(
self.tr.value(), b"uid=0(root) gid=0(root) groups=0(root)\n" + PROMPT
)
@ -105,16 +105,16 @@ class ShellBaseCommandsTests(unittest.TestCase):
def test_chattr_command(self):
self.proto.lineReceived(b"chattr\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_umask_command(self):
self.proto.lineReceived(b"umask\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_set_command(self):
self.proto.lineReceived(b"set\n")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"COLUMNS=80\nHOME=/root\nLINES=25\nLOGNAME=root\nPATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\nTMOUT=1800\nUSER=root\n"
+ PROMPT,
@ -122,47 +122,47 @@ class ShellBaseCommandsTests(unittest.TestCase):
def test_unset_command(self):
self.proto.lineReceived(b"unset\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_export_command(self):
self.proto.lineReceived(b"export\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_alias_command(self):
self.proto.lineReceived(b"alias\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_jobs_command(self):
self.proto.lineReceived(b"jobs\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_kill_command(self):
self.proto.lineReceived(b"/bin/kill\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_pkill_command(self):
self.proto.lineReceived(b"/bin/pkill\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_killall_command(self):
self.proto.lineReceived(b"/bin/killall\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_killall5_command(self):
self.proto.lineReceived(b"/bin/killall5\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_su_command(self):
self.proto.lineReceived(b"su\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chown_command(self):
self.proto.lineReceived(b"chown\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chgrp_command(self):
self.proto.lineReceived(b"chgrp\n")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")
@ -246,11 +246,11 @@ class ShellPipeCommandsTests(unittest.TestCase):
def test_shell_pipe_with_cat_tail(self):
self.proto.lineReceived(b"echo test | tail -n 1\n")
self.assertEquals(self.tr.value(), PROMPT + b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT)
def test_shell_pipe_with_cat_head(self):
self.proto.lineReceived(b"echo test | head -n 1\n")
self.assertEquals(self.tr.value(), PROMPT + b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), PROMPT + b"test\n" + PROMPT)
# def test_shell_busybox_with_cat_and_sudo_grep(self):
# self.proto.lineReceived(b'busybox cat /proc/cpuinfo | sudo grep cpu \n')

View File

@ -36,7 +36,7 @@ class ShellCatCommandTests(unittest.TestCase):
No such file
"""
self.proto.lineReceived(b"cat nonExisting\n")
self.assertEquals(
self.assertEqual(
self.tr.value(), b"cat: nonExisting: No such file or directory\n" + PROMPT
)
@ -45,7 +45,7 @@ class ShellCatCommandTests(unittest.TestCase):
argument - (stdin)
"""
self.proto.lineReceived(b"echo test | cat -\n")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_cat_command_003(self):
"""
@ -54,7 +54,7 @@ class ShellCatCommandTests(unittest.TestCase):
self.proto.lineReceived(b"echo 1 | cat\n")
self.proto.lineReceived(b"echo 2\n")
self.proto.handle_CTRL_D()
self.assertEquals(self.tr.value(), b"1\n" + PROMPT + b"2\n" + PROMPT)
self.assertEqual(self.tr.value(), b"1\n" + PROMPT + b"2\n" + PROMPT)
def test_cat_command_004(self):
"""
@ -63,7 +63,7 @@ class ShellCatCommandTests(unittest.TestCase):
self.proto.lineReceived(b"cat\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_C()
self.assertEquals(self.tr.value(), b"test\n^C\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n^C\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -37,7 +37,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Missing operand
"""
self.proto.lineReceived(b"chmod")
self.assertEquals(
self.assertEqual(
self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
@ -46,7 +46,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Missing operand
"""
self.proto.lineReceived(b"chmod -x")
self.assertEquals(
self.assertEqual(
self.tr.value(), b"chmod: missing operand\n" + TRY_CHMOD_HELP_MSG + PROMPT
)
@ -55,7 +55,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Missing operand after ...
"""
self.proto.lineReceived(b"chmod +x")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"chmod: missing operand after \xe2\x80\x98+x\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
@ -67,7 +67,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Invalid option
"""
self.proto.lineReceived(b"chmod -A")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"chmod: invalid option -- 'A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
)
@ -77,7 +77,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Unrecognized option
"""
self.proto.lineReceived(b"chmod --A")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"chmod: unrecognized option '--A'\n" + TRY_CHMOD_HELP_MSG + PROMPT,
)
@ -87,7 +87,7 @@ class ShellChmodCommandTests(unittest.TestCase):
No such file or directory
"""
self.proto.lineReceived(b"chmod -x abcd")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"chmod: cannot access 'abcd': No such file or directory\n" + PROMPT,
)
@ -97,7 +97,7 @@ class ShellChmodCommandTests(unittest.TestCase):
Invalid mode
"""
self.proto.lineReceived(b"chmod abcd efgh")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"chmod: invalid mode: \xe2\x80\x98abcd\xe2\x80\x99\n"
+ TRY_CHMOD_HELP_MSG
@ -109,56 +109,56 @@ class ShellChmodCommandTests(unittest.TestCase):
Valid directory .ssh
"""
self.proto.lineReceived(b"chmod +x .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_009(self):
"""
Valid directory .ssh recursive
"""
self.proto.lineReceived(b"chmod -R +x .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_010(self):
"""
Valid directory /root/.ssh
"""
self.proto.lineReceived(b"chmod +x /root/.ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_011(self):
"""
Valid directory ~/.ssh
"""
self.proto.lineReceived(b"chmod +x ~/.ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_012(self):
"""
chmod a+x
"""
self.proto.lineReceived(b"chmod a+x .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_013(self):
"""
chmod ug+x
"""
self.proto.lineReceived(b"chmod ug+x .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_014(self):
"""
chmod 777
"""
self.proto.lineReceived(b"chmod 777 .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_chmod_command_015(self):
"""
chmod 0775
"""
self.proto.lineReceived(b"chmod 0755 .ssh")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -36,182 +36,182 @@ class ShellEchoCommandTests(unittest.TestCase):
Basic test
"""
self.proto.lineReceived(b'echo "test"\n')
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_002(self):
"""
argument splitting and recombining
"""
self.proto.lineReceived(b"echo test test\n")
self.assertEquals(self.tr.value(), b"test test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test test\n" + PROMPT)
def test_echo_command_003(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo -n "test test"\n')
self.assertEquals(self.tr.value(), b"test test" + PROMPT)
self.assertEqual(self.tr.value(), b"test test" + PROMPT)
def test_echo_command_004(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo -n "test test"\n')
self.assertEquals(self.tr.value(), b"test test" + PROMPT)
self.assertEqual(self.tr.value(), b"test test" + PROMPT)
def test_echo_command_005(self):
"""
echo test >> test; cat test
"""
self.proto.lineReceived(b"echo test > test5; cat test5")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_006(self):
"""
echo -n
"""
self.proto.lineReceived(b'echo "\\n"\n')
self.assertEquals(self.tr.value(), b"\\n\n" + PROMPT)
self.assertEqual(self.tr.value(), b"\\n\n" + PROMPT)
def test_echo_command_007(self):
"""
echo test >> test; cat test
"""
self.proto.lineReceived(b"echo test >> test7; cat test7")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_008(self):
"""
echo test > test; echo test >> test; cat test
"""
self.proto.lineReceived(b"echo test > test8; echo test >> test8; cat test8")
self.assertEquals(self.tr.value(), b"test\ntest\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT)
def test_echo_command_009(self):
"""
echo test | grep test
"""
self.proto.lineReceived(b"echo test | grep test")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_010(self):
"""
echo test | grep test
"""
self.proto.lineReceived(b"echo test | grep test2")
self.assertEquals(self.tr.value(), PROMPT)
self.assertEqual(self.tr.value(), PROMPT)
def test_echo_command_011(self):
"""
echo test > test011; cat test011 | grep test
"""
self.proto.lineReceived(b"echo test > test011; cat test011 | grep test")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_012(self):
"""
echo test > test012; grep test test012
"""
self.proto.lineReceived(b"echo test > test012; grep test test012")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_013(self):
"""
echo "ls""ls"
"""
self.proto.lineReceived(b'echo "ls""ls"')
self.assertEquals(self.tr.value(), b"lsls\n" + PROMPT)
self.assertEqual(self.tr.value(), b"lsls\n" + PROMPT)
def test_echo_command_014(self):
"""
echo '"ls"'
"""
self.proto.lineReceived(b"echo '\"ls\"'")
self.assertEquals(self.tr.value(), b'"ls"\n' + PROMPT)
self.assertEqual(self.tr.value(), b'"ls"\n' + PROMPT)
def test_echo_command_015(self):
"""
echo "'ls'"
"""
self.proto.lineReceived(b"echo \"'ls'\"")
self.assertEquals(self.tr.value(), b"'ls'\n" + PROMPT)
self.assertEqual(self.tr.value(), b"'ls'\n" + PROMPT)
def test_echo_command_016(self):
"""
echo -e "\x6b\x61\x6d\x69"
"""
self.proto.lineReceived(b'echo -e "\x6b\x61\x6d\x69"')
self.assertEquals(self.tr.value(), b"kami\n" + PROMPT)
self.assertEqual(self.tr.value(), b"kami\n" + PROMPT)
def test_echo_command_017(self):
"""
echo -e "\x6b\x61\x6d\x69"
"""
self.proto.lineReceived(b"echo echo test | bash")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_018(self):
"""
echo $(echo test)
"""
self.proto.lineReceived(b"echo $(echo test)")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_019(self):
"""
echo $(echo $(echo test))
"""
self.proto.lineReceived(b"echo $(echo $(echo test))")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_020(self):
"""
echo test_$(echo test)_test
"""
self.proto.lineReceived(b"echo test_$(echo test)_test")
self.assertEquals(self.tr.value(), b"test_test_test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT)
def test_echo_command_021(self):
"""
echo test_$(echo test)_test_$(echo test)_test
"""
self.proto.lineReceived(b"echo test_$(echo test)_test_$(echo test)_test")
self.assertEquals(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
def test_echo_command_022(self):
"""
echo test; (echo test)
"""
self.proto.lineReceived(b"echo test; (echo test)")
self.assertEquals(self.tr.value(), b"test\ntest\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\ntest\n" + PROMPT)
def test_echo_command_023(self):
"""
echo `echo test`
"""
self.proto.lineReceived(b"echo `echo test`")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_echo_command_024(self):
"""
echo test_`echo test`_test
"""
self.proto.lineReceived(b"echo test_`echo test`_test")
self.assertEquals(self.tr.value(), b"test_test_test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test_test_test\n" + PROMPT)
def test_echo_command_025(self):
"""
echo test_`echo test`_test_`echo test`_test
"""
self.proto.lineReceived(b"echo test_`echo test`_test_`echo test`_test")
self.assertEquals(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test_test_test_test_test\n" + PROMPT)
def test_echo_command_026(self):
"""
echo "TEST1: `echo test1`, TEST2: `echo test2`"
"""
self.proto.lineReceived(b'echo "TEST1: `echo test1`, TEST2: `echo test2`"')
self.assertEquals(self.tr.value(), b"TEST1: test1, TEST2: test2\n" + PROMPT)
self.assertEqual(self.tr.value(), b"TEST1: test1, TEST2: test2\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -32,7 +32,7 @@ class ShellftpgetCommandTests(unittest.TestCase):
Basic test
"""
self.proto.lineReceived(b"ftpget\n")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"""BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.

View File

@ -36,9 +36,7 @@ class ShellTeeCommandTests(unittest.TestCase):
No such file
"""
self.proto.lineReceived(b"tee /a/b/c/d\n")
self.assertEquals(
self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n"
)
self.assertEqual(self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n")
def test_tee_command_002(self):
"""
@ -46,7 +44,7 @@ class ShellTeeCommandTests(unittest.TestCase):
"""
self.proto.lineReceived(b"tee /a/b/c/d\n")
self.proto.handle_CTRL_C()
self.assertEquals(
self.assertEqual(
self.tr.value(), b"tee: /a/b/c/d: No such file or directory\n^C\n" + PROMPT
)
@ -57,14 +55,14 @@ class ShellTeeCommandTests(unittest.TestCase):
self.proto.lineReceived(b"tee a\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_tee_command_004(self):
"""
test handle of stdin
"""
self.proto.lineReceived(b"echo test | tee\n")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_tee_command_005(self):
"""
@ -73,7 +71,7 @@ class ShellTeeCommandTests(unittest.TestCase):
self.proto.lineReceived(b"tee\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -36,7 +36,7 @@ class ShellTftpCommandTests(unittest.TestCase):
Basic test
"""
self.proto.lineReceived(b"tftp\n")
self.assertEquals(
self.assertEqual(
self.tr.value(),
b"usage: tftp [-h] [-c C C] [-l L] [-g G] [-p P] [-r R] [hostname]\n"
+ PROMPT,

View File

@ -36,14 +36,14 @@ class ShellUniqCommandTests(unittest.TestCase):
echo test | uniq
"""
self.proto.lineReceived(b"echo test | uniq\n")
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_002(self):
"""
echo -e "test\ntest\ntest" | uniq
"""
self.proto.lineReceived(b'echo -e "test\ntest\ntest" | uniq\n')
self.assertEquals(self.tr.value(), b"test\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n" + PROMPT)
def test_uniq_command_003(self):
"""
@ -54,7 +54,7 @@ class ShellUniqCommandTests(unittest.TestCase):
self.proto.lineReceived(b"test\n")
self.proto.lineReceived(b"test\n")
self.proto.handle_CTRL_D()
self.assertEquals(self.tr.value(), b"test\n\n" + PROMPT)
self.assertEqual(self.tr.value(), b"test\n\n" + PROMPT)
def tearDown(self):
self.proto.connectionLost("tearDown From Unit Test")

View File

@ -29,7 +29,7 @@
import os
import sys
from typing import List
from typing import Callable, ClassVar, Dict, List
from backend_pool.pool_server import PoolServerFactory
@ -55,7 +55,7 @@ from cowrie.core.utils import create_endpoint_services, get_endpoints_from_secti
from cowrie.pool_interface.handler import PoolHandler
if __twisted_version__.major < 17:
if __twisted_version__.major < 20:
raise ImportError(
"Your version of Twisted is too old. Please ensure your virtual environment is set up correctly."
)
@ -68,11 +68,11 @@ class Options(usage.Options):
# The '-c' parameters is currently ignored
optParameters: List[str] = []
optFlags = [["help", "h", "Display this help and exit."]]
optFlags: List[List[str]] = [["help", "h", "Display this help and exit."]]
@provider(ILogObserver)
def importFailureObserver(event):
def importFailureObserver(event: Dict) -> None:
if "failure" in event and event["failure"].type is ImportError:
log.err(
"ERROR: %s. Please run `pip install -U -r requirements.txt` "
@ -86,27 +86,29 @@ globalLogPublisher.addObserver(importFailureObserver)
@implementer(IServiceMaker, IPlugin)
class CowrieServiceMaker:
tapname = "cowrie"
description = "She sells sea shells by the sea shore."
tapname: ClassVar[str] = "cowrie"
description: ClassVar[str] = "She sells sea shells by the sea shore."
options = Options
output_plugins = None
output_plugins: List[Callable] = []
topService: service.Service
def __init__(self):
self.topService = None
def __init__(self) -> None:
self.pool_handler = None
# ssh is enabled by default
self.enableSSH = CowrieConfig.getboolean("ssh", "enabled", fallback=True)
self.enableSSH: bool = CowrieConfig.getboolean("ssh", "enabled", fallback=True)
# telnet is disabled by default
self.enableTelnet = CowrieConfig.getboolean("telnet", "enabled", fallback=False)
self.enableTelnet: bool = CowrieConfig.getboolean(
"telnet", "enabled", fallback=False
)
# pool is disabled by default, but need to check this setting in case user only wants to run the pool
self.pool_only = CowrieConfig.getboolean(
self.pool_only: bool = CowrieConfig.getboolean(
"backend_pool", "pool_only", fallback=False
)
def makeService(self, options):
def makeService(self, options: Dict) -> service.Service:
"""
Construct a TCPServer from a factory defined in Cowrie.
"""
@ -126,7 +128,7 @@ Makes a Cowrie SSH/Telnet honeypot.
print("ERROR: You must not run cowrie as root!")
sys.exit(1)
tz = CowrieConfig.get("honeypot", "timezone", fallback="UTC")
tz: str = CowrieConfig.get("honeypot", "timezone", fallback="UTC")
# `system` means use the system time zone
if tz != "system":
os.environ["TZ"] = tz
@ -161,7 +163,7 @@ Makes a Cowrie SSH/Telnet honeypot.
continue
if CowrieConfig.getboolean(x, "enabled") is False:
continue
engine = x.split("_")[1]
engine: str = x.split("_")[1]
try:
output = __import__(
f"cowrie.output.{engine}", globals(), locals(), ["output"]
@ -186,15 +188,19 @@ Makes a Cowrie SSH/Telnet honeypot.
# initialise VM pool handling - only if proxy AND pool set to enabled, and pool is to be deployed here
# or also enabled if pool_only is true
backend_type = CowrieConfig.get("honeypot", "backend", fallback="shell")
proxy_backend = CowrieConfig.get("proxy", "backend", fallback="simple")
backend_type: str = CowrieConfig.get("honeypot", "backend", fallback="shell")
proxy_backend: str = CowrieConfig.get("proxy", "backend", fallback="simple")
if (backend_type == "proxy" and proxy_backend == "pool") or self.pool_only:
# in this case we need to set some kind of pool connection
local_pool = CowrieConfig.get("proxy", "pool", fallback="local") == "local"
pool_host = CowrieConfig.get("proxy", "pool_host", fallback="127.0.0.1")
pool_port = CowrieConfig.getint("proxy", "pool_port", fallback=6415)
local_pool: bool = (
CowrieConfig.get("proxy", "pool", fallback="local") == "local"
)
pool_host: str = CowrieConfig.get(
"proxy", "pool_host", fallback="127.0.0.1"
)
pool_port: int = CowrieConfig.getint("proxy", "pool_port", fallback=6415)
if local_pool or self.pool_only:
# start a pool locally
@ -211,7 +217,7 @@ Makes a Cowrie SSH/Telnet honeypot.
# either way (local or remote) we set up a client to the pool
# unless this instance has no SSH and Telnet (pool only)
if (self.enableTelnet or self.enableSSH) and not self.pool_only:
self.pool_handler = PoolHandler(pool_host, pool_port, self)
self.pool_handler = PoolHandler(pool_host, pool_port, self) # type: ignore
else:
# we initialise the services directly
@ -219,14 +225,14 @@ Makes a Cowrie SSH/Telnet honeypot.
return self.topService
def pool_ready(self):
backend = CowrieConfig.get("honeypot", "backend", fallback="shell")
def pool_ready(self) -> None:
backend: str = CowrieConfig.get("honeypot", "backend", fallback="shell")
# this method is never called if self.pool_only is False,
# since we do not start the pool handler that would call it
if self.enableSSH:
factory = cowrie.ssh.factory.CowrieSSHFactory(backend, self.pool_handler)
factory.tac = self
factory.tac = self # type: ignore
factory.portal = portal.Portal(core.realm.HoneyPotRealm())
factory.portal.registerChecker(core.checkers.HoneypotPublicKeyChecker())
factory.portal.registerChecker(core.checkers.HoneypotPasswordChecker())
@ -247,7 +253,7 @@ Makes a Cowrie SSH/Telnet honeypot.
if self.enableTelnet:
f = cowrie.telnet.factory.HoneyPotTelnetFactory(backend, self.pool_handler)
f.tac = self
f.tac = self # type: ignore
f.portal = portal.Portal(core.realm.HoneyPotRealm())
f.portal.registerChecker(core.checkers.HoneypotPasswordChecker())