Merge remote-tracking branch 'Snuf/master' into pwnagotchi-torch

# Conflicts:
#	bin/pwnagotchi
#	builder/data/usr/bin/pwnlib
#	pwnagotchi/ai/__init__.py
This commit is contained in:
Jeroen Oudshoorn
2023-07-24 21:20:37 +02:00
19 changed files with 670 additions and 41 deletions

View File

@ -69,7 +69,7 @@ def do_auto_mode(agent):
# deauth all client stations in order to get a full handshake
for sta in ap['clients']:
agent.deauth(ap, sta)
time.sleep(1)
time.sleep(1) # delay to not trigger nexmon firmware bugs
# An interesting effect of this:
#

View File

@ -25,20 +25,27 @@ reload_brcm() {
if ! modprobe -r brcmfmac; then
return 1
fi
sleep 1
if ! modprobe brcmfmac; then
return 1
fi
sleep 2
iw dev wlan0 set power_save off
return 0
}
# starts mon0
start_monitor_interface() {
airmon-ng start wlan0
iw dev wlan0 set power_save off
iw phy "$(iw phy | head -1 | cut -d" " -f2)" interface add mon0 type monitor
sleep 2
ifconfig wlan0 down
ifconfig mon0 up
}
# stops mon0
stop_monitor_interface() {
airmon-ng stop wlan0
ifconfig mon0 down && iw dev mon0 del
}
# returns 0 if the specificed network interface is up
@ -67,12 +74,12 @@ is_auto_mode() {
# if usb0 is up, we're in MANU
if is_interface_up usb0; then
return 1
return 0
fi
# if eth0 is up (for other boards), we're in MANU
if is_interface_up eth0; then
return 1
return 0
fi
# no override, but none of the interfaces is up -> AUTO

View File

@ -9,7 +9,7 @@ from pwnagotchi._version import __version__
_name = None
config = None
_cpu_stats = {}
def set_name(new_name):
if new_name is None:
@ -83,13 +83,18 @@ def _cpu_stat():
return list(map(int,fp.readline().split()[1:]))
def cpu_load():
def cpu_load(tag=None):
"""
Returns the current cpuload
"""
parts0 = _cpu_stat()
time.sleep(0.1)
if tag and tag in _cpu_stats.keys():
parts0 = _cpu_stats[tag]
else:
parts0 = _cpu_stat()
time.sleep(0.1) # only need to sleep when no tag
parts1 = _cpu_stat()
if tag: _cpu_stats[tag] = parts1
parts_diff = [p1 - p0 for (p0, p1) in zip(parts0, parts1)]
user, nice, sys, idle, iowait, irq, softirq, steal, _guest, _guest_nice = parts_diff
idle_sum = idle + iowait
@ -162,3 +167,4 @@ def reboot(mode=None):
os.system("sync")
os.system("shutdown -r now")
os.system("service pwnagotchi restart")

View File

@ -312,18 +312,47 @@ class Agent(Client, Automata, AsyncAdvertiser, AsyncTrainer):
def _fetch_stats(self):
while True:
s = self.session()
self._update_uptime(s)
self._update_advertisement(s)
self._update_peers()
self._update_counters()
self._update_handshakes(0)
time.sleep(1)
try:
s = self.session()
except Exception as err:
logging.error("[agent:_fetch_stats] self.session: %s" % repr(err))
try:
self._update_uptime(s)
except Exception as err:
logging.error("[agent:_fetch_stats] self.update_uptimes: %s" % repr(err))
try:
self._update_advertisement(s)
except Exception as err:
logging.error("[agent:_fetch_stats] self.update_advertisements: %s" % repr(err))
try:
self._update_peers()
except Exception as err:
logging.error("[agent:_fetch_stats] self.update_peers: %s" % repr(err))
try:
self._update_counters()
except Exception as err:
logging.error("[agent:_fetch_stats] self.update_counters: %s" % repr(err))
try:
self._update_handshakes(0)
except Exception as err:
logging.error("[agent:_fetch_stats] self.update_handshakes: %s" % repr(err))
time.sleep(5)
async def _on_event(self, msg):
found_handshake = False
jmsg = json.loads(msg)
# give plugins access to the events
try:
plugins.on('bcap_%s' % re.sub(r"[^a-z0-9_]+", "_", jmsg['tag'].lower()), self, jmsg)
except Exception as err:
logging.error("Processing event: %s" % err)
if jmsg['tag'] == 'wifi.client.handshake':
filename = jmsg['data']['file']
sta_mac = jmsg['data']['station']
@ -356,12 +385,13 @@ class Agent(Client, Automata, AsyncAdvertiser, AsyncTrainer):
self.run('events.clear')
while True:
logging.debug("polling events ...")
logging.debug("[agent:_event_poller] polling events ...")
try:
loop.create_task(self.start_websocket(self._on_event))
loop.run_forever()
logging.debug("[agent:_event_poller] loop loop loop")
except Exception as ex:
logging.debug("Error while polling via websocket (%s)", ex)
logging.debug("[agent:_event_poller] Error while polling via websocket (%s)", ex)
def start_event_polling(self):
# start a thread and pass in the mainloop

View File

@ -14,10 +14,11 @@ def load(config, agent, epoch, from_disk=True):
try:
begin = time.time()
logging.info("[ai] bootstrapping dependencies ...")
start = time.time()
SB_BACKEND = "stable_baselines3"
SB_BACKEND = "stable_baselines3";
try:
from stable_baselines3 import A2C
@ -51,7 +52,6 @@ def load(config, agent, epoch, from_disk=True):
start = time.time()
import pwnagotchi.ai.gym as wrappers
logging.debug("[ai] gym wrapper imported in %.2fs" % (time.time() - start))
env = wrappers.Environment(agent, epoch)
@ -79,5 +79,5 @@ def load(config, agent, epoch, from_disk=True):
except Exception as e:
logging.exception("error while starting AI (%s)", e)
logging.warning("[ai] AI not loaded!")
return False
logging.warning("[ai] AI not loaded!")
return False

View File

@ -177,7 +177,7 @@ class Epoch(object):
self.bored_for = 0
now = time.time()
cpu = pwnagotchi.cpu_load()
cpu = pwnagotchi.cpu_load("epoch")
mem = pwnagotchi.mem_usage()
temp = pwnagotchi.temperature()

View File

@ -77,9 +77,12 @@ class Stats(object):
})
temp = "%s.tmp" % self.path
back = "%s.bak" % self.path
with open(temp, 'wt') as fp:
fp.write(data)
if os.path.isfile(self.path):
os.replace(self.path, back)
os.replace(temp, self.path)
@ -174,7 +177,13 @@ class AsyncTrainer(object):
logging.info("[ai] learning for %d epochs ..." % epochs_per_episode)
try:
self.set_training(True, epochs_per_episode)
# back up brain file before starting new training set
if os.path.isfile(self._nn_path):
back = "%s.bak" % self._nn_path
os.replace(self._nn_path, back)
self._view.set("mode", " ai")
self._model.learn(total_timesteps=epochs_per_episode, callback=self.on_ai_training_step)
self._view.set("mode", " AI")
except Exception as e:
logging.exception("[ai] error while training (%s)", e)
finally:

