Files
WPIQ/wo/cli/plugins/secure.py
Malin fa5bf17eb8
Some checks failed
CI / test WordOps (ubuntu-22.04) (push) Has been cancelled
CI / test WordOps (ubuntu-24.04) (push) Has been cancelled
feat: convert WordOps from Nginx to OpenLiteSpeed + LSPHP + LSCache
Complete conversion of the WordOps stack from Nginx + PHP-FPM to
OpenLiteSpeed + LSPHP + LSCache. This is a full rewrite across all 7
phases of the codebase:

- Foundation: OLS paths, variables, services, removed pynginxconfig dep
- Templates: 11 new OLS mustache templates, removed nginx-specific ones
- Stack: stack_pref, stack, stack_services, stack_upgrade, stack_migrate
- Site: site_functions, site, site_create, site_update
- Plugins: debug, info, log, clean rewritten for OLS
- SSL/ACME: acme.sh deploy uses lswsctrl, OLS vhssl blocks
- Other: secure, backup, clone, install script

Additional features:
- Debian 13 (trixie) support
- PHP 8.5 support
- WP Fort Knox mu-plugin integration (wo secure --lockdown/--unlock)
- --nginx CLI flag preserved for backward compatibility

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 18:55:16 +01:00

350 lines
14 KiB
Python

