* remove python2 support, assume 3.6+
* ignore .envrc
* type annotations
* rewrite cowrieconfig from singleton class to module variable
* add resumeproducing function
* name has become bytes (was str before)
* remove unreachable statement
* add typing for mock fake transport
* enable mypy check by default
This commit is contained in:
Michel Oosterhof
2021-04-03 23:53:44 +08:00
committed by GitHub
parent 0c775d0b61
commit ac2ad04925
87 changed files with 353 additions and 820 deletions

View File

@ -37,16 +37,16 @@ jobs:
command: |
. cowrie-env/bin/activate
flake8 --ignore E203,E501,W503 --count --application-import-names cowrie --statistics src
- run:
name: Mypy
command: |
. cowrie-env/bin/activate
mypy src
- run:
name: Twistedchecker
command: |
. cowrie-env/bin/activate
twistedchecker src || true
- run:
name: Mypy
command: |
. cowrie-env/bin/activate
mypy src || true
- run:
name: Build Python library
command: |

1
.gitignore vendored
View File

@ -12,6 +12,7 @@ docs/_build
build/
log/
.eggs/
.envrc
.direnv/
__pycache__/
*.py[cod]

View File

@ -20,8 +20,8 @@ install:
before_script:
# stop the build if there are Python syntax errors or undefined names
- flake8 --ignore E203,E501,W503 --count --application-import-names cowrie --statistics .
- mypy src
- twistedchecker src || true
- mypy src || true
- python setup.py build sdist bdist
- make -C docs html
script:

View File