View File

@ -31,8 +31,10 @@ class Client(object):
self.websocket = "ws://%s:%s@%s:%d/api" % (username, password, hostname, port)
self.auth = HTTPBasicAuth(username, password)
def session(self):
r = requests.get("%s/session" % self.url, auth=self.auth)
# session takes optional argument to pull a sub-dictionary
# ex.: "session/wifi", "session/ble"
def session(self, sess="session"):
r = requests.get("%s/%s" % (self.url, sess), auth=self.auth)
return decode(r)
async def start_websocket(self, consumer):

View File

@ -46,13 +46,13 @@ def toggle_plugin(name, enable=True):
if name not in pwnagotchi.config['main']['plugins']:
pwnagotchi.config['main']['plugins'][name] = dict()
pwnagotchi.config['main']['plugins'][name]['enabled'] = enable
save_config(pwnagotchi.config, '/etc/pwnagotchi/config.toml')
if not enable and name in loaded:
if getattr(loaded[name], 'on_unload', None):
loaded[name].on_unload(view.ROOT)
del loaded[name]
if pwnagotchi.config:
save_config(pwnagotchi.config, '/etc/pwnagotchi/config.toml')
return True
if enable and name in database and name not in loaded:
@ -64,6 +64,8 @@ def toggle_plugin(name, enable=True):
one(name, 'config_changed', pwnagotchi.config)
one(name, 'ui_setup', view.ROOT)
one(name, 'ready', view.ROOT._agent)
if pwnagotchi.config:
save_config(pwnagotchi.config, '/etc/pwnagotchi/config.toml')
return True
return False