import getpass
import os
import shutil
from cement.core.controller import CementBaseController, expose
from wo.core.fileutils import WOFileUtils
from wo.core.git import WOGit
from wo.core.logging import Log
from wo.core.random import RANDOM
from wo.core.services import WOService
from wo.core.shellexec import WOShellExec
from wo.core.template import WOTemplate
from wo.core.variables import WOVar
def wo_secure_hook(app):
pass
class WOSecureController(CementBaseController):
class Meta:
label = 'secure'
stacked_on = 'base'
stacked_type = 'nested'
description = (
'Secure command provide the ability to '
'adjust settings for backend and to harden server security.')
arguments = [
(['--auth'],
dict(help='secure backend authentification',
action='store_true')),
(['--port'],
dict(help='set backend port', action='store_true')),
(['--ip'],
dict(help='set backend whitelisted ip', action='store_true')),
(['--sshport'], dict(
help='set custom ssh port', action='store_true')),
(['--ssh'], dict(
help='harden ssh security', action='store_true')),
(['--allowpassword'], dict(
help='allow password authentification '
'when hardening ssh security', action='store_true')),
(['--lockdown'], dict(
help='enable WP Fort Knox lockdown on a site',
action='store_true')),
(['--unlock'], dict(
help='disable WP Fort Knox lockdown on a site',
action='store_true')),
(['--force'],
dict(help='force execution without being prompt',
action='store_true')),
(['user_input'],
dict(help='user input', nargs='?', default=None)),
(['user_pass'],
dict(help='user pass', nargs='?', default=None))]
usage = "wo secure [options]"
@expose(hide=True)
def default(self):
pargs = self.app.pargs
if pargs.auth:
self.secure_auth()
if pargs.port:
self.secure_port()
if pargs.ip:
self.secure_ip()
if pargs.sshport:
self.secure_ssh_port()
if pargs.ssh:
self.secure_ssh()
if pargs.lockdown:
self.secure_lockdown()
if pargs.unlock:
self.secure_unlock()
@expose(hide=True)
def secure_auth(self):
"""This function secures authentication"""
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Add OLS config to Git")
pargs = self.app.pargs
passwd = RANDOM.long(self)
if not pargs.user_input:
username = input("Provide HTTP authentication user "
"name [{0}] :".format(WOVar.wo_user))
pargs.user_input = username
if username == "":
pargs.user_input = WOVar.wo_user
if not pargs.user_pass:
password = getpass.getpass("Provide HTTP authentication "
"password [{0}] :".format(passwd))
pargs.user_pass = password
if password == "":
pargs.user_pass = passwd
# Set OLS admin password using admpass.sh
WOShellExec.cmd_exec(
self, "/usr/local/lsws/admin/misc/admpass.sh "
"{username} {password}"
.format(username=pargs.user_input,
password=pargs.user_pass),
log=False)
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Adding changed secure auth into Git")
@expose(hide=True)
def secure_port(self):
"""This function Secures port"""
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Add OLS config to Git")
pargs = self.app.pargs
if pargs.user_input:
while ((not pargs.user_input.isdigit()) and
(not pargs.user_input < 65536)):
Log.info(self, "Please enter a valid port number ")
pargs.user_input = input("WordOps "
"admin port [22222]:")
else:
port = input("WordOps admin port [22222]:")
if port == "":
port = 22222
while ((not port.isdigit()) and (not port != "") and
(not port < 65536)):
Log.info(self, "Please Enter valid port number :")
port = input("WordOps admin port [22222]:")
pargs.user_input = port
# Update OLS backend listener port
httpd_conf = '{0}/httpd_config.conf'.format(WOVar.wo_ols_conf_dir)
if os.path.isfile(httpd_conf):
WOFileUtils.searchreplace(
self, httpd_conf,
'address *:22222',
'address *:{0}'.format(pargs.user_input))
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Adding changed secure port into Git")
if not WOService.reload_service(self, 'lsws'):
Log.error(self, "service lsws reload failed. "
"check issues with `{0} -t` command"
.format(WOVar.wo_ols_bin))
Log.info(self, "Successfully port changed {port}"
.format(port=pargs.user_input))
@expose(hide=True)
def secure_ip(self):
"""IP whitelisting"""
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Add OLS config to Git")
pargs = self.app.pargs
if not pargs.user_input:
ip = input("Enter the comma separated IP addresses "
"to white list [127.0.0.1]:")
pargs.user_input = ip
try:
user_ip = pargs.user_input.strip().split(',')
except Exception as e:
Log.debug(self, "{0}".format(e))
user_ip = ['127.0.0.1']
# Update OLS ACL configuration
acl_conf = '{0}/22222/vhconf.conf'.format(WOVar.wo_ols_vhost_dir)
if os.path.isfile(acl_conf):
for ip_addr in user_ip:
ip_addr = ip_addr.strip()
if not WOFileUtils.grepcheck(self, acl_conf, ip_addr):
WOFileUtils.searchreplace(
self, acl_conf,
'allowList',
'allowList\n {0}'.format(ip_addr))
WOGit.add(self, [WOVar.wo_ols_conf_dir],
msg="Adding changed secure ip into Git")
Log.info(self, "Successfully added IP address in access control")
@expose(hide=True)
def secure_lockdown(self):
"""Enable WP Fort Knox lockdown on a WordPress site"""
pargs = self.app.pargs
if not pargs.user_input:
site_name = input("Enter the site name to lockdown: ")
pargs.user_input = site_name
site_name = pargs.user_input
webroot = '{0}{1}'.format(WOVar.wo_webroot, site_name)
mu_plugins_dir = '{0}/htdocs/wp-content/mu-plugins'.format(webroot)
fort_knox_src = '/var/lib/wo/wp-fort-knox.php'
if not os.path.isdir(webroot):
Log.error(self, "Site {0} not found".format(site_name))
# Check if it's a WordPress site
if not os.path.isfile(
'{0}/htdocs/wp-config.php'.format(webroot)):
Log.error(self, "Site {0} is not a WordPress site"
.format(site_name))
# Check if Fort Knox source exists
if not os.path.isfile(fort_knox_src):
Log.error(self, "WP Fort Knox plugin not found at {0}. "
"Please reinstall WordOps.".format(fort_knox_src))
# Create mu-plugins directory if it doesn't exist
if not os.path.isdir(mu_plugins_dir):
WOFileUtils.mkdir(self, mu_plugins_dir)
fort_knox_dest = '{0}/wp-fort-knox.php'.format(mu_plugins_dir)
if os.path.isfile(fort_knox_dest):
Log.info(self, "WP Fort Knox is already enabled for {0}"
.format(site_name))
return
Log.wait(self, "Enabling WP Fort Knox lockdown")
shutil.copy2(fort_knox_src, fort_knox_dest)
WOFileUtils.chown(
self, fort_knox_dest,
WOVar.wo_php_user, WOVar.wo_php_user)
Log.valide(self, "Enabling WP Fort Knox lockdown")
Log.info(self, "WP Fort Knox enabled for {0}\n"
" File modifications and plugin management "
"are now disabled in wp-admin.\n"
" Use WP-CLI for all administrative tasks.\n"
" To disable: wo secure --unlock {0}"
.format(site_name))
@expose(hide=True)
def secure_unlock(self):
"""Disable WP Fort Knox lockdown on a WordPress site"""
pargs = self.app.pargs
if not pargs.user_input:
site_name = input("Enter the site name to unlock: ")
pargs.user_input = site_name
site_name = pargs.user_input
webroot = '{0}{1}'.format(WOVar.wo_webroot, site_name)
fort_knox_path = ('{0}/htdocs/wp-content/mu-plugins/'
'wp-fort-knox.php'.format(webroot))
if not os.path.isdir(webroot):
Log.error(self, "Site {0} not found".format(site_name))
if not os.path.isfile(fort_knox_path):
Log.info(self, "WP Fort Knox is not enabled for {0}"
.format(site_name))
return
Log.wait(self, "Disabling WP Fort Knox lockdown")
WOFileUtils.rm(self, fort_knox_path)
Log.valide(self, "Disabling WP Fort Knox lockdown")
Log.info(self, "WP Fort Knox disabled for {0}\n"
" Plugin management is now available in wp-admin."
.format(site_name))
@expose(hide=True)
def secure_ssh(self):
"""Harden ssh security"""
pargs = self.app.pargs
if not pargs.force and not pargs.allowpassword:
start_secure = input('Are you sure you to want to'
' harden SSH security ?'
'\nSSH login with password will not '
'be possible anymore. Please make sure '
'you are already using SSH Keys.\n'
'Harden SSH security [y/N]')
if start_secure != "Y" and start_secure != "y":
Log.error(self, "Not hardening SSH security")
if os.path.exists('/etc/ssh'):
WOGit.add(self, ["/etc/ssh"],
msg="Adding SSH into Git")
Log.debug(self, "check if /etc/ssh/sshd_config exist")
if os.path.isfile('/etc/ssh/sshd_config'):
Log.debug(self, "looking for the current ssh port")
for line in open('/etc/ssh/sshd_config', encoding='utf-8'):
if 'Port' in line:
ssh_line = line.strip()
break
port = (ssh_line).split(' ')
current_ssh_port = (port[1]).strip()
if os.getenv('SUDO_USER'):
sudo_user = os.getenv('SUDO_USER')
else:
sudo_user = ''
if pargs.allowpassword:
wo_allowpassword = 'yes'
else:
wo_allowpassword = 'no'
data = dict(sshport=current_ssh_port, allowpass=wo_allowpassword,
user=sudo_user)
WOTemplate.deploy(self, '/etc/ssh/sshd_config',
'sshd.mustache', data)
WOGit.add(self, ["/etc/ssh"],
msg="Adding changed SSH port into Git")
if not WOService.restart_service(self, 'ssh'):
Log.error(self, "service SSH restart failed.")
Log.info(self, "Successfully harden SSH security")
else:
Log.error(self, "SSH config file not found")
@expose(hide=True)
def secure_ssh_port(self):
"""Change SSH port"""
WOGit.add(self, ["/etc/ssh"],
msg="Adding changed SSH port into Git")
pargs = self.app.pargs
if pargs.user_input:
while ((not pargs.user_input.isdigit()) and
(not pargs.user_input < 65536)):
Log.info(self, "Please enter a valid port number ")
pargs.user_input = input("Server "
"SSH port [22]:")
if not pargs.user_input:
port = input("Server SSH port [22]:")
if port == "":
port = 22
while (not port.isdigit()) and (port != "") and (not port < 65536):
Log.info(self, "Please Enter valid port number :")
port = input("Server SSH port [22]:")
pargs.user_input = port
if WOFileUtils.grepcheck(self, '/etc/ssh/sshd_config', '#Port'):
WOShellExec.cmd_exec(self, "sed -i \"s/#Port.*/Port "
"{port}/\" /etc/ssh/sshd_config"
.format(port=pargs.user_input))
else:
WOShellExec.cmd_exec(self, "sed -i \"s/Port.*/Port "
"{port}/\" /etc/ssh/sshd_config"
.format(port=pargs.user_input))
# allow new ssh port if ufw is enabled
if os.path.isfile('/etc/ufw/ufw.conf'):
# add rule for proftpd with UFW
if WOFileUtils.grepcheck(
self, '/etc/ufw/ufw.conf', 'ENABLED=yes'):
try:
WOShellExec.cmd_exec(
self, 'ufw limit {0}'.format(pargs.user_input))
WOShellExec.cmd_exec(
self, 'ufw reload')
except Exception as e:
Log.debug(self, "{0}".format(e))
Log.error(self, "Unable to add UFW rule")
# add ssh into git
WOGit.add(self, ["/etc/ssh"],
msg="Adding changed SSH port into Git")
# restart ssh service
if not WOService.restart_service(self, 'ssh'):
Log.error(self, "service SSH restart failed.")
Log.info(self, "Successfully changed SSH port to {port}"
.format(port=pargs.user_input))
def load(app):
app.handler.register(WOSecureController)
app.hook.register('post_argument_parsing', wo_secure_hook)