@ -128,12 +128,10 @@ class LibvirtBackendService:
# - no snapshot dir was defined (using cowrie's root dir) - should not happen but prevent it
if (
(
not CowrieConfig().getboolean(
not CowrieConfig.getboolean(
"backend_pool", "save_snapshots", fallback=True
)
or CowrieConfig().get(
"backend_pool", "snapshot_path", fallback=None
)
or CowrieConfig.get("backend_pool", "snapshot_path", fallback=None)
is None
)
and os.path.exists(snapshot)

View File

@ -21,19 +21,17 @@ def create_guest(connection, mac_address, guest_unique_id):
# get guest configurations
configuration_file = os.path.join(
CowrieConfig().get(
CowrieConfig.get(
"backend_pool", "config_files_path", fallback="share/pool_configs"
),
CowrieConfig().get(
"backend_pool", "guest_config", fallback="default_guest.xml"
),
CowrieConfig.get("backend_pool", "guest_config", fallback="default_guest.xml"),
)
version_tag = CowrieConfig().get("backend_pool", "guest_tag", fallback="guest")
base_image = CowrieConfig().get("backend_pool", "guest_image_path")
hypervisor = CowrieConfig().get("backend_pool", "guest_hypervisor", fallback="qemu")
memory = CowrieConfig().getint("backend_pool", "guest_memory", fallback=128)
qemu_machine = CowrieConfig().get(
version_tag = CowrieConfig.get("backend_pool", "guest_tag", fallback="guest")
base_image = CowrieConfig.get("backend_pool", "guest_image_path")
hypervisor = CowrieConfig.get("backend_pool", "guest_hypervisor", fallback="qemu")
memory = CowrieConfig.getint("backend_pool", "guest_memory", fallback=128)
qemu_machine = CowrieConfig.get(
"backend_pool", "guest_qemu_machine", fallback="pc-q35-3.1"
)
@ -47,13 +45,13 @@ def create_guest(connection, mac_address, guest_unique_id):
os._exit(1)
# only in some cases, like wrt
kernel_image = CowrieConfig().get("backend_pool", "guest_kernel_image", fallback="")
kernel_image = CowrieConfig.get("backend_pool", "guest_kernel_image", fallback="")
# get a directory to save snapshots, even if temporary
try:
# guest configuration, to be read by qemu, needs an absolute path
snapshot_path = backend_pool.util.to_absolute_path(
CowrieConfig().get("backend_pool", "snapshot_path")
CowrieConfig.get("backend_pool", "snapshot_path")
)
except NoOptionError:
snapshot_path = os.getcwd()

View File

@ -14,10 +14,10 @@ def create_filter(connection):
import libvirt
filter_file = os.path.join(
CowrieConfig().get(
CowrieConfig.get(
"backend_pool", "config_files_path", fallback="share/pool_configs"
),
CowrieConfig().get(
CowrieConfig.get(
"backend_pool", "nw_filter_config", fallback="default_filter.xml"
),
)
@ -41,10 +41,10 @@ def create_network(connection, network_table):
# TODO support more interfaces and therefore more IP space to allow > 253 guests
network_file = os.path.join(
CowrieConfig().get(
CowrieConfig.get(
"backend_pool", "config_files_path", fallback="share/pool_configs"
),
CowrieConfig().get(
CowrieConfig.get(
"backend_pool", "network_config", fallback="default_network.xml"
),
)

View File

@ -16,18 +16,14 @@ from cowrie.core.config import CowrieConfig
class PoolServer(Protocol):
def __init__(self, factory):
self.factory = factory
self.local_pool = (
CowrieConfig().get("proxy", "pool", fallback="local") == "local"
)
self.pool_only = CowrieConfig().getboolean(
self.local_pool = CowrieConfig.get("proxy", "pool", fallback="local") == "local"
self.pool_only = CowrieConfig.getboolean(
"backend_pool", "pool_only", fallback=False
)
self.use_nat = CowrieConfig().getboolean(
"backend_pool", "use_nat", fallback=True
)
self.use_nat = CowrieConfig.getboolean("backend_pool", "use_nat", fallback=True)
if self.use_nat:
self.nat_public_ip = CowrieConfig().get("backend_pool", "nat_public_ip")
self.nat_public_ip = CowrieConfig.get("backend_pool", "nat_public_ip")
def dataReceived(self, data):
res_op = struct.unpack("!c", bytes([data[0]]))[
@ -78,10 +74,10 @@ class PoolServer(Protocol):
guest_id=guest_id,
)
ssh_port = CowrieConfig().getint(
ssh_port = CowrieConfig.getint(
"backend_pool", "guest_ssh_port", fallback=22
)
telnet_port = CowrieConfig().getint(
telnet_port = CowrieConfig.getint(
"backend_pool", "guest_telnet_port", fallback=23
)

View File

@ -52,22 +52,18 @@ class PoolService:
self.share_guests = True
# file configs
self.ssh_port = CowrieConfig().getint(
self.ssh_port = CowrieConfig.getint(
"backend_pool", "guest_ssh_port", fallback=-1
)
self.telnet_port = CowrieConfig().getint(
self.telnet_port = CowrieConfig.getint(
"backend_pool", "guest_telnet_port", fallback=-1
)
self.local_pool = (
CowrieConfig().get("proxy", "pool", fallback="local") == "local"
)
self.pool_only = CowrieConfig().getboolean(
self.local_pool = CowrieConfig.get("proxy", "pool", fallback="local") == "local"
self.pool_only = CowrieConfig.getboolean(
"backend_pool", "pool_only", fallback=False
)
self.use_nat = CowrieConfig().getboolean(
"backend_pool", "use_nat", fallback=True
)
self.use_nat = CowrieConfig.getboolean("backend_pool", "use_nat", fallback=True)
# detect invalid config
if not self.ssh_port > 0 and not self.telnet_port > 0:
@ -96,7 +92,7 @@ class PoolService:
threads.deferToThread(self.producer_loop)
# recycle myself after some time
recycle_period = CowrieConfig().getint(
recycle_period = CowrieConfig.getint(
"backend_pool", "recycle_period", fallback=-1
)
if recycle_period > 0:

View File

@ -13,7 +13,7 @@ class PasswordAuth(userauth.SSHUserAuthClient):
class CommandChannel(channel.SSHChannel):
name = "session"
name = b"session"
def __init__(self, command, done_deferred, callback, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@ -11,6 +11,7 @@ limited implementation that only supports `print` command.
import getopt
import re
from typing import Dict, List
from twisted.python import log
@ -26,7 +27,7 @@ class command_awk(HoneyPotCommand):
"""
# code is an array of dictionaries contain the regexes to match and the code to execute
code = []
code: List[Dict[str, str]] = []
def start(self):
try:

View File

@ -10,6 +10,7 @@ import getopt
import random
import re
import time
from typing import Callable, Dict
from twisted.internet import error, reactor
from twisted.python import failure, log
@ -18,7 +19,7 @@ from cowrie.core import utils
from cowrie.shell.command import HoneyPotCommand
from cowrie.shell.honeypot import HoneyPotShell
commands = {}
commands: Dict[str, Callable] = {}
class command_whoami(HoneyPotCommand):

View File

@ -178,8 +178,8 @@ class command_curl(HoneyPotCommand):
curl command
"""
limit_size = CowrieConfig().getint("honeypot", "download_limit_size", fallback=0)
download_path = CowrieConfig().get("honeypot", "download_path")
limit_size = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
download_path = CowrieConfig.get("honeypot", "download_path")
def start(self):
try:
@ -272,8 +272,8 @@ class command_curl(HoneyPotCommand):
self, fakeoutfile, url, outputfile, *args, **kwargs
)
out_addr = None
if CowrieConfig().has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig().get("honeypot", "out_addr"), 0)
if CowrieConfig.has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig.get("honeypot", "out_addr"), 0)
if scheme == "https":
context_factory = ssl.optionsForClientTLS(hostname=host)

View File

@ -7,6 +7,7 @@ dd commands
import re
from typing import Dict
from twisted.python import log
@ -21,7 +22,7 @@ class command_dd(HoneyPotCommand):
dd command
"""
ddargs = {}
ddargs: Dict[str, str] = {}
def start(self):
if not self.args or self.args[0] == ">":

View File

@ -11,13 +11,14 @@ import copy
import getopt
import os.path
import re
from typing import Callable, Dict
from twisted.python import log
import cowrie.shell.fs as fs
from cowrie.shell.command import HoneyPotCommand
commands = {}
commands: Dict[str, Callable] = {}
class command_grep(HoneyPotCommand):

View File

@ -80,7 +80,7 @@ class command_ftpget(HoneyPotCommand):
ftpget command
"""
download_path = CowrieConfig().get("honeypot", "download_path")
download_path = CowrieConfig.get("honeypot", "download_path")
def help(self):
self.write(
@ -212,8 +212,8 @@ Download a file via FTP
def ftp_download(self):
out_addr = ("", 0)
if CowrieConfig().has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig().get("honeypot", "out_addr"), 0)
if CowrieConfig.has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig.get("honeypot", "out_addr"), 0)
ftp = FTP(source_address=out_addr)

View File

@ -194,7 +194,7 @@ gcc version {} (Debian {}-5)""".format(
re.sub("[^A-Za-z0-9]", "_", outfile),
)
safeoutfile = os.path.join(
CowrieConfig().get("honeypot", "download_path"), tmp_fname
CowrieConfig.get("honeypot", "download_path"), tmp_fname
)
# Data contains random garbage from an actual file, so when

View File

@ -101,7 +101,7 @@ usage: nc [-46bCDdhjklnrStUuvZz] [-I length] [-i interval] [-O length]
out_addr = None
try:
out_addr = (CowrieConfig().get("honeypot", "out_addr"), 0)
out_addr = (CowrieConfig.get("honeypot", "out_addr"), 0)
except Exception:
out_addr = ("0.0.0.0", 0)

View File

@ -47,8 +47,8 @@ class command_scp(HoneyPotCommand):
scp command
"""
download_path = CowrieConfig().get("honeypot", "download_path")
download_path_uniq = CowrieConfig().get(
download_path = CowrieConfig.get("honeypot", "download_path")
download_path_uniq = CowrieConfig.get(
"honeypot", "download_path_uniq", fallback=download_path
)

View File

@ -46,7 +46,7 @@ class command_ssh(HoneyPotCommand):
for opt in optlist:
if opt[0] == "-V":
self.write(
CowrieConfig().get(
CowrieConfig.get(
"shell",
"ssh_version",
fallback="OpenSSH_7.9p1, OpenSSL 1.1.1a 20 Nov 2018",

View File

@ -8,6 +8,7 @@ tee command
import getopt
import os
from typing import List
from twisted.python import log
@ -23,7 +24,7 @@ class command_tee(HoneyPotCommand):
"""
append = False
teeFiles = []
teeFiles: List[str] = []
writtenBytes = 0
ignoreInterupts = False

View File

@ -32,7 +32,7 @@ class command_tftp(HoneyPotCommand):
port = 69
hostname = None
file_to_get = None
limit_size = CowrieConfig().getint("honeypot", "download_limit_size", fallback=0)
limit_size = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
def makeTftpRetrieval(self):
progresshook = Progress(self).progresshook

View File

@ -13,25 +13,25 @@ commands = {}
def hardware_platform():
return CowrieConfig().get("shell", "hardware_platform", fallback="x86_64")
return CowrieConfig.get("shell", "hardware_platform", fallback="x86_64")
def kernel_name():
return CowrieConfig().get("shell", "kernel_name", fallback="Linux")
return CowrieConfig.get("shell", "kernel_name", fallback="Linux")
def kernel_version():
return CowrieConfig().get("shell", "kernel_version", fallback="3.2.0-4-amd64")
return CowrieConfig.get("shell", "kernel_version", fallback="3.2.0-4-amd64")
def kernel_build_string():
return CowrieConfig().get(
return CowrieConfig.get(
"shell", "kernel_build_string", fallback="#1 SMP Debian 3.2.68-1+deb7u1"
)
def operating_system():
return CowrieConfig().get("shell", "operating_system", fallback="GNU/Linux")
return CowrieConfig.get("shell", "operating_system", fallback="GNU/Linux")
def uname_help():

View File

@ -55,8 +55,8 @@ class command_wget(HoneyPotCommand):
wget command
"""
limit_size = CowrieConfig().getint("honeypot", "download_limit_size", fallback=0)
downloadPath = CowrieConfig().get("honeypot", "download_path")
limit_size = CowrieConfig.getint("honeypot", "download_limit_size", fallback=0)
downloadPath = CowrieConfig.get("honeypot", "download_path")
def start(self):
try:
@ -157,8 +157,8 @@ class command_wget(HoneyPotCommand):
)
out_addr = None
if CowrieConfig().has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig().get("honeypot", "out_addr"), 0)
if CowrieConfig.has_option("honeypot", "out_addr"):
out_addr = (CowrieConfig.get("honeypot", "out_addr"), 0)
if scheme == b"https":
context_factory = ssl.optionsForClientTLS(hostname=host)

View File

@ -32,7 +32,7 @@ from cowrie.core.config import CowrieConfig
class Artifact:
artifactDir = CowrieConfig().get("honeypot", "download_path")
artifactDir = CowrieConfig.get("honeypot", "download_path")
def __init__(self, label):
self.label = label

View File

@ -42,7 +42,7 @@ class UserDB:
try:
with open(
"{}/userdb.txt".format(CowrieConfig().get("honeypot", "etc_path"))
"{}/userdb.txt".format(CowrieConfig.get("honeypot", "etc_path"))
) as db:
userdb = db.readlines()
except OSError:
@ -120,8 +120,8 @@ class AuthRandom:
self.mintry, self.maxtry, self.maxcache = 2, 5, 10
# Are there auth_class parameters?
if CowrieConfig().has_option("honeypot", "auth_class_parameters"):
parameters = CowrieConfig().get("honeypot", "auth_class_parameters")
if CowrieConfig.has_option("honeypot", "auth_class_parameters"):
parameters = CowrieConfig.get("honeypot", "auth_class_parameters")
parlist = parameters.split(",")
if len(parlist) == 3:
self.mintry = int(parlist[0])
@ -133,7 +133,7 @@ class AuthRandom:
log.msg(f"maxtry < mintry, adjusting maxtry to: {self.maxtry}")
self.uservar = {}
self.uservar_file = "{}/auth_random.json".format(
CowrieConfig().get("honeypot", "state_path")
CowrieConfig.get("honeypot", "state_path")
)
self.loadvars()

View File

@ -97,8 +97,8 @@ class HoneypotPasswordChecker:
authname = auth.UserDB
# Is the auth_class defined in the config file?
if CowrieConfig().has_option("honeypot", "auth_class"):
authclass = CowrieConfig().get("honeypot", "auth_class")
if CowrieConfig.has_option("honeypot", "auth_class"):
authclass = CowrieConfig.get("honeypot", "auth_class")
authmodule = "cowrie.core.auth"
# Check if authclass exists in this module

View File

@ -15,23 +15,6 @@ def to_environ_key(key):
return key.upper()
class CowrieConfig:
"""
Singleton class for configuration data
"""
__instance = None
def __new__(cls):
if CowrieConfig.__instance is None:
CowrieConfig.__instance = object.__new__(EnvironmentConfigParser)
CowrieConfig.__instance.__init__(
interpolation=configparser.ExtendedInterpolation()
)
CowrieConfig.__instance.read(get_config_path())
return CowrieConfig.__instance
class EnvironmentConfigParser(configparser.ConfigParser):
"""
ConfigParser with additional option to read from environment variables
@ -79,3 +62,6 @@ def get_config_path():
return found_confs
print("Config file not found")
CowrieConfig = readConfigFile(get_config_path())

View File

@ -90,3 +90,6 @@ class UsernamePasswordIP:
self.username = username
self.password = password
self.ip = ip
def checkPassword(self, password):
return self.password == password

View File

@ -92,7 +92,7 @@ class Output:
# Need these for each individual transport, or else the session numbers overlap
self.sshRegex = re.compile(".*SSHTransport,([0-9]+),[0-9a-f:.]+$")
self.telnetRegex = re.compile(".*TelnetTransport,([0-9]+),[0-9a-f:.]+$")
self.sensor = CowrieConfig().get(
self.sensor = CowrieConfig.get(
"honeypot", "sensor_name", fallback=socket.gethostname()
)

View File

@ -3,8 +3,6 @@
# See the COPYRIGHT file for more information
import sys
from twisted.application import internet
from twisted.internet import endpoints
@ -113,14 +111,7 @@ def get_endpoints_from_section(cfg, section, default_port):
def create_endpoint_services(reactor, parent, listen_endpoints, factory):
for listen_endpoint in listen_endpoints:
# work around http://twistedmatrix.com/trac/ticket/8422
if sys.version_info.major < 3:
endpoint = endpoints.serverFromString(
reactor, listen_endpoint.encode("utf-8")
)
else:
endpoint = endpoints.serverFromString(reactor, listen_endpoint)
endpoint = endpoints.serverFromString(reactor, listen_endpoint)
service = internet.StreamServerEndpointService(endpoint, factory)
# FIXME: Use addService on parent ?

View File

@ -5,6 +5,7 @@
import hashlib
import os
import time
from typing import List, Set
from twisted.conch.insults import insults
from twisted.python import log
@ -22,14 +23,14 @@ class LoggingServerProtocol(insults.ServerProtocol):
redirlogOpen = False # it will be set at core/protocol.py
stdinlogOpen = False
ttylogOpen = False
ttylogPath = CowrieConfig().get("honeypot", "ttylog_path")
downloadPath = CowrieConfig().get("honeypot", "download_path")
ttylogEnabled = CowrieConfig().getboolean("honeypot", "ttylog", fallback=True)
bytesReceivedLimit = CowrieConfig().getint(
ttylogPath = CowrieConfig.get("honeypot", "ttylog_path")
downloadPath = CowrieConfig.get("honeypot", "download_path")
ttylogEnabled = CowrieConfig.getboolean("honeypot", "ttylog", fallback=True)
bytesReceivedLimit = CowrieConfig.getint(
"honeypot", "download_limit_size", fallback=0
)
bytesReceived = 0
redirFiles = set()
redirFiles: Set[List[str]] = set()
def __init__(self, prot=None, *a, **kw):
insults.ServerProtocol.__init__(self, prot, *a, **kw)

View File

@ -37,7 +37,6 @@ from collections import deque
from datetime import datetime
from json.decoder import JSONDecodeError
from pathlib import Path
from sys import version_info
from time import sleep, time
from treq import post
@ -63,17 +62,13 @@ REREPORT_MINIMUM = 900
class Output(output.Output):
def start(self):
self.tolerance_attempts = CowrieConfig().getint(
self.tolerance_attempts = CowrieConfig.getint(
"output_abuseipdb", "tolerance_attempts", fallback=10
)
self.state_path = CowrieConfig().get("output_abuseipdb", "dump_path")
self.state_path = CowrieConfig.get("output_abuseipdb", "dump_path")
self.state_path = Path(*(d for d in self.state_path.split("/")))
self.state_dump = self.state_path / DUMP_FILE
if version_info.minor < 6:
# PathLike object not compatible with with open in python < 3.6
self.state_dump = str(self.state_dump)
self.logbook = LogBook(self.tolerance_attempts, self.state_dump)
# Pass our instance of LogBook() to Reporter() so we don't end up
# working with different records.
@ -204,10 +199,10 @@ class LogBook(dict):
self.sleeping = False
self.sleep_until = 0
self.tolerance_attempts = tolerance_attempts
self.tolerance_window = 60 * CowrieConfig().getint(
self.tolerance_window = 60 * CowrieConfig.getint(
"output_abuseipdb", "tolerance_window", fallback=120
)
self.rereport_after = 3600 * CowrieConfig().getfloat(
self.rereport_after = 3600 * CowrieConfig.getfloat(
"output_abuseipdb", "rereport_after", fallback=24
)
if self.rereport_after < REREPORT_MINIMUM:
@ -348,7 +343,7 @@ class Reporter:
self.headers = {
"User-Agent": "Cowrie Honeypot AbuseIPDB plugin",
"Accept": "application/json",
"Key": CowrieConfig().get("output_abuseipdb", "api_key"),
"Key": CowrieConfig.get("output_abuseipdb", "api_key"),
}
def report_ip_single(self, ip, t, uname):

View File

@ -31,8 +31,8 @@ class Output(cowrie.core.output.Output):
"""
Start output plugin
"""
self.apiKey = CowrieConfig().get("output_cowrie", "api_key", fallback=None)
self.debug = CowrieConfig().getboolean("output_cowrie", "debug", fallback=False)
self.apiKey = CowrieConfig.get("output_cowrie", "api_key", fallback=None)
self.debug = CowrieConfig.getboolean("output_cowrie", "debug", fallback=False)
def emit(self, event):
"""

View File

@ -23,10 +23,10 @@ class Output(cowrie.core.output.Output):
def start(
self,
):
self.user = CowrieConfig().get("output_csirtg", "username") or USERNAME
self.feed = CowrieConfig().get("output_csirtg", "feed") or FEED
self.token = CowrieConfig().get("output_csirtg", "token") or TOKEN
self.description = CowrieConfig().get(
self.user = CowrieConfig.get("output_csirtg", "username") or USERNAME
self.feed = CowrieConfig.get("output_csirtg", "feed") or FEED
self.token = CowrieConfig.get("output_csirtg", "token") or TOKEN
self.description = CowrieConfig.get(
"output_csirtg", "description", fallback=DESCRIPTION
)
self.context = {}

View File

@ -32,11 +32,7 @@ Send downloaded/uplaoded files to Cuckoo
import os
try:
from urllib.parse import urlparse, urljoin
except ImportError:
from urlparse import urlparse, urljoin
from urllib.parse import urljoin, urlparse
import requests
from requests.auth import HTTPBasicAuth
@ -54,10 +50,10 @@ class Output(cowrie.core.output.Output):
"""
Start output plugin
"""
self.url_base = CowrieConfig().get("output_cuckoo", "url_base").encode("utf-8")
self.api_user = CowrieConfig().get("output_cuckoo", "user")
self.api_passwd = CowrieConfig().get("output_cuckoo", "passwd", raw=True)
self.cuckoo_force = int(CowrieConfig().getboolean("output_cuckoo", "force"))
self.url_base = CowrieConfig.get("output_cuckoo", "url_base").encode("utf-8")
self.api_user = CowrieConfig.get("output_cuckoo", "user")
self.api_passwd = CowrieConfig.get("output_cuckoo", "passwd", raw=True)
self.cuckoo_force = int(CowrieConfig.getboolean("output_cuckoo", "force"))
def stop(self):
"""

View File

@ -27,12 +27,10 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.auth_key = CowrieConfig().get("output_dshield", "auth_key")
self.userid = CowrieConfig().get("output_dshield", "userid")
self.batch_size = CowrieConfig().getint("output_dshield", "batch_size")
self.debug = CowrieConfig().getboolean(
"output_dshield", "debug", fallback=False
)
self.auth_key = CowrieConfig.get("output_dshield", "auth_key")
self.userid = CowrieConfig.get("output_dshield", "userid")
self.batch_size = CowrieConfig.getint("output_dshield", "batch_size")
self.debug = CowrieConfig.getboolean("output_dshield", "debug", fallback=False)
self.batch = [] # This is used to store login attempts in batches
def stop(self):

View File

@ -13,25 +13,25 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.host = CowrieConfig().get("output_elasticsearch", "host")
self.port = CowrieConfig().get("output_elasticsearch", "port")
self.index = CowrieConfig().get("output_elasticsearch", "index")
self.type = CowrieConfig().get("output_elasticsearch", "type")
self.pipeline = CowrieConfig().get("output_elasticsearch", "pipeline")
self.host = CowrieConfig.get("output_elasticsearch", "host")
self.port = CowrieConfig.get("output_elasticsearch", "port")
self.index = CowrieConfig.get("output_elasticsearch", "index")
self.type = CowrieConfig.get("output_elasticsearch", "type")
self.pipeline = CowrieConfig.get("output_elasticsearch", "pipeline")
# new options (creds + https)
self.username = CowrieConfig().get(
self.username = CowrieConfig.get(
"output_elasticsearch", "username", fallback=None
)
self.password = CowrieConfig().get(
self.password = CowrieConfig.get(
"output_elasticsearch", "password", fallback=None
)
self.use_ssl = CowrieConfig().getboolean(
self.use_ssl = CowrieConfig.getboolean(
"output_elasticsearch", "ssl", fallback=False
)
self.ca_certs = CowrieConfig().get(
self.ca_certs = CowrieConfig.get(
"output_elasticsearch", "ca_certs", fallback=None
)
self.verify_certs = CowrieConfig().getboolean(
self.verify_certs = CowrieConfig.getboolean(
"output_elasticsearch", "verify_certs", fallback=True
)

View File

@ -277,11 +277,11 @@ class Output(cowrie.core.output.Output):
def start(self):
log.msg("Early version of hpfeeds-output, untested!")
server = CowrieConfig().get("output_hpfeeds", "server")
port = CowrieConfig().getint("output_hpfeeds", "port")
ident = CowrieConfig().get("output_hpfeeds", "identifier")
secret = CowrieConfig().get("output_hpfeeds", "secret")
debug = CowrieConfig().getboolean("output_hpfeeds", "debug")
server = CowrieConfig.get("output_hpfeeds", "server")
port = CowrieConfig.getint("output_hpfeeds", "port")
ident = CowrieConfig.get("output_hpfeeds", "identifier")
secret = CowrieConfig.get("output_hpfeeds", "secret")
debug = CowrieConfig.getboolean("output_hpfeeds", "debug")
self.client = hpclient(server, port, ident, secret, debug)
self.meta = {}

View File

@ -27,25 +27,25 @@ class Output(cowrie.core.output.Output):
"WARNING: Beta version of new hpfeeds enabled. This will become hpfeeds in a future release."
)
if CowrieConfig().has_option("output_hpfeeds3", "channel"):
self.channel = CowrieConfig().get("output_hpfeeds3", "channel")
if CowrieConfig.has_option("output_hpfeeds3", "channel"):
self.channel = CowrieConfig.get("output_hpfeeds3", "channel")
if CowrieConfig().has_option("output_hpfeeds3", "endpoint"):
endpoint = CowrieConfig().get("output_hpfeeds3", "endpoint")
if CowrieConfig.has_option("output_hpfeeds3", "endpoint"):
endpoint = CowrieConfig.get("output_hpfeeds3", "endpoint")
else:
server = CowrieConfig().get("output_hpfeeds3", "server")
port = CowrieConfig().getint("output_hpfeeds3", "port")
server = CowrieConfig.get("output_hpfeeds3", "server")
port = CowrieConfig.getint("output_hpfeeds3", "port")
if CowrieConfig().has_option("output_hpfeeds3", "tlscert"):
with open(CowrieConfig().get("output_hpfeeds3", "tlscert")) as fp:
if CowrieConfig.has_option("output_hpfeeds3", "tlscert"):
with open(CowrieConfig.get("output_hpfeeds3", "tlscert")) as fp:
authority = ssl.Certificate.loadPEM(fp.read())
options = ssl.optionsForClientTLS(server, authority)
endpoint = endpoints.SSL4ClientEndpoint(reactor, server, port, options)
else:
endpoint = endpoints.HostnameEndpoint(reactor, server, port)
ident = CowrieConfig().get("output_hpfeeds3", "identifier")
secret = CowrieConfig().get("output_hpfeeds3", "secret")
ident = CowrieConfig.get("output_hpfeeds3", "identifier")
secret = CowrieConfig.get("output_hpfeeds3", "secret")
self.meta = {}

View File

@ -15,9 +15,9 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
host = CowrieConfig().get("output_influx", "host", fallback="")
port = CowrieConfig().getint("output_influx", "port", fallback=8086)
ssl = CowrieConfig().getboolean("output_influx", "ssl", fallback=False)
host = CowrieConfig.get("output_influx", "host", fallback="")
port = CowrieConfig.getint("output_influx", "port", fallback=8086)
ssl = CowrieConfig.getboolean("output_influx", "ssl", fallback=False)
self.client = None
try:
@ -30,23 +30,23 @@ class Output(cowrie.core.output.Output):
log.msg("output_influx: cannot instantiate client!")
return
if CowrieConfig().has_option(
if CowrieConfig.has_option(
"output_influx", "username"
) and CowrieConfig().has_option("output_influx", "password"):
username = CowrieConfig().get("output_influx", "username")
password = CowrieConfig().get("output_influx", "password", raw=True)
) and CowrieConfig.has_option("output_influx", "password"):
username = CowrieConfig.get("output_influx", "username")
password = CowrieConfig.get("output_influx", "password", raw=True)
self.client.switch_user(username, password)
try:
dbname = CowrieConfig().get("output_influx", "database_name")
dbname = CowrieConfig.get("output_influx", "database_name")
except Exception:
dbname = "cowrie"
retention_policy_duration_default = "12w"
retention_policy_name = dbname + "_retention_policy"
if CowrieConfig().has_option("output_influx", "retention_policy_duration"):
retention_policy_duration = CowrieConfig().get(
if CowrieConfig.has_option("output_influx", "retention_policy_duration"):
retention_policy_duration = CowrieConfig.get(
"output_influx", "retention_policy_duration"
)

View File

@ -41,10 +41,10 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.epoch_timestamp = CowrieConfig().getboolean(
self.epoch_timestamp = CowrieConfig.getboolean(
"output_jsonlog", "epoch_timestamp", fallback=False
)
fn = CowrieConfig().get("output_jsonlog", "logfile")
fn = CowrieConfig.get("output_jsonlog", "logfile")
dirs = os.path.dirname(fn)
base = os.path.basename(fn)
self.outfile = cowrie.python.logfile.CowrieDailyLogFile(

View File

@ -42,8 +42,8 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.format = CowrieConfig().get("output_localsyslog", "format")
facilityString = CowrieConfig().get("output_localsyslog", "facility")
self.format = CowrieConfig.get("output_localsyslog", "format")
facilityString = CowrieConfig.get("output_localsyslog", "facility")
self.facility = vars(syslog)["LOG_" + facilityString]
self.syslog = twisted.python.syslog.SyslogObserver(
prefix="cowrie", facility=self.facility

View File

@ -33,11 +33,8 @@ More info https://malshare.com/doc.php
import os
from urllib.parse import urlparse
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
import requests
from twisted.python import log
@ -57,7 +54,7 @@ class Output(cowrie.core.output.Output):
"""
Start output plugin
"""
self.apiKey = CowrieConfig().get("output_malshare", "api_key")
self.apiKey = CowrieConfig.get("output_malshare", "api_key")
def stop(self):
"""

View File

@ -1,4 +1,3 @@
import sys
import warnings
from functools import wraps
from pathlib import Path
@ -42,17 +41,16 @@ class Output(cowrie.core.output.Output):
"""
Start output plugin
"""
misp_url = CowrieConfig().get("output_misp", "base_url")
misp_key = CowrieConfig().get("output_misp", "api_key")
misp_url = CowrieConfig.get("output_misp", "base_url")
misp_key = CowrieConfig.get("output_misp", "api_key")
misp_verifycert = (
"true" == CowrieConfig().get("output_misp", "verify_cert").lower()
"true" == CowrieConfig.get("output_misp", "verify_cert").lower()
)
self.misp_api = PyMISP(
url=misp_url, key=misp_key, ssl=misp_verifycert, debug=False
)
self.is_python2 = sys.version_info[0] < 3
self.debug = CowrieConfig().getboolean("output_misp", "debug", fallback=False)
self.publish = CowrieConfig().getboolean(
self.debug = CowrieConfig.getboolean("output_misp", "debug", fallback=False)
self.publish = CowrieConfig.getboolean(
"output_misp", "publish_event", fallback=False
)
@ -88,10 +86,6 @@ class Output(cowrie.core.output.Output):
controller="attributes", type_attribute=attribute_type, value=searchterm
)
# legacy PyMISP returns the Attribute wrapped in a response
if self.is_python2:
result = result["response"]
if result["Attribute"]:
return result["Attribute"][0]
else:
@ -99,40 +93,24 @@ class Output(cowrie.core.output.Output):
@ignore_warnings
def create_new_event(self, entry):
if self.is_python2:
self.misp_api.upload_sample(
entry["shasum"],
entry["outfile"],
None,
distribution=1,
info="File uploaded to Cowrie ({})".format(entry["sensor"]),
analysis=0,
threat_level_id=2,
)
else:
attribute = MISPAttribute()
attribute.type = "malware-sample"
attribute.value = entry["shasum"]
attribute.data = Path(entry["outfile"])
attribute.comment = "File uploaded to Cowrie ({})".format(entry["sensor"])
attribute.expand = "binary"
event = MISPEvent()
event.info = "File uploaded to Cowrie ({})".format(entry["sensor"])
event.attributes = [attribute]
event.run_expansions()
if self.publish:
event.publish()
result = self.misp_api.add_event(event)
if self.debug:
log.msg("Event creation result: \n%s" % result)
attribute = MISPAttribute()
attribute.type = "malware-sample"
attribute.value = entry["shasum"]
attribute.data = Path(entry["outfile"])
attribute.comment = "File uploaded to Cowrie ({})".format(entry["sensor"])
attribute.expand = "binary"
event = MISPEvent()
event.info = "File uploaded to Cowrie ({})".format(entry["sensor"])
event.attributes = [attribute]
event.run_expansions()
if self.publish:
event.publish()
result = self.misp_api.add_event(event)
if self.debug:
log.msg("Event creation result: \n%s" % result)
@ignore_warnings
def add_sighting(self, entry, attribute):
if self.is_python2:
self.misp_api.sighting(
uuid=attribute["uuid"], source="{} (Cowrie)".format(entry["sensor"])
)
else:
sighting = MISPSighting()
sighting.source = "{} (Cowrie)".format(entry["sensor"])
self.misp_api.add_sighting(sighting, attribute)
sighting = MISPSighting()
sighting.source = "{} (Cowrie)".format(entry["sensor"])
self.misp_api.add_sighting(sighting, attribute)

View File

@ -26,8 +26,8 @@ class Output(cowrie.core.output.Output):
log.msg(f"mongo error - {e}")
def start(self):
db_addr = CowrieConfig().get("output_mongodb", "connection_string")
db_name = CowrieConfig().get("output_mongodb", "database")
db_addr = CowrieConfig.get("output_mongodb", "connection_string")
db_name = CowrieConfig.get("output_mongodb", "database")
try:
self.mongo_client = pymongo.MongoClient(db_addr)

View File

@ -47,15 +47,15 @@ class Output(cowrie.core.output.Output):
db = None
def start(self):
self.debug = CowrieConfig().getboolean("output_mysql", "debug", fallback=False)
port = CowrieConfig().getint("output_mysql", "port", fallback=3306)
self.debug = CowrieConfig.getboolean("output_mysql", "debug", fallback=False)
port = CowrieConfig.getint("output_mysql", "port", fallback=3306)
try:
self.db = ReconnectingConnectionPool(
"MySQLdb",
host=CowrieConfig().get("output_mysql", "host"),
db=CowrieConfig().get("output_mysql", "database"),
user=CowrieConfig().get("output_mysql", "username"),
passwd=CowrieConfig().get("output_mysql", "password", raw=True),
host=CowrieConfig.get("output_mysql", "host"),
db=CowrieConfig.get("output_mysql", "database"),
user=CowrieConfig.get("output_mysql", "username"),
passwd=CowrieConfig.get("output_mysql", "password", raw=True),
port=port,
cp_min=1,
cp_max=1,

View File

@ -22,26 +22,26 @@ class Output(cowrie.core.output.Output):
"""
Initialize pymisp module and ObjectWrapper (Abstract event and object creation)
"""
host = CowrieConfig().get("output_redis", "host")
port = CowrieConfig().get("output_redis", "port")
host = CowrieConfig.get("output_redis", "host")
port = CowrieConfig.get("output_redis", "port")
try:
db = CowrieConfig().get("output_redis", "db")
db = CowrieConfig.get("output_redis", "db")
except NoOptionError:
db = 0
try:
password = CowrieConfig().get("output_redis", "password")
password = CowrieConfig.get("output_redis", "password")
except NoOptionError:
password = None
self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
self.keyname = CowrieConfig().get("output_redis", "keyname")
self.keyname = CowrieConfig.get("output_redis", "keyname")
try:
self.send_method = SEND_METHODS[
CowrieConfig().get("output_redis", "send_method")
CowrieConfig.get("output_redis", "send_method")
]
except (NoOptionError, KeyError):
self.send_method = SEND_METHODS["lpush"]

View File

@ -18,11 +18,11 @@ class Output(cowrie.core.output.Output):
# noinspection PyAttributeOutsideInit
def start(self):
self.host = CowrieConfig().get(RETHINK_DB_SEGMENT, "host")
self.port = CowrieConfig().getint(RETHINK_DB_SEGMENT, "port")
self.db = CowrieConfig().get(RETHINK_DB_SEGMENT, "db")
self.table = CowrieConfig().get(RETHINK_DB_SEGMENT, "table")
self.password = CowrieConfig().get(RETHINK_DB_SEGMENT, "password", raw=True)
self.host = CowrieConfig.get(RETHINK_DB_SEGMENT, "host")
self.port = CowrieConfig.getint(RETHINK_DB_SEGMENT, "port")
self.db = CowrieConfig.get(RETHINK_DB_SEGMENT, "db")
self.table = CowrieConfig.get(RETHINK_DB_SEGMENT, "table")
self.password = CowrieConfig.get(RETHINK_DB_SEGMENT, "password", raw=True)
self.connection = r.connect(
host=self.host, port=self.port, db=self.db, password=self.password
)

View File

@ -18,9 +18,7 @@ class Output(cowrie.core.output.Output):
"""
Start Output Plugin
"""
self.timeout = [
CowrieConfig().getint("output_reversedns", "timeout", fallback=3)
]
self.timeout = [CowrieConfig.getint("output_reversedns", "timeout", fallback=3)]
def stop(self):
"""

View File

@ -21,17 +21,17 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.bucket = CowrieConfig().get("output_s3", "bucket")
self.bucket = CowrieConfig.get("output_s3", "bucket")
self.seen = set()
self.session = get_session()
try:
if CowrieConfig().get("output_s3", "access_key_id") and CowrieConfig().get(
if CowrieConfig.get("output_s3", "access_key_id") and CowrieConfig.get(
"output_s3", "secret_access_key"
):
self.session.set_credentials(
CowrieConfig().get("output_s3", "access_key_id"),
CowrieConfig().get("output_s3", "secret_access_key"),
CowrieConfig.get("output_s3", "access_key_id"),
CowrieConfig.get("output_s3", "secret_access_key"),
)
except NoOptionError:
log.msg(
@ -40,9 +40,9 @@ class Output(cowrie.core.output.Output):
self.client = self.session.create_client(
"s3",
region_name=CowrieConfig().get("output_s3", "region"),
endpoint_url=CowrieConfig().get("output_s3", "endpoint", fallback=None),
verify=CowrieConfig().getboolean("output_s3", "verify", fallback=True),
region_name=CowrieConfig.get("output_s3", "region"),
endpoint_url=CowrieConfig.get("output_s3", "endpoint", fallback=None),
verify=CowrieConfig.getboolean("output_s3", "verify", fallback=True),
)
def stop(self):

View File

@ -42,8 +42,8 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.slack_channel = CowrieConfig().get("output_slack", "channel")
self.slack_token = CowrieConfig().get("output_slack", "token")
self.slack_channel = CowrieConfig.get("output_slack", "channel")
self.slack_token = CowrieConfig.get("output_slack", "token")
def stop(self):
pass

View File

@ -11,8 +11,8 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.timeout = CowrieConfig().getint("output_socketlog", "timeout")
addr = CowrieConfig().get("output_socketlog", "address")
self.timeout = CowrieConfig.getint("output_socketlog", "timeout")
addr = CowrieConfig.get("output_socketlog", "address")
self.host = addr.split(":")[0]
self.port = int(addr.split(":")[1])

View File

@ -30,14 +30,12 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.token = CowrieConfig().get("output_splunk", "token")
self.url = CowrieConfig().get("output_splunk", "url").encode("utf8")
self.index = CowrieConfig().get("output_splunk", "index", fallback=None)
self.source = CowrieConfig().get("output_splunk", "source", fallback=None)
self.sourcetype = CowrieConfig().get(
"output_splunk", "sourcetype", fallback=None
)
self.host = CowrieConfig().get("output_splunk", "host", fallback=None)
self.token = CowrieConfig.get("output_splunk", "token")
self.url = CowrieConfig.get("output_splunk", "url").encode("utf8")
self.index = CowrieConfig.get("output_splunk", "index", fallback=None)
self.source = CowrieConfig.get("output_splunk", "source", fallback=None)
self.sourcetype = CowrieConfig.get("output_splunk", "sourcetype", fallback=None)
self.host = CowrieConfig.get("output_splunk", "host", fallback=None)
contextFactory = WebClientContextFactory()
# contextFactory.method = TLSv1_METHOD
self.agent = client.Agent(reactor, contextFactory)

View File

@ -19,7 +19,7 @@ class Output(cowrie.core.output.Output):
Need to be started with check_same_thread=False. See
https://twistedmatrix.com/trac/ticket/3629.
"""
sqliteFilename = CowrieConfig().get("output_sqlite", "db_file")
sqliteFilename = CowrieConfig.get("output_sqlite", "db_file")
try:
self.db = adbapi.ConnectionPool(
"sqlite3", database=sqliteFilename, check_same_thread=False

View File

@ -38,8 +38,8 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
self.format = CowrieConfig().get("output_textlog", "format")
self.outfile = open(CowrieConfig().get("output_textlog", "logfile"), "a")
self.format = CowrieConfig.get("output_textlog", "format")
self.outfile = open(CowrieConfig.get("output_textlog", "logfile"), "a")
def stop(self):
pass

View File

@ -34,12 +34,7 @@ Send SSH logins to Virustotal
import datetime
import json
import os
try:
from urllib.parse import urlparse, urlencode
except ImportError:
from urllib import urlencode
from urlparse import urlparse
from urllib.parse import urlencode, urlparse
from twisted.internet import defer, reactor
from twisted.internet.ssl import ClientContextFactory
@ -67,23 +62,23 @@ class Output(cowrie.core.output.Output):
"""
Start output plugin
"""
self.apiKey = CowrieConfig().get("output_virustotal", "api_key")
self.debug = CowrieConfig().getboolean(
self.apiKey = CowrieConfig.get("output_virustotal", "api_key")
self.debug = CowrieConfig.getboolean(
"output_virustotal", "debug", fallback=False
)
self.upload = CowrieConfig().getboolean(
self.upload = CowrieConfig.getboolean(
"output_virustotal", "upload", fallback=True
)
self.comment = CowrieConfig().getboolean(
self.comment = CowrieConfig.getboolean(
"output_virustotal", "comment", fallback=True
)
self.scan_file = CowrieConfig().getboolean(
self.scan_file = CowrieConfig.getboolean(
"output_virustotal", "scan_file", fallback=True
)
self.scan_url = CowrieConfig().getboolean(
self.scan_url = CowrieConfig.getboolean(
"output_virustotal", "scan_url", fallback=False
)
self.commenttext = CowrieConfig().get(
self.commenttext = CowrieConfig.get(
"output_virustotal", "commenttext", fallback=COMMENT
)
self.agent = client.Agent(reactor, WebClientContextFactory())
@ -110,9 +105,7 @@ class Output(cowrie.core.output.Output):
def _is_new_shasum(self, shasum):
# Get the downloaded file's modification time
shasumfile = os.path.join(
CowrieConfig().get("honeypot", "download_path"), shasum
)
shasumfile = os.path.join(CowrieConfig.get("honeypot", "download_path"), shasum)
file_modification_time = datetime.datetime.fromtimestamp(
os.stat(shasumfile).st_mtime
)
@ -450,6 +443,9 @@ class StringProducer:
def pauseProducing(self):
pass
def resumeProducing(self):
pass
def stopProducing(self):
pass

View File

@ -61,10 +61,10 @@ class Output(cowrie.core.output.Output):
"""
def start(self):
server = CowrieConfig().get("output_xmpp", "server")
user = CowrieConfig().get("output_xmpp", "user")
password = CowrieConfig().get("output_xmpp", "password")
muc = CowrieConfig().get("output_xmpp", "muc")
server = CowrieConfig.get("output_xmpp", "server")
user = CowrieConfig.get("output_xmpp", "user")
password = CowrieConfig.get("output_xmpp", "password")
muc = CowrieConfig.get("output_xmpp", "muc")
resource = "".join([choice(string.ascii_letters) for i in range(8)])
jid = user + "/" + resource
application = service.Application("honeypot")
@ -72,7 +72,7 @@ class Output(cowrie.core.output.Output):
def run(self, application, jidstr, password, muc, server):
self.xmppclient = XMPPClient(JID(jidstr), password)
if CowrieConfig().getboolean("output_xmpp", "debug", fallback=False):
if CowrieConfig.getboolean("output_xmpp", "debug", fallback=False):
self.xmppclient.logTraffic = True
(user, host, resource) = jid.parse(jidstr)
self.muc = XMPPLoggerProtocol(muc, server, user + "-" + resource)

View File

@ -29,11 +29,11 @@ class PoolClient(Protocol):
"""
Used only by the PoolHandler on the first connection, to set the pool up.
"""
max_vms = CowrieConfig().getint("proxy", "pool_max_vms", fallback=2)
vm_unused_timeout = CowrieConfig().getint(
max_vms = CowrieConfig.getint("proxy", "pool_max_vms", fallback=2)
vm_unused_timeout = CowrieConfig.getint(
"proxy", "pool_vm_unused_timeout", fallback=600
)
share_guests = CowrieConfig().getboolean(
share_guests = CowrieConfig.getboolean(
"proxy", "pool_share_guests", fallback=True
)

View File

@ -30,7 +30,7 @@ class CowrieDailyLogFile(logfile.DailyLogFile):
def logger():
dir = CowrieConfig().get("honeypot", "log_path", fallback="log")
dir = CowrieConfig.get("honeypot", "log_path", fallback="log")
logfile = CowrieDailyLogFile("cowrie.log", dir)
# use Z for UTC (Zulu) time, it's shorter.

View File

@ -40,11 +40,11 @@ class CowrieUser(avatar.ConchUser):
self.home = pwentry["pw_dir"]
# SFTP support enabled only when option is explicitly set
if CowrieConfig().getboolean("ssh", "sftp_enabled", fallback=False):
if CowrieConfig.getboolean("ssh", "sftp_enabled", fallback=False):
self.subsystemLookup[b"sftp"] = conchfiletransfer.FileTransferServer
# SSH forwarding disabled only when option is explicitly set
if CowrieConfig().getboolean("ssh", "forwarding", fallback=True):
if CowrieConfig.getboolean("ssh", "forwarding", fallback=True):
self.channelLookup[
b"direct-tcpip"
] = forwarding.cowrieOpenConnectForwardingClient

View File

@ -8,8 +8,8 @@ This module contains code to run a command
import os
import re
import shlex
import stat
import sys
import time
from twisted.internet import error
@ -18,12 +18,6 @@ from twisted.python import failure, log
from cowrie.core.config import CowrieConfig
from cowrie.shell import fs
# From Python3.6 we get the new shlex version
if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
import shlex
else:
from cowrie.shell import shlex
class HoneyPotCommand:
"""
@ -71,7 +65,7 @@ class HoneyPotCommand:
re.sub("[^A-Za-z0-9]", "_", self.outfile),
)
self.safeoutfile = os.path.join(
CowrieConfig().get("honeypot", "download_path"), tmp_fname
CowrieConfig.get("honeypot", "download_path"), tmp_fname
)
perm = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
try:

View File

@ -37,7 +37,7 @@ class CowrieSFTPFile:
transfer_completed = 0
bytesReceived = 0
bytesReceivedLimit = CowrieConfig().getint(
bytesReceivedLimit = CowrieConfig.getint(
"honeypot", "download_limit_size", fallback=0
)

View File

@ -5,7 +5,7 @@
try:
import cPickle as pickle
except Exception:
import pickle
import pickle # type: ignore
import errno
import fnmatch
@ -88,10 +88,10 @@ class HoneyPotFilesystem:
def __init__(self, fs, arch, home):
try:
with open(CowrieConfig().get("shell", "filesystem"), "rb") as f:
with open(CowrieConfig.get("shell", "filesystem"), "rb") as f:
self.fs = pickle.load(f)
except UnicodeDecodeError:
with open(CowrieConfig().get("shell", "filesystem"), "rb") as f:
with open(CowrieConfig.get("shell", "filesystem"), "rb") as f:
self.fs = pickle.load(f, encoding="utf8")
except Exception as e:
log.err(e, "ERROR: Failed to load filesystem")
@ -110,7 +110,7 @@ class HoneyPotFilesystem:
# Get the honeyfs path from the config file and explore it for file
# contents:
self.init_honeyfs(CowrieConfig().get("honeypot", "contents_path"))
self.init_honeyfs(CowrieConfig.get("honeypot", "contents_path"))
def init_honeyfs(self, honeyfs_path):
"""
@ -295,7 +295,7 @@ class HoneyPotFilesystem:
return ""
elif f[A_TYPE] == T_FILE and f[A_MODE] & stat.S_IXUSR:
return open(
CowrieConfig().get("honeypot", "share_path") + "/arch/" + self.arch,
CowrieConfig.get("honeypot", "share_path") + "/arch/" + self.arch,
"rb",
).read()
@ -328,7 +328,6 @@ class HoneyPotFilesystem:
dir = self.get_path(os.path.dirname(path.strip("/")))
except IndexError:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), path)
return False
dir.append(
[os.path.basename(path), T_DIR, uid, gid, size, mode, ctime, [], None, None]
)
@ -401,7 +400,7 @@ class HoneyPotFilesystem:
# strip executable bit
hostmode = mode & ~(111)
hostfile = "{}/{}_sftp_{}".format(
CowrieConfig().get("honeypot", "download_path"),
CowrieConfig.get("honeypot", "download_path"),
time.strftime("%Y%m%d-%H%M%S"),
re.sub("[^A-Za-z0-9]", "_", filename),
)
@ -429,7 +428,7 @@ class HoneyPotFilesystem:
return True
if self.tempfiles[fd] is not None:
shasum = hashlib.sha256(open(self.tempfiles[fd], "rb").read()).hexdigest()
shasumfile = CowrieConfig().get("honeypot", "download_path") + "/" + shasum
shasumfile = CowrieConfig.get("honeypot", "download_path") + "/" + shasum
if os.path.exists(shasumfile):
os.remove(self.tempfiles[fd])
else:

View File

@ -5,7 +5,7 @@
import copy
import os
import re
import sys
import shlex
from twisted.internet import error
from twisted.python import failure, log
@ -14,12 +14,6 @@ from twisted.python.compat import iterbytes
from cowrie.core.config import CowrieConfig
from cowrie.shell import fs
# From Python3.6 we get the new shlex version
if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
import shlex
else:
from cowrie.shell import shlex
class HoneyPotShell:
def __init__(self, protocol, interactive=True, redirect=False):
@ -337,8 +331,8 @@ class HoneyPotShell:
return
prompt = ""
if CowrieConfig().has_option("honeypot", "prompt"):
prompt = CowrieConfig().get("honeypot", "prompt")
if CowrieConfig.has_option("honeypot", "prompt"):
prompt = CowrieConfig.get("honeypot", "prompt")
prompt += " "
else:
cwd = self.protocol.cwd

View File

@ -92,18 +92,18 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
log.msg(eventid="cowrie.session.params", arch=self.user.server.arch)
timeout = CowrieConfig().getint("honeypot", "interactive_timeout", fallback=180)
timeout = CowrieConfig.getint("honeypot", "interactive_timeout", fallback=180)
self.setTimeout(timeout)
# Source IP of client in user visible reports (can be fake or real)
try:
self.clientIP = CowrieConfig().get("honeypot", "fake_addr")
self.clientIP = CowrieConfig.get("honeypot", "fake_addr")
except Exception:
self.clientIP = self.realClientIP
# Source IP of server in user visible reports (can be fake or real)
if CowrieConfig().has_option("honeypot", "internet_facing_ip"):
self.kippoIP = CowrieConfig().get("honeypot", "internet_facing_ip")
if CowrieConfig.has_option("honeypot", "internet_facing_ip"):
self.kippoIP = CowrieConfig.get("honeypot", "internet_facing_ip")
else:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -170,7 +170,7 @@ class HoneyPotBaseProtocol(insults.TerminalProtocol, TimeoutMixin):
break
txt = os.path.normpath(
"{}/txtcmds/{}".format(CowrieConfig().get("honeypot", "share_path"), path)
"{}/txtcmds/{}".format(CowrieConfig.get("honeypot", "share_path"), path)
)
if os.path.exists(txt) and os.path.isfile(txt):
return self.txtcmd(txt)

View File

@ -42,9 +42,7 @@ class Passwd:
passwords.
"""
passwd_file = "{}/etc/passwd".format(
CowrieConfig().get("honeypot", "contents_path")
)
passwd_file = "{}/etc/passwd".format(CowrieConfig.get("honeypot", "contents_path"))
def __init__(self):
self.load()
@ -153,7 +151,7 @@ class Group:
/etc/group.
"""
group_file = "{}/etc/group".format(CowrieConfig().get("honeypot", "contents_path"))
group_file = "{}/etc/group".format(CowrieConfig.get("honeypot", "contents_path"))
def __init__(self):
self.load()

View File

@ -49,13 +49,12 @@ class CowrieServer:
fs = None
process = None
avatars = []
hostname = CowrieConfig().get("honeypot", "hostname")
hostname = CowrieConfig.get("honeypot", "hostname")
def __init__(self, realm):
try:
arches = [
arch.strip() for arch in CowrieConfig().get("shell", "arch").split(",")
arch.strip() for arch in CowrieConfig.get("shell", "arch").split(",")
]
self.arch = random.choice(arches)
except NoOptionError:
@ -79,7 +78,7 @@ class CowrieServer:
try:
self.process = self.getCommandOutput(
CowrieConfig().get("shell", "processes")
CowrieConfig.get("shell", "processes")
)["command"]["ps"]
except NoOptionError:
self.process = None

View File

@ -1,368 +0,0 @@
"""A lexical analyzer class for simple shell-like syntaxes."""
# Module and documentation by Eric S. Raymond, 21 Dec 1998
# Input stacking and error message cleanup added by ESR, March 2000
# push_source() and pop_source() made explicit by ESR, January 2001.
# Posix compliance, split(), string arguments, and
# iterator interface by Gustavo Niemeyer, April 2003.
# changes to tokenize more like Posix shells by Vinay Sajip, January 2012.
import os
import re
import sys
from collections import deque
from io import StringIO
__all__ = ["shlex", "split", "quote"]
class shlex:
"""
A lexical analyzer class for simple shell-like syntaxes.
"""
def __init__(
self, instream=None, infile=None, posix=False, punctuation_chars=False
):
if instream is not None:
instream = StringIO(instream)
if instream is not None:
self.instream = instream
self.infile = infile
else:
self.instream = sys.stdin
self.infile = None
self.posix = posix
if posix:
self.eof = None
else:
self.eof = ""
self.commenters = "#"
self.wordchars = (
"abcdfeghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
)
self.whitespace = " \t\r\n"
self.whitespace_split = False
self.quotes = "'\""
self.escape = "\\"
self.escapedquotes = '"'
self.state = " "
self.pushback = deque()
self.lineno = 1
self.debug = 0
self.token = ""
self.filestack = deque()
self.source = None
if not punctuation_chars:
punctuation_chars = ""
elif punctuation_chars is True:
punctuation_chars = "();<>|&"
self.punctuation_chars = punctuation_chars
if punctuation_chars:
# _pushback_chars is a push back queue used by lookahead logic
self._pushback_chars = deque()
# these chars added because allowed in file names, args, wildcards
# self.wordchars += '@%{}~-./*?=$:+^'
self.wordchars += "~-./*?="
# remove any punctuation chars from wordchars
self.wordchars = "".join(
c for c in self.wordchars if c not in self.punctuation_chars
)
for c in punctuation_chars:
if c in self.wordchars:
self.wordchars.remove(c)
if self.debug:
print("shlex: reading from %s, line %d" % (self.instream, self.lineno))
def push_token(self, tok):
"""
Push a token onto the stack popped by the get_token method" \
"""
if self.debug >= 1:
print("shlex: pushing token " + repr(tok))
self.pushback.appendleft(tok)
def push_source(self, newstream, newfile=None):
"Push an input source onto the lexer's input source stack."
if isinstance(newstream, str):
newstream = StringIO(newstream)
self.filestack.appendleft((self.infile, self.instream, self.lineno))
self.infile = newfile
self.instream = newstream
self.lineno = 1
if self.debug:
if newfile is not None:
print(f"shlex: pushing to file {self.infile}")
else:
print(f"shlex: pushing to stream {self.instream}")
def pop_source(self):
"""
Pop the input source stack.
"""
self.instream.close()
(self.infile, self.instream, self.lineno) = self.filestack.popleft()
if self.debug:
print(f"shlex: popping to {self.instream}, line {self.lineno}")
self.state = " "
def get_token(self):
"""
Get a token from the input stream (or from stack if it's nonempty)
"""
if self.pushback:
tok = self.pushback.popleft()
if self.debug >= 1:
print("shlex: popping token " + repr(tok))
return tok
# No pushback. Get a token.
raw = self.read_token()
# Handle inclusions
if self.source is not None:
while raw == self.source:
spec = self.sourcehook(self.read_token())
if spec:
(newfile, newstream) = spec
self.push_source(newstream, newfile)
raw = self.get_token()
# Maybe we got EOF instead?
while raw == self.eof:
if not self.filestack:
return self.eof
else:
self.pop_source()
raw = self.get_token()
# Neither inclusion nor EOF
if self.debug >= 1:
if raw != self.eof:
print("shlex: token=" + repr(raw))
else:
print("shlex: token=EOF")
return raw
def read_token(self):
quoted = False
escapedstate = " "
while True:
if self.punctuation_chars and self._pushback_chars:
nextchar = self._pushback_chars.pop()
else:
nextchar = self.instream.read(1)
if nextchar == "\n":
self.lineno += 1
if self.debug >= 3:
print(
"shlex: in state {!r} I see character: {!r}".format(
self.state, nextchar
)
)
if self.state is None:
self.token = "" # past end of file
break
elif self.state == " ":
if not nextchar:
self.state = None # end of file
break
elif nextchar in self.whitespace:
if self.debug >= 2:
print("shlex: I see whitespace in whitespace state")
if self.token or (self.posix and quoted):
break # emit current token
else:
continue
elif nextchar in self.commenters:
self.instream.readline()
self.lineno += 1
elif self.posix and nextchar in self.escape:
escapedstate = "a"
self.state = nextchar
elif nextchar in self.wordchars:
self.token = nextchar
self.state = "a"
elif nextchar in self.punctuation_chars:
self.token = nextchar
self.state = "c"
elif nextchar in self.quotes:
if not self.posix:
self.token = nextchar
self.state = nextchar
elif self.whitespace_split:
self.token = nextchar
self.state = "a"
else:
self.token = nextchar
if self.token or (self.posix and quoted):
break # emit current token
else:
continue
elif self.state in self.quotes:
quoted = True
if not nextchar: # end of file
if self.debug >= 2:
print("shlex: I see EOF in quotes state")
# XXX what error should be raised here?
raise ValueError("No closing quotation")
if nextchar == self.state:
if not self.posix:
self.token += nextchar
self.state = " "
break
else:
self.state = "a"
elif (
self.posix
and nextchar in self.escape
and self.state in self.escapedquotes
):
escapedstate = self.state
self.state = nextchar
else:
self.token += nextchar
elif self.state in self.escape:
if not nextchar: # end of file
if self.debug >= 2:
print("shlex: I see EOF in escape state")
# XXX what error should be raised here?
raise ValueError("No escaped character")
# In posix shells, only the quote itself or the escape
# character may be escaped within quotes.
if (
escapedstate in self.quotes
and nextchar != self.state
and nextchar != escapedstate
):
self.token += self.state
self.token += nextchar
self.state = escapedstate
elif self.state in ("a", "c"):
if not nextchar:
self.state = None # end of file
break
elif nextchar in self.whitespace:
if self.debug >= 2:
print("shlex: I see whitespace in word state")
self.state = " "
if self.token or (self.posix and quoted):
break # emit current token
else:
continue
elif nextchar in self.commenters:
self.instream.readline()
self.lineno += 1
if self.posix:
self.state = " "
if self.token or (self.posix and quoted):
break # emit current token
else:
continue
elif self.posix and nextchar in self.quotes:
self.state = nextchar
elif self.posix and nextchar in self.escape:
escapedstate = "a"
self.state = nextchar
elif self.state == "c":
if nextchar in self.punctuation_chars:
self.token += nextchar
else:
if nextchar not in self.whitespace:
self._pushback_chars.append(nextchar)
self.state = " "
break
elif (
nextchar in self.wordchars
or nextchar in self.quotes
or self.whitespace_split
):
self.token += nextchar
else:
if self.punctuation_chars:
self._pushback_chars.append(nextchar)
else:
self.pushback.appendleft(nextchar)
if self.debug >= 2:
print("shlex: I see punctuation in word state")
self.state = " "
if self.token or (self.posix and quoted):
break # emit current token
else:
continue
result = self.token
self.token = ""
if self.posix and not quoted and result == "":
result = None
if self.debug > 1:
if result:
print("shlex: raw token=" + repr(result))
else:
print("shlex: raw token=EOF")
return result
def sourcehook(self, newfile):
"Hook called on a filename to be sourced."
if newfile[0] == '"':
newfile = newfile[1:-1]
# This implements cpp-like semantics for relative-path inclusion.
if isinstance(self.infile, str) and not os.path.isabs(newfile):
newfile = os.path.join(os.path.dirname(self.infile), newfile)
return (newfile, open(newfile))
def error_leader(self, infile=None, lineno=None):
"Emit a C-compiler-like, Emacs-friendly error-message leader."
if infile is None:
infile = self.infile
if lineno is None:
lineno = self.lineno
return '"%s", line %d: ' % (infile, lineno)
def __iter__(self):
return self
def __next__(self):
token = self.get_token()
if token == self.eof:
raise StopIteration
return token
# For Python 2.x
next = __next__
def split(s, comments=False, posix=True):
lex = shlex(s, posix=posix)
lex.whitespace_split = True
if not comments:
lex.commenters = ""
return list(lex)
# No ASCII in P2.x
_find_unsafe = re.compile(r"[^\w@%+=:,./-]").search
def quote(s):
"""
Return a shell-escaped version of the string *s*.
"""
if not s:
return "''"
if _find_unsafe(s) is None:
return s
# use single quotes, and put single quotes into double quotes
# the string $'b is then quoted as '$'"'"'b'
return "'" + s.replace("'", "'\"'\"'") + "'"
if __name__ == "__main__":
if len(sys.argv) == 1:
lexer = shlex()
else:
file = sys.argv[1]
lexer = shlex(open(file), file)
while 1:
tt = lexer.get_token()
if tt:
print("Token: " + repr(tt))
else:
break

View File

@ -28,10 +28,10 @@ class CowrieSSHChannel(channel.SSHChannel):
bytesWritten = 0
name = b"cowrie-ssh-channel"
startTime = None
ttylogPath = CowrieConfig().get("honeypot", "log_path")
downloadPath = CowrieConfig().get("honeypot", "download_path")
ttylogEnabled = CowrieConfig().getboolean("honeypot", "ttylog", fallback=True)
bytesReceivedLimit = CowrieConfig().getint(
ttylogPath = CowrieConfig.get("honeypot", "log_path")
downloadPath = CowrieConfig.get("honeypot", "download_path")
ttylogEnabled = CowrieConfig.getboolean("honeypot", "ttylog", fallback=True)
bytesReceivedLimit = CowrieConfig.getint(
"honeypot", "download_limit_size", fallback=0
)

View File

@ -35,7 +35,7 @@ class CowrieSSHFactory(factory.SSHFactory):
publicKeys = None
primes = None
tac = None # gets set later
ourVersionString = CowrieConfig().get(
ourVersionString = CowrieConfig.get(
"ssh", "version", fallback="SSH-2.0-OpenSSH_6.0p1 Debian-4+deb7u2"
)
@ -83,7 +83,7 @@ class CowrieSSHFactory(factory.SSHFactory):
pass
# this can come from backend in the future, check HonSSH's slim client
self.ourVersionString = CowrieConfig().get(
self.ourVersionString = CowrieConfig.get(
"ssh", "version", fallback="SSH-2.0-OpenSSH_6.0p1 Debian-4+deb7u2"
)
@ -123,8 +123,7 @@ class CowrieSSHFactory(factory.SSHFactory):
try:
t.supportedCiphers = [
i.encode("utf-8")
for i in CowrieConfig().get("ssh", "ciphers").split(",")
i.encode("utf-8") for i in CowrieConfig.get("ssh", "ciphers").split(",")
]
except NoOptionError:
# Reorder supported ciphers to resemble current openssh more
@ -142,7 +141,7 @@ class CowrieSSHFactory(factory.SSHFactory):
try:
t.supportedMACs = [
i.encode("utf-8") for i in CowrieConfig().get("ssh", "macs").split(",")
i.encode("utf-8") for i in CowrieConfig.get("ssh", "macs").split(",")
]
except NoOptionError:
# SHA1 and MD5 are considered insecure now. Use better algos
@ -158,7 +157,7 @@ class CowrieSSHFactory(factory.SSHFactory):
try:
t.supportedCompressions = [
i.encode("utf-8")
for i in CowrieConfig().get("ssh", "compression").split(",")
for i in CowrieConfig.get("ssh", "compression").split(",")
]
except NoOptionError:
t.supportedCompressions = [b"zlib@openssh.com", b"zlib", b"none"]

View File

@ -29,12 +29,10 @@ def cowrieOpenConnectForwardingClient(remoteWindow, remoteMaxPacket, data, avata
)
# Forward redirect
redirectEnabled = CowrieConfig().getboolean(
"ssh", "forward_redirect", fallback=False
)
redirectEnabled = CowrieConfig.getboolean("ssh", "forward_redirect", fallback=False)
if redirectEnabled:
redirects = {}
items = CowrieConfig().items("ssh")
items = CowrieConfig.items("ssh")
for i in items:
if i[0].startswith("forward_redirect_"):
destPort = i[0].split("_")[-1]
@ -58,10 +56,10 @@ def cowrieOpenConnectForwardingClient(remoteWindow, remoteMaxPacket, data, avata
)
# TCP tunnel
tunnelEnabled = CowrieConfig().getboolean("ssh", "forward_tunnel", fallback=False)
tunnelEnabled = CowrieConfig.getboolean("ssh", "forward_tunnel", fallback=False)
if tunnelEnabled:
tunnels = {}
items = CowrieConfig().items("ssh")
items = CowrieConfig.items("ssh")
for i in items:
if i[0].startswith("forward_tunnel_"):
destPort = i[0].split("_")[-1]

View File

@ -15,8 +15,8 @@ from cowrie.core.config import CowrieConfig
def getRSAKeys():
publicKeyFile = CowrieConfig().get("ssh", "rsa_public_key")
privateKeyFile = CowrieConfig().get("ssh", "rsa_private_key")
publicKeyFile = CowrieConfig.get("ssh", "rsa_public_key")
privateKeyFile = CowrieConfig.get("ssh", "rsa_private_key")
if not (os.path.exists(publicKeyFile) and os.path.exists(privateKeyFile)):
log.msg("Generating new RSA keypair...")
from cryptography.hazmat.backends import default_backend
@ -40,8 +40,8 @@ def getRSAKeys():
def getDSAKeys():
publicKeyFile = CowrieConfig().get("ssh", "dsa_public_key")
privateKeyFile = CowrieConfig().get("ssh", "dsa_private_key")
publicKeyFile = CowrieConfig.get("ssh", "dsa_public_key")
privateKeyFile = CowrieConfig.get("ssh", "dsa_private_key")
if not (os.path.exists(publicKeyFile) and os.path.exists(privateKeyFile)):
log.msg("Generating new DSA keypair...")
from cryptography.hazmat.backends import default_backend

View File

@ -27,10 +27,10 @@ class HoneyPotSSHTransport(transport.SSHServerTransport, TimeoutMixin):
startTime = None
gotVersion = False
ipv4rex = re.compile(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$")
auth_timeout = CowrieConfig().getint(
auth_timeout = CowrieConfig.getint(
"honeypot", "authentication_timeout", fallback=120
)
interactive_timeout = CowrieConfig().getint(
interactive_timeout = CowrieConfig.getint(
"honeypot", "interactive_timeout", fallback=300
)

View File

@ -28,7 +28,7 @@ class HoneyPotSSHUserAuthServer(userauth.SSHUserAuthServer):
def serviceStarted(self):
self.interfaceToMethod[credentials.IUsername] = b"none"
self.interfaceToMethod[credentials.IUsernamePasswordIP] = b"password"
keyboard = CowrieConfig().getboolean(
keyboard = CowrieConfig.getboolean(
"ssh", "auth_keyboard_interactive_enabled", fallback=False
)
@ -49,9 +49,7 @@ class HoneyPotSSHUserAuthServer(userauth.SSHUserAuthServer):
return
self.bannerSent = True
try:
issuefile = (
CowrieConfig().get("honeypot", "contents_path") + "/etc/issue.net"
)
issuefile = CowrieConfig.get("honeypot", "contents_path") + "/etc/issue.net"
data = open(issuefile).read()
except OSError:
return

View File

@ -73,8 +73,8 @@ class BackendSSHTransport(transport.SSHClientTransport, TimeoutMixin):
# we authenticate with the backend using the credentials provided
# TODO create the account in the backend before (contact the pool of VMs for example)
# so these credentials from the config may not be needed after all
username = CowrieConfig().get("proxy", "backend_user").encode()
password = CowrieConfig().get("proxy", "backend_pass").encode()
username = CowrieConfig.get("proxy", "backend_user").encode()
password = CowrieConfig.get("proxy", "backend_pass").encode()
log.msg(f"Will auth with backend: {username}/{password}")
self.sendPacket(5, bin_string_to_hex(b"ssh-userauth"))

View File

@ -50,8 +50,8 @@ class ExecTerm(base_protocol.BaseProtocol):
self.channelId = channelId
self.startTime = time.time()
self.ttylogPath = CowrieConfig().get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig().getboolean(
self.ttylogPath = CowrieConfig.get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig.getboolean(
"honeypot", "ttylog", fallback=True
)
self.ttylogSize = 0

View File

@ -113,7 +113,7 @@ class SSH(base_protocol.BaseProtocol):
direction = "BACKEND -> PROXY"
# log raw packets if user sets so
if CowrieConfig().getboolean("proxy", "log_raw", fallback=False):
if CowrieConfig.getboolean("proxy", "log_raw", fallback=False):
log.msg(
eventid="cowrie.proxy.ssh",
format="%(direction)s - %(packet)s - %(payload)s",
@ -188,9 +188,7 @@ class SSH(base_protocol.BaseProtocol):
if channel_type == b"session":
# if using an interactive session reset frontend timeout
self.server.setTimeout(
CowrieConfig().getint(
"honeypot", "interactive_timeout", fallback=300
)
CowrieConfig.getint("honeypot", "interactive_timeout", fallback=300)
)
self.create_channel(parent, channel_id, channel_type)
@ -205,7 +203,7 @@ class SSH(base_protocol.BaseProtocol):
src_ip = self.extract_string()
src_port = self.extract_int(4)
if CowrieConfig().getboolean("ssh", "forwarding"):
if CowrieConfig.getboolean("ssh", "forwarding"):
log.msg(
eventid="cowrie.direct-tcpip.request",
format="direct-tcp connection request to %(dst_ip)s:%(dst_port)s "
@ -295,7 +293,7 @@ class SSH(base_protocol.BaseProtocol):
subsystem = self.extract_string()
if subsystem == b"sftp":
if CowrieConfig().getboolean("ssh", "sftp_enabled"):
if CowrieConfig.getboolean("ssh", "sftp_enabled"):
channel["name"] = "[SFTP" + str(channel["serverID"]) + "]"
# self.out.channel_opened(the_uuid, channel['name'])
channel["session"] = sftp.SFTP(the_uuid, channel["name"], self)
@ -351,7 +349,7 @@ class SSH(base_protocol.BaseProtocol):
elif packet == "SSH_MSG_GLOBAL_REQUEST":
channel_type = self.extract_string()
if channel_type == b"tcpip-forward":
if not CowrieConfig().getboolean(["ssh", "forwarding"]):
if not CowrieConfig.getboolean(["ssh", "forwarding"]):
self.sendOn = False
self.send_back(parent, 82, "")

View File

@ -49,8 +49,8 @@ class Term(base_protocol.BaseProtocol):
self.channelId = channelId
self.startTime = time.time()
self.ttylogPath = CowrieConfig().get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig().getboolean(
self.ttylogPath = CowrieConfig.get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig.getboolean(
"honeypot", "ttylog", fallback=True
)
self.ttylogSize = 0

View File

@ -112,7 +112,7 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
# if we have a pool connect to it and later request a backend, else just connect to a simple backend
# when pool is set we can just test self.pool_interface to the same effect of getting the CowrieConfig
proxy_backend = CowrieConfig().get("proxy", "backend", fallback="simple")
proxy_backend = CowrieConfig.get("proxy", "backend", fallback="simple")
if proxy_backend == "pool":
# request a backend
@ -121,8 +121,8 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
d.addErrback(self.pool_connection_error)
else:
# simply a proxy, no pool
backend_ip = CowrieConfig().get("proxy", "backend_ssh_host")
backend_port = CowrieConfig().getint("proxy", "backend_ssh_port")
backend_ip = CowrieConfig.get("proxy", "backend_ssh_host")
backend_port = CowrieConfig.getint("proxy", "backend_ssh_port")
self.connect_to_backend(backend_ip, backend_port)
def pool_connection_error(self, reason):
@ -164,7 +164,7 @@ class FrontendSSHTransport(transport.SSHServerTransport, TimeoutMixin):
# this timeout is replaced with `interactive_timeout` in ssh.py
self.setTimeout(
CowrieConfig().getint("honeypot", "authentication_timeout", fallback=120)
CowrieConfig.getint("honeypot", "authentication_timeout", fallback=120)
)
def connect_to_backend(self, ip, port):

View File

@ -43,7 +43,7 @@ class HoneyPotTelnetFactory(protocol.ServerFactory):
def startFactory(self):
try:
honeyfs = CowrieConfig().get("honeypot", "contents_path")
honeyfs = CowrieConfig.get("honeypot", "contents_path")
issuefile = honeyfs + "/etc/issue.net"
self.banner = open(issuefile, "rb").read()
except OSError:

View File

@ -23,7 +23,7 @@ class CowrieTelnetTransport(TelnetTransport, TimeoutMixin):
self.startTime = time.time()
self.setTimeout(
CowrieConfig().getint("honeypot", "authentication_timeout", fallback=120)
CowrieConfig.getint("honeypot", "authentication_timeout", fallback=120)
)
log.msg(

View File

@ -98,7 +98,7 @@ class HoneyPotTelnetAuthProtocol(AuthenticatingTelnetProtocol):
# Remove the short timeout of the login prompt.
self.transport.setTimeout(
CowrieConfig().getint("honeypot", "interactive_timeout", fallback=300)
CowrieConfig.getint("honeypot", "interactive_timeout", fallback=300)
)
# replace myself with avatar protocol

View File

@ -45,28 +45,22 @@ class TelnetHandler:
self.client = None
# definitions from config
self.spoofAuthenticationData = CowrieConfig().getboolean(
self.spoofAuthenticationData = CowrieConfig.getboolean(
"proxy", "telnet_spoof_authentication"
)
self.backendLogin = CowrieConfig().get("proxy", "backend_user").encode()
self.backendPassword = CowrieConfig().get("proxy", "backend_pass").encode()
self.backendLogin = CowrieConfig.get("proxy", "backend_user").encode()
self.backendPassword = CowrieConfig.get("proxy", "backend_pass").encode()
self.usernameInNegotiationRegex = (
CowrieConfig()
.get("proxy", "telnet_username_in_negotiation_regex", raw=True)
.encode()
)
self.usernamePromptRegex = (
CowrieConfig()
.get("proxy", "telnet_username_prompt_regex", raw=True)
.encode()
)
self.passwordPromptRegex = (
CowrieConfig()
.get("proxy", "telnet_password_prompt_regex", raw=True)
.encode()
)
self.usernameInNegotiationRegex = CowrieConfig.get(
"proxy", "telnet_username_in_negotiation_regex", raw=True
).encode()
self.usernamePromptRegex = CowrieConfig.get(
"proxy", "telnet_username_prompt_regex", raw=True
).encode()
self.passwordPromptRegex = CowrieConfig.get(
"proxy", "telnet_password_prompt_regex", raw=True
).encode()
# telnet state
self.currentCommand = b""
@ -92,8 +86,8 @@ class TelnetHandler:
# tty logging
self.startTime = time.time()
self.ttylogPath = CowrieConfig().get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig().getboolean(
self.ttylogPath = CowrieConfig.get("honeypot", "ttylog_path")
self.ttylogEnabled = CowrieConfig.getboolean(
"honeypot", "ttylog", fallback=True
)
self.ttylogSize = 0
@ -146,7 +140,7 @@ class TelnetHandler:
for packet in self.backend_buffer:
self.client.transport.write(packet)
# log raw packets if user sets so
if CowrieConfig().getboolean("proxy", "log_raw", fallback=False):
if CowrieConfig.getboolean("proxy", "log_raw", fallback=False):
log.msg(b"to_backend - " + data)
if self.ttylogEnabled and self.authStarted:
@ -168,7 +162,7 @@ class TelnetHandler:
self.server.transport.write(data)
# log raw packets if user sets so
if CowrieConfig().getboolean("proxy", "log_raw", fallback=False):
if CowrieConfig.getboolean("proxy", "log_raw", fallback=False):
log.msg(b"to_frontend - " + data)
if self.ttylogEnabled and self.authStarted:
@ -292,9 +286,7 @@ class TelnetHandler:
passwordToSend = self.backendPassword
self.authDone = True
self.server.setTimeout(
CowrieConfig().getint(
"honeypot", "interactive_timeout", fallback=300
)
CowrieConfig.getint("honeypot", "interactive_timeout", fallback=300)
)
else:
log.msg("Sending invalid auth to backend")

View File

@ -72,7 +72,7 @@ class FrontendTelnetTransport(TelnetTransport, TimeoutMixin):
# if we have a pool connect to it and later request a backend, else just connect to a simple backend
# when pool is set we can just test self.pool_interface to the same effect of getting the config
proxy_backend = CowrieConfig().get("proxy", "backend", fallback="simple")
proxy_backend = CowrieConfig.get("proxy", "backend", fallback="simple")
if proxy_backend == "pool":
# request a backend
@ -81,8 +81,8 @@ class FrontendTelnetTransport(TelnetTransport, TimeoutMixin):
d.addErrback(self.pool_connection_error)
else:
# simply a proxy, no pool
backend_ip = CowrieConfig().get("proxy", "backend_telnet_host")
backend_port = CowrieConfig().getint("proxy", "backend_telnet_port")
backend_ip = CowrieConfig.get("proxy", "backend_telnet_host")
backend_port = CowrieConfig.getint("proxy", "backend_telnet_port")
self.connect_to_backend(backend_ip, backend_port)
def pool_connection_error(self, reason):
@ -122,7 +122,7 @@ class FrontendTelnetTransport(TelnetTransport, TimeoutMixin):
self.startTime = time.time()
self.setTimeout(
CowrieConfig().getint("honeypot", "authentication_timeout", fallback=120)
CowrieConfig.getint("honeypot", "authentication_timeout", fallback=120)
)
def connect_to_backend(self, ip, port):

View File

@ -3,6 +3,7 @@
# Copyright (c) 2016 Dave Germiquet
# See LICENSE for details.
from typing import Callable, Dict, List, Optional, Set
from twisted.conch.insults import insults
from twisted.test import proto_helpers
@ -11,7 +12,7 @@ from twisted.test import proto_helpers
class Container:
"""
This class is placeholder for creating a fake interface
@var host Client fake infomration
@var host Client fake information
@var port Fake Port for connection
@var otherVersionString version
@var
@ -20,6 +21,13 @@ class Container:
otherVersionString = "1.0"
transportId = "test-suite"
id = "test-suite"
sessionno = 1
starttime = 0
session: Optional["Container"]
sessions: Dict[int, str] = {}
conn: Optional["Container"]
transport: Optional["Container"]
factory: Optional["Container"]
def getPeer(self):
"""
@ -43,7 +51,7 @@ class FakeTransport(proto_helpers.StringTransport):
# Thanks to TerminalBuffer (some code was taken from twisted Terminal Buffer)
redirFiles = set()
redirFiles: Set[List[str]] = set()
width = 80
height = 24
void = object()
@ -78,7 +86,7 @@ class FakeTransport(proto_helpers.StringTransport):
TAB = "\x09"
BACKSPACE = "\x08"
modes = {}
modes: Dict[str, Callable] = {}
# '\x01': self.handle_HOME, # CTRL-A
# '\x02': self.handle_LEFT, # CTRL-B
@ -109,7 +117,7 @@ class FakeTransport(proto_helpers.StringTransport):
transport.session.conn.transport.factory.sessions = {}
transport.session.conn.transport.factory.starttime = 0
factory = Container()
session = {}
session: Dict[str, str] = {}
def abortConnection(self):
self.aborting = True

View File

@ -29,6 +29,7 @@
import os
import sys
from typing import List
from backend_pool.pool_server import PoolServerFactory
@ -66,8 +67,7 @@ class Options(usage.Options):
"""
# The '-c' parameters is currently ignored
optParameters = []
optParameters: List[str] = []
optFlags = [["help", "h", "Display this help and exit."]]
@ -96,15 +96,13 @@ class CowrieServiceMaker:
self.pool_handler = None
# ssh is enabled by default
self.enableSSH = CowrieConfig().getboolean("ssh", "enabled", fallback=True)
self.enableSSH = CowrieConfig.getboolean("ssh", "enabled", fallback=True)
# telnet is disabled by default
self.enableTelnet = CowrieConfig().getboolean(
"telnet", "enabled", fallback=False
)
self.enableTelnet = CowrieConfig.getboolean("telnet", "enabled", fallback=False)
# pool is disabled by default, but need to check this setting in case user only wants to run the pool
self.pool_only = CowrieConfig().getboolean(
self.pool_only = CowrieConfig.getboolean(
"backend_pool", "pool_only", fallback=False
)
@ -128,7 +126,7 @@ Makes a Cowrie SSH/Telnet honeypot.
print("ERROR: You must not run cowrie as root!")
sys.exit(1)
tz = CowrieConfig().get("honeypot", "timezone", fallback="UTC")
tz = CowrieConfig.get("honeypot", "timezone", fallback="UTC")
# `system` means use the system time zone
if tz != "system":
os.environ["TZ"] = tz
@ -158,10 +156,10 @@ Makes a Cowrie SSH/Telnet honeypot.
# Load output modules
self.output_plugins = []
for x in CowrieConfig().sections():
for x in CowrieConfig.sections():
if not x.startswith("output_"):
continue
if CowrieConfig().getboolean(x, "enabled") is False:
if CowrieConfig.getboolean(x, "enabled") is False:
continue
engine = x.split("_")[1]
try:
@ -188,17 +186,15 @@ Makes a Cowrie SSH/Telnet honeypot.
# initialise VM pool handling - only if proxy AND pool set to enabled, and pool is to be deployed here
# or also enabled if pool_only is true
backend_type = CowrieConfig().get("honeypot", "backend", fallback="shell")
proxy_backend = CowrieConfig().get("proxy", "backend", fallback="simple")
backend_type = CowrieConfig.get("honeypot", "backend", fallback="shell")
proxy_backend = CowrieConfig.get("proxy", "backend", fallback="simple")
if (backend_type == "proxy" and proxy_backend == "pool") or self.pool_only:
# in this case we need to set some kind of pool connection
local_pool = (
CowrieConfig().get("proxy", "pool", fallback="local") == "local"
)
pool_host = CowrieConfig().get("proxy", "pool_host", fallback="127.0.0.1")
pool_port = CowrieConfig().getint("proxy", "pool_port", fallback=6415)
local_pool = CowrieConfig.get("proxy", "pool", fallback="local") == "local"
pool_host = CowrieConfig.get("proxy", "pool_host", fallback="127.0.0.1")
pool_port = CowrieConfig.getint("proxy", "pool_port", fallback=6415)
if local_pool or self.pool_only:
# start a pool locally
@ -206,7 +202,7 @@ Makes a Cowrie SSH/Telnet honeypot.
f.tac = self
listen_endpoints = get_endpoints_from_section(
CowrieConfig(), "backend_pool", 6415
CowrieConfig, "backend_pool", 6415
)
create_endpoint_services(reactor, self.topService, listen_endpoints, f)
@ -224,7 +220,7 @@ Makes a Cowrie SSH/Telnet honeypot.
return self.topService
def pool_ready(self):
backend = CowrieConfig().get("honeypot", "backend", fallback="shell")
backend = CowrieConfig.get("honeypot", "backend", fallback="shell")
# this method is never called if self.pool_only is False,
# since we do not start the pool handler that would call it
@ -235,16 +231,14 @@ Makes a Cowrie SSH/Telnet honeypot.
factory.portal.registerChecker(core.checkers.HoneypotPublicKeyChecker())
factory.portal.registerChecker(core.checkers.HoneypotPasswordChecker())
if CowrieConfig().getboolean("ssh", "auth_none_enabled", fallback=False):
if CowrieConfig.getboolean("ssh", "auth_none_enabled", fallback=False):
factory.portal.registerChecker(core.checkers.HoneypotNoneChecker())
if CowrieConfig().has_section("ssh"):
listen_endpoints = get_endpoints_from_section(
CowrieConfig(), "ssh", 2222
)
if CowrieConfig.has_section("ssh"):
listen_endpoints = get_endpoints_from_section(CowrieConfig, "ssh", 2222)
else:
listen_endpoints = get_endpoints_from_section(
CowrieConfig(), "honeypot", 2222
CowrieConfig, "honeypot", 2222
)
create_endpoint_services(
@ -257,9 +251,7 @@ Makes a Cowrie SSH/Telnet honeypot.
f.portal = portal.Portal(core.realm.HoneyPotRealm())
f.portal.registerChecker(core.checkers.HoneypotPasswordChecker())
listen_endpoints = get_endpoints_from_section(
CowrieConfig(), "telnet", 2223
)
listen_endpoints = get_endpoints_from_section(CowrieConfig, "telnet", 2223)
create_endpoint_services(reactor, self.topService, listen_endpoints, f)

View File

@ -48,11 +48,12 @@ basepython = python3.8
description = run Mypy (static type checker)
deps =
mypy==0.812
mypy-zope==0.2.13
-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements-dev.txt
commands =
mypy \
--cache-dir="{toxworkdir}/mypy_cache" \
--config-file="{toxinidir}/mypy.ini" \
{tty:--pretty:} \
{posargs:src}