5 Commits

Author SHA1 Message Date
7be73a7dff Make peername and sockname calls more robust across platforms
For whatever reason, MacOS returns 4 values from conn.get_extra_info('peername') and conn.get_extra_info('sockname'), but Linux systems only return 2.  On the Mac, it's only the first two that we need anyway. Now we retrieve them all, no matter how many there are, and just use the first two so it will work on both platforms.
2025-01-28 10:39:12 -05:00
788bd26845 Now print exceptions to console when SSH connection is lost 2025-01-28 10:21:27 -05:00
cea5dc28a2 New command line options for prompts and config files.
* --prompt-file to specify a file from which to read the prompt.
* --prompt to specify a prompt string on the command line
* --config to specify an alternate config file
2025-01-27 13:20:41 -05:00
545d50f294 Added DECEIVE image to README 2025-01-23 11:16:53 -05:00
32441dc4c0 Merge pull request #1 from splunk/user-system-prompt
Streamline the prompting
2025-01-17 19:37:52 +00:00
3 changed files with 112 additions and 64 deletions

BIN
DECEIVE.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 600 KiB

View File

@ -1,5 +1,7 @@
# DECEIVE # DECEIVE
<img align="right" src="DECEIVE.png" alt="A cybercriminal interacts with a ghostly, AI-driven honeypot system">
DECEIVE, the **DECeption with Evaluative Integrated Validation Engine**, is a high-interaction, low-effort honeypot system. Unlike most high-interaction honeypots, DECEIVE doesn't provide attackers with access to any actual system. AI actually does all the work of simulating a realistic honeypot system based on a configurable system prompt that describes what type of system you want to simulate. Unlike many other high-interaction honeypots which require substantial effort to seed with realistic users, data, and applications, DECEIVE's AI backend will do all this for you, automatically. DECEIVE, the **DECeption with Evaluative Integrated Validation Engine**, is a high-interaction, low-effort honeypot system. Unlike most high-interaction honeypots, DECEIVE doesn't provide attackers with access to any actual system. AI actually does all the work of simulating a realistic honeypot system based on a configurable system prompt that describes what type of system you want to simulate. Unlike many other high-interaction honeypots which require substantial effort to seed with realistic users, data, and applications, DECEIVE's AI backend will do all this for you, automatically.
This version of DECEIVE simulates a Linux server via the SSH protocol. It will log all the user inputs, the outputs returned by the LLM backend, as well as a summary of each session after they end. It'll even tell you if it thinks a users' session was benign, suspicious, or outright malicious. This version of DECEIVE simulates a Linux server via the SSH protocol. It will log all the user inputs, the outputs returned by the LLM backend, as well as a summary of each session after they end. It'll even tell you if it thinks a users' session was benign, suspicious, or outright malicious.

View File