View File

@ -41,6 +41,7 @@ class MemTemp(plugins.Plugin):
ALLOWED_FIELDS = {
'mem': 'mem_usage',
'cpu': 'cpu_load',
'cpus': 'cpu_load_since',
'temp': 'cpu_temp',
'freq': 'cpu_freq'
}
@ -50,6 +51,7 @@ class MemTemp(plugins.Plugin):
FIELD_WIDTH = 4
def on_loaded(self):
self._last_cpu_load = self._cpu_stat()
logging.info("memtemp plugin loaded.")
def mem_usage(self):
@ -58,6 +60,28 @@ class MemTemp(plugins.Plugin):
def cpu_load(self):
return f"{int(pwnagotchi.cpu_load() * 100)}%"
def _cpu_stat(self):
"""
Returns the splitted first line of the /proc/stat file
"""
with open('/proc/stat', 'rt') as fp:
return list(map(int,fp.readline().split()[1:]))
def cpu_load_since(self):
"""
Returns the % load, since last time called
"""
parts0 = self._cpu_stat()
parts1 = self._last_cpu_load
self._last_cpu_load = parts0
parts_diff = [p1 - p0 for (p0, p1) in zip(parts0, parts1)]
user, nice, sys, idle, iowait, irq, softirq, steal, _guest, _guest_nice = parts_diff
idle_sum = idle + iowait
non_idle_sum = user + nice + sys + irq + softirq + steal
total = idle_sum + non_idle_sum
return f"{int(non_idle_sum / total * 100)}%"
def cpu_temp(self):
if self.options['scale'] == "fahrenheit":
temp = (pwnagotchi.temperature() * 9 / 5) + 32

View File

