Three fixes:
1. Replace admpass.sh calls with direct htpasswd writes — the script
is interactive-only (no --password flag) and hangs forever in
automation. Write admin htpasswd directly with openssl passwd.
2. Fix httpd_config.conf template — OLS requires virtualHost {} blocks
with vhRoot/configFile, not bare include of vhconf.conf files.
Add proper _backend virtualHost block, map it to Backend listener,
use self-signed cert for Secure listener until real certs exist.
3. Fix addOLSListenerMap to only add maps to Default and Secure
listeners (not Backend which is reserved for the admin panel).
4. Fix default PHP detection to read from wo.conf config instead
of picking first installed version (which would prefer php74).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
364 lines
15 KiB
Python
364 lines
15 KiB
Python
import getpass
|
|
import os
|
|
import shutil
|
|
|
|
from cement.core.controller import CementBaseController, expose
|
|
|
|
from wo.core.fileutils import WOFileUtils
|
|
from wo.core.git import WOGit
|
|
from wo.core.logging import Log
|
|
from wo.core.random import RANDOM
|
|
from wo.core.services import WOService
|
|
from wo.core.shellexec import WOShellExec
|
|
from wo.core.template import WOTemplate
|
|
from wo.core.variables import WOVar
|
|
|
|
|
|
def wo_secure_hook(app):
|
|
pass
|
|
|
|
|
|
class WOSecureController(CementBaseController):
|
|
class Meta:
|
|
label = 'secure'
|
|
stacked_on = 'base'
|
|
stacked_type = 'nested'
|
|
description = (
|
|
'Secure command provide the ability to '
|
|
'adjust settings for backend and to harden server security.')
|
|
arguments = [
|
|
(['--auth'],
|
|
dict(help='secure backend authentification',
|
|
action='store_true')),
|
|
(['--port'],
|
|
dict(help='set backend port', action='store_true')),
|
|
(['--ip'],
|
|
dict(help='set backend whitelisted ip', action='store_true')),
|
|
(['--sshport'], dict(
|
|
help='set custom ssh port', action='store_true')),
|
|
(['--ssh'], dict(
|
|
help='harden ssh security', action='store_true')),
|
|
(['--allowpassword'], dict(
|
|
help='allow password authentification '
|
|
'when hardening ssh security', action='store_true')),
|
|
(['--lockdown'], dict(
|
|
help='enable WP Fort Knox lockdown on a site',
|
|
action='store_true')),
|
|
(['--unlock'], dict(
|
|
help='disable WP Fort Knox lockdown on a site',
|
|
action='store_true')),
|
|
(['--force'],
|
|
dict(help='force execution without being prompt',
|
|
action='store_true')),
|
|
(['user_input'],
|
|
dict(help='user input', nargs='?', default=None)),
|
|
(['user_pass'],
|
|
dict(help='user pass', nargs='?', default=None))]
|
|
usage = "wo secure [options]"
|
|
|
|
@expose(hide=True)
|
|
def default(self):
|
|
pargs = self.app.pargs
|
|
if pargs.auth:
|
|
self.secure_auth()
|
|
if pargs.port:
|
|
self.secure_port()
|
|
if pargs.ip:
|
|
self.secure_ip()
|
|
if pargs.sshport:
|
|
self.secure_ssh_port()
|
|
if pargs.ssh:
|
|
self.secure_ssh()
|
|
if pargs.lockdown:
|
|
self.secure_lockdown()
|
|
if pargs.unlock:
|
|
self.secure_unlock()
|
|
|
|
@expose(hide=True)
|
|
def secure_auth(self):
|
|
"""This function secures authentication"""
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Add OLS config to Git")
|
|
pargs = self.app.pargs
|
|
passwd = RANDOM.long(self)
|
|
if not pargs.user_input:
|
|
username = input("Provide HTTP authentication user "
|
|
"name [{0}] :".format(WOVar.wo_user))
|
|
pargs.user_input = username
|
|
if username == "":
|
|
pargs.user_input = WOVar.wo_user
|
|
if not pargs.user_pass:
|
|
password = getpass.getpass("Provide HTTP authentication "
|
|
"password [{0}] :".format(passwd))
|
|
pargs.user_pass = password
|
|
if password == "":
|
|
pargs.user_pass = passwd
|
|
# Set OLS admin + backend password directly
|
|
# (admpass.sh is interactive-only and hangs in automation)
|
|
WOShellExec.cmd_exec(
|
|
self, "printf \"{username}:"
|
|
"$(openssl passwd -apr1 '{password}' "
|
|
"2>/dev/null)\n\" "
|
|
"> /usr/local/lsws/admin/conf/htpasswd "
|
|
"2>/dev/null"
|
|
.format(username=pargs.user_input,
|
|
password=pargs.user_pass),
|
|
log=False)
|
|
WOShellExec.cmd_exec(
|
|
self, "printf \"{username}:"
|
|
"$(openssl passwd -apr1 '{password}' "
|
|
"2>/dev/null)\n\" "
|
|
"> {conf}/htpasswd-wo "
|
|
"2>/dev/null"
|
|
.format(username=pargs.user_input,
|
|
password=pargs.user_pass,
|
|
conf=WOVar.wo_ols_conf_dir),
|
|
log=False)
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Adding changed secure auth into Git")
|
|
|
|
@expose(hide=True)
|
|
def secure_port(self):
|
|
"""This function Secures port"""
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Add OLS config to Git")
|
|
pargs = self.app.pargs
|
|
if pargs.user_input:
|
|
while ((not pargs.user_input.isdigit()) and
|
|
(not pargs.user_input < 65536)):
|
|
Log.info(self, "Please enter a valid port number ")
|
|
pargs.user_input = input("WordOps "
|
|
"admin port [22222]:")
|
|
else:
|
|
port = input("WordOps admin port [22222]:")
|
|
if port == "":
|
|
port = 22222
|
|
while ((not port.isdigit()) and (not port != "") and
|
|
(not port < 65536)):
|
|
Log.info(self, "Please Enter valid port number :")
|
|
port = input("WordOps admin port [22222]:")
|
|
pargs.user_input = port
|
|
# Update OLS backend listener port
|
|
httpd_conf = '{0}/httpd_config.conf'.format(WOVar.wo_ols_conf_dir)
|
|
if os.path.isfile(httpd_conf):
|
|
WOFileUtils.searchreplace(
|
|
self, httpd_conf,
|
|
'address *:22222',
|
|
'address *:{0}'.format(pargs.user_input))
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Adding changed secure port into Git")
|
|
if not WOService.reload_service(self, 'lsws'):
|
|
Log.error(self, "service lsws reload failed. "
|
|
"check issues with `{0} -t` command"
|
|
.format(WOVar.wo_ols_bin))
|
|
Log.info(self, "Successfully port changed {port}"
|
|
.format(port=pargs.user_input))
|
|
|
|
@expose(hide=True)
|
|
def secure_ip(self):
|
|
"""IP whitelisting"""
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Add OLS config to Git")
|
|
pargs = self.app.pargs
|
|
if not pargs.user_input:
|
|
ip = input("Enter the comma separated IP addresses "
|
|
"to white list [127.0.0.1]:")
|
|
pargs.user_input = ip
|
|
try:
|
|
user_ip = pargs.user_input.strip().split(',')
|
|
except Exception as e:
|
|
Log.debug(self, "{0}".format(e))
|
|
user_ip = ['127.0.0.1']
|
|
# Update OLS ACL configuration
|
|
acl_conf = '{0}/22222/vhconf.conf'.format(WOVar.wo_ols_vhost_dir)
|
|
if os.path.isfile(acl_conf):
|
|
for ip_addr in user_ip:
|
|
ip_addr = ip_addr.strip()
|
|
if not WOFileUtils.grepcheck(self, acl_conf, ip_addr):
|
|
WOFileUtils.searchreplace(
|
|
self, acl_conf,
|
|
'allowList',
|
|
'allowList\n {0}'.format(ip_addr))
|
|
WOGit.add(self, [WOVar.wo_ols_conf_dir],
|
|
msg="Adding changed secure ip into Git")
|
|
Log.info(self, "Successfully added IP address in access control")
|
|
|
|
@expose(hide=True)
|
|
def secure_lockdown(self):
|
|
"""Enable WP Fort Knox lockdown on a WordPress site"""
|
|
pargs = self.app.pargs
|
|
if not pargs.user_input:
|
|
site_name = input("Enter the site name to lockdown: ")
|
|
pargs.user_input = site_name
|
|
|
|
site_name = pargs.user_input
|
|
webroot = '{0}{1}'.format(WOVar.wo_webroot, site_name)
|
|
mu_plugins_dir = '{0}/htdocs/wp-content/mu-plugins'.format(webroot)
|
|
fort_knox_src = '/var/lib/wo/wp-fort-knox.php'
|
|
|
|
if not os.path.isdir(webroot):
|
|
Log.error(self, "Site {0} not found".format(site_name))
|
|
|
|
# Check if it's a WordPress site
|
|
if not os.path.isfile(
|
|
'{0}/htdocs/wp-config.php'.format(webroot)):
|
|
Log.error(self, "Site {0} is not a WordPress site"
|
|
.format(site_name))
|
|
|
|
# Check if Fort Knox source exists
|
|
if not os.path.isfile(fort_knox_src):
|
|
Log.error(self, "WP Fort Knox plugin not found at {0}. "
|
|
"Please reinstall WordOps.".format(fort_knox_src))
|
|
|
|
# Create mu-plugins directory if it doesn't exist
|
|
if not os.path.isdir(mu_plugins_dir):
|
|
WOFileUtils.mkdir(self, mu_plugins_dir)
|
|
|
|
fort_knox_dest = '{0}/wp-fort-knox.php'.format(mu_plugins_dir)
|
|
|
|
if os.path.isfile(fort_knox_dest):
|
|
Log.info(self, "WP Fort Knox is already enabled for {0}"
|
|
.format(site_name))
|
|
return
|
|
|
|
Log.wait(self, "Enabling WP Fort Knox lockdown")
|
|
shutil.copy2(fort_knox_src, fort_knox_dest)
|
|
WOFileUtils.chown(
|
|
self, fort_knox_dest,
|
|
WOVar.wo_php_user, WOVar.wo_php_user)
|
|
Log.valide(self, "Enabling WP Fort Knox lockdown")
|
|
Log.info(self, "WP Fort Knox enabled for {0}\n"
|
|
" File modifications and plugin management "
|
|
"are now disabled in wp-admin.\n"
|
|
" Use WP-CLI for all administrative tasks.\n"
|
|
" To disable: wo secure --unlock {0}"
|
|
.format(site_name))
|
|
|
|
@expose(hide=True)
|
|
def secure_unlock(self):
|
|
"""Disable WP Fort Knox lockdown on a WordPress site"""
|
|
pargs = self.app.pargs
|
|
if not pargs.user_input:
|
|
site_name = input("Enter the site name to unlock: ")
|
|
pargs.user_input = site_name
|
|
|
|
site_name = pargs.user_input
|
|
webroot = '{0}{1}'.format(WOVar.wo_webroot, site_name)
|
|
fort_knox_path = ('{0}/htdocs/wp-content/mu-plugins/'
|
|
'wp-fort-knox.php'.format(webroot))
|
|
|
|
if not os.path.isdir(webroot):
|
|
Log.error(self, "Site {0} not found".format(site_name))
|
|
|
|
if not os.path.isfile(fort_knox_path):
|
|
Log.info(self, "WP Fort Knox is not enabled for {0}"
|
|
.format(site_name))
|
|
return
|
|
|
|
Log.wait(self, "Disabling WP Fort Knox lockdown")
|
|
WOFileUtils.rm(self, fort_knox_path)
|
|
Log.valide(self, "Disabling WP Fort Knox lockdown")
|
|
Log.info(self, "WP Fort Knox disabled for {0}\n"
|
|
" Plugin management is now available in wp-admin."
|
|
.format(site_name))
|
|
|
|
@expose(hide=True)
|
|
def secure_ssh(self):
|
|
"""Harden ssh security"""
|
|
pargs = self.app.pargs
|
|
if not pargs.force and not pargs.allowpassword:
|
|
start_secure = input('Are you sure you to want to'
|
|
' harden SSH security ?'
|
|
'\nSSH login with password will not '
|
|
'be possible anymore. Please make sure '
|
|
'you are already using SSH Keys.\n'
|
|
'Harden SSH security [y/N]')
|
|
if start_secure != "Y" and start_secure != "y":
|
|
Log.error(self, "Not hardening SSH security")
|
|
if os.path.exists('/etc/ssh'):
|
|
WOGit.add(self, ["/etc/ssh"],
|
|
msg="Adding SSH into Git")
|
|
Log.debug(self, "check if /etc/ssh/sshd_config exist")
|
|
if os.path.isfile('/etc/ssh/sshd_config'):
|
|
Log.debug(self, "looking for the current ssh port")
|
|
for line in open('/etc/ssh/sshd_config', encoding='utf-8'):
|
|
if 'Port' in line:
|
|
ssh_line = line.strip()
|
|
break
|
|
port = (ssh_line).split(' ')
|
|
current_ssh_port = (port[1]).strip()
|
|
if os.getenv('SUDO_USER'):
|
|
sudo_user = os.getenv('SUDO_USER')
|
|
else:
|
|
sudo_user = ''
|
|
if pargs.allowpassword:
|
|
wo_allowpassword = 'yes'
|
|
else:
|
|
wo_allowpassword = 'no'
|
|
data = dict(sshport=current_ssh_port, allowpass=wo_allowpassword,
|
|
user=sudo_user)
|
|
WOTemplate.deploy(self, '/etc/ssh/sshd_config',
|
|
'sshd.mustache', data)
|
|
WOGit.add(self, ["/etc/ssh"],
|
|
msg="Adding changed SSH port into Git")
|
|
if not WOService.restart_service(self, 'ssh'):
|
|
Log.error(self, "service SSH restart failed.")
|
|
Log.info(self, "Successfully harden SSH security")
|
|
else:
|
|
Log.error(self, "SSH config file not found")
|
|
|
|
@expose(hide=True)
|
|
def secure_ssh_port(self):
|
|
"""Change SSH port"""
|
|
WOGit.add(self, ["/etc/ssh"],
|
|
msg="Adding changed SSH port into Git")
|
|
pargs = self.app.pargs
|
|
if pargs.user_input:
|
|
while ((not pargs.user_input.isdigit()) and
|
|
(not pargs.user_input < 65536)):
|
|
Log.info(self, "Please enter a valid port number ")
|
|
pargs.user_input = input("Server "
|
|
"SSH port [22]:")
|
|
if not pargs.user_input:
|
|
port = input("Server SSH port [22]:")
|
|
if port == "":
|
|
port = 22
|
|
while (not port.isdigit()) and (port != "") and (not port < 65536):
|
|
Log.info(self, "Please Enter valid port number :")
|
|
port = input("Server SSH port [22]:")
|
|
pargs.user_input = port
|
|
if WOFileUtils.grepcheck(self, '/etc/ssh/sshd_config', '#Port'):
|
|
WOShellExec.cmd_exec(self, "sed -i \"s/#Port.*/Port "
|
|
"{port}/\" /etc/ssh/sshd_config"
|
|
.format(port=pargs.user_input))
|
|
else:
|
|
WOShellExec.cmd_exec(self, "sed -i \"s/Port.*/Port "
|
|
"{port}/\" /etc/ssh/sshd_config"
|
|
.format(port=pargs.user_input))
|
|
# allow new ssh port if ufw is enabled
|
|
if os.path.isfile('/etc/ufw/ufw.conf'):
|
|
# add rule for proftpd with UFW
|
|
if WOFileUtils.grepcheck(
|
|
self, '/etc/ufw/ufw.conf', 'ENABLED=yes'):
|
|
try:
|
|
WOShellExec.cmd_exec(
|
|
self, 'ufw limit {0}'.format(pargs.user_input))
|
|
WOShellExec.cmd_exec(
|
|
self, 'ufw reload')
|
|
except Exception as e:
|
|
Log.debug(self, "{0}".format(e))
|
|
Log.error(self, "Unable to add UFW rule")
|
|
# add ssh into git
|
|
WOGit.add(self, ["/etc/ssh"],
|
|
msg="Adding changed SSH port into Git")
|
|
# restart ssh service
|
|
if not WOService.restart_service(self, 'ssh'):
|
|
Log.error(self, "service SSH restart failed.")
|
|
Log.info(self, "Successfully changed SSH port to {port}"
|
|
.format(port=pargs.user_input))
|
|
|
|
|
|
def load(app):
|
|
app.handler.register(WOSecureController)
|
|
app.hook.register('post_argument_parsing', wo_secure_hook)
|