@ -1,11 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from configparser import ConfigParser from configparser import ConfigParser
import argparse
import asyncio import asyncio
import asyncssh import asyncssh
import threading import threading
import sys import sys
import json import json
import os
import traceback
from typing import Optional from typing import Optional
import logging import logging
import datetime import datetime
@ -46,8 +49,18 @@ class MySSHServer(asyncssh.SSHServer):
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None: def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
# Get the source and destination IPs and ports # Get the source and destination IPs and ports
(src_ip, src_port, _, _) = conn.get_extra_info('peername') peername = conn.get_extra_info('peername')
(dst_ip, dst_port, _, _) = conn.get_extra_info('sockname') sockname = conn.get_extra_info('sockname')
if peername is not None:
src_ip, src_port = peername[:2]
else:
src_ip, src_port = '-', '-'
if sockname is not None:
dst_ip, dst_port = sockname[:2]
else:
dst_ip, dst_port = '-', '-'
# Store the connection details in thread-local storage # Store the connection details in thread-local storage
thread_local.src_ip = src_ip thread_local.src_ip = src_ip
@ -61,6 +74,7 @@ class MySSHServer(asyncssh.SSHServer):
def connection_lost(self, exc: Optional[Exception]) -> None: def connection_lost(self, exc: Optional[Exception]) -> None:
if exc: if exc:
logger.error('SSH connection error', extra={"error": str(exc)}) logger.error('SSH connection error', extra={"error": str(exc)})
traceback.print_exception(exc)
else: else:
logger.info("SSH connection closed") logger.info("SSH connection closed")
# Ensure session summary is called on connection loss if attributes are set # Ensure session summary is called on connection loss if attributes are set
@ -285,10 +299,24 @@ def choose_llm():
return llm_model return llm_model
def get_prompts() -> dict: def get_prompts(prompt: Optional[str], prompt_file: Optional[str]) -> dict:
system_prompt = config['llm']['system_prompt'] system_prompt = config['llm']['system_prompt']
if prompt is not None:
if not prompt.strip():
print("Error: The prompt text cannot be empty.", file=sys.stderr)
sys.exit(1)
user_prompt = prompt
elif prompt_file:
if not os.path.exists(prompt_file):
print(f"Error: The specified prompt file '{prompt_file}' does not exist.", file=sys.stderr)
sys.exit(1)
with open(prompt_file, "r") as f:
user_prompt = f.read()
elif os.path.exists("prompt.txt"):
with open("prompt.txt", "r") as f: with open("prompt.txt", "r") as f:
user_prompt = f.read() user_prompt = f.read()
else:
raise ValueError("Either prompt or prompt_file must be provided.")
return { return {
"system_prompt": system_prompt, "system_prompt": system_prompt,
"user_prompt": user_prompt "user_prompt": user_prompt
@ -296,48 +324,61 @@ def get_prompts() -> dict:
#### MAIN #### #### MAIN ####
# Always use UTC for logging try:
logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).isoformat(sep="T",timespec="milliseconds")) # Parse command line arguments
parser = argparse.ArgumentParser(description='Start the SSH honeypot server.')
parser.add_argument('-c', '--config', type=str, default='config.ini', help='Path to the configuration file')
parser.add_argument('-p', '--prompt', type=str, help='The entire text of the prompt')
parser.add_argument('-f', '--prompt-file', type=str, default='prompt.txt', help='Path to the prompt file')
args = parser.parse_args()
# Read our configuration file # Check if the config file exists
config = ConfigParser() if not os.path.exists(args.config):
config.read("config.ini") print(f"Error: The specified config file '{args.config}' does not exist.", file=sys.stderr)
sys.exit(1)
# Read the user accounts from the configuration file # Always use UTC for logging
accounts = get_user_accounts() logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).isoformat(sep="T",timespec="milliseconds"))
# Set up the honeypot logger # Read our configuration file
logger = logging.getLogger(__name__) config = ConfigParser()
logger.setLevel(logging.INFO) config.read(args.config)
log_file_handler = logging.FileHandler(config['honeypot'].get("log_file", "ssh_log.log")) # Read the user accounts from the configuration file
logger.addHandler(log_file_handler) accounts = get_user_accounts()
log_file_handler.setFormatter(JSONFormatter()) # Set up the honeypot logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
f = ContextFilter() log_file_handler = logging.FileHandler(config['honeypot'].get("log_file", "ssh_log.log"))
logger.addFilter(f) logger.addHandler(log_file_handler)
# Now get access to the LLM log_file_handler.setFormatter(JSONFormatter())
prompts = get_prompts() f = ContextFilter()
llm_system_prompt = prompts["system_prompt"] logger.addFilter(f)
llm_user_prompt = prompts["user_prompt"]
llm = choose_llm() # Now get access to the LLM
llm_sessions = dict() prompts = get_prompts(args.prompt, args.prompt_file)
llm_system_prompt = prompts["system_prompt"]
llm_user_prompt = prompts["user_prompt"]
llm_trimmer = trim_messages( llm = choose_llm()
llm_sessions = dict()
llm_trimmer = trim_messages(
max_tokens=config['llm'].getint("trimmer_max_tokens", 64000), max_tokens=config['llm'].getint("trimmer_max_tokens", 64000),
strategy="last", strategy="last",
token_counter=llm, token_counter=llm,
include_system=True, include_system=True,
allow_partial=False, allow_partial=False,
start_on="human", start_on="human",
) )
llm_prompt = ChatPromptTemplate.from_messages( llm_prompt = ChatPromptTemplate.from_messages(
[ [
( (
"system", "system",
@ -349,25 +390,30 @@ llm_prompt = ChatPromptTemplate.from_messages(
), ),
MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="messages"),
] ]
) )
llm_chain = ( llm_chain = (
RunnablePassthrough.assign(messages=itemgetter("messages") | llm_trimmer) RunnablePassthrough.assign(messages=itemgetter("messages") | llm_trimmer)
| llm_prompt | llm_prompt
| llm | llm
) )
with_message_history = RunnableWithMessageHistory( with_message_history = RunnableWithMessageHistory(
llm_chain, llm_chain,
llm_get_session_history, llm_get_session_history,
input_messages_key="messages" input_messages_key="messages"
) )
# Thread-local storage for connection details # Thread-local storage for connection details
thread_local = threading.local() thread_local = threading.local()
# Kick off the server! # Kick off the server!
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.run_until_complete(start_server()) loop.run_until_complete(start_server())
loop.run_forever() loop.run_forever()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
traceback.print_exc()
sys.exit(1)