@ -2,8 +2,9 @@ import logging
import json
import toml
import _thread
import pwnagotchi
from pwnagotchi import restart, plugins
from pwnagotchi.utils import save_config
from pwnagotchi.utils import save_config, merge_config
from flask import abort
from flask import render_template_string
@ -180,7 +181,10 @@ INDEX = """
<span><select id="selAddType"><option value="text">Text</option><option value="number">Number</option></select></span>
<span><button id="btnAdd" type="button" onclick="addOption()">+</button></span>
</div>
<button id="btnSave" type="button" onclick="saveConfig()">Save and restart</button>
<div id="divSaveTop">
<button id="btnSave" type="button" onclick="saveConfig()">Save and restart</button>
<button id="btnSave" type="button" onclick="saveConfigNoRestart()">Merge and Save (CAUTION)</button>
</div>
<div id="content"></div>
{% endblock %}
@ -240,6 +244,24 @@ INDEX = """
});
}
}
function saveConfigNoRestart(){
// get table
var table = document.getElementById("tableOptions");
if (table) {
var json = tableToJson(table);
sendJSON("webcfg/merge-save-config", json, function(response) {
if (response) {
if (response.status == "200") {
alert("Config got updated");
} else {
alert("Error while updating the config (err-code: " + response.status + ")");
}
}
});
}
}
var searchInput = document.getElementById("searchText");
searchInput.onkeyup = function() {
var filter, table, tr, td, i, txtValue;
@ -471,15 +493,18 @@ class WebConfig(plugins.Plugin):
def __init__(self):
self.ready = False
self.mode = 'MANU'
self._agent = None
def on_config_changed(self, config):
self.config = config
self.ready = True
def on_ready(self, agent):
self._agent = agent
self.mode = 'MANU' if agent.mode == 'manual' else 'AUTO'
def on_internet_available(self, agent):
self._agent = agent
self.mode = 'MANU' if agent.mode == 'manual' else 'AUTO'
def on_loaded(self):
@ -513,4 +538,18 @@ class WebConfig(plugins.Plugin):
except Exception as ex:
logging.error(ex)
return "config error", 500
elif path == "merge-save-config":
try:
self.config = merge_config(request.get_json(), self.config)
pwnagotchi.config = merge_config(request.get_json(), pwnagotchi.config)
logging.debug("PWNAGOTCHI CONFIG:\n%s" % repr(pwnagotchi.config))
if self._agent:
self._agent._config = merge_config(request.get_json(), self._agent._config)
logging.debug(" Agent CONFIG:\n%s" % repr(self._agent._config))
logging.debug(" Updated CONFIG:\n%s" % request.get_json())
save_config(request.get_json(), '/etc/pwnagotchi/config.toml') # test
return "success"
except Exception as ex:
logging.error("[webcfg mergesave] %s" % ex)
return "config error", 500
abort(404)

View File

@ -43,6 +43,9 @@ class Display(View):
def is_waveshare27inch(self):
return self._implementation.name == 'waveshare27inch'
def is_waveshare27inchPartial(self):
return self._implementation.name == 'waveshare27inchPartial'
def is_waveshare29inch(self):
return self._implementation.name == 'waveshare29inch'
@ -75,6 +78,9 @@ class Display(View):
def is_spotpear24inch(self):
return self._implementation.name == 'spotpear24inch'
def is_displayhatmini(self):
return self._implementation.name == 'displayhatmini'
def is_waveshare_any(self):
return self.is_waveshare_v1() or self.is_waveshare_v2()
@ -101,7 +107,8 @@ class Display(View):
while True:
self._canvas_next_event.wait()
self._canvas_next_event.clear()
if self._implementation.name != 'waveshare27inchPartial':
self._canvas_next_event.clear()
self._implementation.render(self._canvas_next)
def _on_view_rendered(self, img):

View File

@ -8,6 +8,7 @@ from pwnagotchi.ui.hw.waveshare1 import WaveshareV1
from pwnagotchi.ui.hw.waveshare2 import WaveshareV2
from pwnagotchi.ui.hw.waveshare3 import WaveshareV3
from pwnagotchi.ui.hw.waveshare27inch import Waveshare27inch
from pwnagotchi.ui.hw.waveshare27inchPartial import Waveshare27inchPartial
from pwnagotchi.ui.hw.waveshare29inch import Waveshare29inch
from pwnagotchi.ui.hw.waveshare144lcd import Waveshare144lcd
from pwnagotchi.ui.hw.waveshare154inch import Waveshare154inch
@ -15,6 +16,7 @@ from pwnagotchi.ui.hw.waveshare213d import Waveshare213d
from pwnagotchi.ui.hw.waveshare213bc import Waveshare213bc
from pwnagotchi.ui.hw.waveshare35lcd import Waveshare35lcd
from pwnagotchi.ui.hw.spotpear24inch import Spotpear24inch
from pwnagotchi.ui.hw.displayhatmini import DisplayHatMini
def display_for(config):
# config has been normalized already in utils.load_config
@ -48,6 +50,9 @@ def display_for(config):
elif config['ui']['display']['type'] == 'waveshare27inch':
return Waveshare27inch(config)
elif config['ui']['display']['type'] == 'waveshare27inchPartial':
return Waveshare27inchPartial(config)
elif config['ui']['display']['type'] == 'waveshare29inch':
return Waveshare29inch(config)
@ -68,3 +73,6 @@ def display_for(config):
elif config['ui']['display']['type'] == 'spotpear24inch':
return Spotpear24inch(config)
elif config['ui']['display']['type'] == 'displayhatmini':
return DisplayHatMini(config)

View File

@ -0,0 +1,44 @@
import logging
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.hw.base import DisplayImpl
class DisplayHatMini(DisplayImpl):
def __init__(self, config):
super(DisplayHatMini, self).__init__(config, 'displayhatmini')
self._display = None
def layout(self):
fonts.setup(12, 10, 12, 70, 25, 9)
self._layout['width'] = 320
self._layout['height'] = 240
self._layout['face'] = (35, 50)
self._layout['name'] = (5, 20)
self._layout['channel'] = (0, 0)
self._layout['aps'] = (40, 0)
self._layout['uptime'] = (240, 0)
self._layout['line1'] = [0, 14, 320, 14]
self._layout['line2'] = [0, 220, 320, 220]
self._layout['friend_face'] = (0, 130)
self._layout['friend_name'] = (40, 135)
self._layout['shakes'] = (0, 220)
self._layout['mode'] = (280, 220)
self._layout['status'] = {
'pos': (80, 160),
'font': fonts.status_font(fonts.Medium),
'max': 20
}
return self._layout
def initialize(self):
logging.info("initializing Display Hat Mini")
from pwnagotchi.ui.hw.libs.pimoroni.displayhatmini.ST7789 import ST7789
self._display = ST7789(0,1,9,13)
def render(self, canvas):
self._display.display(canvas)
def clear(self):
self._display.clear()

View File

@ -0,0 +1,360 @@
# Copyright (c) 2014 Adafruit Industries
# Author: Tony DiCola
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import numbers
import time
import numpy as np
import spidev
import RPi.GPIO as GPIO
__version__ = '0.0.4'
BG_SPI_CS_BACK = 0
BG_SPI_CS_FRONT = 1
SPI_CLOCK_HZ = 16000000
ST7789_NOP = 0x00
ST7789_SWRESET = 0x01
ST7789_RDDID = 0x04
ST7789_RDDST = 0x09
ST7789_SLPIN = 0x10
ST7789_SLPOUT = 0x11
ST7789_PTLON = 0x12
ST7789_NORON = 0x13
ST7789_INVOFF = 0x20
ST7789_INVON = 0x21
ST7789_DISPOFF = 0x28
ST7789_DISPON = 0x29
ST7789_CASET = 0x2A
ST7789_RASET = 0x2B
ST7789_RAMWR = 0x2C
ST7789_RAMRD = 0x2E
ST7789_PTLAR = 0x30
ST7789_MADCTL = 0x36
ST7789_COLMOD = 0x3A
ST7789_FRMCTR1 = 0xB1
ST7789_FRMCTR2 = 0xB2
ST7789_FRMCTR3 = 0xB3
ST7789_INVCTR = 0xB4
ST7789_DISSET5 = 0xB6
ST7789_GCTRL = 0xB7
ST7789_GTADJ = 0xB8
ST7789_VCOMS = 0xBB
ST7789_LCMCTRL = 0xC0
ST7789_IDSET = 0xC1
ST7789_VDVVRHEN = 0xC2
ST7789_VRHS = 0xC3
ST7789_VDVS = 0xC4
ST7789_VMCTR1 = 0xC5
ST7789_FRCTRL2 = 0xC6
ST7789_CABCCTRL = 0xC7
ST7789_RDID1 = 0xDA
ST7789_RDID2 = 0xDB
ST7789_RDID3 = 0xDC
ST7789_RDID4 = 0xDD
ST7789_GMCTRP1 = 0xE0
ST7789_GMCTRN1 = 0xE1
ST7789_PWCTR6 = 0xFC
class ST7789(object):
"""Representation of an ST7789 TFT LCD."""
def __init__(self, port, cs, dc, backlight, rst=None, width=320,
height=240, rotation=0, invert=True, spi_speed_hz=60 * 1000 * 1000,
offset_left=0,
offset_top=0):
"""Create an instance of the display using SPI communication.
Must provide the GPIO pin number for the D/C pin and the SPI driver.
Can optionally provide the GPIO pin number for the reset pin as the rst parameter.
:param port: SPI port number
:param cs: SPI chip-select number (0 or 1 for BCM
:param backlight: Pin for controlling backlight
:param rst: Reset pin for ST7789
:param width: Width of display connected to ST7789
:param height: Height of display connected to ST7789
:param rotation: Rotation of display connected to ST7789
:param invert: Invert display
:param spi_speed_hz: SPI speed (in Hz)
"""
if rotation not in [0, 90, 180, 270]:
raise ValueError("Invalid rotation {}".format(rotation))
if width != height and rotation in [90, 270]:
raise ValueError("Invalid rotation {} for {}x{} resolution".format(rotation, width, height))
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
self._spi = spidev.SpiDev(port, cs)
self._spi.mode = 0
self._spi.lsbfirst = False
self._spi.max_speed_hz = spi_speed_hz
self._dc = dc
self._rst = rst
self._width = width
self._height = height
self._rotation = rotation
self._invert = invert
self._offset_left = offset_left
self._offset_top = offset_top
# Set DC as output.
GPIO.setup(dc, GPIO.OUT)
# Setup backlight as output (if provided).
self._backlight = backlight
if backlight is not None:
GPIO.setup(backlight, GPIO.OUT)
GPIO.output(backlight, GPIO.LOW)
time.sleep(0.1)
GPIO.output(backlight, GPIO.HIGH)
# Setup reset as output (if provided).
if rst is not None:
GPIO.setup(self._rst, GPIO.OUT)
self.reset()
self._init()
def send(self, data, is_data=True, chunk_size=4096):
"""Write a byte or array of bytes to the display. Is_data parameter
controls if byte should be interpreted as display data (True) or command
data (False). Chunk_size is an optional size of bytes to write in a
single SPI transaction, with a default of 4096.
"""
# Set DC low for command, high for data.
GPIO.output(self._dc, is_data)
# Convert scalar argument to list so either can be passed as parameter.
if isinstance(data, numbers.Number):
data = [data & 0xFF]
# Write data a chunk at a time.
for start in range(0, len(data), chunk_size):
end = min(start + chunk_size, len(data))
self._spi.xfer(data[start:end])
def set_backlight(self, value):
"""Set the backlight on/off."""
if self._backlight is not None:
GPIO.output(self._backlight, value)
@property
def width(self):
return self._width if self._rotation == 0 or self._rotation == 180 else self._height
@property
def height(self):
return self._height if self._rotation == 0 or self._rotation == 180 else self._width
def command(self, data):
"""Write a byte or array of bytes to the display as command data."""
self.send(data, False)
def data(self, data):
"""Write a byte or array of bytes to the display as display data."""
self.send(data, True)
def reset(self):
"""Reset the display, if reset pin is connected."""
if self._rst is not None:
GPIO.output(self._rst, 1)
time.sleep(0.500)
GPIO.output(self._rst, 0)
time.sleep(0.500)
GPIO.output(self._rst, 1)
time.sleep(0.500)
def _init(self):
# Initialize the display.
self.command(ST7789_SWRESET) # Software reset
time.sleep(0.150) # delay 150 ms
self.command(ST7789_MADCTL)
self.data(0x70)
self.command(ST7789_FRMCTR2) # Frame rate ctrl - idle mode
self.data(0x0C)
self.data(0x0C)
self.data(0x00)
self.data(0x33)
self.data(0x33)
self.command(ST7789_COLMOD)
self.data(0x05)
self.command(ST7789_GCTRL)
self.data(0x14)
self.command(ST7789_VCOMS)
self.data(0x37)
self.command(ST7789_LCMCTRL) # Power control
self.data(0x2C)
self.command(ST7789_VDVVRHEN) # Power control
self.data(0x01)
self.command(ST7789_VRHS) # Power control
self.data(0x12)
self.command(ST7789_VDVS) # Power control
self.data(0x20)
self.command(0xD0)
self.data(0xA4)
self.data(0xA1)
self.command(ST7789_FRCTRL2)
self.data(0x0F)
self.command(ST7789_GMCTRP1) # Set Gamma
self.data(0xD0)
self.data(0x04)
self.data(0x0D)
self.data(0x11)
self.data(0x13)
self.data(0x2B)
self.data(0x3F)
self.data(0x54)
self.data(0x4C)
self.data(0x18)
self.data(0x0D)
self.data(0x0B)
self.data(0x1F)
self.data(0x23)
self.command(ST7789_GMCTRN1) # Set Gamma
self.data(0xD0)
self.data(0x04)
self.data(0x0C)
self.data(0x11)
self.data(0x13)
self.data(0x2C)
self.data(0x3F)
self.data(0x44)
self.data(0x51)
self.data(0x2F)
self.data(0x1F)
self.data(0x1F)
self.data(0x20)
self.data(0x23)
if self._invert:
self.command(ST7789_INVON) # Invert display
else:
self.command(ST7789_INVOFF) # Don't invert display
self.command(ST7789_SLPOUT)
self.command(ST7789_DISPON) # Display on
time.sleep(0.100) # 100 ms
def begin(self):
"""Set up the display
Deprecated. Included in __init__.
"""
pass
def set_window(self, x0=0, y0=0, x1=None, y1=None):
"""Set the pixel address window for proceeding drawing commands. x0 and
x1 should define the minimum and maximum x pixel bounds. y0 and y1
should define the minimum and maximum y pixel bound. If no parameters
are specified the default will be to update the entire display from 0,0
to width-1,height-1.
"""
if x1 is None:
x1 = self._width - 1
if y1 is None:
y1 = self._height - 1
y0 += self._offset_top
y1 += self._offset_top
x0 += self._offset_left
x1 += self._offset_left
self.command(ST7789_CASET) # Column addr set
self.data(x0 >> 8)
self.data(x0 & 0xFF) # XSTART
self.data(x1 >> 8)
self.data(x1 & 0xFF) # XEND
self.command(ST7789_RASET) # Row addr set
self.data(y0 >> 8)
self.data(y0 & 0xFF) # YSTART
self.data(y1 >> 8)
self.data(y1 & 0xFF) # YEND
self.command(ST7789_RAMWR) # write to RAM
def display(self, image):
"""Write the provided image to the hardware.
:param image: Should be RGB format and the same dimensions as the display hardware.
"""
# Set address bounds to entire display.
self.set_window()
# Convert image to 16bit RGB565 format and
# flatten into bytes.
pixelbytes = self.image_to_data(image, self._rotation)
# Write data to hardware.
for i in range(0, len(pixelbytes), 4096):
self.data(pixelbytes[i:i + 4096])
def image_to_data(self, image, rotation=0):
if not isinstance(image, np.ndarray):
image = np.array(image.convert('RGB'))
# Rotate the image
pb = np.rot90(image, rotation // 90).astype('uint16')
# Mask and shift the 888 RGB into 565 RGB
red = (pb[..., [0]] & 0xf8) << 8
green = (pb[..., [1]] & 0xfc) << 3
blue = (pb[..., [2]] & 0xf8) >> 3
# Stick 'em together
result = red | green | blue
# Output the raw bytes
return result.byteswap().tobytes()

View File

@ -0,0 +1,77 @@
import logging
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.hw.base import DisplayImpl
from PIL import Image
class Waveshare27inchPartial(DisplayImpl):
def __init__(self, config):
super(Waveshare27inchPartial, self).__init__(config, 'waveshare27inchPartial')
self._display = None
self.counter = 0
def layout(self):
fonts.setup(10, 9, 10, 35, 25, 9)
self._layout['width'] = 264
self._layout['height'] = 176
self._layout['face'] = (66, 27)
self._layout['name'] = (5, 73)
self._layout['channel'] = (0, 0)
self._layout['aps'] = (28, 0)
self._layout['uptime'] = (199, 0)
self._layout['line1'] = [0, 14, 264, 14]
self._layout['line2'] = [0, 162, 264, 162]
self._layout['friend_face'] = (0, 146)
self._layout['friend_name'] = (40, 146)
self._layout['shakes'] = (0, 163)
self._layout['mode'] = (239, 163)
self._layout['status'] = {
'pos': (38, 93),
'font': fonts.status_font(fonts.Medium),
'max': 40
}
return self._layout
def initialize(self):
logging.info("initializing waveshare v1 2.7 inch display")
from rpi_epd2in7.epd import EPD
self._display = EPD(fast_refresh=True)
self._display.init()
def render(self, canvas):
# have to rotate, because lib work with portrait mode only
# also I have 180 degrees screen rotation inn config, not tested with other valuesjk:w
rotated = canvas.rotate(90, expand=True)
if self.counter == 0:
self._display.smart_update(rotated)
# print invert
# print true image
elif self.counter % 35 == 0:
inverted_image = rotated.point(lambda x: 255-x)
self._display.display_partial_frame(inverted_image, 0, 0, 264, 176, fast=True)
self._display.display_partial_frame(rotated, 0, 0, 264, 176, fast=True)
# partial update full screen
elif self.counter % 7 == 0:
# face + text under
#self._display.display_partial_frame(rotated, 35, 35, 190, 115, fast=True)
# full screen partial update
self._display.display_partial_frame(rotated, 0, 0, 264, 176, fast=True)
# partial update face
self._display.display_partial_frame(rotated, 110, 84, 92, 40, fast=True)
if self.counter >= 100:
self.counter = 0
else:
self.counter += 1
def clear(self):
pass

View File

@ -40,7 +40,7 @@ class Waveshare35lcd(DisplayImpl):
from pwnagotchi.ui.hw.libs.fb import fb
self._display = fb
logging.info("initializing waveshare 3,5inch lcd display")
self._display.ready_fb(i=1)
self._display.ready_fb(i=0)
self._display.black_scr()
def render(self, canvas):

View File

@ -16,8 +16,10 @@ from pwnagotchi.ui.components import *
from pwnagotchi.ui.state import State
from pwnagotchi.voice import Voice
WHITE = 0xff
BLACK = 0x00
import RPi.GPIO as GPIO
WHITE = 0x181010
BLACK = 0xffcccc
ROOT = None
@ -122,7 +124,7 @@ class View(object):
while True:
try:
name = self._state.get('name')
self.set('name', name.rstrip('').strip() if '' in name else (name + ' '))
self.set('name', name.rstrip('-').strip() if '-' in name else (name + ' -'))
self.update()
except Exception as e:
logging.warning("non fatal error while updating view: %s" % e)
@ -245,9 +247,9 @@ class View(object):
def wait(self, secs, sleeping=True):
was_normal = self.is_normal()
part = secs / 10.0
part = secs/3.0
for step in range(0, 10):
for step in range(0, 3):
# if we weren't in a normal state before going
# to sleep, keep that face and status on for
# a while, otherwise the sleep animation will
@ -257,11 +259,13 @@ class View(object):
if secs > 1:
self.set('face', faces.SLEEP)
self.set('status', self._voice.on_napping(int(secs)))
plugins.on('sleep', self, secs)
else:
self.set('face', faces.SLEEP2)
self.set('status', self._voice.on_awakening())
else:
self.set('status', self._voice.on_waiting(int(secs)))
plugins.on('wait', self, secs)
good_mood = self._agent.in_good_mood()
if step % 2 == 0:
self.set('face', faces.LOOK_R_HAPPY if good_mood else faces.LOOK_R)
@ -371,7 +375,9 @@ class View(object):
state = self._state
changes = state.changes(ignore=self._ignore_changes)
if force or len(changes):
self._canvas = Image.new('1', (self._width, self._height), WHITE)
colormode = '1' if not 'colormode' in self._config['ui'] else self._config['ui']['colormode']
backgroundcolor = WHITE if not 'backgroundcolor' in self._config['ui'] else self._config['ui']['backgroundcolor']
self._canvas = Image.new(colormode, (self._width, self._height), backgroundcolor)
drawer = ImageDraw.Draw(self._canvas)
plugins.on('ui_update', self)

View File

@ -254,6 +254,9 @@ def load_config(args):
elif config['ui']['display']['type'] in ('ws_27inch', 'ws27inch', 'waveshare_27inch', 'waveshare27inch'):
config['ui']['display']['type'] = 'waveshare27inch'
elif config['ui']['display']['type'] in ('ws_27inchPartial', 'ws27inchPartial', 'waveshare_27inchPartial', 'waveshare27inchPartial'):
config['ui']['display']['type'] = 'waveshare27inchPartial'
elif config['ui']['display']['type'] in ('ws_29inch', 'ws29inch', 'waveshare_29inch', 'waveshare29inch'):
config['ui']['display']['type'] = 'waveshare29inch'
@ -283,7 +286,10 @@ def load_config(args):
elif config['ui']['display']['type'] in ('spotpear24inch'):
config['ui']['display']['type'] = 'spotpear24inch'
elif config['ui']['display']['type'] in ('displayhatmini'):
config['ui']['display']['type'] = 'displayhatmini'
else:
print("unsupported display type %s" % config['ui']['display']['type'])
sys.exit(1)
@ -304,11 +310,13 @@ def total_unique_handshakes(path):
def iface_channels(ifname):
channels = []
output = subprocess.getoutput("/sbin/iwlist %s freq" % ifname)
phy = subprocess.getoutput("/sbin/iw %s info | grep wiphy | cut -d ' ' -f 2" % ifname)
output = subprocess.getoutput("/sbin/iw phy%s channels | grep ' MHz' | sed 's/^.*\[//g' | sed s/\].*\$//g" % phy)
for line in output.split("\n"):
line = line.strip()
if line.startswith("Channel "):
channels.append(int(line.split()[1]))
channels.append(int(line))
return channels