more typing (#1795)

* more typing
This commit is contained in:
Michel Oosterhof
2022-12-15 11:46:15 +08:00
committed by GitHub
parent 9098c5378d
commit b56e257c29
78 changed files with 501 additions and 463 deletions

View File

@ -18,6 +18,10 @@ disallow_any_unimported = true
strict_equality = true
disallow_untyped_decorators = true
disallow_subclassing_any = true
warn_unused_ignores = true
# Getting these passing should be easy
strict_concatenate = true
# These are too strict for us at the moment
@ -28,4 +32,6 @@ disallow_any_explicit = false
disallow_any_expr = false
disallow_any_generics = false
disallow_untyped_calls = false
warn_unused_ignores = false
[tool.pylint."MESSAGES CONTROL"]
disable = [ "R0912", "C0103", "C0114", "C0115", "C0116", "C0301" ]

View File

@ -1,8 +1,8 @@
Sphinx==5.3.0
flake8==6.0.0
mypy-extensions==0.4.3; platform_python_implementation=='CPython' and python_version>'3.8'
mypy-zope==0.3.9; platform_python_implementation=='CPython' and python_version>'3.8'
mypy==0.971; platform_python_implementation=='CPython' and python_version>'3.8'
mypy-zope==0.3.11; platform_python_implementation=='CPython' and python_version>'3.8'
mypy==0.981; platform_python_implementation=='CPython' and python_version>'3.8'
pathspec==0.10.1
pipdeptree==2.2.1
pre-commit==2.20.0
@ -20,4 +20,4 @@ twistedchecker==0.7.4
types-python-dateutil==2.8.19; python_version>'3.8'
types-redis==4.3.20; python_version>'3.8'
types-requests==2.28.9; python_version>'3.8'
yamllint==1.27.1
yamllint==1.27.1

View File

@ -2,12 +2,12 @@ from __future__ import annotations
from threading import Lock
from twisted.internet import protocol
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
class ClientProtocol(protocol.Protocol):
def dataReceived(self, data):
self.server_protocol.transport.write(data)
def dataReceived(self, data: bytes) -> None:
self.server_protocol.transport.write(data) # type: ignore
def connectionLost(self, reason):
self.server_protocol.transport.loseConnection()
@ -32,7 +32,7 @@ class ServerProtocol(protocol.Protocol):
self.buffer = []
def connectionMade(self):
reactor.connectTCP(self.dst_ip, self.dst_port, ClientFactory(self)) # type: ignore[attr-defined]
reactor.connectTCP(self.dst_ip, self.dst_port, ClientFactory(self))
def dataReceived(self, data):
self.buffer.append(data)
@ -40,7 +40,7 @@ class ServerProtocol(protocol.Protocol):
def sendData(self):
if not self.client_protocol:
reactor.callLater(0.5, self.sendData) # type: ignore[attr-defined]
reactor.callLater(0.5, self.sendData)
return
for packet in self.buffer:
@ -88,10 +88,10 @@ class NATService:
self.bindings[guest_id][2]._realPortNumber,
)
else:
nat_ssh = reactor.listenTCP( # type: ignore[attr-defined]
nat_ssh = reactor.listenTCP(
0, ServerFactory(dst_ip, ssh_port), interface="0.0.0.0"
)
nat_telnet = reactor.listenTCP( # type: ignore[attr-defined]
nat_telnet = reactor.listenTCP(
0, ServerFactory(dst_ip, telnet_port), interface="0.0.0.0"
)
self.bindings[guest_id] = [1, nat_ssh, nat_telnet]

View File

@ -6,7 +6,7 @@ import os
import time
from threading import Lock
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet import threads
from twisted.python import log
@ -101,7 +101,7 @@ class PoolService:
"backend_pool", "recycle_period", fallback=-1
)
if recycle_period > 0:
reactor.callLater(recycle_period, self.restart_pool) # type: ignore[attr-defined]
reactor.callLater(recycle_period, self.restart_pool)
def stop_pool(self):
# lazy import to avoid exception if not using the backend_pool and libvirt not installed (#1185)
@ -339,7 +339,7 @@ class PoolService:
self.__producer_mark_available()
# sleep until next iteration
self.loop_next_call = reactor.callLater( # type: ignore[attr-defined]
self.loop_next_call = reactor.callLater(
self.loop_sleep_time, self.producer_loop
)

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from twisted.conch.ssh import channel, common, connection, transport, userauth
from twisted.internet import defer, protocol
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
class PasswordAuth(userauth.SSHUserAuthClient):
@ -99,6 +99,6 @@ def execute_ssh(host, port, username, password, command, callback=None):
done_deferred = defer.Deferred()
factory = ClientCommandFactory(username, password, command, done_deferred, callback)
reactor.connectTCP(host, port, factory) # type: ignore[attr-defined]
reactor.connectTCP(host, port, factory)
return done_deferred

View File

@ -6,7 +6,7 @@ from typing import Optional
from twisted.conch.telnet import StatefulTelnetProtocol, TelnetTransport
from twisted.internet import defer
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.protocol import ClientFactory
@ -67,7 +67,7 @@ class TelnetClient(StatefulTelnetProtocol):
# start countdown to command done (when reached, consider the output was completely received and close)
if not self.done_callback:
self.done_callback = reactor.callLater(0.5, self.close) # type: ignore[attr-defined]
self.done_callback = reactor.callLater(0.5, self.close) # type: ignore
else:
self.done_callback.reset(0.5)
@ -76,7 +76,7 @@ class TelnetClient(StatefulTelnetProtocol):
Sends a command via Telnet using line mode
"""
self.command = command.encode()
self.sendLine(self.command)
self.sendLine(self.command) # ignore: attr-defined
def close(self):
"""
@ -129,7 +129,7 @@ class TelnetClientCommand:
factory = TelnetFactory(
username, password, self.prompt, self.command, done_deferred, self.callback
)
reactor.connectTCP(host, port, factory) # type: ignore[attr-defined]
reactor.connectTCP(host, port, factory)
return done_deferred

View File

@ -6,7 +6,7 @@ from __future__ import annotations
import random
from typing import Optional
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from cowrie.shell.command import HoneyPotCommand
@ -64,7 +64,7 @@ class Command_adduser(HoneyPotCommand):
self.do_output()
def do_output(self):
def do_output(self) -> None:
if self.item == len(self.output):
self.item = 7
self.schedule_next()
@ -81,7 +81,7 @@ class Command_adduser(HoneyPotCommand):
self.item += 1
self.schedule_next()
def schedule_next(self):
def schedule_next(self) -> None:
self.scheduled = reactor.callLater(0.5 + random.random() * 1, self.do_output) # type: ignore[attr-defined]
def lineReceived(self, line: str) -> None:

View File

@ -6,9 +6,9 @@ from __future__ import annotations
import random
import re
from typing import Any, Dict
from typing import Any, Callable, Dict, Optional
from twisted.internet import defer, reactor # type: ignore
from twisted.internet import defer, reactor
from twisted.internet.defer import inlineCallbacks
from cowrie.shell.command import HoneyPotCommand
@ -18,9 +18,9 @@ commands = {}
class Command_faked_package_class_factory:
@staticmethod
def getCommand(name):
def getCommand(name: str) -> Callable:
class Command_faked_installation(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(f"{name}: Segmentation fault\n")
return Command_faked_installation
@ -47,14 +47,14 @@ class Command_aptget(HoneyPotCommand):
else:
self.do_locked()
def sleep(self, time, time2=None):
def sleep(self, time: float, time2: Optional[float] = None) -> defer.Deferred:
d: defer.Deferred = defer.Deferred()
if time2:
time = random.randint(time * 100, time2 * 100) / 100.0
time = random.randint(int(time * 100), int(time2 * 100.0)) / 100.0
reactor.callLater(time, d.callback, None) # type: ignore[attr-defined]
return d
def do_version(self):
def do_version(self) -> None:
self.write(
"""apt 1.0.9.8.1 for amd64 compiled on Jun 10 2015 09:42:06
Supported modules:
@ -71,7 +71,7 @@ Supported modules:
)
self.exit()
def do_help(self):
def do_help(self) -> None:
self.write(
"""apt 1.0.9.8.1 for amd64 compiled on Jun 10 2015 09:42:06
Usage: apt-get [options] command
@ -187,7 +187,7 @@ pages for more information and options.
yield self.sleep(2)
self.exit()
def do_moo(self):
def do_moo(self) -> None:
self.write(" (__)\n")
self.write(" (oo)\n")
self.write(" /------\\/\n")
@ -197,7 +197,7 @@ pages for more information and options.
self.write('...."Have you mooed today?"...\n')
self.exit()
def do_locked(self):
def do_locked(self) -> None:
self.errorWrite(
"E: Could not open lock file /var/lib/apt/lists/lock - open (13: Permission denied)\n"
)

View File

@ -12,6 +12,7 @@ from __future__ import annotations
import getopt
import re
from typing import Match, Optional
from twisted.python import log
@ -88,7 +89,7 @@ class Command_awk(HoneyPotCommand):
self.output(self.input_data)
self.exit()
def awk_parser(self, program):
def awk_parser(self, program: str) -> list[dict[str, str]]:
"""
search for awk execution patterns, either direct {} code or only executed for a certain regex
{ }
@ -101,23 +102,23 @@ class Command_awk(HoneyPotCommand):
code.append({"regex": m[1], "code": m[2]})
return code
def awk_print(self, words):
def awk_print(self, words: str) -> None:
"""
This is the awk `print` command that operates on a single line only
"""
self.write(words)
self.write("\n")
def output(self, input):
def output(self, inb: Optional[bytes]) -> None:
"""
This is the awk output.
"""
if "decode" in dir(input):
input = input.decode("utf-8")
if not isinstance(input, str):
pass
if inb:
inp = inb.decode("utf-8")
else:
return
inputlines = input.split("\n")
inputlines = inp.split("\n")
if inputlines[-1] == "":
inputlines.pop()
for inputline in inputlines:
@ -127,7 +128,7 @@ class Command_awk(HoneyPotCommand):
words = inputline.split()
words.insert(0, inputline)
def repl(m):
def repl(m: Match) -> str:
try:
return words[int(m.group(1))]
except IndexError:
@ -148,7 +149,7 @@ class Command_awk(HoneyPotCommand):
# print("LINE2: {}".format(line))
self.awk_print(line)
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
"""
This function logs standard input from the user send to awk
"""
@ -159,15 +160,15 @@ class Command_awk(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
self.output(line)
self.output(line.encode())
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
"""
ctrl-d is end-of-file, time to terminate
"""
self.exit()
def help(self):
def help(self) -> None:
self.write(
"""Usage: awk [POSIX or GNU style options] -f progfile [--] file ...
Usage: awk [POSIX or GNU style options] [--] 'program' file ...
@ -212,7 +213,7 @@ Examples:
"""
)
def version(self):
def version(self) -> None:
self.write(
"""GNU Awk 4.1.4, API: 1.1 (GNU MPFR 4.0.1, GNU MP 6.1.2)
Copyright (C) 1989, 1991-2016 Free Software Foundation.

View File

@ -11,9 +11,9 @@ import getopt
import random
import re
import time
from typing import Callable
from typing import Callable, Optional
from twisted.internet import error, reactor # type: ignore
from twisted.internet import error, reactor
from twisted.python import failure, log
from cowrie.core import utils
@ -24,7 +24,7 @@ commands: dict[str, Callable] = {}
class Command_whoami(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(f"{self.protocol.user.username}\n")
@ -35,7 +35,7 @@ commands["users"] = Command_whoami
class Command_help(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(
"""GNU bash, version 4.2.37(1)-release (x86_64-pc-linux-gnu)
These shell commands are defined internally. Type `help' to see this list.
@ -90,7 +90,7 @@ commands["help"] = Command_help
class Command_w(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(
" {} up {}, 1 user, load average: 0.00, 0.00, 0.00\n".format(
time.strftime("%H:%M:%S"), utils.uptime(self.protocol.uptime())
@ -114,7 +114,7 @@ commands["w"] = Command_w
class Command_who(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(
"%-8s pts/0 %s %s (%s)\n"
% (
@ -131,7 +131,7 @@ commands["who"] = Command_who
class Command_echo(HoneyPotCommand):
def call(self):
def call(self) -> None:
newline = True
escape_decode = False
@ -166,8 +166,8 @@ class Command_echo(HoneyPotCommand):
string += "\n"
if escape_decode:
string = codecs.escape_decode(string)[0]
self.writeBytes(string)
data: bytes = codecs.escape_decode(string)[0] # type: ignore
self.writeBytes(data)
else:
self.write(string)
@ -180,7 +180,7 @@ commands["echo"] = Command_echo
class Command_printf(HoneyPotCommand):
def call(self):
def call(self) -> None:
if not len(self.args):
self.write("printf: usage: printf [-v var] format [arguments]\n")
else:
@ -198,7 +198,8 @@ class Command_printf(HoneyPotCommand):
if s.endswith("\\c"):
s = s[:-2]
self.writeBytes(codecs.escape_decode(s)[0])
data: bytes = codecs.escape_decode(s)[0] # type: ignore
self.writeBytes(data)
commands["/usr/bin/printf"] = Command_printf
@ -206,11 +207,11 @@ commands["printf"] = Command_printf
class Command_exit(HoneyPotCommand):
def call(self):
def call(self) -> None:
stat = failure.Failure(error.ProcessDone(status=""))
self.protocol.terminal.transport.processEnded(stat)
def exit(self):
def exit(self) -> None:
pass
@ -219,7 +220,7 @@ commands["logout"] = Command_exit
class Command_clear(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.protocol.terminal.reset()
@ -230,7 +231,7 @@ commands["reset"] = Command_clear
class Command_hostname(HoneyPotCommand):
def call(self):
def call(self) -> None:
if len(self.args):
if self.protocol.user.username == "root":
self.protocol.hostname = self.args[0]
@ -245,7 +246,7 @@ commands["hostname"] = Command_hostname
class Command_ps(HoneyPotCommand):
def call(self):
def call(self) -> None:
user = self.protocol.user.username
args = ""
if len(self.args):
@ -827,7 +828,7 @@ commands["ps"] = Command_ps
class Command_id(HoneyPotCommand):
def call(self):
def call(self) -> None:
u = self.protocol.user
self.write(
"uid={}({}) gid={}({}) groups={}({})\n".format(
@ -841,17 +842,17 @@ commands["id"] = Command_id
class Command_passwd(HoneyPotCommand):
def start(self):
def start(self) -> None:
self.write("Enter new UNIX password: ")
self.protocol.password_input = True
self.callbacks = [self.ask_again, self.finish]
self.passwd = None
self.passwd: Optional[str] = None
def ask_again(self, line):
def ask_again(self, line: str) -> None:
self.passwd = line
self.write("Retype new UNIX password: ")
def finish(self, line):
def finish(self, line: str) -> None:
self.protocol.password_input = False
if line != self.passwd or self.passwd == "*":
@ -860,7 +861,7 @@ class Command_passwd(HoneyPotCommand):
self.write("passwd: password updated successfully\n")
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.success",
realm="passwd",
@ -876,7 +877,7 @@ commands["passwd"] = Command_passwd
class Command_shutdown(HoneyPotCommand):
def start(self):
def start(self) -> None:
if len(self.args) and self.args[0].strip().count("--help"):
output = [
"Usage: shutdown [-akrhHPfnc] [-t secs] time [warning message]",
@ -924,7 +925,7 @@ class Command_shutdown(HoneyPotCommand):
self.write("Try `shutdown --help' for more information.\n")
self.exit()
def finish(self):
def finish(self) -> None:
stat = failure.Failure(error.ProcessDone(status=""))
self.protocol.terminal.transport.processEnded(stat)
@ -938,7 +939,7 @@ commands["halt"] = Command_shutdown
class Command_reboot(HoneyPotCommand):
def start(self):
def start(self) -> None:
self.write("\n")
self.write(
f"Broadcast message from root@{self.protocol.hostname} (pts/0) ({time.ctime()}):\n\n"
@ -946,7 +947,7 @@ class Command_reboot(HoneyPotCommand):
self.write("The system is going down for reboot NOW!\n")
reactor.callLater(3, self.finish) # type: ignore[attr-defined]
def finish(self):
def finish(self) -> None:
stat = failure.Failure(error.ProcessDone(status=""))
self.protocol.terminal.transport.processEnded(stat)
@ -956,7 +957,7 @@ commands["reboot"] = Command_reboot
class Command_history(HoneyPotCommand):
def call(self):
def call(self) -> None:
try:
if len(self.args) and self.args[0] == "-c":
self.protocol.historyLines = []
@ -975,7 +976,7 @@ commands["history"] = Command_history
class Command_date(HoneyPotCommand):
def call(self):
def call(self) -> None:
time = datetime.datetime.utcnow()
self.write("{}\n".format(time.strftime("%a %b %d %H:%M:%S UTC %Y")))
@ -985,17 +986,17 @@ commands["date"] = Command_date
class Command_yes(HoneyPotCommand):
def start(self):
def start(self) -> None:
self.y()
def y(self):
def y(self) -> None:
if len(self.args):
self.write("{}\n".format(" ".join(self.args)))
else:
self.write("y\n")
self.scheduled = reactor.callLater(0.01, self.y) # type: ignore[attr-defined]
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
self.scheduled.cancel()
self.exit()
@ -1005,7 +1006,7 @@ commands["yes"] = Command_yes
class Command_sh(HoneyPotCommand):
def call(self):
def call(self) -> None:
if len(self.args) and self.args[0].strip() == "-c":
line = " ".join(self.args[1:])
@ -1023,7 +1024,7 @@ class Command_sh(HoneyPotCommand):
# TODO: handle spawning multiple shells, support other sh flags
def execute_commands(self, cmds):
def execute_commands(self, cmds: str) -> None:
# self.input_data holds commands passed via PIPE
# create new HoneyPotShell for our a new 'sh' shell
self.protocol.cmdstack.append(HoneyPotShell(self.protocol, interactive=False))
@ -1042,47 +1043,48 @@ commands["sh"] = Command_sh
class Command_php(HoneyPotCommand):
HELP = "Usage: php [options] [-f] <file> [--] [args...]\n" \
" php [options] -r <code> [--] [args...]\n" \
" php [options] [-B <begin_code>] -R <code> [-E <end_code>] [--] [args...]\n" \
" php [options] [-B <begin_code>] -F <file> [-E <end_code>] [--] [args...]\n" \
" php [options] -- [args...]\n" \
" php [options] -a\n" \
"\n" \
" -a Run interactively\n" \
" -c <path>|<file> Look for php.ini file in this directory\n" \
" -n No php.ini file will be used\n" \
" -d foo[=bar] Define INI entry foo with value 'bar'\n" \
" -e Generate extended information for debugger/profiler\n" \
" -f <file> Parse and execute <file>.\n" \
" -h This help\n" \
" -i PHP information\n" \
" -l Syntax check only (lint)\n" \
" -m Show compiled in modules\n" \
" -r <code> Run PHP <code> without using script tags <?..?>\n" \
" -B <begin_code> Run PHP <begin_code> before processing input lines\n" \
" -R <code> Run PHP <code> for every input line\n" \
" -F <file> Parse and execute <file> for every input line\n" \
" -E <end_code> Run PHP <end_code> after processing all input lines\n" \
" -H Hide any passed arguments from external tools.\n" \
" -s Output HTML syntax highlighted source.\n" \
" -v Version number\n" \
" -w Output source with stripped comments and whitespace.\n" \
" -z <file> Load Zend extension <file>.\n" \
"\n" \
" args... Arguments passed to script. Use -- args when first argument\n" \
" starts with - or script is read from stdin\n" \
"\n" \
" --ini Show configuration file names\n" \
"\n" \
" --rf <name> Show information about function <name>.\n" \
" --rc <name> Show information about class <name>.\n" \
" --re <name> Show information about extension <name>.\n" \
" --ri <name> Show configuration for extension <name>.\n" \
"\n"
HELP = (
"Usage: php [options] [-f] <file> [--] [args...]\n"
" php [options] -r <code> [--] [args...]\n"
" php [options] [-B <begin_code>] -R <code> [-E <end_code>] [--] [args...]\n"
" php [options] [-B <begin_code>] -F <file> [-E <end_code>] [--] [args...]\n"
" php [options] -- [args...]\n"
" php [options] -a\n"
"\n"
" -a Run interactively\n"
" -c <path>|<file> Look for php.ini file in this directory\n"
" -n No php.ini file will be used\n"
" -d foo[=bar] Define INI entry foo with value 'bar'\n"
" -e Generate extended information for debugger/profiler\n"
" -f <file> Parse and execute <file>.\n"
" -h This help\n"
" -i PHP information\n"
" -l Syntax check only (lint)\n"
" -m Show compiled in modules\n"
" -r <code> Run PHP <code> without using script tags <?..?>\n"
" -B <begin_code> Run PHP <begin_code> before processing input lines\n"
" -R <code> Run PHP <code> for every input line\n"
" -F <file> Parse and execute <file> for every input line\n"
" -E <end_code> Run PHP <end_code> after processing all input lines\n"
" -H Hide any passed arguments from external tools.\n"
" -s Output HTML syntax highlighted source.\n"
" -v Version number\n"
" -w Output source with stripped comments and whitespace.\n"
" -z <file> Load Zend extension <file>.\n"
"\n"
" args... Arguments passed to script. Use -- args when first argument\n"
" starts with - or script is read from stdin\n"
"\n"
" --ini Show configuration file names\n"
"\n"
" --rf <name> Show information about function <name>.\n"
" --rc <name> Show information about class <name>.\n"
" --re <name> Show information about extension <name>.\n"
" --ri <name> Show configuration for extension <name>.\n"
"\n"
)
VERSION = "PHP 5.3.5 (cli)\n" \
"Copyright (c) 1997-2010 The PHP Group\n"
VERSION = "PHP 5.3.5 (cli)\n" "Copyright (c) 1997-2010 The PHP Group\n"
def start(self) -> None:
if len(self.args):
@ -1093,10 +1095,12 @@ class Command_php(HoneyPotCommand):
self.exit()
def lineReceived(self, line: str) -> None:
log.msg(eventid="cowrie.command.success",
realm="php",
input=line,
format="INPUT (%(realm)s): %(input)s")
log.msg(
eventid="cowrie.command.success",
realm="php",
input=line,
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self) -> None:
self.exit()
@ -1107,7 +1111,7 @@ commands["php"] = Command_php
class Command_chattr(HoneyPotCommand):
def call(self):
def call(self) -> None:
if len(self.args) < 1:
self.write("Usage: chattr [-RVf] [-+=AacDdeijsSu] [-v version] files...\n")
return
@ -1130,7 +1134,7 @@ class Command_set(HoneyPotCommand):
# Basic functionaltly (show only), need enhancements
# This will show ALL environ vars, not only the global ones
# With enhancements it should work like env when -o posix is used
def call(self):
def call(self) -> None:
for i in sorted(list(self.environ.keys())):
self.write(f"{i}={self.environ[i]}\n")
@ -1139,7 +1143,7 @@ commands["set"] = Command_set
class Command_nop(HoneyPotCommand):
def call(self):
def call(self) -> None:
pass

View File

@ -18,7 +18,7 @@ class Command_base64(HoneyPotCommand):
mode: str = "e"
ignore: bool
def start(self):
def start(self) -> None:
self.mode = "e"
self.ignore = False
@ -140,7 +140,7 @@ Try 'base64 --help' for more information.
self.dojob(line)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -61,11 +61,11 @@ class Command_busybox(HoneyPotCommand):
The command should never call self.exit(), cause it will corrupt cmdstack
"""
def help(self):
def help(self) -> None:
for ln in busybox_help:
self.errorWrite(f"{ln}\n")
def call(self):
def call(self) -> None:
if len(self.args) == 0:
self.help()
return

View File

@ -9,6 +9,7 @@ from __future__ import annotations
import getopt
from typing import Optional
from twisted.python import log
@ -26,7 +27,7 @@ class Command_cat(HoneyPotCommand):
number = False
linenumber = 1
def start(self):
def start(self) -> None:
try:
optlist, args = getopt.gnu_getopt(
self.args, "AbeEnstTuv", ["help", "number", "version"]
@ -71,21 +72,14 @@ class Command_cat(HoneyPotCommand):
self.output(self.input_data)
self.exit()
def output(self, input):
def output(self, inb: Optional[bytes]) -> None:
"""
This is the cat output, with optional line numbering
"""
if input is None:
if inb is None:
return
if isinstance(input, str):
input = input.encode("utf8")
elif isinstance(input, bytes):
pass
else:
log.msg(f"unusual cat input {repr(input)}")
lines = input.split(b"\n")
lines = inb.split(b"\n")
if lines[-1] == b"":
lines.pop()
for line in lines:
@ -94,7 +88,7 @@ class Command_cat(HoneyPotCommand):
self.linenumber = self.linenumber + 1
self.writeBytes(line + b"\n")
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
"""
This function logs standard input from the user send to cat
"""
@ -105,15 +99,15 @@ class Command_cat(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
self.output(line)
self.output(line.encode("utf-8"))
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
"""
ctrl-d is end-of-file, time to terminate
"""
self.exit()
def help(self):
def help(self) -> None:
self.write(
"""Usage: cat [OPTION]... [FILE]...
Concatenate FILE(s) to standard output.

View File

@ -47,7 +47,7 @@ TRY_CHMOD_HELP_MSG = "Try 'chmod --help' for more information.\n"
class Command_chmod(HoneyPotCommand):
def call(self):
def call(self) -> None:
# parse the command line arguments
opts, mode, files, getopt_err = self.parse_args()
if getopt_err:

View File

@ -18,7 +18,7 @@ commands = {}
class Command_chpasswd(HoneyPotCommand):
def help(self):
def help(self) -> None:
output = (
"Usage: chpasswd [options]",
"",
@ -35,7 +35,7 @@ class Command_chpasswd(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def chpasswd_application(self, contents):
def chpasswd_application(self, contents: bytes) -> None:
c = 1
try:
for line in contents.split(b"\n"):
@ -55,7 +55,7 @@ class Command_chpasswd(HoneyPotCommand):
except Exception:
self.write(f"chpasswd: line {c}: missing new password\n")
def start(self):
def start(self) -> None:
try:
opts, args = getopt.getopt(
self.args,
@ -85,7 +85,7 @@ class Command_chpasswd(HoneyPotCommand):
self.chpasswd_application(self.input_data)
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="chpasswd",
@ -94,7 +94,7 @@ class Command_chpasswd(HoneyPotCommand):
)
self.chpasswd_application(line.encode())
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -18,7 +18,7 @@ commands = {}
class Command_crontab(HoneyPotCommand):
def help(self):
def help(self) -> None:
output = (
"usage: crontab [-u user] file",
" crontab [-u user] [-i] {-e | -l | -r}",
@ -31,7 +31,7 @@ class Command_crontab(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def start(self):
def start(self) -> None:
try:
opts, args = getopt.getopt(self.args, "u:elri")
except getopt.GetoptError as err:
@ -62,7 +62,7 @@ class Command_crontab(HoneyPotCommand):
if len(self.args):
pass
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="crontab",
@ -70,7 +70,7 @@ class Command_crontab(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -302,7 +302,7 @@ class Command_curl(HoneyPotCommand):
deferred = treq.get(url=url, allow_redirects=False, headers=headers, timeout=10)
return deferred
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
self.write("^C\n")
self.exit()
@ -410,12 +410,16 @@ class Command_curl(HoneyPotCommand):
return
elif response.check(error.ConnectingCancelledError) is not None:
self.write(f"curl: (7) Failed to connect to {self.host} port {self.port}: Operation timed out\n")
self.write(
f"curl: (7) Failed to connect to {self.host} port {self.port}: Operation timed out\n"
)
self.exit()
return
elif response.check(error.ConnectionRefusedError) is not None:
self.write(f"curl: (7) Failed to connect to {self.host} port {self.port}: Connection refused\n")
self.write(
f"curl: (7) Failed to connect to {self.host} port {self.port}: Connection refused\n"
)
self.exit()
return

View File

@ -24,7 +24,7 @@ class Command_dd(HoneyPotCommand):
ddargs: dict[str, str] = {}
def start(self):
def start(self) -> None:
if not self.args or self.args[0] == ">":
return
@ -83,14 +83,14 @@ class Command_dd(HoneyPotCommand):
self.exit(success=bSuccess)
def exit(self, success=True):
def exit(self, success: bool = True) -> None:
if success is True:
self.write("0+0 records in\n")
self.write("0+0 records out\n")
self.write("0 bytes transferred in 0.695821 secs (0 bytes/sec)\n")
HoneyPotCommand.exit(self)
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.session.input",
realm="dd",
@ -98,11 +98,11 @@ class Command_dd(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()
def parse_size(param):
def parse_size(param: str) -> int:
"""
Parse dd arguments that indicate block sizes
Return 0 in case of illegal input

View File

@ -12,7 +12,7 @@ commands = {}
class Command_du(HoneyPotCommand):
def message_help(self):
def message_help(self) -> str:
return """Usage: du [OPTION]... [FILE]...
or: du [OPTION]... --files0-from=F
Summarize disk usage of the set of FILEs, recursively for directories.
@ -76,7 +76,7 @@ Report du translation bugs to <http://translationproject.org/team/>
Full documentation at: <http://www.gnu.org/software/coreutils/du>
or available locally via: info '(coreutils) du invocation'\n"""
def call(self):
def call(self) -> None:
self.showHidden = False
self.showDirectories = False
path = self.protocol.cwd
@ -91,7 +91,7 @@ or available locally via: info '(coreutils) du invocation'\n"""
else:
self.du_show(path, all=True)
def du_show(self, path, all=False):
def du_show(self, path: str, all: bool = False) -> None:
try:
if self.protocol.fs.isdir(path) and not self.showDirectories:
files = self.protocol.fs.get_path(path)[:]

View File

@ -26,7 +26,7 @@ For complete documentation, run: info coreutils 'env invocation'
class Command_env(HoneyPotCommand):
def call(self):
def call(self) -> None:
# This only show environ vars, not the shell vars. Need just to mimic real systems
for i in list(self.protocol.environ.keys()):
self.write(f"{i}={self.protocol.environ[i]}\n")

View File

@ -9,7 +9,7 @@ commands = {}
class Command_ethtool(HoneyPotCommand):
def call(self):
def call(self) -> None:
func = self.do_ethtool_help
for x in self.args:
if x.startswith("lo"):
@ -20,7 +20,7 @@ class Command_ethtool(HoneyPotCommand):
func = self.do_ethtool_eth1
func()
def do_ethtool_help(self):
def do_ethtool_help(self) -> None:
"""
No real help output.
"""
@ -29,14 +29,14 @@ class Command_ethtool(HoneyPotCommand):
For more information run ethtool -h\n"""
)
def do_ethtool_lo(self):
def do_ethtool_lo(self) -> None:
self.write(
"""Settings for lo:
Link detected: yes\n"""
)
def do_ethtool_eth0(self):
def do_ethtool_eth0(self) -> None:
self.write(
"""Settings for eth0:
Supported ports: [ TP MII ]
@ -68,7 +68,7 @@ Current message level: 0x00000033 (51)
Link detected: yes\n"""
)
def do_ethtool_eth1(self):
def do_ethtool_eth1(self) -> None:
self.write(
"""Settings for eth1:
Cannot get device settings: No such device

View File

@ -25,7 +25,7 @@ class Command_free(HoneyPotCommand):
free
"""
def call(self):
def call(self) -> None:
# Parse options or display no files
try:
opts, args = getopt.getopt(self.args, "mh")
@ -43,7 +43,7 @@ class Command_free(HoneyPotCommand):
return
self.do_free()
def do_free(self, fmt="kilobytes"):
def do_free(self, fmt: str = "kilobytes") -> None:
"""
print free statistics
"""

View File

@ -27,22 +27,22 @@ class Command_grep(HoneyPotCommand):
grep command
"""
def grep_get_contents(self, filename, match):
def grep_get_contents(self, filename: str, match: str) -> None:
try:
contents = self.fs.file_contents(filename)
self.grep_application(contents, match)
except Exception:
self.errorWrite(f"grep: {filename}: No such file or directory\n")
def grep_application(self, contents, match):
match = os.path.basename(match).replace('"', "").encode("utf8")
matches = re.compile(match)
def grep_application(self, contents: bytes, match: str) -> None:
bmatch = os.path.basename(match).replace('"', "").encode("utf8")
matches = re.compile(bmatch)
contentsplit = contents.split(b"\n")
for line in contentsplit:
if matches.search(line):
self.writeBytes(line + b"\n")
def help(self):
def help(self) -> None:
self.writeBytes(
b"usage: grep [-abcDEFGHhIiJLlmnOoPqRSsUVvwxZ] [-A num] [-B num] [-C[num]]\n"
)
@ -54,7 +54,7 @@ class Command_grep(HoneyPotCommand):
)
self.writeBytes(b"\t[--null] [pattern] [file ...]\n")
def start(self):
def start(self) -> None:
if not self.args:
self.help()
self.exit()
@ -87,7 +87,7 @@ class Command_grep(HoneyPotCommand):
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="grep",
@ -95,7 +95,7 @@ class Command_grep(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()
@ -109,9 +109,10 @@ class Command_tail(HoneyPotCommand):
"""
tail command
"""
n: int = 10
def tail_get_contents(self, filename):
def tail_get_contents(self, filename: str) -> None:
try:
contents = self.fs.file_contents(filename)
self.tail_application(contents)
@ -120,7 +121,7 @@ class Command_tail(HoneyPotCommand):
f"tail: cannot open `{filename}' for reading: No such file or directory\n"
)
def tail_application(self, contents):
def tail_application(self, contents: bytes) -> None:
contentsplit = contents.split(b"\n")
lines = int(len(contentsplit))
if lines < self.n:
@ -132,7 +133,7 @@ class Command_tail(HoneyPotCommand):
self.write("\n")
i += 1
def start(self):
def start(self) -> None:
if not self.args or self.args[0] == ">":
return
else:
@ -158,7 +159,7 @@ class Command_tail(HoneyPotCommand):
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="tail",
@ -166,7 +167,7 @@ class Command_tail(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()
@ -182,7 +183,7 @@ class Command_head(HoneyPotCommand):
n: int = 10
def head_application(self, contents):
def head_application(self, contents: bytes) -> None:
i = 0
contentsplit = contents.split(b"\n")
for line in contentsplit:
@ -190,7 +191,7 @@ class Command_head(HoneyPotCommand):
self.writeBytes(line + b"\n")
i += 1
def head_get_file_contents(self, filename):
def head_get_file_contents(self, filename: str) -> None:
try:
contents = self.fs.file_contents(filename)
self.head_application(contents)
@ -199,7 +200,7 @@ class Command_head(HoneyPotCommand):
f"head: cannot open `{filename}' for reading: No such file or directory\n"
)
def start(self):
def start(self) -> None:
self.n = 10
if not self.args or self.args[0] == ">":
return
@ -226,7 +227,7 @@ class Command_head(HoneyPotCommand):
self.head_application(self.input_data)
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="head",
@ -234,7 +235,7 @@ class Command_head(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()
@ -248,7 +249,7 @@ class Command_cd(HoneyPotCommand):
cd command
"""
def call(self):
def call(self) -> None:
if not self.args or self.args[0] == "~":
pname = self.protocol.user.avatar.home
else:
@ -278,7 +279,7 @@ class Command_rm(HoneyPotCommand):
rm command
"""
def help(self):
def help(self) -> None:
self.write(
"""Usage: rm [OPTION]... [FILE]...
Remove (unlink) the FILE(s).
@ -319,10 +320,10 @@ Full documentation at: <http://www.gnu.org/software/coreutils/rm>
or available locally via: info '(coreutils) rm invocation'\n"""
)
def paramError(self):
def paramError(self) -> None:
self.errorWrite("Try 'rm --help' for more information\n")
def call(self):
def call(self) -> None:
recursive = False
force = False
verbose = False
@ -392,7 +393,7 @@ class Command_cp(HoneyPotCommand):
cp command
"""
def call(self):
def call(self) -> None:
if not len(self.args):
self.errorWrite("cp: missing file operand\n")
self.errorWrite("Try `cp --help' for more information.\n")
@ -407,8 +408,9 @@ class Command_cp(HoneyPotCommand):
if opt[0] in ("-r", "-a", "-R"):
recursive = True
def resolv(pname):
return self.fs.resolve_path(pname, self.protocol.cwd)
def resolv(pname: str) -> str:
rsv: str = self.fs.resolve_path(pname, self.protocol.cwd)
return rsv
if len(args) < 2:
self.errorWrite(
@ -468,7 +470,7 @@ class Command_mv(HoneyPotCommand):
mv command
"""
def call(self):
def call(self) -> None:
if not len(self.args):
self.errorWrite("mv: missing file operand\n")
self.errorWrite("Try `mv --help' for more information.\n")
@ -480,8 +482,9 @@ class Command_mv(HoneyPotCommand):
self.errorWrite("Unrecognized option\n")
self.exit()
def resolv(pname):
return self.fs.resolve_path(pname, self.protocol.cwd)
def resolv(pname: str) -> str:
rsv: str = self.fs.resolve_path(pname, self.protocol.cwd)
return rsv
if len(args) < 2:
self.errorWrite(
@ -541,7 +544,7 @@ class Command_mkdir(HoneyPotCommand):
mkdir command
"""
def call(self):
def call(self) -> None:
for f in self.args:
pname = self.fs.resolve_path(f, self.protocol.cwd)
if self.fs.exists(pname):
@ -565,7 +568,7 @@ class Command_rmdir(HoneyPotCommand):
rmdir command
"""
def call(self):
def call(self) -> None:
for f in self.args:
pname = self.fs.resolve_path(f, self.protocol.cwd)
try:
@ -603,7 +606,7 @@ class Command_pwd(HoneyPotCommand):
pwd command
"""
def call(self):
def call(self) -> None:
self.write(self.protocol.cwd + "\n")
@ -616,7 +619,7 @@ class Command_touch(HoneyPotCommand):
touch command
"""
def call(self):
def call(self) -> None:
if not len(self.args):
self.errorWrite("touch: missing file operand\n")
self.errorWrite("Try `touch --help' for more information.\n")

View File

@ -6,6 +6,7 @@ import ftplib
import getopt
import os
import socket
from typing import Optional, Tuple, Union
from twisted.python import log
@ -21,13 +22,19 @@ class FTP(ftplib.FTP):
self.source_address = kwargs.pop("source_address", None)
ftplib.FTP.__init__(self, *args, **kwargs)
def connect(self, host="", port=0, timeout=-999, source_address=None):
def connect(
self,
host: str = "",
port: int = 0,
timeout: float = -999.0,
source_address: Optional[Tuple[str, int]] = None,
) -> str:
if host != "":
self.host = host
if port > 0:
self.port = port
if timeout != -999:
self.timeout = timeout
if timeout != -999.0:
self.timeout: int = int(timeout)
if source_address is not None:
self.source_address = source_address
self.sock = socket.create_connection(
@ -38,8 +45,10 @@ class FTP(ftplib.FTP):
self.welcome = self.getresp()
return self.welcome
def ntransfercmd(self, cmd, rest=None):
size = None
def ntransfercmd(
self, cmd: str, rest: Union[int, str, None] = None
) -> Tuple[socket.socket, int]:
size = 0
if self.passiveserver:
host, port = self.makepasv()
conn = socket.create_connection(
@ -72,7 +81,9 @@ class FTP(ftplib.FTP):
finally:
sock.close()
if resp[:3] == "150":
size = ftplib.parse150(resp)
sz = ftplib.parse150(resp)
if sz:
size = sz
return conn, size
@ -92,7 +103,7 @@ class Command_ftpget(HoneyPotCommand):
remote_file: str
artifactFile: Artifact
def help(self):
def help(self) -> None:
self.write(
"""BusyBox v1.20.2 (2016-06-22 15:12:53 EDT) multi-call binary.
@ -107,7 +118,7 @@ Download a file via FTP
-P NUM Port\n\n"""
)
def start(self):
def start(self) -> None:
try:
optlist, args = getopt.getopt(self.args, "cvu:p:P:")
except getopt.GetoptError:
@ -222,7 +233,7 @@ Download a file via FTP
self.exit()
def ftp_download(self):
def ftp_download(self) -> bool:
out_addr = ("", 0)
if CowrieConfig.has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig.get("honeypot", "out_addr"), 0)

View File

@ -8,7 +8,7 @@ import random
import re
import time
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from cowrie.core.config import CowrieConfig
@ -53,7 +53,7 @@ class Command_gcc(HoneyPotCommand):
scheduled: Deferred
def start(self):
def start(self) -> None:
"""
Parse as much as possible from a GCC syntax and generate the output
that is requested. The file that is generated can be read (and will)
@ -138,7 +138,7 @@ class Command_gcc(HoneyPotCommand):
else:
self.no_files()
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
"""
Make sure the scheduled call will be canceled
"""
@ -146,7 +146,7 @@ class Command_gcc(HoneyPotCommand):
if getattr(self, "scheduled", False):
self.scheduled.cancel()
def no_files(self):
def no_files(self) -> None:
"""
Notify user there are no input files, and exit
"""
@ -156,7 +156,7 @@ compilation terminated.\n"""
)
self.exit()
def version(self, short):
def version(self, short: bool) -> None:
"""
Print long or short version, and exit
"""
@ -187,7 +187,7 @@ gcc version {} (Debian {}-5)""".format(
self.write(f"{data}\n")
self.exit()
def generate_file(self, outfile):
def generate_file(self, outfile: str) -> None:
data = b""
# TODO: make sure it is written to temp file, not downloads
tmp_fname = "{}_{}_{}_{}".format(
@ -222,7 +222,7 @@ gcc version {} (Debian {}-5)""".format(
# Segfault command
class segfault_command(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write("Segmentation fault\n")
# Trick the 'new compiled file' as an segfault
@ -231,14 +231,14 @@ gcc version {} (Debian {}-5)""".format(
# Done
self.exit()
def arg_missing(self, arg):
def arg_missing(self, arg: str) -> None:
"""
Print missing argument message, and exit
"""
self.write(f"{Command_gcc.APP_NAME}: argument to '{arg}' is missing\n")
self.exit()
def help(self):
def help(self) -> None:
"""
Print help info, and exit
"""

View File

@ -3,6 +3,8 @@
from __future__ import annotations
from typing import Tuple
from random import randint, randrange
from cowrie.shell.command import HoneyPotCommand
@ -29,27 +31,27 @@ commands = {}
class Command_ifconfig(HoneyPotCommand):
@staticmethod
def generate_packets():
def generate_packets() -> int:
return randrange(222222, 555555)
@staticmethod
def convert_bytes_to_mx(bytes_eth0):
def convert_bytes_to_mx(bytes_eth0: int) -> str:
mb = float(bytes_eth0) / 1000 / 1000
return f"{mb:.1f}"
def calculate_rx(self):
def calculate_rx(self) -> Tuple[int, str]:
rx_bytes = randrange(111111111, 555555555)
return rx_bytes, self.convert_bytes_to_mx(rx_bytes)
def calculate_tx(self):
def calculate_tx(self) -> Tuple[int, str]:
rx_bytes = randrange(11111111, 55555555)
return rx_bytes, self.convert_bytes_to_mx(rx_bytes)
def calculate_lo(self):
def calculate_lo(self) -> Tuple[int, str]:
lo_bytes = randrange(11111111, 55555555)
return lo_bytes, self.convert_bytes_to_mx(lo_bytes)
def call(self):
def call(self) -> None:
rx_bytes_eth0, rx_mb_eth0 = self.calculate_rx()
tx_bytes_eth0, tx_mb_eth0 = self.calculate_tx()
lo_bytes, lo_mb = self.calculate_lo()

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import optparse
from typing import Any
from typing import Any, Optional
from cowrie.shell.command import HoneyPotCommand
@ -12,21 +12,21 @@ commands = {}
class OptionParsingError(RuntimeError):
def __init__(self, msg):
def __init__(self, msg: str) -> None:
self.msg = msg
class OptionParsingExit(Exception):
def __init__(self, status, msg):
def __init__(self, status: int, msg: Optional[str]) -> None:
self.msg = msg
self.status = status
class ModifiedOptionParser(optparse.OptionParser):
def error(self, msg):
def error(self, msg: str) -> None:
raise OptionParsingError(msg)
def exit(self, status=0, msg=None):
def exit(self, status: int = 0, msg: Optional[str] = None) -> None:
raise OptionParsingExit(status, msg)
@ -49,10 +49,11 @@ class Command_iptables(HoneyPotCommand):
current_table: dict[str, list[Any]]
def user_is_root(self):
return self.protocol.user.username == "root"
def user_is_root(self) -> bool:
out: bool = self.protocol.user.username == "root"
return out
def start(self):
def start(self) -> None:
"""
Emulate iptables commands, including permission checking.
@ -187,7 +188,7 @@ class Command_iptables(HoneyPotCommand):
# Done
self.exit()
def setup_table(self, table):
def setup_table(self, table: str) -> bool:
"""
Called during startup to make sure the current environment has some
fake rules in memory.
@ -261,7 +262,7 @@ Perhaps iptables or your kernel needs to be upgraded.\n""".format(
# Failed
return False
def is_valid_chain(self, chain):
def is_valid_chain(self, chain: str) -> bool:
# Verify chain existence. Requires valid table first
if chain not in list(self.current_table.keys()):
self.write(
@ -273,14 +274,14 @@ Perhaps iptables or your kernel needs to be upgraded.\n""".format(
# Exists
return True
def show_version(self):
def show_version(self) -> None:
"""
Show version and exit
"""
self.write(f"{Command_iptables.APP_NAME} {Command_iptables.APP_VERSION}\n")
self.exit()
def show_help(self):
def show_help(self) -> None:
"""
Show help and exit
"""
@ -354,7 +355,7 @@ Options:
)
self.exit()
def list_rules(self, chain):
def list_rules(self, chain: str) -> None:
"""
List current rules as commands
"""
@ -382,7 +383,7 @@ Options:
else:
self.no_permission()
def list(self, chain):
def list(self, chain: str) -> None:
"""
List current rules
"""
@ -423,7 +424,7 @@ Options:
else:
self.no_permission()
def flush(self, chain):
def flush(self, chain: str) -> None:
"""
Mark rules as flushed
"""
@ -446,7 +447,7 @@ Options:
else:
self.no_permission()
def no_permission(self):
def no_permission(self) -> None:
self.write(
f"{Command_iptables.APP_NAME} {Command_iptables.APP_VERSION}: "
+ "can't initialize iptables table 'filter': "
@ -455,7 +456,7 @@ Options:
)
self.exit()
def no_command(self):
def no_command(self) -> None:
"""
Print no command message and exit
"""
@ -467,7 +468,7 @@ Options:
)
self.exit()
def unknown_option(self, option):
def unknown_option(self, option: OptionParsingExit) -> None:
"""
Print unknown option message and exit
"""
@ -479,7 +480,7 @@ Options:
)
self.exit()
def bad_argument(self, argument):
def bad_argument(self, argument: str) -> None:
"""
Print bad argument and exit
"""

View File

@ -12,7 +12,7 @@ commands = {}
class Command_last(HoneyPotCommand):
def call(self):
def call(self) -> None:
line = list(self.args)
while len(line):
arg = line.pop(0)

View File

@ -16,19 +16,21 @@ commands = {}
class Command_ls(HoneyPotCommand):
def uid2name(self, uid):
def uid2name(self, uid: int) -> str:
try:
return Passwd().getpwuid(uid)["pw_name"]
name: str = Passwd().getpwuid(uid)["pw_name"]
return name
except Exception:
return str(uid)
def gid2name(self, gid):
def gid2name(self, gid: int) -> str:
try:
return Group().getgrgid(gid)["gr_name"]
group: str = Group().getgrgid(gid)["gr_name"]
return group
except Exception:
return str(gid)
def call(self):
def call(self) -> None:
path = self.protocol.cwd
paths = []
self.showHidden = False
@ -87,7 +89,7 @@ class Command_ls(HoneyPotCommand):
return
return files
def do_ls_normal(self, path):
def do_ls_normal(self, path: str) -> None:
files = self.get_dir_files(path)
if not files:
return
@ -112,7 +114,7 @@ class Command_ls(HoneyPotCommand):
count += 1
self.write("\n")
def do_ls_l(self, path):
def do_ls_l(self, path: str) -> None:
files = self.get_dir_files(path)
if not files:
return

View File

@ -13,29 +13,31 @@ long = int
commands = {}
def makeMask(n):
def makeMask(n: int) -> int:
"""
return a mask of n bits as a long integer
"""
return (long(2) << n - 1) - 1
def dottedQuadToNum(ip):
def dottedQuadToNum(ip: str) -> int:
"""
convert decimal dotted quad string to long integer
this will throw builtins.OSError on failure
"""
return struct.unpack("I", socket.inet_aton(ip))[0]
ip32bit: bytes = socket.inet_aton(ip)
num: int = struct.unpack("I", ip32bit)[0]
return num
def networkMask(ip, bits):
def networkMask(ip: str, bits: int) -> int:
"""
Convert a network address to a long integer
"""
return dottedQuadToNum(ip) & makeMask(bits)
def addressInNetwork(ip, net):
def addressInNetwork(ip: int, net: int) -> int:
"""
Is an address in a network
"""
@ -49,7 +51,7 @@ class Command_nc(HoneyPotCommand):
s: socket.socket
def help(self):
def help(self) -> None:
self.write(
"""This is nc from the netcat-openbsd package. An alternative nc is available
in the netcat-traditional package.
@ -59,7 +61,7 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
[-x proxy_address[:port]] [destination] [port]\n"""
)
def start(self):
def start(self) -> None:
try:
optlist, args = getopt.getopt(
self.args, "46bCDdhklnrStUuvZzI:i:O:P:p:q:s:T:V:w:X:x:"
@ -113,7 +115,7 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
except Exception:
self.exit()
def recv_data(self):
def recv_data(self) -> None:
data = b""
while 1:
packet = self.s.recv(1024)
@ -126,16 +128,16 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
self.s.close()
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
if hasattr(self, "s"):
self.s.send(line.encode("utf8"))
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
self.write("^C\n")
if hasattr(self, "s"):
self.s.close()
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
if hasattr(self, "s"):
self.s.close()

View File

@ -10,7 +10,7 @@ commands = {}
class Command_netstat(HoneyPotCommand):
def show_version(self):
def show_version(self) -> None:
self.write("net-tools 1.60\n")
self.write("netstat 1.42 (2001-04-15)\n")
self.write(
@ -25,7 +25,7 @@ class Command_netstat(HoneyPotCommand):
+ "+FR +ROSE +ASH +SIT +FDDI +HIPPI +HDLC/LAPB +EUI64\n"
)
def show_help(self):
def show_help(self) -> None:
self.write(
"""
usage: netstat [-vWeenNcCF] [<Af>] -r netstat {-V|--version|-h|--help}
@ -64,7 +64,7 @@ usage: netstat [-vWeenNcCF] [<Af>] -r netstat {-V|--version|-h|--help}
"""
)
def do_netstat_route(self):
def do_netstat_route(self) -> None:
self.write(
"""Kernel IP routing table
Destination Gateway Genmask Flags MSS Window irtt Iface\n"""
@ -88,7 +88,7 @@ Destination Gateway Genmask Flags MSS Window irtt Iface\n
self.write(f"{l1}\n")
self.write(f"{l2}\n")
def do_netstat_normal(self):
def do_netstat_normal(self) -> None:
self.write(
"""Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State\n"""
@ -180,7 +180,7 @@ unix 2 [ ] DGRAM 9570
unix 3 [ ] STREAM CONNECTED 8619 @/com/ubuntu/upstart\n"""
)
def call(self):
def call(self) -> None:
self.show_all = False
self.show_numeric = False
self.show_listen = False

View File

@ -10,7 +10,7 @@ commands = {}
class Command_nohup(HoneyPotCommand):
def call(self):
def call(self) -> None:
if not len(self.args):
self.write("nohup: missing operand\n")
self.write("Try `nohup --help' for more information.\n")

View File

@ -17,7 +17,7 @@ commands = {}
class Command_perl(HoneyPotCommand):
def version(self):
def version(self) -> None:
output = (
"",
"This is perl 5, version 14, subversion 2 (v5.14.2) built for x86_64-linux-thread-multi",
@ -35,7 +35,7 @@ class Command_perl(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def help(self):
def help(self) -> None:
output = (
"",
"Usage: perl [switches] [--] [programfile] [arguments]",
@ -72,7 +72,7 @@ class Command_perl(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def start(self):
def start(self) -> None:
try:
opts, args = getopt.gnu_getopt(
self.args, "acfhnpsStTuUvwWXC:D:e:E:F:i:I:l:m:M:V:X:"
@ -110,7 +110,7 @@ class Command_perl(HoneyPotCommand):
if not len(self.args):
pass
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="perl",
@ -118,7 +118,7 @@ class Command_perl(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -10,7 +10,7 @@ import re
import socket
from typing import Any
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from cowrie.shell.command import HoneyPotCommand
@ -29,14 +29,14 @@ class Command_ping(HoneyPotCommand):
running: bool
scheduled: Any
def valid_ip(self, address):
def valid_ip(self, address: str) -> bool:
try:
socket.inet_aton(address)
return True
except Exception:
return False
def start(self):
def start(self) -> None:
self.host = ""
self.max = 0
self.running = False
@ -88,7 +88,7 @@ class Command_ping(HoneyPotCommand):
self.scheduled = reactor.callLater(0.2, self.showreply) # type: ignore[attr-defined]
self.count = 0
def showreply(self):
def showreply(self) -> None:
ms = 40 + random.random() * 10
self.write(
"64 bytes from {} ({}): icmp_seq={} ttl=50 time={:.1f} ms\n".format(
@ -104,7 +104,7 @@ class Command_ping(HoneyPotCommand):
else:
self.scheduled = reactor.callLater(1, self.showreply) # type: ignore[attr-defined]
def printstatistics(self):
def printstatistics(self) -> None:
self.write(f"--- {self.host} ping statistics ---\n")
self.write(
"%d packets transmitted, %d received, 0%% packet loss, time 907ms\n"
@ -112,7 +112,7 @@ class Command_ping(HoneyPotCommand):
)
self.write("rtt min/avg/max/mdev = 48.264/50.352/52.441/2.100 ms\n")
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
if self.running is False:
return HoneyPotCommand.handle_CTRL_C(self)
else:

View File

@ -17,11 +17,11 @@ commands = {}
class Command_python(HoneyPotCommand):
def version(self):
def version(self) -> None:
ver = "Python 2.7.11+"
self.write(ver + "\n")
def help(self):
def help(self) -> None:
output = (
"usage: python [option] ... [-c cmd | -m mod | file | -] [arg] ...",
"Options and arguments (and corresponding environment variables):",
@ -71,7 +71,7 @@ class Command_python(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def start(self):
def start(self) -> None:
try:
opts, args = getopt.gnu_getopt(
self.args, "BdEhiORsStuvVx3c:m:Q:W:", ["help", "version"]
@ -120,7 +120,7 @@ class Command_python(HoneyPotCommand):
if not len(self.args):
pass
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="python",
@ -128,7 +128,7 @@ class Command_python(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -55,14 +55,14 @@ class Command_scp(HoneyPotCommand):
out_dir: str = ""
def help(self):
def help(self) -> None:
self.write(
"""usage: scp [-12346BCpqrv] [-c cipher] [-F ssh_config] [-i identity_file]
[-l limit] [-o ssh_option] [-P port] [-S program]
[[user@]host1:]file1 ... [[user@]host2:]file2\n"""
)
def start(self):
def start(self) -> None:
try:
optlist, args = getopt.getopt(self.args, "12346BCpqrvfstdv:cFiloPS:")
except getopt.GetoptError:
@ -95,7 +95,7 @@ class Command_scp(HoneyPotCommand):
self.write("\x00")
self.write("\x00")
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.session.file_download",
realm="scp",
@ -104,7 +104,7 @@ class Command_scp(HoneyPotCommand):
)
self.protocol.terminal.write("\x00")
def drop_tmp_file(self, data, name):
def drop_tmp_file(self, data: bytes, name: str) -> None:
tmp_fname = "{}-{}-{}-scp_{}".format(
time.strftime("%Y%m%d-%H%M%S"),
self.protocol.getProtoTransport().transportId,
@ -117,7 +117,7 @@ class Command_scp(HoneyPotCommand):
with open(self.safeoutfile, "wb+") as f:
f.write(data)
def save_file(self, data, fname):
def save_file(self, data: bytes, fname: str) -> None:
self.drop_tmp_file(data, fname)
if os.path.exists(self.safeoutfile):
@ -197,7 +197,7 @@ class Command_scp(HoneyPotCommand):
return data
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
if (
self.protocol.terminal.stdinlogOpen
and self.protocol.terminal.stdinlogFile

View File

@ -19,7 +19,7 @@ class Command_service(HoneyPotCommand):
By Giannis Papaioannou <giannispapcod7@gmail.com>
"""
def status_all(self):
def status_all(self) -> None:
"""
more services can be added here.
"""
@ -86,11 +86,11 @@ class Command_service(HoneyPotCommand):
for line in output:
self.write(line + "\n")
def help(self):
def help(self) -> None:
output = "Usage: service < option > | --status-all | [ service_name [ command | --full-restart ] ]"
self.write(output + "\n")
def call(self):
def call(self) -> None:
try:
opts, args = getopt.gnu_getopt(
self.args, "h", ["help", "status-all", "full-restart"]

View File

@ -9,7 +9,7 @@ from __future__ import annotations
import re
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from cowrie.shell.command import HoneyPotCommand
@ -23,10 +23,10 @@ class Command_sleep(HoneyPotCommand):
pattern = re.compile(r"(\d+)[mhs]?")
def done(self):
def done(self) -> None:
self.exit()
def start(self):
def start(self) -> None:
if len(self.args) == 1:
m = re.match(r"(\d+)[mhs]?", self.args[0])
if m:

View File

@ -11,7 +11,7 @@ import socket
import time
from typing import Callable
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.python import log
from cowrie.core.config import CowrieConfig
@ -39,14 +39,14 @@ class Command_ssh(HoneyPotCommand):
host: str
callbacks: list[Callable]
def valid_ip(self, address):
def valid_ip(self, address: str) -> bool:
try:
socket.inet_aton(address)
return True
except Exception:
return False
def start(self):
def start(self) -> None:
try:
options = "-1246AaCfgKkMNnqsTtVvXxYb:c:D:e:F:i:L:l:m:O:o:p:R:S:w:"
optlist, args = getopt.getopt(self.args, options)
@ -110,7 +110,7 @@ class Command_ssh(HoneyPotCommand):
self.write("Are you sure you want to continue connecting (yes/no)? ")
self.callbacks = [self.yesno, self.wait]
def yesno(self, line):
def yesno(self, line: str) -> None:
self.write(
"Warning: Permanently added '{}' (RSA) to the \
list of known hosts.\n".format(
@ -120,10 +120,10 @@ class Command_ssh(HoneyPotCommand):
self.write(f"{self.user}@{self.host}'s password: ")
self.protocol.password_input = True
def wait(self, line):
def wait(self, line: str) -> None:
reactor.callLater(2, self.finish, line) # type: ignore[attr-defined]
def finish(self, line):
def finish(self, line: str) -> None:
self.pause = False
rests = self.host.strip().split(".")
if len(rests) and rests[0].isalpha():
@ -144,7 +144,7 @@ class Command_ssh(HoneyPotCommand):
self.write(f"Last login: {time.ctime(time.time() - 123123)} from 192.168.9.4\n")
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg("INPUT (ssh):", line)
if len(self.callbacks):
self.callbacks.pop(0)(line)

View File

@ -65,17 +65,17 @@ Options:
class Command_sudo(HoneyPotCommand):
def short_help(self):
def short_help(self) -> None:
for ln in sudo_shorthelp:
self.errorWrite(f"{ln}\n")
self.exit()
def long_help(self):
def long_help(self) -> None:
for ln in sudo_longhelp:
self.errorWrite(f"{ln}\n")
self.exit()
def version(self):
def version(self) -> None:
self.errorWrite(
"""Sudo version 1.8.5p2
Sudoers policy plugin version 1.8.5p2
@ -84,7 +84,7 @@ class Command_sudo(HoneyPotCommand):
)
self.exit()
def start(self):
def start(self) -> None:
start_value = None
parsed_arguments = []
for count in range(0, len(self.args)):

View File

@ -23,7 +23,7 @@ class Command_tar(HoneyPotCommand):
if p and not self.fs.exists(p):
self.fs.mkdir(p, 0, 0, 4096, f.mode, f.mtime)
def call(self):
def call(self) -> None:
if len(self.args) < 2:
self.write("tar: You must specify one of the `-Acdtrux' options\n")
self.write("Try `tar --help' or `tar --usage' for more information.\n")

View File

@ -9,6 +9,7 @@ from __future__ import annotations
import getopt
import os
from typing import Optional
from twisted.python import log
@ -28,7 +29,7 @@ class Command_tee(HoneyPotCommand):
writtenBytes = 0
ignoreInterupts = False
def start(self):
def start(self) -> None:
try:
optlist, args = getopt.gnu_getopt(
self.args, "aip", ["help", "append", "version"]
@ -75,28 +76,28 @@ class Command_tee(HoneyPotCommand):
self.output(self.input_data)
self.exit()
def write_to_file(self, data):
def write_to_file(self, data: bytes) -> None:
self.writtenBytes += len(data)
for outf in self.teeFiles:
self.fs.update_size(outf, self.writtenBytes)
def output(self, input):
def output(self, inb: Optional[bytes]) -> None:
"""
This is the tee output, if no file supplied
"""
if "decode" in dir(input):
input = input.decode("UTF-8")
if not isinstance(input, str):
pass
if inb:
inp = inb.decode("utf-8")
else:
return
lines = input.split("\n")
lines = inp.split("\n")
if lines[-1] == "":
lines.pop()
for line in lines:
self.write(line + "\n")
self.write_to_file(line + "\n")
self.write_to_file(line.encode("utf-8") + b"\n")
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
"""
This function logs standard input from the user send to tee
"""
@ -107,21 +108,21 @@ class Command_tee(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
self.output(line)
self.output(line.encode("utf-8"))
def handle_CTRL_C(self):
def handle_CTRL_C(self) -> None:
if not self.ignoreInterupts:
log.msg("Received CTRL-C, exiting..")
self.write("^C\n")
self.exit()
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
"""
ctrl-d is end-of-file, time to terminate
"""
self.exit()
def help(self):
def help(self) -> None:
self.write(
"""Usage: tee [OPTION]... [FILE]...
Copy standard input to each FILE, and also to standard output.

View File

@ -31,7 +31,7 @@ class Command_tftp(HoneyPotCommand):
file_to_get: str
limit_size = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
def makeTftpRetrieval(self):
def makeTftpRetrieval(self) -> None:
progresshook = Progress(self).progresshook
self.artifactFile = Artifact(self.file_to_get)
@ -89,7 +89,7 @@ class Command_tftp(HoneyPotCommand):
self.file_to_get, self.protocol.user.uid, self.protocol.user.gid
)
def start(self):
def start(self) -> None:
parser = CustomParser(self)
parser.prog = "tftp"
parser.add_argument("hostname", nargs="?", default=None)

View File

@ -21,7 +21,7 @@ class Command_ulimit(HoneyPotCommand):
ulimit: usage: ulimit [-SHacdfilmnpqstuvx] [limit]
"""
def call(self):
def call(self) -> None:
# Parse options or display no files
try:
opts, args = getopt.getopt(self.args, "SHacdfilmnpqstuvx")

View File

@ -13,29 +13,29 @@ from cowrie.shell.command import HoneyPotCommand
commands = {}
def hardware_platform():
def hardware_platform() -> str:
return CowrieConfig.get("shell", "hardware_platform", fallback="x86_64")
def kernel_name():
def kernel_name() -> str:
return CowrieConfig.get("shell", "kernel_name", fallback="Linux")
def kernel_version():
def kernel_version() -> str:
return CowrieConfig.get("shell", "kernel_version", fallback="3.2.0-4-amd64")
def kernel_build_string():
def kernel_build_string() -> str:
return CowrieConfig.get(
"shell", "kernel_build_string", fallback="#1 SMP Debian 3.2.68-1+deb7u1"
)
def operating_system():
def operating_system() -> str:
return CowrieConfig.get("shell", "operating_system", fallback="GNU/Linux")
def uname_help():
def uname_help() -> str:
return """Usage: uname [OPTION]...
Print certain system information. With no OPTION, same as -s.
@ -58,25 +58,25 @@ or available locally via: info '(coreutils) uname invocation'\n
"""
def uname_get_some_help():
def uname_get_some_help() -> str:
return "Try 'uname --help' for more information."
def uname_fail_long(arg):
def uname_fail_long(arg: str) -> str:
return f"uname: unrecognized option '{arg}'\n{uname_get_some_help()}\n"
def uname_fail_short(arg):
def uname_fail_short(arg: str) -> str:
return f"uname: invalid option -- '{arg}'\n{uname_get_some_help()}\n"
def uname_fail_extra(arg):
def uname_fail_extra(arg: str) -> str:
# Note: These are apostrophes, not single quotation marks.
return f"uname: extra operand {arg}\n{uname_get_some_help()}\n"
class Command_uname(HoneyPotCommand):
def full_uname(self):
def full_uname(self) -> str:
return "{} {} {} {} {} {}\n".format(
kernel_name(),
self.protocol.hostname,
@ -86,7 +86,7 @@ class Command_uname(HoneyPotCommand):
operating_system(),
)
def call(self):
def call(self) -> None:
opts = {
"name": False,
"release": False,

View File

@ -54,7 +54,7 @@ class Command_uniq(HoneyPotCommand):
last_line = None
def start(self):
def start(self) -> None:
if "--help" in self.args:
self.writeBytes(UNIQ_HELP.encode())
self.exit()
@ -66,7 +66,7 @@ class Command_uniq(HoneyPotCommand):
self.grep_input(line)
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="uniq",
@ -75,10 +75,10 @@ class Command_uniq(HoneyPotCommand):
)
self.grep_input(line.encode())
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()
def grep_input(self, line):
def grep_input(self, line: bytes) -> None:
if not line == self.last_line:
self.writeBytes(line + b"\n")
self.last_line = line

View File

@ -16,7 +16,7 @@ commands = {}
class Command_unzip(HoneyPotCommand):
def mkfullpath(self, path, f):
def mkfullpath(self, path: str) -> None:
l, d = path.split("/"), []
while len(l):
d.append(l.pop(0))
@ -24,7 +24,7 @@ class Command_unzip(HoneyPotCommand):
if not self.fs.exists(dir):
self.fs.mkdir(dir, 0, 0, 4096, 33188)
def call(self):
def call(self) -> None:
if len(self.args) == 0 or self.args[0].startswith("-"):
output = (
"UnZip 6.00 of 20 April 2009, by Debian. Original by Info-ZIP.\n"
@ -116,7 +116,7 @@ class Command_unzip(HoneyPotCommand):
if f.is_dir():
self.fs.mkdir(dest, 0, 0, 4096, 33188)
elif not f.is_dir():
self.mkfullpath(os.path.dirname(dest), f)
self.mkfullpath(os.path.dirname(dest))
self.fs.mkfile(dest, 0, 0, f.file_size, 33188)
else:
log.msg(f" skipping: {f.filename}\n")

View File

@ -12,7 +12,7 @@ commands = {}
class Command_uptime(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(
"%s up %s, 1 user, load average: 0.00, 0.00, 0.00\n"
% (time.strftime("%H:%M:%S"), utils.uptime(self.protocol.uptime()))

View File

@ -7,9 +7,9 @@ This module contains the wc commnad
"""
from __future__ import annotations
import getopt
import re
from typing import Tuple
from twisted.python import log
@ -23,7 +23,7 @@ class Command_wc(HoneyPotCommand):
wc command
"""
def version(self):
def version(self) -> None:
self.writeBytes(b"wc (GNU coreutils) 8.30\n")
self.writeBytes(b"Copyright (C) 2018 Free Software Foundation, Inc.\n")
self.writeBytes(
@ -36,7 +36,7 @@ class Command_wc(HoneyPotCommand):
self.writeBytes(b"\n")
self.writeBytes(b"Written by Paul Rubin and David MacKenzie.\n")
def help(self):
def help(self) -> None:
self.writeBytes(b"Usage: wc [OPTION]... [FILE]...\n")
self.writeBytes(
b"Print newline, word, and byte counts for each FILE, and a total line if\n"
@ -61,22 +61,22 @@ class Command_wc(HoneyPotCommand):
self.writeBytes(b"\t-h\tdisplay this help and exit\n")
self.writeBytes(b"\t-v\toutput version information and exit\n")
def wc_get_contents(self, filename, optlist):
def wc_get_contents(self, filename: str, optlist: list[Tuple[str, str]]) -> None:
try:
contents = self.fs.file_contents(filename)
self.wc_application(contents, optlist)
except Exception:
self.errorWrite(f"wc: {filename}: No such file or directory\n")
def wc_application(self, contents, optlist):
def wc_application(self, contents: bytes, optlist: list[Tuple[str, str]]) -> None:
for opt, arg in optlist:
if opt == "-l":
contentsplit = contents.split(b"\n")
self.write(f"{len(contentsplit) - 1}\n")
elif opt == "-w":
contentsplit = re.sub(
" +", " ", contents.decode().strip("\n").strip()
).split(" ")
contentsplit = re.sub(b" +", b" ", contents.strip(b"\n").strip()).split(
b" "
)
self.write(f"{len(contentsplit)}\n")
elif opt == "-m" or opt == "-c":
self.write(f"{len(contents)}\n")
@ -85,7 +85,7 @@ class Command_wc(HoneyPotCommand):
else:
self.help()
def start(self):
def start(self) -> None:
if not self.args:
self.exit()
return
@ -119,7 +119,7 @@ class Command_wc(HoneyPotCommand):
self.exit()
def lineReceived(self, line):
def lineReceived(self, line: str) -> None:
log.msg(
eventid="cowrie.command.input",
realm="wc",
@ -127,7 +127,7 @@ class Command_wc(HoneyPotCommand):
format="INPUT (%(realm)s): %(input)s",
)
def handle_CTRL_D(self):
def handle_CTRL_D(self) -> None:
self.exit()

View File

@ -7,7 +7,7 @@ import getopt
import ipaddress
import os
import time
from typing import Optional
from typing import Any, Optional
from twisted.internet import error
from twisted.python import compat, log
@ -22,7 +22,7 @@ from cowrie.shell.command import HoneyPotCommand
commands = {}
def tdiff(seconds):
def tdiff(seconds: int) -> str:
t = seconds
days = int(t / (24 * 60 * 60))
t -= days * 24 * 60 * 60
@ -41,15 +41,16 @@ def tdiff(seconds):
return s
def sizeof_fmt(num):
def sizeof_fmt(num: float) -> str:
for x in ["bytes", "K", "M", "G", "T"]:
if num < 1024.0:
return f"{num}{x}"
num /= 1024.0
raise Exception
# Luciano Ramalho @ http://code.activestate.com/recipes/498181/
def splitthousands(s, sep=","):
def splitthousands(s: str, sep: str = ",") -> str:
if len(s) <= 3:
return s
return splitthousands(s[:-3], sep) + sep + s[-3:]
@ -72,7 +73,7 @@ class Command_wget(HoneyPotCommand):
host: str
started: float
def start(self):
def start(self) -> None:
url: str
try:
optlist, args = getopt.getopt(self.args, "cqO:P:", ["header="])
@ -90,7 +91,7 @@ class Command_wget(HoneyPotCommand):
self.exit()
return
self.outfile: str = None
self.outfile = None
self.quiet = False
for opt in optlist:
if opt[0] == "-O":
@ -112,7 +113,10 @@ class Command_wget(HoneyPotCommand):
urldata = compat.urllib_parse.urlparse(url)
self.host = urldata.hostname
if urldata.hostname:
self.host = urldata.hostname
else:
pass
# TODO: need to do full name resolution in case someon passes DNS name pointing to local address
try:
@ -132,7 +136,7 @@ class Command_wget(HoneyPotCommand):
if self.outfile != "-":
self.outfile = self.fs.resolve_path(self.outfile, self.protocol.cwd)
path = os.path.dirname(self.outfile)
path = os.path.dirname(self.outfile) # type: ignore
if not path or not self.fs.exists(path) or not self.fs.isdir(path):
self.errorWrite(
"wget: {}: Cannot open: No such file or directory\n".format(
@ -155,7 +159,7 @@ class Command_wget(HoneyPotCommand):
self.deferred.addCallback(self.success)
self.deferred.addErrback(self.error)
def wgetDownload(self, url):
def wgetDownload(self, url: str) -> Any:
"""
Download `url`
"""
@ -223,6 +227,7 @@ class Command_wget(HoneyPotCommand):
"""
partial collect
"""
eta: float
self.currentlength += len(data)
if self.limit_size > 0 and self.currentlength > self.limit_size:
log.msg(
@ -248,7 +253,7 @@ class Command_wget(HoneyPotCommand):
("%s>" % (int(39.0 / 100.0 * percent) * "=")).ljust(39),
splitthousands(str(int(self.currentlength))).ljust(12),
self.speed / 1000,
tdiff(eta),
tdiff(int(eta)),
)
if not self.quiet:
self.errorWrite(s.ljust(self.proglen))

View File

@ -11,7 +11,7 @@ class Command_which(HoneyPotCommand):
# Do not resolve args
resolve_args = False
def call(self):
def call(self) -> None:
"""
Look up all the arguments on PATH and print each (first) result
"""

View File

@ -10,10 +10,10 @@ from __future__ import annotations
import hashlib
import random
import re
from typing import Any, Dict
from typing import Any, Callable, Dict, Optional
from twisted.internet import defer
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.python import log
@ -25,9 +25,9 @@ commands = {}
class Command_faked_package_class_factory:
@staticmethod
def getCommand(name):
def getCommand(name: str) -> Callable:
class Command_faked_installation(HoneyPotCommand):
def call(self):
def call(self) -> None:
self.write(f"{name}: Segmentation fault\n")
return Command_faked_installation
@ -39,9 +39,10 @@ class Command_yum(HoneyPotCommand):
suppports only the 'install PACKAGE' command & 'moo'.
Any installed packages, places a 'Segfault' at /usr/bin/PACKAGE.'''
"""
packages: Dict[str, Dict[str, Any]] = {}
def start(self):
def start(self) -> None:
if len(self.args) == 0:
self.do_help()
elif len(self.args) > 0 and self.args[0] == "version":
@ -51,10 +52,10 @@ class Command_yum(HoneyPotCommand):
else:
self.do_locked()
def sleep(self, time, time2=None):
def sleep(self, time: float, time2: Optional[float] = None) -> defer.Deferred:
d: defer.Deferred = defer.Deferred()
if time2:
time = random.randint(time * 100, time2 * 100) / 100.0
time = random.randint(int(time * 100), int(time2 * 100)) / 100.0
reactor.callLater(time, d.callback, None) # type: ignore[attr-defined]
return d
@ -315,7 +316,7 @@ Options:
self.write("Complete!\n")
self.exit()
def do_locked(self):
def do_locked(self) -> None:
self.errorWrite(
"Loaded plugins: changelog, kernel-module, ovl, priorities, tsflags, versionlock\n"
)

View File

@ -59,8 +59,8 @@ class Artifact:
self.close()
return True
def write(self, bytes: bytes) -> None:
self.fp.write(bytes)
def write(self, data: bytes) -> None:
self.fp.write(data)
def fileno(self) -> Any:
return self.fp.fileno()

View File

@ -47,7 +47,8 @@ class UserDB:
dblines: list[str]
try:
with open(
"{}/userdb.txt".format(CowrieConfig.get("honeypot", "etc_path"))
"{}/userdb.txt".format(CowrieConfig.get("honeypot", "etc_path")),
encoding="ascii",
) as db:
dblines = db.readlines()
except OSError:
@ -79,12 +80,11 @@ class UserDB:
return False
def match_rule(
self, rule: Union[bytes, Pattern[bytes]], input: bytes
self, rule: Union[bytes, Pattern[bytes]], data: bytes
) -> Union[bool, bytes]:
if isinstance(rule, bytes):
return rule in [b"*", input]
else:
return bool(rule.search(input))
return rule in [b"*", data]
return bool(rule.search(data))
def re_or_bytes(self, rule: bytes) -> Union[Pattern[bytes], bytes]:
"""
@ -93,7 +93,7 @@ class UserDB:
@param login: rule
@type login: bytes
"""
res = re.match(br"/(.+)/(i)?$", rule)
res = re.match(rb"/(.+)/(i)?$", rule)
if res:
return re.compile(res.group(1), re.IGNORECASE if res.group(2) else 0)
@ -156,7 +156,7 @@ class AuthRandom:
Load user vars from json file
"""
if path.isfile(self.uservar_file):
with open(self.uservar_file) as fp:
with open(self.uservar_file, encoding="utf-8") as fp:
try:
self.uservar = json.load(fp)
except Exception:
@ -168,7 +168,7 @@ class AuthRandom:
"""
data = self.uservar
# Note: this is subject to races between cowrie logins
with open(self.uservar_file, "w") as fp:
with open(self.uservar_file, "w", encoding="utf-8") as fp:
json.dump(data, fp)
def checklogin(self, thelogin: bytes, thepasswd: bytes, src_ip: str) -> bool:
@ -202,9 +202,8 @@ class AuthRandom:
auth = True
self.savevars()
return auth
else:
ipinfo["max"] = randint(self.mintry, self.maxtry)
log.msg("first time for {}, need: {}".format(src_ip, ipinfo["max"]))
ipinfo["max"] = randint(self.mintry, self.maxtry)
log.msg("first time for {}, need: {}".format(src_ip, ipinfo["max"]))
else:
if userpass in cache:
ipinfo = self.uservar[src_ip]

View File

@ -74,8 +74,7 @@ class HoneypotPasswordChecker:
credentials.username, credentials.password, credentials.ip
):
return defer.succeed(credentials.username)
else:
return defer.fail(UnauthorizedLogin())
return defer.fail(UnauthorizedLogin())
elif hasattr(credentials, "pamConversion"):
return self.checkPamUser(
credentials.username, credentials.pamConversion, credentials.ip
@ -87,12 +86,12 @@ class HoneypotPasswordChecker:
return r.addCallback(self.cbCheckPamUser, username, ip)
def cbCheckPamUser(self, responses, username, ip):
for (response, zero) in responses:
for (response, _) in responses:
if self.checkUserPass(username, response, ip):
return defer.succeed(username)
return defer.fail(UnauthorizedLogin())
def checkUserPass(self, theusername, thepassword, ip):
def checkUserPass(self, theusername: bytes, thepassword: bytes, ip: str) -> bool:
# UserDB is the default auth_class
authname = auth.UserDB
@ -117,11 +116,11 @@ class HoneypotPasswordChecker:
password=thepassword,
)
return True
else:
log.msg(
eventid="cowrie.login.failed",
format="login attempt [%(username)s/%(password)s] failed",
username=theusername,
password=thepassword,
)
return False
log.msg(
eventid="cowrie.login.failed",
format="login attempt [%(username)s/%(password)s] failed",
username=theusername,
password=thepassword,
)
return False

View File

@ -35,7 +35,7 @@ import time
from os import environ
from typing import Any, Pattern
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.logger import formatTime
from cowrie.core.config import CowrieConfig

View File

@ -43,7 +43,7 @@ from time import sleep, time
from treq import post
from twisted.internet import defer, threads
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.python import log
from twisted.web import http
@ -92,7 +92,7 @@ class Output(output.Output):
self.logbook.sleep_until = t_wake
# and we set an alarm so the reactor knows when he can drag
# us back out of bed
reactor.callLater(t_wake - t_now, self.logbook.wakeup) # type: ignore[attr-defined]
reactor.callLater(t_wake - t_now, self.logbook.wakeup)
del self.logbook["sleeping"]
del self.logbook["sleep_until"]
@ -219,7 +219,7 @@ class LogBook(dict):
# This is the method we pass in a callLater() before we go to sleep.
self.sleeping = False
self.sleep_until = 0
self.recall = reactor.callLater(CLEAN_DUMP_SCHED, self.cleanup_and_dump_state) # type: ignore[attr-defined]
self.recall = reactor.callLater(CLEAN_DUMP_SCHED, self.cleanup_and_dump_state)
log.msg(
eventid="cowrie.abuseipdb.wakeup",
format="AbuseIPDB plugin resuming activity after receiving "
@ -299,7 +299,7 @@ class LogBook(dict):
self.dump_state()
if mode == 0 and not self.sleeping:
self.recall = reactor.callLater( # type: ignore[attr-defined]
self.recall = reactor.callLater(
CLEAN_DUMP_SCHED, self.cleanup_and_dump_state
)
@ -314,7 +314,7 @@ class LogBook(dict):
for k, v in self.items():
dump[k] = v
reactor.callInThread(self.write_dump_file, dump) # type: ignore[attr-defined]
reactor.callInThread(self.write_dump_file, dump)
def write_dump_file(self, dump):
# Check self._writing; waits for release; timeout after 10 seconds.
@ -472,7 +472,7 @@ class Reporter:
self.logbook.sleeping = True
self.logbook.sleep_until = time() + retry
reactor.callLater(retry, self.logbook.wakeup) # type: ignore[attr-defined]
reactor.callLater(retry, self.logbook.wakeup)
# It's not serious if we don't, but it's best to call the clean-up
# after logbook.sleeping has been set to True. The clean-up method
# checks for this flag and will use the wake-up time rather than

View File

@ -19,7 +19,7 @@ from cowrie.core.config import CowrieConfig
class Output(cowrie.core.output.Output):
def start(self):
def start(self) -> None:
self.url = CowrieConfig.get("output_datadog", "url").encode("utf8")
self.api_key = CowrieConfig.get("output_datadog", "api_key", fallback="").encode("utf8")
if len(self.api_key) == 0:
@ -30,7 +30,7 @@ class Output(cowrie.core.output.Output):
contextFactory = WebClientContextFactory()
self.agent = client.Agent(reactor, contextFactory)
def stop(self):
def stop(self) -> None:
pass
def write(self, logentry):

View File

@ -17,12 +17,12 @@ from cowrie.core.config import CowrieConfig
class Output(cowrie.core.output.Output):
def start(self):
def start(self) -> None:
self.url = CowrieConfig.get("output_discord", "url").encode("utf8")
contextFactory = WebClientContextFactory()
self.agent = client.Agent(reactor, contextFactory)
def stop(self):
def stop(self) -> None:
pass
def write(self, logentry):

View File

@ -14,7 +14,7 @@ import time
import dateutil.parser
import requests
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet import threads
from twisted.python import log
@ -167,6 +167,6 @@ class Output(cowrie.core.output.Output):
if failed:
# Something went wrong, we need to add them to batch.
reactor.callFromThread(self.transmission_error, batch) # type: ignore[attr-defined]
reactor.callFromThread(self.transmission_error, batch)
req.addCallback(check_response)

View File

@ -18,12 +18,12 @@ from cowrie.core.config import CowrieConfig
class Output(cowrie.core.output.Output):
def start(self):
def start(self) -> None:
self.url = CowrieConfig.get("output_graylog", "url").encode("utf8")
contextFactory = WebClientContextFactory()
self.agent = client.Agent(reactor, contextFactory)
def stop(self):
def stop(self) -> None:
pass
def write(self, logentry):

View File

@ -10,7 +10,7 @@ import logging
from hpfeeds.twisted import ClientSessionService
from twisted.internet import endpoints, ssl
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.python import log
import cowrie.core.output

View File

@ -4,6 +4,8 @@ Send downloaded/uplaoded files to S3 (or compatible)
from __future__ import annotations
from typing import Any
from configparser import NoOptionError
from botocore.exceptions import ClientError
@ -21,9 +23,9 @@ class Output(cowrie.core.output.Output):
s3 output
"""
def start(self):
def start(self) -> None:
self.bucket = CowrieConfig.get("output_s3", "bucket")
self.seen = set()
self.seen: set[str] = set()
self.session = get_session()
try:
@ -46,10 +48,10 @@ class Output(cowrie.core.output.Output):
verify=CowrieConfig.getboolean("output_s3", "verify", fallback=True),
)
def stop(self):
def stop(self) -> None:
pass
def write(self, entry):
def write(self, entry: dict[str, Any]) -> None:
if entry["eventid"] == "cowrie.session.file_download":
self.upload(entry["shasum"], entry["outfile"])

View File

@ -12,7 +12,7 @@ import json
from io import BytesIO
from typing import Any
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.ssl import ClientContextFactory
from twisted.python import log
from twisted.web import client, http_headers
@ -31,7 +31,7 @@ class Output(cowrie.core.output.Output):
agent: Any
url: bytes
def start(self):
def start(self) -> None:
self.token = CowrieConfig.get("output_splunk", "token")
self.url = CowrieConfig.get("output_splunk", "url").encode("utf8")
self.index = CowrieConfig.get("output_splunk", "index", fallback=None)
@ -42,7 +42,7 @@ class Output(cowrie.core.output.Output):
# contextFactory.method = TLSv1_METHOD
self.agent = client.Agent(reactor, contextFactory)
def stop(self):
def stop(self) -> None:
pass
def write(self, logentry):

View File

@ -41,7 +41,7 @@ from urllib.parse import urlencode, urlparse
from zope.interface import implementer
from twisted.internet import defer
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.ssl import ClientContextFactory
from twisted.python import log
from twisted.web import client, http_headers
@ -71,7 +71,7 @@ class Output(cowrie.core.output.Output):
str, datetime.datetime
] = {} # url and last time succesfully submitted
def start(self):
def start(self) -> None:
"""
Start output plugin
"""
@ -96,7 +96,7 @@ class Output(cowrie.core.output.Output):
)
self.agent = client.Agent(reactor, WebClientContextFactory())
def stop(self):
def stop(self) -> None:
"""
Stop output plugin
"""

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import os
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.python import log

View File

@ -26,7 +26,7 @@ from twisted.conch.ssh.filetransfer import (
from twisted.python import log
from twisted.python.compat import nativeString
import cowrie.shell.pwd as pwd
from cowrie.shell import pwd
from cowrie.core.config import CowrieConfig

View File

@ -97,10 +97,9 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
self.setTimeout(timeout)
# Source IP of client in user visible reports (can be fake or real)
try:
self.clientIP = CowrieConfig.get("honeypot", "fake_addr")
except Exception:
self.clientIP = self.realClientIP
self.clientIP = CowrieConfig.get(
"honeypot", "fake_addr", fallback=self.realClientIP
)
# Source IP of server in user visible reports (can be fake or real)
if CowrieConfig.has_option("honeypot", "internet_facing_ip"):
@ -139,7 +138,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
class Command_txtcmd(command.HoneyPotCommand):
def call(self):
log.msg(f'Reading txtcmd from "{txt}"')
with open(txt) as f:
with open(txt, encoding="utf-8") as f:
self.write(f.read())
return Command_txtcmd
@ -151,7 +150,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
return True if cmd in self.commands else False
def getCommand(self, cmd, paths):
if not len(cmd.strip()):
if not cmd.strip():
return None
path = None
if cmd in self.commands:
@ -186,7 +185,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
"""
line = line.decode("utf8")
if len(self.cmdstack):
if self.cmdstack:
self.cmdstack[-1].lineReceived(line)
else:
log.msg(f"discarding input {line}")
@ -329,15 +328,15 @@ class HoneyPotInteractiveProtocol(HoneyPotBaseProtocol, recvline.HistoricRecvLin
return recvline.RecvLine.handle_RETURN(self)
def handle_CTRL_C(self):
if len(self.cmdstack):
if self.cmdstack:
self.cmdstack[-1].handle_CTRL_C()
def handle_CTRL_D(self):
if len(self.cmdstack):
if self.cmdstack:
self.cmdstack[-1].handle_CTRL_D()
def handle_TAB(self):
if len(self.cmdstack):
if self.cmdstack:
self.cmdstack[-1].handle_TAB()
def handle_CTRL_K(self):

View File

@ -55,7 +55,7 @@ class Passwd:
Load /etc/passwd
"""
self.passwd = []
with open(self.passwd_file) as f:
with open(self.passwd_file, encoding="ascii") as f:
while True:
rawline = f.readline()
if not rawline:
@ -165,7 +165,7 @@ class Group:
Load /etc/group
"""
self.group = []
with open(self.group_file) as f:
with open(self.group_file, encoding="ascii") as f:
while True:
rawline = f.readline()
if not rawline:
@ -178,7 +178,7 @@ class Group:
if line.startswith("#"):
continue
(gr_name, gr_passwd, gr_gid, gr_mem) = line.split(":")
(gr_name, _, gr_gid, gr_mem) = line.split(":")
e: dict[str, Union[str, int]] = {}
e["gr_name"] = gr_name

View File

@ -33,8 +33,8 @@ import json
import random
from configparser import NoOptionError
import twisted.python.log as log
from twisted.cred.portal import IRealm
from twisted.python import log
from cowrie.core.config import CowrieConfig
from cowrie.shell import fs
@ -69,7 +69,7 @@ class CowrieServer:
"""
Reads process output from JSON file.
"""
with open(file) as f:
with open(file, encoding="utf-8") as f:
cmdoutput = json.load(f)
return cmdoutput

View File

@ -56,7 +56,7 @@ class CowrieSSHFactory(factory.SSHFactory):
Special delivery to the loggers to avoid scope problems
"""
args["sessionno"] = "S{}".format(args["sessionno"])
for output in self.tac.output_plugins: # type: ignore[attr-defined]
for output in self.tac.output_plugins:
output.logDispatch(**args)
def startFactory(self):

View File

@ -37,7 +37,7 @@ from hashlib import md5
from twisted.conch.ssh import transport
from twisted.conch.ssh.common import getNS
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.protocols.policies import TimeoutMixin
from twisted.python import log, randbytes
@ -56,6 +56,7 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
"""
buf: bytes
ourVersionString: bytes
gotVersion: bool
# TODO merge this with HoneyPotSSHTransport(transport.SSHServerTransport, TimeoutMixin)
# maybe create a parent class with common methods for the two

View File

@ -40,7 +40,7 @@ class HoneyPotTelnetFactory(protocol.ServerFactory):
Special delivery to the loggers to avoid scope problems
"""
args["sessionno"] = "T{}".format(str(args["sessionno"]))
for output in self.tac.output_plugins: # type: ignore[attr-defined]
for output in self.tac.output_plugins:
output.logDispatch(**args)
def startFactory(self):

View File

@ -11,7 +11,7 @@ import time
import uuid
from twisted.conch.telnet import TelnetTransport
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.protocols.policies import TimeoutMixin
from twisted.python import log
@ -22,7 +22,7 @@ from cowrie.telnet_proxy.handler import TelnetHandler
# object is added for Python 2.7 compatibility (#1198) - as is super with args
class FrontendTelnetTransport(TelnetTransport, TimeoutMixin):
class FrontendTelnetTransport(TimeoutMixin, TelnetTransport):
def __init__(self):
super().__init__()

View File

@ -14,7 +14,7 @@ from cowrie.core.realm import HoneyPotRealm
from cowrie.ssh.factory import CowrieSSHFactory
from twisted.cred import portal
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
# from cowrie.test.proxy_compare import ProxyTestCommand
@ -67,7 +67,7 @@ class ProxyTests(unittest.TestCase):
# ################################################# #
# setup SSH backend
self.factory_shell_ssh = create_ssh_factory("shell")
self.shell_server_ssh = reactor.listenTCP( # type: ignore[attr-defined]
self.shell_server_ssh = reactor.listenTCP(
self.PORT_BACKEND_SSH, self.factory_shell_ssh
)
@ -83,7 +83,7 @@ class ProxyTests(unittest.TestCase):
# setup SSH proxy
self.factory_proxy_ssh = create_ssh_factory("proxy")
self.proxy_server_ssh = reactor.listenTCP( # type: ignore[attr-defined]
self.proxy_server_ssh = reactor.listenTCP(
self.PORT_PROXY_SSH, self.factory_proxy_ssh
)

View File

@ -8,7 +8,7 @@ from cowrie.core.utils import create_endpoint_services, durationHuman, get_endpo
from twisted.application.service import MultiService
from twisted.internet import protocol
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
def get_config(config_string: str) -> configparser.ConfigParser:

View File

@ -39,7 +39,7 @@ from twisted._version import __version__ as __twisted_version__
from twisted.application import service
from twisted.application.service import IServiceMaker
from twisted.cred import portal
from twisted.internet import reactor # type: ignore
from twisted.internet import reactor
from twisted.logger import ILogObserver, globalLogPublisher
from twisted.plugin import IPlugin
from twisted.python import log, usage
@ -253,7 +253,7 @@ Makes a Cowrie SSH/Telnet honeypot.
if self.enableTelnet:
f = cowrie.telnet.factory.HoneyPotTelnetFactory(backend, self.pool_handler)
f.tac = self # type: ignore
f.tac = self
f.portal = portal.Portal(core.realm.HoneyPotRealm())
f.portal.registerChecker(core.checkers.HoneypotPasswordChecker())

View File

@ -31,9 +31,10 @@ deps =
allowlist_externals =
yamllint
commands =
flake8 --ignore E203,E501,W503 --count --statistics {toxinidir}/src
- flake8 --ignore E203,E501,W503 --count --statistics {toxinidir}/src
# - twistedchecker -d W9002,W9202,W9204,W9208,W9402,C0301,C0103,W9001,C9302,W9401 {toxinidir}/src
yamllint {toxinidir}
- yamllint {toxinidir}
- pylint {toxinidir}/src
basepython = python3.10