# Copyright (c) 2007, 2020, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is also distributed with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, as # designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have included with MySQL. # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import re import time import getpass import threading import functools import random import string import os import sys import mforms from mforms import newBox, newLabel, newButton, newTextEntry, newTreeView, newTable, newRadioButton, newSelector, newPanel, newTabView, Utilities, newCheckBox, newImageBox, newScrollPanel, App from wb_admin_security_be import AdminSecurity, PrivilegeInfo, PrivilegeReverseDict, SecurityAdminRoles, WBSecurityValidationError from wb_common import PermissionDeniedError, to_unicode from workbench.log import log_error, log_debug3 import grt from workbench import db_utils from wb_admin_utils import weakcb, WbAdminTabBase, WbAdminValidationConnection, WbAdminValidationBase SCHEMA_OBJECT_RIGHTS = [ "Select_priv", "Insert_priv", "Update_priv", "Delete_priv", "Execute_priv", "Show_view_priv", ] SCHEMA_DDL_RIGHTS = [ "Create_priv", "Alter_priv", "References_priv", "Index_priv", "Create_view_priv", "Create_routine_priv", "Alter_routine_priv", "Event_priv", "Drop_priv", "Trigger_priv" ] SCHEMA_OTHER_RIGHTS = [ "Grant_priv", "Create_tmp_table_priv", "Lock_tables_priv", ] DEFAULT_AUTH_PLUGIN = "mysql_native_password" AUTHENTICATION_PLUGIN_TYPES = { "mysql_native_password" : {"name" : "Standard", "auth_string_label": "", "enable_host": True, "enable_password" : True, }, None : {"name" : "Standard", "auth_string_label": "", "enable_host": True, "enable_password" : True, }, "mysql_old_password" : {"name" : "Standard (old)", "auth_string_label": "", "enable_host": True, "enable_password" : True, }, "authentication_windows" : { "name": "Windows Native", "auth_string_label": "Supply the Windows username and/or group names that are allowed to use this account, separated by a comma (,)\nThis account will be usable by these users whenever they are logged into Windows, without additional passwords.\nExample: Administrator, %s" % (getpass.getuser() or "joe"), "enable_host": True, "enable_password" : False }, "authentication_pam" : {"name": "PAM", "auth_string_label": "Syntax: [,= [,= ...]]\nSee documentation for details.", "enable_host": True, "enable_password" : False }, "sha256_password" : {"name": "SHA256 Password", "auth_string_label": "", "enable_host": True, "enable_password" : True }, "*" : {"name": None, "auth_string_label": "See the plugin documentation for valid values and details.", "enable_host": True, "enable_password" : True } } def rLabel(text): l = newLabel(text) l.set_text_align(mforms.MiddleRight) return l def dLabel(text): l = newLabel(text) l.set_style(mforms.SmallHelpTextStyle) return l class ThreadedInputValidator(object): """This class validates the changes in the associated text entry widget displaying the result in a validation label Usage: Instantiate this class and set is callback method to be the callback of the associated text entry. """ def __init__(self, owner, text_entry, validation_label, ctrl_be, delay=1, colors=('#33aa33', '#aa3333')): self.owner = owner self.text_entry = text_entry self.validation_label = validation_label self.ctrl_be = ctrl_be self.delay = delay self.colors = colors self.last_keypress_time = None self.text_to_validate = None self.lock = threading.Lock() self.timer = None self.strength = ['Blank','Weak', 'Medium strength', 'Strong'] self.message = '%s password.' def update_validation_label(self, estimate): # Call this from the main thread color = self.colors[0 if estimate > 1 else 1] message = self.strength[0] if estimate == 0 else (self.message % self.strength[estimate]) self.validation_label.set_color(color) self.validation_label.set_text(message) self.owner.set_dirty() def callback(self): with self.lock: self.last_keypress_time = time.time() self.text_to_validate = self.text_entry.get_string_value() if not self.timer or not self.timer.is_alive(): self.timer = threading.Timer(self.delay, self.validate) self.timer.start() def shutdown(self): if self.timer: self.timer.cancel() def validate(self): def compute_password_strength(passwd): if passwd == '': return 0 if len(passwd) < 8: return 1 estimate = 1 if len(passwd) >= 12: estimate += 1 if re.search('\d+', passwd): estimate += 1 if re.search('[a-z]', passwd) and re.search('[A-Z]', passwd): estimate += 1 if re.search('(\W|_)+', passwd): estimate += 1 return 1 if estimate < 3 else ( 2 if estimate < 5 else 3 ) def query_password_strength(passwd): if passwd == '': return 0 result = self.ctrl_be.exec_query("SELECT VALIDATE_PASSWORD_STRENGTH('%s')" % db_utils.escape_sql_string(passwd)) if result and result.nextRow(): estimate = result.intByIndex(1) else: return compute_password_strength(passwd) return 1 if estimate < 50 else ( 2 if estimate < 75 else 3 ) query_strength = self.ctrl_be.is_sql_connected() and ('validate_password', 'VALIDATE PASSWORD') in self.ctrl_be.server_active_plugins strength_function = query_password_strength if query_strength else compute_password_strength while True: # If after processing the current text a new text to validate is present, then process it with self.lock: last_keypress_time = self.last_keypress_time text_to_validate = self.text_to_validate if time.time() - last_keypress_time >= self.delay: if text_to_validate is None: # delay have passed and there is no text to validate break # Leave this function thus stopping this thread with self.lock: self.text_to_validate = None estimate = strength_function(text_to_validate) mforms.Utilities.perform_from_main_thread(functools.partial(self.update_validation_label, estimate=estimate), False) time.sleep(0.2) mforms.Utilities.driver_shutdown() class AddSchemaPrivilegeForm(mforms.Form): def __init__(self, secman, user=""): mforms.Form.__init__(self, None, mforms.FormResizable | mforms.FormMinimizable) self.set_title("New Schema Privilege Definition") self.secman = secman box = newBox(False) box.set_padding(12) box.set_spacing(8) self.set_content(box) label = newLabel("Select the Schema for which the user '%s' will have the privileges you want to define." % user) box.add(label, False, True) panel = newPanel(mforms.TitledBoxPanel) panel.set_title("Schema") box.add(panel, True, True) table = newTable() panel.add(table) table.set_padding(8) table.set_row_count(3) table.set_column_count(3) table.set_row_spacing(8) table.set_column_spacing(4) self.schema1 = newRadioButton(mforms.RadioButton.new_id()) self.schema1.set_active(True) self.schema1.add_clicked_callback(self.schema_radio_changed) self.schema1.set_text("All Schema (%)") table.add(self.schema1, 0, 1, 0, 1, mforms.HFillFlag) table.add(dLabel("This rule will apply to any schema name."), 2, 3, 0, 1, mforms.VFillFlag|mforms.HFillFlag) self.schema2 = newRadioButton(self.schema1.group_id()) self.schema2.add_clicked_callback(self.schema_radio_changed) self.schema2.set_text("Schemas matching pattern:") table.add(self.schema2, 0, 1, 1, 2, mforms.HFillFlag) table.add(dLabel("This rule will apply to schemas that match the given name or pattern.\nYou may use _ and % as wildcards in a pattern.\nEscape these characters with \\ in case you want their literal value."), 2, 3, 1, 2, mforms.VFillFlag|mforms.HFillFlag) self.schema2entry = newTextEntry() table.add(self.schema2entry, 1, 2, 1, 2, mforms.HFillFlag|mforms.HExpandFlag) self.schema3 = newRadioButton(self.schema1.group_id()) self.schema3.add_clicked_callback(self.schema_radio_changed) self.schema3.set_text("Selected schema:") alignbox = mforms.newBox(False) alignbox.add(self.schema3, False, True) table.add(alignbox, 0, 1, 2, 3, mforms.VFillFlag|mforms.HFillFlag) label = dLabel("Select a specific schema name for the rule to apply to.") label.set_text_align(mforms.TopLeft) table.add(label, 2, 3, 2, 3, mforms.VFillFlag|mforms.HFillFlag) self.schema3sel = newSelector() table.add(self.schema3sel, 1, 2, 2, 3, mforms.HFillFlag|mforms.HExpandFlag) for item in self.secman.schema_names: self.schema3sel.add_item(item) bbox = newBox(True) box.add(bbox, False, True) bbox.set_spacing(8) self.ok = newButton() self.ok.set_text("OK") bbox.add_end(self.ok, False, True) self.cancel = newButton() self.cancel.set_text("Cancel") bbox.add_end(self.cancel, False, True) self.set_size(850, 500) self.schema_radio_changed() self.center() def schema_radio_changed(self): self.schema2entry.set_enabled(self.schema2.get_active()) self.schema3sel.set_enabled(self.schema3.get_active()) self.secman.async_refresh(self.update_schemas) def update_schemas(self): selected_schema = self.schema3.get_string_value() self.schema3sel.clear() for item in self.secman.schema_names: self.schema3sel.add_item(item) for index in range(0, self.schema3sel.get_item_count()): if self.schema3sel.get_item_title(index) == selected_schema: self.schema3sel.set_index(index) def run(self): if self.run_modal(self.ok, self.cancel): if self.schema1.get_active(): schema = "%" elif self.schema2.get_active(): schema = self.schema2entry.get_string_value() else: schema = db_utils.escape_schema_wildcards(self.schema3sel.get_string_value()) return schema return None ############################## class SecuritySchemaPrivileges(mforms.Box): def __init__(self, owner): mforms.Box.__init__(self, False) self.set_managed() self.set_release_on_add() self._owner = owner self.dirty = False self.suspend_layout() self.set_spacing(8) self.set_padding(8) self.schema_rights_checks = {} self.privs_list = newTreeView(mforms.TreeFlatList) self.privs_list.add_column(mforms.StringColumnType, "Schema", 150, True) self.privs_list.add_column(mforms.StringColumnType, "Privileges", 800, False) self.privs_list.end_columns() self.privs_list.add_changed_callback(self.schema_priv_selected) self.privs_list.set_min_size(-1, 100) self.add(self.privs_list, True, True) bbox = newBox(True) bbox.set_spacing(8) bbox.add(dLabel("Schema and Host fields may use % and _ wildcards.\nThe server will match specific entries before wildcarded ones."), False, True) self.add_entry_button = newButton() self.add_entry_button.set_text("Add Entry...") self.add_entry_button.set_name("Add Entry") bbox.add_end(self.add_entry_button, False, True) self.add_entry_button.add_clicked_callback(self.add_entry) self.del_entry_button = newButton() self.del_entry_button.set_text("Delete Entry") bbox.add_end(self.del_entry_button, False, True) self.del_entry_button.add_clicked_callback(self.del_entry) self.revoke_all_button = newButton() self.revoke_all_button.set_text("Revoke All Privileges") bbox.add_end(self.revoke_all_button, False, True) self.revoke_all_button.add_clicked_callback(self._owner.revoke_all) self.revoke_all_button.set_tooltip("Immediately remove all privileges from the account, from every object at all levels.\nThe account itself will be left untouched and logins will still be possible.") self.add(bbox, False, True) self.schema_priv_label = newLabel("") self.add(self.schema_priv_label, False, True) hbox = newBox(True) hbox.set_spacing(8) hbox.set_homogeneous(True) self.schema_object_privs_panel = panel = newPanel(mforms.TitledBoxPanel) panel.set_title("Object Rights") box = newBox(False) box.set_padding(8) self.object_rights_box = box panel.add(box) hbox.add(panel, True, True) self.schema_ddl_privs_panel = panel = newPanel(mforms.TitledBoxPanel) panel.set_title("DDL Rights") box = newBox(False) box.set_padding(8) self.ddl_rights_box = box panel.add(box) hbox.add(panel, True, True) self.schema_other_privs_panel = panel = newPanel(mforms.TitledBoxPanel) panel.set_title("Other Rights") box = newBox(False) box.set_padding(8) self.other_rights_box = box panel.add(box) hbox.add(panel, False, True) self.add(hbox, False, True) bottom_box = newBox(True) bottom_box.set_spacing(8) self.grant_all = newButton() self.grant_all.set_text('Select "ALL"') self.grant_all.set_name('Select All') bottom_box.add_end(self.grant_all, False, True) self.grant_all.add_clicked_callback(self.grant_all_schema_privs) self.revoke_all = newButton() self.revoke_all.set_text("Unselect All") bottom_box.add_end(self.revoke_all, False, True) self.revoke_all.add_clicked_callback(self.revoke_all_schema_privs) self.add(bottom_box, False, True) self.resume_layout() #### def update(self): if self.schema_rights_checks: return global_privilege_names = self._owner.owner.secman.global_privilege_names for name in SCHEMA_OBJECT_RIGHTS: if name not in global_privilege_names: continue cb = newCheckBox() label, desc = PrivilegeInfo.get(name, ("", None)) cb.set_text(label) if desc: cb.set_tooltip(desc) cb.add_clicked_callback(self.schema_priv_checked) self.object_rights_box.add(cb, False, False) self.schema_rights_checks[name] = cb for name in SCHEMA_DDL_RIGHTS: if name not in global_privilege_names: continue cb = newCheckBox() label, desc = PrivilegeInfo.get(name, ("", None)) cb.set_text(label) if desc: cb.set_tooltip(desc) cb.add_clicked_callback(self.schema_priv_checked) self.ddl_rights_box.add(cb, False, False) self.schema_rights_checks[name] = cb for name in SCHEMA_OTHER_RIGHTS: if name not in global_privilege_names: continue cb = newCheckBox() label, desc = PrivilegeInfo.get(name, ("", None)) cb.set_text(label) if desc: cb.set_tooltip(desc) cb.add_clicked_callback(self.schema_priv_checked) self.other_rights_box.add(cb, False, False) self.schema_rights_checks[name] = cb def schema_priv_checked(self): privs = [] for name, cb in self.schema_rights_checks.items(): if cb.get_active(): privs.append(name) sel = self.privs_list.get_selected_row() if sel >= 0: self._owner._selected_user.schema_privs.entries[sel].privileges = set(privs) plist = [PrivilegeInfo.get(p, " ")[0] for p in privs] plist.sort() self.privs_list.node_at_row(sel).set_string(1, ", ".join(plist) or "none") self._owner.set_dirty() def add_entry(self): addform = AddSchemaPrivilegeForm(self._owner.owner.secman, self._owner._selected_user.username) schema = addform.run() if schema is not None: entry = self._owner._selected_user.schema_privs.add_entry(schema, set()) self.refresh_entry_list(self._owner._selected_user.schema_privs) self.show_privileges(entry) self.privs_list.select_node(self.privs_list.node_at_row(len(self._owner._selected_user.schema_privs.entries)-1)) self.schema_priv_selected() self._owner.set_dirty() def del_entry(self): sel = self.privs_list.get_selected_node() if not sel: return self._owner._selected_user.schema_privs.del_entry(self.privs_list.row_for_node(sel)) sel.remove_from_parent() self.schema_priv_selected() self._owner.set_dirty() self.add_entry_button.set_enabled(True) self.del_entry_button.set_enabled(True) self._owner.refresh_button.set_enabled(True) def schema_priv_selected(self): sel = self.privs_list.get_selected_node() if not sel or not self._owner._selected_user: self.show_privileges(None) else: entry = self._owner._selected_user.schema_privs.entries[self.privs_list.row_for_node(sel)] self.show_privileges(entry) def grant_all_schema_privs(self): for name, cb in self.schema_rights_checks.items(): if name != "Grant_priv": cb.set_active(True) else: cb.set_active(False) self.schema_priv_checked() self._owner.set_dirty() def revoke_all_schema_privs(self): for cb in self.schema_rights_checks.values(): cb.set_active(False) self.schema_priv_checked() self._owner.set_dirty() def show_user(self, user): # make sure that the info is not outdated self.set_enabled(user != None) self._owner.unset_dirty() self.refresh_entry_list(user) if user is not None: self.add_entry_button.set_enabled(True) self.del_entry_button.set_enabled(False) self.show_privileges(None) def refresh_entry_list(self, user): self.privs_list.clear() for entry in user and user.entries or []: row = self.privs_list.add_node() row.set_string(0, entry.db) plist = [PrivilegeInfo.get(p, " ")[0] for p in entry.privileges] plist.sort() row.set_string(1, ", ".join(plist) or "none") def show_privileges(self, entry): self.schema_object_privs_panel.set_enabled(entry != None) self.schema_ddl_privs_panel.set_enabled(entry != None) self.schema_other_privs_panel.set_enabled(entry != None) if entry: db, privs = entry.db, entry.privileges text = "The user '%s'@'%s' " % (self._owner._selected_user.username, self._owner._selected_user.host) if '_' in db or '%' in db: if db == '%': text += "will have the following access rights to any schema:" else: text += "will have the following access rights to schemas matching '%s':" % db else: text += "will have the following access rights to the schema '%s':" % db self.schema_priv_label.set_text(text) self.del_entry_button.set_enabled(True) self.grant_all.set_enabled(True) self.revoke_all.set_enabled(True) for priv, check in self.schema_rights_checks.items(): check.set_active(priv in privs) else: self.schema_priv_label.set_text("") self.del_entry_button.set_enabled(False) self.grant_all.set_enabled(False) self.revoke_all.set_enabled(False) for priv, check in self.schema_rights_checks.items(): check.set_active(False) def refresh(self): self.show_user(None) class FirewallCommands: def __init__(self, owner): self.owner = owner self.ctrl_be = owner.ctrl_be class CommandType: simple_result = 0 simple_result_with_count = 1 multi_result = 2 def execute_command(self, command): affcted_record_count = 0 if not self.ctrl_be.sql_ping(): log_error("Executing firewall command. The connection is down.\n") Utilities.show_error("Execute firewall command", "The connection is down. Please check if you have connection to the server.", "Ok", "", "") return False, 0 result, affcted_record_count = self.ctrl_be.exec_sql(command) return result, affcted_record_count def execute_result_command(self, command): if not self.ctrl_be.sql_ping(): log_error("Executing firewall command. The connection is down.\n") Utilities.show_error("Execute firewall command", "The connection is down. Please check if you have connection to the server.", "Ok", "", "") return False result = self.ctrl_be.exec_query(command) if result == False: return False if result is None: log_error("Executing firewall command. There was no resultset from the command.\n") Utilities.show_error("Execute firewall command", "There was a problem executing a Firewall command.", "Ok", "", "") return False return result def execute_multiresult_command(self, command): if not self.ctrl_be.sql_ping(): log_error("Executing firewall command. The connection is down.\n") Utilities.show_error("Execute firewall command", "The connection is down. Please check if you have connection to the server.", "Ok", "", "") return False multi_result = self.ctrl_be.exec_query_multi_result(command) if multi_result == False: return False if not multi_result: log_error("Executing firewall command. There was no resultset from the command.\n") Utilities.show_error("Execute firewall command", "There was a problem executing a Firewall command.", "Ok", "", "") return False for result in multi_result: if not result: log_error("Executing firewall command with multiple resultsets. Resultset is 'None'.\n") Utilities.show_error("Execute firewall command", "Resultset is 'None'.", "Ok", "", "") return False return multi_result def get_user_rules(self, userhost): result = [] query_result = self.execute_result_command("SELECT RULE FROM mysql.firewall_whitelist WHERE USERHOST='%s'" % (userhost)) if not query_result: return result while query_result.nextRow(): result.append(to_unicode(query_result.stringByIndex(1))) return result def get_cached_user_rules(self, userhost): result = [] query_result = self.execute_result_command("SELECT RULE FROM information_schema.mysql_firewall_whitelist WHERE USERHOST='%s'" % (userhost)) if not query_result: return result while query_result.nextRow(): result.append(to_unicode(query_result.stringByIndex(1))) return result def get_rule_count(self, userhost): result = self.execute_result_command("SELECT COUNT(*) CNT FROM mysql.firewall_whitelist WHERE USERHOST='%s'" % (userhost)) if not result: return 0 result.nextRow() return result.stringByIndex(1) def get_cached_rule_count(self, userhost): result = self.execute_result_command("SELECT COUNT(*) CNT FROM information_schema.mysql_firewall_whitelist WHERE USERHOST='%s'" % (userhost)) result.nextRow() return result.stringByIndex(1) def reload_rules(self, userhost): self.execute_multiresult_command("CALL mysql.sp_reload_firewall_rules('%s')" % userhost) def delete_user_rule(self, userhost, rule): result, cnt = self.execute_command("DELETE FROM mysql.firewall_whitelist WHERE USERHOST='%s' AND RULE='%s'" % (userhost, db_utils.escape_sql_string(rule))) deleted_something = cnt > 0 if deleted_something: self.reload_rules(userhost) return deleted_something def add_user_rule(self, userhost, rule, normalized = False): if not normalized: firewall_rule = self.normalize_query(rule) else: firewall_rule = rule if firewall_rule: result = self.execute_command("INSERT INTO mysql.firewall_whitelist (USERHOST, RULE) VALUES ('%s', '%s')" % (userhost, db_utils.escape_sql_string(firewall_rule))) if result: self.reload_rules(userhost) return True log_error("Adding a firewall user rule failed to normalize the query. Probably, the inserted query does not translate to a firewall rule.\n") return False def normalize_query(self, query): query_result = self.execute_result_command("SELECT normalize_statement('%s')" % db_utils.escape_sql_string(query)) if not query_result: return False result = "" if query_result.nextRow(): data = query_result.stringByIndex(1) if data: result = to_unicode(data) if result == "": return False return result def get_user_mode(self, userhost): result = self.execute_result_command("SELECT mode FROM mysql.firewall_users WHERE userhost='%s'" % userhost) if not result: return False if result.nextRow(): return result.stringByName("mode") return "OFF" def set_user_mode(self, userhost, mode): multi_result = self.execute_multiresult_command("CALL mysql.sp_set_firewall_mode('%s', '%s')" % (userhost, mode)) if not multi_result: return False result = multi_result[len(multi_result) - 1] try: if result.nextRow(): if result.stringByIndex(1).startswith("ERROR"): log_error("Firewall: Failed to set user mode (user=%s, mode=%s): %s\n" % (userhost, mode, to_unicode(result.stringByIndex(1)))) return False log_debug3("Firewall: Returning 'True' due to lack of 'ERROR' records\n") return True except SystemError as err: # sp_set_firewall_mode return resultset only on error, but nextRow will throw exception if result is missing resultset. if self.owner.ctrl_be.target_version and self.owner.ctrl_be.target_version.is_supported_mysql_version_at_least(5, 6, 27): log_debug3("Firewall: Returning 'True' due to server ('%s') not responding with the 'OK' record any more\n" % self.owner.ctrl_be.target_version) return True import traceback log_error("Exception while setting firewall user mode: Expecting 'OK' record for this version ('%s')\n" % str(self.owner.ctrl_be.target_version)) except Exception as exc: import traceback log_error("Exception while setting firewall user mode: %s\n" % (traceback.format_exc())) log_error("Firewall: Failed to set user mode (user=%s, mode=%s): Unknow error\n" % (userhost, mode)) return False def reset_user(self, userhost): return self.set_user_mode(userhost, 'RESET') def is_enabled(self): resultset = self.owner.ctrl_be.exec_query("SHOW VARIABLES LIKE 'mysql_firewall_mode'") resultset.nextRow() result = resultset.unicodeByName("Value") return result == "ON" class FirewallUserInterfaceBase(mforms.Box): def __init__(self, owner): mforms.Box.__init__(self, False) self.rule_count = 0 self.owner = owner self.ctrl_be = owner.owner.ctrl_be def show_user(self, user, host, new_user): return def update_rules(self, user, host): return def get_rule_count(self): return self.rule_count def refresh_users(self, users): return def change_state(self): return def refresh_row(self, current_row, user, host): return def change_layout(self): return def tweak_user_list(self): return def tweak_tabs(self, tabView): return def save(self): return class FirewallUserInterfaceDummy(FirewallUserInterfaceBase): def __init__(self, owner): FirewallUserInterfaceBase.__init__(self, owner) return class FirewallAddRuleDialog(mforms.Form): def __init__(self, owner): mforms.Form.__init__(self, None, mforms.FormResizable | mforms.FormMinimizable) self.set_title("Add new rule or SQL statement") self.set_size(400, 300) self.center() self.content = mforms.newBox(False) self.set_content(self.content) self.content.add(mforms.newLabel("\nType the rule you want to add for this user.\n"), False, True) self.query_box = mforms.newTextBox(mforms.SmallScrollBars) self.query_box.set_size(400, 200) self.query_box.add_changed_callback(self.text_changed) self.content.add(self.query_box, True, True) self.normalized = False self.normalized_checkbox = mforms.newCheckBox(False) self.normalized_checkbox.set_text("This is a normalized rule") self.normalized_checkbox.add_clicked_callback(self.normalized_clicked) self.content.add(self.normalized_checkbox, True, True) button_box = mforms.newBox(True) self.content.add(button_box, False, True) button_box.set_spacing(8) self.ok_button = mforms.newButton() self.ok_button.set_text("OK") self.ok_button.set_enabled(False) self.ok_button.add_clicked_callback(self.ok_button_pressed) button_box.add_end(self.ok_button, False, True) self.cancel_button = mforms.newButton() self.cancel_button.set_text("Cancel") self.cancel_button.add_clicked_callback(self.cancel_button_pressed) button_box.add_end(self.cancel_button, False, True) def text_changed(self): if len(self.query_box.get_string_value()) > 0: self.ok_button.set_enabled(True) else: self.ok_button.set_enabled(False) def normalized_clicked(self): self.normalized = self.normalized_checkbox.get_active() def ok_button_pressed(self): self.end_modal(True) def cancel_button_pressed(self): self.end_modal(False) def run(self): return self.run_modal(None, None) class FirewallUserInterface(FirewallUserInterfaceBase): def __init__(self, owner): FirewallUserInterfaceBase.__init__(self, owner) self.set_managed() self.set_release_on_add() self.set_spacing(8) self.set_padding(8) self.commands = FirewallCommands(self) self.new_user = False # Firewall rules panel firewall_rules_main_box = mforms.newBox(False) firewall_rules_main_box.set_padding(12) firewall_rules_main_box.set_spacing(8) self.note = mforms.newLabel("") self.build_note() firewall_rules_main_box.add(self.note, False, True) info_box = mforms.newBox(True) info_label = mforms.newLabel("Manage the rules for the current user. Changing the mode to RECORDING will start collecting the SQL commands used by your application.\nWhen all the necessary rules were collected, you should set the mode to PROTECTING. You can then fine-tune the set of rules by adding or deleting them.") info_box.add(info_label, True, True) self.add(info_box, False, True) state_box = mforms.newBox(True) self.state = mforms.newSelector() self.state.add_item("OFF") self.state.add_item("PROTECTING") self.state.add_item("RECORDING") self.state.add_item("DETECTING") self.state.set_size(120, -1) self.state.add_changed_callback(self.change_state) state_box.add(mforms.newLabel("Mode:"), False, True) state_box.add(self.state, False, True) firewall_rules_main_box.add(state_box, False, True) # White list section white_list_box = mforms.newBox(True) white_list_box.set_spacing(8) self.white_list = mforms.newListBox(True) self.white_list.set_size(500, -1) white_list_button_box = mforms.newBox(False) self.white_list_add_button = mforms.newButton() self.white_list_add_button.set_text("Add") self.white_list_add_button.add_clicked_callback(self.add_button_click) self.white_list_add_from_file_button = mforms.newButton() self.white_list_add_from_file_button.set_text("Add From File") self.white_list_add_from_file_button.add_clicked_callback(self.add_from_file_button_click) self.white_list_save_to_file_button = mforms.newButton() self.white_list_save_to_file_button.set_text("Save To File") self.white_list_save_to_file_button.add_clicked_callback(self.save_to_file_button_click) self.white_list_delete_button = mforms.newButton() self.white_list_delete_button.set_text("Delete") self.white_list_delete_button.add_clicked_callback(self.delete_button_click) self.white_list_clear_button = mforms.newButton() self.white_list_clear_button.set_text("Reset") self.white_list_clear_button.add_clicked_callback(self.reset_button_click) white_list_button_box.add(self.white_list_add_button, False, True) white_list_button_box.add(self.white_list_delete_button, False, True) white_list_button_box.add(self.white_list_add_from_file_button, False, True) white_list_button_box.add(self.white_list_save_to_file_button, False, True) white_list_button_box.add(self.white_list_clear_button, False, True) white_list_box.add(self.white_list, True, True) white_list_box.add(white_list_button_box, False, True) self.available_rules_label = mforms.newLabel("Active rules:") firewall_rules_main_box.add(self.available_rules_label, False, True) firewall_rules_main_box.add(white_list_box, True, True) cache_list_box = mforms.newBox(True) self.cache_list = mforms.newListBox(True) self.cache_list.set_size(500, 150) cache_list_box.add(self.cache_list, True, True) self.cached_rules_label = mforms.newLabel("Rules being recorded:") firewall_rules_main_box.add(self.cached_rules_label, False, True) firewall_rules_main_box.add(cache_list_box, True, True) self.add(firewall_rules_main_box, True, True) def build_note(self): text = "" self.note.show(False) if not self.commands.is_enabled(): text = "The firewall is currently disabled. You can still manage user rules and modes, but changes will not have any effect until the firewall is enabled again." self.note.show(True) if self.new_user: text = "The user is not created yet. Please finish creating the user to makes changes here." self.note.show(True) text = "%s\n\n" % text self.note.set_text(text) def refresh_row(self, current_row, user, host): userhost = "%s@%s" % (db_utils.escape_sql_string(user), host) if self.commands.is_enabled(): current_row.set_string(2, str(self.commands.get_user_mode(userhost))) current_row.set_string(3, str(self.commands.get_rule_count(userhost))) current_row.set_string(4, str(self.commands.get_cached_rule_count(userhost))) else: current_row.set_string(2, 'disabled') current_row.set_string(3, 'disabled') current_row.set_string(4, 'disabled') def show_user(self, user, host, new_user): self.current_user = user self.current_host = host self.current_userhost = "%s@%s" % (db_utils.escape_sql_string(user), host) self.new_user = new_user self.set_enabled(not new_user) self.update_rules() result = 'disabled' if self.commands.is_enabled(): self.state.set_value(self.commands.get_user_mode(self.current_userhost)) return result def update_rules(self): self.white_list.clear() self.cache_list.clear() if not self.commands.is_enabled(): return self.available_rules_label.set_text("Active rules (%s) - These are the rules used in PROTECTED mode for this user" % str(self.commands.get_rule_count(self.current_userhost))) for rule in self.commands.get_user_rules(self.current_userhost): self.white_list.add_item(rule) self.cached_rules_label.set_text("Rules being recorded (%s) - These are the rules gathered while in RECORDING mode" % str(self.commands.get_cached_rule_count(self.current_userhost))) for rule in self.commands.get_cached_user_rules(self.current_userhost): self.cache_list.add_item(rule) self.build_note() def tweak_user_list(self): self.owner.user_list.add_column(mforms.StringColumnType, "FW State", 80, False) self.owner.user_list.add_column(mforms.StringColumnType, "# FW Rules", 80, False) self.owner.user_list.add_column(mforms.StringColumnType, "# FW Recorded", 80, False) def tweak_tabs(self, tabView): tabView.add_page(self, "Firewall Rules") def refresh_users(self, users): return #for user, host in users: #self.users.add_item(user) def change_state(self): self.owner.set_dirty() def add_button_click(self): dialog = FirewallAddRuleDialog(self) if dialog.run(): rule = dialog.query_box.get_string_value() result = self.commands.add_user_rule(self.current_userhost, rule, dialog.normalized) if not result: Utilities.show_error("Add user rule", "Add a new rule for this user failed to be inserted. Please check the log for more information.", "OK", "", "") self.owner.refresh() return def add_from_file_button_click(self): dialog = mforms.FileChooser(mforms.OpenFile) dialog.set_title("Load firewall rules") dialog.set_extensions("Firewall Rules (*.fwr)|*.fwr|SQL Statements (*.sql)|*.sql", ".fwr") if dialog.run_modal(): path = dialog.get_path() is_rules_file = True if path.endswith(".fwr") else False with open(path) as f: content = [x.strip('\n') for x in f.readlines()] if not self.commands.set_user_mode(self.current_userhost, "OFF"): return added_rules = [] for rule in content: if not self.commands.add_user_rule(self.current_userhost, rule, is_rules_file): break added_rules.append(rule) self.commands.set_user_mode(self.current_userhost, "RECORDING") self.white_list.add_items(added_rules) def save_to_file_button_click(self): dialog = mforms.FileChooser(mforms.SaveFile) dialog.set_title("Save firewall rules") dialog.set_extensions("Firewall Rules (*.fwr)|*.fwr", ".fwr") if dialog.run_modal(): f = open(dialog.get_path(), "w") for index in range(0, self.white_list.get_count()): rule = self.white_list.get_string_value_from_index(index) f.write("%s\n" % rule) f.close() def delete_button_click(self): indexes = self.white_list.get_selected_indices() deleted_indexes = [] for index in indexes: rule = self.white_list.get_string_value_from_index(index) result = self.commands.delete_user_rule(self.current_userhost, rule) if result: deleted_indexes.append(index) self.white_list.remove_indexes(deleted_indexes) def reset_button_click(self): result = Utilities.show_warning("Reset user rules", "Reseting the user rules will delete all rules that were collected", "Yes", "No", "") if result: if self.commands.reset_user(self.current_userhost): self.owner.refresh() def save(self): if not self.commands.set_user_mode(self.current_userhost, self.state.get_string_value()): Utilities.show_error("Setting user mode", "There was a problem setting the user mode. Please check the log for more details.", "OK", "", "") ############################# class SecurityAccount(mforms.Box): def __init__(self, owner): mforms.Box.__init__(self, False) self.set_managed() self.set_release_on_add() self.set_name("Security Account") self.owner = owner self.dirty = False self._selected_user = None self._selected_user_original = None self.suspend_layout() self.set_spacing(8) if 'mysql_firewall_mode' in list(self.owner.ctrl_be.server_variables.keys()): self.firewall_rules = FirewallUserInterface(self) else: self.firewall_rules = FirewallUserInterfaceDummy(self) self.owner.ctrl_be.add_me_for_event('firewall_installed', self) self.owner.ctrl_be.add_me_for_event('firewall_removed', self) self.splitter = mforms.newSplitter(True) bottom_box = newBox(True) bottom_box.set_spacing(8) self.add_button = newButton() self.add_button.set_text("Add Account") bottom_box.add(self.add_button, False, True) self.add_button.add_clicked_callback(self.add_account) self.del_button = newButton() self.del_button.set_text("Delete") bottom_box.add(self.del_button, False, True) self.del_button.add_clicked_callback(self.del_account) self.refresh_button = newButton() self.refresh_button.set_text("Refresh") self.refresh_button.add_clicked_callback(self.owner.page_active) bottom_box.add(self.refresh_button, False, True) self.save_button = newButton() self.save_button.set_text("Apply") bottom_box.add_end(self.save_button, False, True) self.save_button.add_clicked_callback(self.commit) self.revert_button = newButton() self.revert_button.set_text("Revert") bottom_box.add_end(self.revert_button, False, True) self.revert_button.add_clicked_callback(self.revert) self.owner.set_footer(bottom_box) self.account_list_box = newBox(False) self.account_list_box.set_spacing(8) self.account_list_box.set_size(220, -1) label = newLabel("User Accounts") self.account_list_box.add(label, False, True) self.user_list = None self.create_user_list() self.splitter.add(self.account_list_box, 200, True) # Right part self.content_box = abox = newBox(False) abox.set_spacing(8) self.account_label = newLabel("Select an account to edit or click Add Account to create a new one") self.account_label.set_style(mforms.BoldStyle) abox.add(self.account_label, False, True) self.tabView = newTabView(False) self.inner_tabview = self.tabView # Login Tab vbox = newBox(False) vbox.set_spacing(12) vbox.set_padding(12) table = newTable() vbox.add(table, False, True) table.set_row_count(9) table.set_column_count(3) table.set_row_spacing(8) table.set_column_spacing(8) self.username = newTextEntry() self.username.set_size(180, -1) self.username.add_changed_callback(self.set_dirty) if self.owner.ctrl_be.target_version and self.owner.ctrl_be.target_version.is_supported_mysql_version_at_least(5, 7, 8): self.username.set_max_length(32) # max username length for mysql since 5.7.8 else: self.username.set_max_length(16) # max username length for mysql before 5.7.8 self.password = newTextEntry(mforms.PasswordEntry) self.password.set_size(180, -1) self.password_advice = 'Consider using a password with 8 or more characters with\nmixed case letters, numbers and punctuation marks.' self.password_label = dLabel(self.password_advice) self.password_validator = ThreadedInputValidator(self, self.password, self.password_label, self.owner.ctrl_be) self.password.add_changed_callback(self.password_validator.callback) self.confirm = newTextEntry(mforms.PasswordEntry) self.confirm.set_size(180, -1) self.confirm.add_changed_callback(self.set_dirty) self.hostlimithost = newTextEntry() self.hostlimithost.set_size(180, -1) self.hostlimithost.add_changed_callback(self.hostlimithost_changed) table.add(rLabel("Login Name:"), 0, 1, 0, 1, mforms.HFillFlag) table.add(self.username, 1, 2, 0, 1, mforms.HFillFlag) table.add(dLabel("You may create multiple accounts with the same name\nto connect from different hosts."), 2, 3, 0, 1, mforms.HFillFlag|mforms.HExpandFlag) self.has_extra_plugins = len([p for p in self.active_plugins]) > 0 if self.has_extra_plugins: self.auth_type_label = rLabel("Authentication Type:") table.add(self.auth_type_label, 0, 1, 1, 2, mforms.HFillFlag) self.auth_type_sel = newSelector() self.auth_type_sel.set_size(180, -1) table.add(self.auth_type_sel, 1, 2, 1, 2, mforms.HFillFlag) table.add(dLabel("\nFor the standard password and/or host based authentication,\nselect 'Standard'.\n"), 2, 3, 1, 2, mforms.HFillFlag) self.auth_type_sel.add_changed_callback(self.auth_type_changed) self.auth_type_list = [] for plugin in self.active_plugins: if plugin in AUTHENTICATION_PLUGIN_TYPES: self.auth_type_sel.add_item(AUTHENTICATION_PLUGIN_TYPES[plugin]["name"]) else: self.auth_type_sel.add_item(plugin) self.auth_type_list.append(plugin) else: self.auth_type_list = None self.auth_string_param = None self.hostlimithost_caption = rLabel("Limit to Hosts Matching:") table.add(self.hostlimithost_caption, 0, 1, 2, 3, mforms.HFillFlag) table.add(self.hostlimithost, 1, 2, 2, 3, mforms.HFillFlag) self.hostlimit_box = newBox(True) self.hostlimithost_valid_icon = newImageBox() self.hostlimit_box.add(self.hostlimithost_valid_icon, False, False) self.password_caption2 = dLabel("% and _ wildcards may be used") self.hostlimit_box.add(self.password_caption2, True, True) table.add(self.hostlimit_box, 2, 3, 2, 3, mforms.HFillFlag | mforms.HExpandFlag | mforms.VFillFlag) self.password_caption = rLabel("Password:") table.add(self.password_caption, 0, 1, 3, 4, mforms.HFillFlag) table.add(self.password, 1, 2, 3, 4, mforms.HFillFlag) self.password_caption2 = dLabel("Type a password to reset it.") table.add(self.password_caption2, 2, 3, 3, 4, mforms.HFillFlag|mforms.HExpandFlag) table.add(self.password_label, 1, 3, 4, 5, mforms.HFillFlag) self.confirm_caption = rLabel("Confirm Password:") table.add(self.confirm_caption, 0, 1, 5, 6, mforms.HFillFlag) table.add(self.confirm, 1, 2, 5, 6, mforms.HFillFlag) self.confirm_caption2 = dLabel("Enter password again to confirm.") table.add(self.confirm_caption2, 2, 3, 5, 6, mforms.HFillFlag|mforms.HExpandFlag) if self.owner.ctrl_be.target_version and self.owner.ctrl_be.target_version.is_supported_mysql_version_at_least(5, 6, 7): self.expire_button = newButton() self.expire_button.set_text("Expire Password") self.expire_button.set_tooltip("Force user to change password after next login. The user will be unable to issue any command other than SET PASSWORD.") #bottom_box.add_end(self.expire_button, False, True) table.add(self.expire_button, 1, 2, 6, 7, mforms.HFillFlag) self.expire_button.add_clicked_callback(self.expire) else: self.expire_button = None table.add(newLabel(""), 1, 2, 7, 8, mforms.HFillFlag) if self.has_extra_plugins: self.auth_string_param = newTextEntry() self.auth_string_param.add_changed_callback(self.set_dirty) self.auth_string_label = rLabel("Authentication String:") self.auth_string_desc = dLabel("Authentication plugin specific parameters.") table.add(self.auth_string_label, 0, 1, 8, 9, mforms.HFillFlag) table.add(self.auth_string_param, 1, 2, 8, 9, mforms.HFillFlag) table.add(self.auth_string_desc, 2, 3, 8, 9, mforms.HFillFlag|mforms.HExpandFlag) self.auth_string_help = newLabel("") vbox.add(self.auth_string_help, False, True) self.bottom_message_hbox = newBox(True) self.bottom_message_hbox.set_spacing(8) self.bottom_message_caption = newLabel('') self.bottom_message_caption.set_color('#aa3333') self.upgrade_account_button = newButton() self.upgrade_account_button.set_text("Upgrade") self.upgrade_account_button.set_tooltip("Upgrade the authentication type of the account, from the obsolete format to the newer, safer one. You must set the password for the account before clicking this button.") self.upgrade_account_button.add_clicked_callback(self.upgrade_account) self.bottom_message_hbox.add(self.bottom_message_caption, True, True) self.bottom_message_hbox.add(self.upgrade_account_button, False, True) vbox.add_end(self.bottom_message_hbox, False) self.bottom_message_hbox.show(False) self.tabView.add_page(vbox, "Login") # Account Limits Tab table = newTable() table.set_padding(12) table.set_column_spacing(8) table.set_row_spacing(8) table.set_row_count(4) table.set_column_count(3) table.add(rLabel("Max. Queries:"), 0, 1, 0, 1, mforms.HFillFlag) self.max_questions = newTextEntry() self.max_questions.set_size(60, -1) self.max_questions.add_changed_callback(self.set_dirty) table.add(self.max_questions, 1, 2, 0, 1, mforms.HFillFlag) table.add(dLabel("Number of queries the account can execute within one hour."), 2, 3, 0, 1, mforms.HFillFlag|mforms.HExpandFlag) table.add(rLabel("Max. Updates:"), 0, 1, 1, 2, mforms.HFillFlag) self.max_updates = newTextEntry() self.max_updates.set_size(60, -1) self.max_updates.add_changed_callback(self.set_dirty) table.add(self.max_updates, 1, 2, 1, 2, mforms.HFillFlag) table.add(dLabel("Number of updates the account can execute within one hour."), 2, 3, 1, 2, mforms.HFillFlag|mforms.HExpandFlag) table.add(rLabel("Max. Connections:"), 0, 1, 2, 3, mforms.HFillFlag) self.max_connections = newTextEntry() self.max_connections.set_size(60, -1) self.max_connections.add_changed_callback(self.set_dirty) table.add(self.max_connections, 1, 2, 2, 3, mforms.HFillFlag) table.add(dLabel("The number of times the account can connect to the server per hour."), 2, 3, 2, 3, mforms.HFillFlag|mforms.HExpandFlag) table.add(rLabel("Concurrent Connections:"), 0, 1, 3, 4, mforms.HFillFlag) self.max_uconnections = newTextEntry() self.max_uconnections.set_size(60, -1) self.max_uconnections.add_changed_callback(self.set_dirty) table.add(self.max_uconnections, 1, 2, 3, 4, mforms.HFillFlag) table.add(dLabel("The number of simultaneous connections to the server the account can have."), 2, 3, 3, 4, mforms.HFillFlag|mforms.HExpandFlag) self.tabView.add_page(table, "Account Limits") # Administrative Roles Tab box = newBox(False) lbox = newBox(True) box.add(lbox, True, True) lbox.set_spacing(12) lbox.set_padding(12) self.revoke_all_button = newButton() self.revoke_all_button.set_text("Revoke All Privileges") box.add(self.revoke_all_button, False, True) self.revoke_all_button.add_clicked_callback(self.revoke_all) self.revoke_all_button.set_tooltip("Immediately remove all privileges from the account, from every object at all levels.\nThe account itself will be left untouched and logins will still be possible.") self.role_list = newTreeView(mforms.TreeFlatList) self.role_list.add_column(mforms.CheckColumnType, "", 30, True) self.role_list.add_column(mforms.StringColumnType, "Role", 150, False) self.role_list.add_column(mforms.StringColumnType, "Description", 300, False) self.role_list.end_columns() lbox.add(self.role_list, True, True) self.role_list.set_cell_edited_callback(self.role_list_toggled) self.role_priv_list = newTreeView(mforms.TreeFlatList) self.role_priv_list.add_column(mforms.CheckColumnType, '', 30, True) self.role_priv_list.add_column(mforms.StringColumnType, "Global Privileges", 180, False) self.role_priv_list.end_columns() self.role_priv_list.set_size(220, -1) self.role_priv_list.set_cell_edited_callback(self.role_priv_list_toggled) lbox.add(self.role_priv_list, False, True) self.tabView.add_page(box, "Administrative Roles") self.schema_privs = SecuritySchemaPrivileges(self) self.tabView.add_page(self.schema_privs, "Schema Privileges") self.firewall_rules.tweak_tabs(self.tabView) abox.add(self.tabView, True, True) if sys.platform.lower() == "darwin": # No scrollbox on macOS as this is not needed and breaks selection. self.splitter.add(abox, 200) else: scroller = newScrollPanel(0) scroller.add(abox) self.splitter.add(scroller, 200) self.add(self.splitter, True, True) self.resume_layout() mforms.Utilities.add_timeout(0.1, lambda self=self: self.splitter.set_divider_position(240)) self.current_action = "" self.user_selected() self.relayout() def create_user_list(self): if self.user_list: self.account_list_box.remove(self.user_list) self.user_list = None self.user_list = newTreeView(mforms.TreeFlatList) self.user_list.add_column(mforms.StringColumnType, "User", 120, False) self.user_list.add_column(mforms.StringColumnType, "From Host", 100, False) self.firewall_rules.tweak_user_list() self.user_list.end_columns() self.user_list.add_changed_callback(self.user_selected) self.user_list.set_allow_sorting(True) self.account_list_box.add(self.user_list, True, True) def firewall_installed_event(self): if isinstance(self.firewall_rules, FirewallUserInterfaceDummy): self.firewall_rules = FirewallUserInterface(self) self.create_user_list() self.firewall_rules.tweak_tabs(self.tabView) self.refresh() def firewall_removed_event(self): if isinstance(self.firewall_rules, FirewallUserInterface): view = self.firewall_rules self.firewall_rules = FirewallUserInterfaceDummy(self) self.tabView.remove_page(view) self.create_user_list() self.refresh() def shutdown(self): self.password_validator.shutdown() @property def active_plugins(self): return [name for name, ptype in self.owner.ctrl_be.server_active_plugins if ptype == "AUTHENTICATION"] def selected_plugin_type(self): if self.auth_type_list: return self.auth_type_list[max(self.auth_type_sel.get_selected_index(), 0)] return None def update_enable_state_for_auth_type(self, auth_type, is_new_user): info = AUTHENTICATION_PLUGIN_TYPES.get(auth_type) if not info: info = AUTHENTICATION_PLUGIN_TYPES["*"] if info: self.password_caption.set_enabled(bool(info["enable_password"])) self.password_caption2.set_enabled(bool(info["enable_password"])) self.password_label.set_enabled(bool(info["enable_password"])) self.password.set_enabled(bool(info["enable_password"])) self.confirm.set_enabled(bool(info["enable_password"])) self.confirm_caption.set_enabled(bool(info["enable_password"])) self.confirm_caption2.set_enabled(bool(info["enable_password"])) self.hostlimithost.set_enabled(bool(info["enable_host"])) self.hostlimithost_caption.set_enabled(bool(info["enable_host"])) if self.auth_string_param: if info["auth_string_label"]: self.auth_string_help.show(True) self.auth_string_help.set_text(info["auth_string_label"]) self.auth_string_label.show(True) self.auth_string_param.show(True) self.auth_string_desc.show(True) if is_new_user: self.auth_string_label.set_enabled(True) self.auth_string_param.set_enabled(True) else: self.auth_string_label.set_enabled(False) self.auth_string_param.set_enabled(False) else: self.auth_string_help.show(False) self.auth_string_label.show(False) self.auth_string_param.show(False) self.auth_string_desc.show(False) def set_hostlimithost(self, value): self.hostlimithost.set_value(value) self.validate_hostlimithost() def hostlimithost_changed(self): self.validate_hostlimithost() self.set_dirty() def auth_type_changed(self): if not self.has_extra_plugins: return auth_type = self.selected_plugin_type() # this method is only called for new users (because auth-type can't be changed for existing users) self.update_enable_state_for_auth_type(auth_type, True) info = AUTHENTICATION_PLUGIN_TYPES.get(auth_type) if not info: info = AUTHENTICATION_PLUGIN_TYPES["*"] if info: if not info["enable_password"]: self.password.set_value("") self.confirm.set_value("") if not info["enable_host"]: self.set_hostlimithost("%") self.set_dirty() def refresh_role_list(self): if self._selected_user: self.role_list.clear() roles = self._selected_user.admin_roles TheRoles = SecurityAdminRoles if not self._selected_user.is_custom_role_needed else SecurityAdminRoles + [ ('Custom', 'custom role', []) ] for name, desc, privs in TheRoles: row = self.role_list.add_node() row.set_bool(0, name in roles) row.set_string(1, name) row.set_string(2, desc) def role_priv_list_toggled(self, node, col, value): if self._selected_user: node.set_bool(col, value == '1') priv = node.get_string(1) self._selected_user.toggle_priv(PrivilegeReverseDict[priv], value == '1') self.refresh_role_list() self.set_dirty() def role_list_toggled(self, node, col, value): if self._selected_user: node.set_int(col, int(value)) role = node.get_string(1) self._selected_user.toggle_role(role, value == "1") self.refresh_role_list() self.set_dirty() self.refresh_priv_list() def host_limit_clicked(self): self.hostlimithost.set_enabled(self.hostlimit.get_active()) self.set_dirty() def user_selected(self): sel = self.user_list.get_selected_node() user = "" host = "" #if self.current_action == "delete account": if self.current_action == "commit account": if not self._selected_user: return user = self._selected_user.username host = self._selected_user.host elif sel: user, host = eval(sel.get_tag()) self._selected_user = None self._selected_user_original = None self.show_user(None) #self.dup_button.set_enabled(False) self.del_button.set_enabled(False) if sel: self.account_label.set_text("Details for account %s@%s" % (user, host)) if self.owner.secman.is_zombie(user, host): self.show_zombie_user(user, host) else: try: self.owner.secman.async_get_account(self.show_user, user, host) except Exception as e: if str(e).startswith("Could not load account information for"): log_debug3("Unable to load the account information for %s@%s. Probably the user was not created in the server yet and revert was pressed." % (user, host)) return else: self.account_label.set_text("Select an account to edit or click [Add Account] to create a new one") def show_zombie_user(self, user, host): self.content_box.set_enabled(True) self.inner_tabview.show(False) if self.expire_button: self.expire_button.set_enabled(False) #self.dup_button.set_enabled(False) self.del_button.set_enabled(True) privs = self.owner.secman.get_zombie_privs(user, host) self.account_label.set_text("Account %s@%s does not exist but it still has privileges defined for the following objects:\n %s\n\nClick the [Delete] button to completely remove the account." % (user, host, "\n ".join(privs))) def show_user(self, user): self.inner_tabview.show(True) sel = self.user_list.get_selected_node() if not sel and user: return if user and to_unicode(eval(sel.get_tag())[0]) != user.username: return self.content_box.set_enabled(user != None) self.revoke_all_button.set_enabled(bool(user != None and user.is_commited)) self.unset_dirty() self._selected_user = user self._selected_user_original = user and user.snapshot_for_revert() if user: #self.dup_button.set_enabled(True) self.del_button.set_enabled(True) if self.expire_button: self.expire_button.set_enabled(bool(user.is_commited and not user.password_expired)) self.username.set_value(user.username) self.password.set_value(user.password or "") self.confirm.set_value(user.password or "") self.set_hostlimithost(user.host) if self.auth_type_list: try: index = self.auth_type_list.index(user.auth_plugin or DEFAULT_AUTH_PLUGIN) except ValueError: mforms.Utilities.show_warning("Invalid Authentication Plugin", "User %s has plugin type %s, which is not listed as a known authentication plugin by the server." % (user.formatted_name(), user.auth_plugin), "OK", "", "") return self.auth_type_sel.set_selected(index) # changing auth type not supported atm self.auth_type_sel.set_enabled(not user.is_commited) self.auth_string_param.set_value(user.auth_string or "") self.update_enable_state_for_auth_type(user.auth_plugin or None, not user.is_commited) self.max_questions.set_value(str(user.max_questions)) self.max_updates.set_value(str(user.max_updates)) self.max_connections.set_value(str(user.max_connections)) self.max_uconnections.set_value(str(user.max_user_connections)) self.refresh_priv_list() self.refresh_role_list() self.schema_privs.show_user(user.schema_privs) self.firewall_rules.show_user(user.username, user.host, False if self._selected_user and self._selected_user.is_commited else True) else: self.username.set_value("") self.password.set_value("") self.confirm.set_value("") if self.expire_button: self.expire_button.set_enabled(False) self.set_hostlimithost("") if self.auth_string_param: self.auth_string_param.set_value("") self.max_questions.set_value("") self.max_updates.set_value("") self.max_connections.set_value("") self.max_uconnections.set_value("") for i in range(self.role_list.count()): role_list_node = self.role_list.node_at_row(i) if role_list_node: self.role_list.node_at_row(i).set_bool(0, False) self.schema_privs.show_user(None) self.password_label.set_text(self.password_advice) self.setup_bottom_message_box(user) def _find_user_position(self, user, host): users_count = len(self.owner.secman.account_names) for row in range(users_count): node = self.user_list.node_at_row(row) if (user, host) == ( node.get_string(0), node.get_string(1) ): return row return users_count - 1 # Control shouldn't get here def add_account(self): self.current_action = "add account" account = self.owner.secman.create_account() self.refresh() self.inner_tabview.set_active_tab(0) pos = self._find_user_position(account.username, account.host) if pos is not None and pos >= 0: self.user_list.select_node(self.user_list.node_at_row(pos)) self.user_selected() self.set_dirty() self.add_button.set_enabled(False) self.del_button.set_enabled(False) self.refresh_button.set_enabled(False) self.current_action = "" def upgrade_account(self): def generate_password(length = 8): chars = string.ascii_letters + string.digits + '!@#$%^&*()' random.seed = (os.urandom(1024)) return ''.join(random.choice(chars) for i in range(length)) def change_password(password, confirm_password): self._selected_user.password = password self._selected_user.confirm_password = confirm_password try: self._selected_user.upgrade_password_format() except WBSecurityValidationError as exc: Utilities.show_error("Upgrade", str(exc), "OK", "", "") return except Exception as exc: import traceback log_error("Exception while upgrading account auth type: %s\n" % traceback.format_exc()) Utilities.show_error("Error Upgrading Authentication Method", str(exc), "OK", "", "") return if self.password.get_string_value() == self._selected_user._orig_password: upgrade_account_msg = ["A password must be provided for the account (either the original or a new one) before it can be upgraded."] reset_to_expire_caption = "" if self.owner.ctrl_be.target_version and self.owner.ctrl_be.target_version.is_supported_mysql_version_at_least(5, 6, 0): upgrade_account_msg.append("Click [Reset to Expired] to set a random password and expire it, so the user will have to reset it next time they login.") reset_to_expire_caption = "Reset to Expired" result = Utilities.show_warning("Upgrade Authentication Method", "\n".join(upgrade_account_msg), "OK", "", reset_to_expire_caption) if result == mforms.ResultOther: gen_password = generate_password() change_password(gen_password, gen_password) Utilities.show_message("Random Generated Password", 'Password changed to: %s' % gen_password, "OK", "", "") self.expire() else: return else: change_password(self.password.get_string_value(), self.confirm.get_string_value()) self.reload_user(False) def dup_account(self): if self._selected_user: account = self.owner.secman.copy_account(self._selected_user) self.refresh() pos = self._find_user_position(account.username, account.host) if pos and pos >= 0: self.user_list.select_node(self.user_list.node_at_row(pos)) self.user_selected() def del_account(self): self.current_action = "delete account" if self._selected_user: if not self._selected_user.is_commited or Utilities.show_message("Delete Account", "The account '%s' will be permanently removed. Please confirm."%(self._selected_user.formatted_name()), "Delete", "Cancel", "") == mforms.ResultOk: the_name = self._selected_user.formatted_name() try: self.owner.secman.delete_account(self._selected_user) except Exception as e: log_error("Exception while removing account: %s\n" % str(e)) title, message = e.args[:2] if len(e.args) > 1 else ('Error:', str(e)) Utilities.show_error("Delete account", '%s\n%s' % (title, message), 'OK', '', '') self.current_action = "" return self._selected_user = None self._selected_user_original = None self.user_list.clear_selection() self.owner.do_refresh() self.user_selected() mforms.App.get().set_status_text("Account '%s' was deleted" % the_name) else: user = self.user_list.get_selected_node() if user: username, host = eval(user.get_tag()) try: class tmpuser(object): pass t = tmpuser() t.username = username t.host = host t.is_commited = True user.remove_from_parent() self.owner.secman.delete_account(t) except Exception as e: log_error("Exception while removing zombi account: %s\n" % str(e)) title, message = e.args[:2] if len(e.args) > 1 else ('Error:', str(e)) Utilities.show_error("Delete account", '%s\n%s' % (title, message), 'OK', '', '') self.current_action = "" return self.add_button.set_enabled(True) self.del_button.set_enabled(True) self.refresh_button.set_enabled(True) self.current_action = "" def refresh(self): selected_row = None su = self._selected_user suo = self._selected_user_original self.user_list.freeze_refresh() self.user_list.clear() self.firewall_rules.refresh_users(self.owner.secman.account_names) for user, host, loaded in self.owner.secman.account_names: user = to_unicode(user) row = self.user_list.add_node() if self.owner.secman.is_zombie(user, host): row.set_string(0, "(!) "+(user or "")) else: row.set_string(0, user or "") row.set_string(1, host) row.set_tag(repr((user, host))) # Change row with firewall values self.firewall_rules.refresh_row(row, user, host) if su and (user == su.username and host == su.host): selected_row = row self.user_list.thaw_refresh() self._selected_user = su self._selected_user_original = suo if selected_row is not None: self.user_list.select_node(selected_row) self.user_selected() self.password_label.set_text(self.password_advice) self.setup_bottom_message_box(su) def refresh_priv_list(self): self.role_priv_list.clear() if self._selected_user: all_supported_privs = sorted( [val[0] for key, val in PrivilegeInfo.items() if key in self.owner.secman.global_privilege_names] ) privs = self._selected_user.raw_privilege_names for priv in all_supported_privs: row = self.role_priv_list.add_node() row.set_bool(0, priv in privs) row.set_string(1, priv) def validate_hostlimithost(self): host = self.hostlimithost.get_string_value() self.valid_name = True self.valid_ipv6 = False if len(host) > 255: self.valid_name = False subnet_mask = '' if '/' in host: host, _, subnet_mask = host.rpartition('/') if not subnet_mask: self.valid_name = False if self.valid_name: allowed = re.compile(r"([a-z]|([%_]\.))(([\-_\.]?[a-z0-9]+)*)[a-z0-9]$", re.IGNORECASE) allowed_ipv4 = re.compile(r"^(((%|_)?|25[0-5%_]|(%|_)?|2[0-4%_][0-9%_]|[01%_]?[0-9%_][0-9%_]?)\.){0,3}((%|_)?|25[0-5%_]|2[0-4%_][0-9%_]|[01%_]?[0-9%_][0-9%_]?)$") allowed_ipv6 = re.compile(r"^\s*(?!.*::.*::)(?:(?!:)|:(?=:))(?:[0-9a-f%_]{0,4}(?:(?<=::)|(? 1 else ('Error:', str(e)) Utilities.show_error(title, message, 'OK', '', '') self.refresh() def revoke_all(self): if self._selected_user: if Utilities.show_message("Revoke All Privileges", "Please confirm revocation of all privileges for the account '%s'@'%s'.\nNote: the account itself will be maintained.\n\nAdd new privileges afterwards or the user will not be able to access any schema object."%(self._selected_user.username, self._selected_user.host), "Revoke", "Cancel", "") == mforms.ResultOk: try: self._selected_user.revoke_all() self._selected_user.load(self._selected_user.username, self._selected_user.host) self.show_user(self._selected_user) except Exception as e: title, message = e.args[:2] if len(e.args) > 1 else ('Error:', str(e)) Utilities.show_error(title, message, 'OK', '', '') def revert(self): if self._selected_user_original: self.show_user(self.owner.secman.revert_account(self._selected_user, self._selected_user_original)) if not self._selected_user.is_commited: self.owner.secman.delete_account(self._selected_user) self.refresh() if not self.user_list.get_selected_node(): self.user_selected() self.add_button.set_enabled(True) self.del_button.set_enabled(True) self.refresh_button.set_enabled(True) self.owner.do_refresh() def commit(self): self.current_action = "commit account" #if self._selected_user: username = to_unicode(self.username.get_string_value()) host = to_unicode(self.hostlimithost.get_string_value()) if not self.valid_name: Utilities.show_error('Invalid host specification', 'The host specification "%s" is not valid. Please correct it and try again.' % host, 'OK', '', '') self.current_action = "" return is_new_user = not self._selected_user.is_commited password_unneeded = False self.password_label.set_text("Password has expired. User must change password to use the account." if self._selected_user.password_expired else self.password_advice) plugin_info = AUTHENTICATION_PLUGIN_TYPES.get(self.selected_plugin_type(), {}) if self.has_extra_plugins and not plugin_info.get("enable_password", True): password_unneeded = True if is_new_user and not self.password.get_string_value() and not password_unneeded: if Utilities.show_warning("Save Account Changes", "It is a security hazard to create an account with no password.\nPlease confirm creation of '%s'@'%s' with no password."%(username, host), "Create", "Cancel", "") != mforms.ResultOk: self.current_action = "" return lcase_host = host.lower() if lcase_host != host: if Utilities.show_message_and_remember("Save Account Changes", "MySQL only allows lowercase characters for hostnames, the account host will be updated accordingly.", "Ok", "", "","wb.admin.warn_ucase_hostnames", "Don't show this message again"): self.hostlimithost.set_value(lcase_host) username_to_rename = self._selected_user.username host_to_rename = self._selected_user.host self._selected_user.username = to_unicode(self.username.get_string_value()) self._selected_user.password = to_unicode(self.password.get_string_value()) self._selected_user.confirm_password = to_unicode(self.confirm.get_string_value()) #if self.hostlimit.get_active(): self._selected_user.host = to_unicode(self.hostlimithost.get_string_value()) # else: # self._selected_user.host = "%" try: self._selected_user.max_questions = int(self.max_questions.get_string_value()) if self._selected_user.max_questions < 0: raise ValueError except ValueError: Utilities.show_error('Wrong Value for Max. Queries', 'Cannot convert "%s" to a valid non-negative integer.\nPlease correct this value and try again.' % self.max_questions.get_string_value(), 'OK', '', '') self.current_action = "" return try: self._selected_user.max_updates = int(self.max_updates.get_string_value()) if self._selected_user.max_updates < 0: raise ValueError except ValueError: Utilities.show_error('Wrong Value for Max. Updates', 'Cannot convert "%s" to a valid non-negative integer.\nPlease correct this value and try again.' % self.max_updates.get_string_value(), 'OK', '', '') self.current_action = "" return try: self._selected_user.max_connections = int(self.max_connections.get_string_value()) if self._selected_user.max_connections < 0: raise ValueError except ValueError: Utilities.show_error('Wrong Value for Max. Connections', 'Cannot convert "%s" to a valid non-negative integer.\nPlease correct this value and try again.' % self.max_connections.get_string_value(), 'OK', '', '') self.current_action = "" return try: self._selected_user.max_user_connections = int(self.max_uconnections.get_string_value()) if self._selected_user.max_user_connections < 0: raise ValueError except ValueError: Utilities.show_error('Wrong Value for Concurrent Connections', 'Cannot convert "%s" to a valid non-negative integer.\nPlease correct this value and try again.' % self.max_uconnections.get_string_value(), 'OK', '', '') self.current_action = "" return if is_new_user and self.has_extra_plugins: self._selected_user.auth_plugin = self.selected_plugin_type() self._selected_user.auth_string = None if self._selected_user.auth_plugin and ((self._selected_user.auth_plugin in AUTHENTICATION_PLUGIN_TYPES) or (self._selected_user.auth_plugin in self.active_plugins)): self._selected_user.auth_string = self.auth_string_param.get_string_value() try: self._selected_user.save() except WBSecurityValidationError as exc: Utilities.show_error("Save Account Changes", str(exc), "OK", "", "") self.current_action = "" return except PermissionDeniedError as exc: Utilities.show_error("Permission Errors", str(exc), "OK", "", "") self.current_action = "" return except Exception as exc: import traceback log_error("Exception while saving account: %s\n" % traceback.format_exc()) Utilities.show_error("Error Saving Account", str(exc), "OK", "", "") self.current_action = "" return try: self.firewall_rules.save() except Exception as exc: import traceback log_error("Exception while saving account: %s\n" % traceback.format_exc()) Utilities.show_error("Error Saving Account", str(exc), "OK", "", "") self.current_action = "" return self.reload_user(is_new_user) self.owner.secman.async_refresh(self.refresh) self.add_button.set_enabled(True) self.del_button.set_enabled(True) self.refresh_button.set_enabled(True) self.current_action = "" def update(self): self.schema_privs.update() def setup_bottom_message_box(self, user): if not user: return caption = '' if user.old_authentication: caption = '''This account is using the pre-mysql-4.1.1 password hashing type. The user will not be able to login if the secure_auth option is enabled. Please click [Upgrade Account] to fix that. Either the account password must be provided to reset it or a new password must be supplied.''' elif user.password_expired: caption = 'Password has expired. User must change password to use the account.' elif not user.username: caption = 'This is an anonymous account. It is usually advisable to delete this account.' elif user.blank_password : caption = 'No password is set for this account.' self.bottom_message_caption.set_text(caption) self.bottom_message_hbox.show(bool(user.old_authentication or user.password_expired or user.blank_password or not user.username)) self.upgrade_account_button.show(bool(user.old_authentication)) def reload_user(self, is_new_user): the_name = self._selected_user.formatted_name() self._selected_user.load(self._selected_user.username, self._selected_user.host) self.unset_dirty() self.owner.do_refresh() if is_new_user: mforms.App.get().set_status_text("Created account '%s'" % the_name) else: mforms.App.get().set_status_text("Updated account '%s'" % the_name) ############################# class WbAdminValidationPermissions(WbAdminValidationBase): def __init__(self, owner): super().__init__("The account you are currently using does not have sufficient privileges to view the client connections.") self.owner = owner def validate(self): try: self.owner.secman.async_refresh(self.owner.async_callback) except PermissionDeniedError: return False return True class WbAdminSecurity(WbAdminTabBase): _schema_priv_entries = [] #ui_created = False @classmethod def wba_register(cls, admin_context): admin_context.register_page(cls, "Management", "Users and Privileges", "Users and Privileges") @classmethod def identifier(cls): return "admin_manage_privs" def shutdown(self): self.account_tab.shutdown() def __init__(self, ctrl_be, instance_info, main_view): WbAdminTabBase.__init__(self, ctrl_be, instance_info, main_view) self.account_tab = SecurityAccount(self) self.secman = AdminSecurity(self.ctrl_be) self.add_validation(WbAdminValidationConnection(ctrl_be)) self.add_validation(WbAdminValidationPermissions(self)) self.set_standard_header("title_users.png", self.instance_info.name, "Users and Privileges") self.set_managed() self.set_release_on_add() self.firewall_enabled = False def firewall_status_changed(self): current_status = self.ctrl_be.server_variables.get('mysql_firewall_mode') changed = current_status != self.firewall_enabled self.firewall_enabled = current_status return changed def create_ui(self): self.secman.async_refresh(self.do_refresh) try: anon_accounts = [ (user, host, loaded) for user, host, loaded in self.secman.account_names if user==''] if anon_accounts and not self.__dict__.get('already_asked_for_anon_accounts', False): logged_username = self.instance_info.db_connection_params.parameterValues['userName'] logged_servername = self.instance_info.db_connection_params.hostIdentifier privs = self.secman.get_valid_privileges() except: privs = [] if 'DELETE' in privs or 'CREATE USER' in privs: if Utilities.show_message_and_remember('Anonymous accounts detected', 'Anonymous accounts were detected in the server %s.\nAnonymous accounts can cause great confusion and are also a potential security issue and are advised to be removed. Would you like Workbench to delete them now?.' % logged_servername, 'Delete', 'Leave Accounts', '', 'wb.admin.delete_anonymous_accounts:' + logged_username + '@' + logged_servername, "Don't show this message again") == mforms.ResultOk: for name, host in anon_accounts: self.secman.async_get_account(self.secman.delete_account, name, host) else: Utilities.show_message_and_remember('Anonymous accounts detected', 'Anonymous accounts were detected in the server %s.\nAnonymous accounts can cause great confusion and are also a potential security issue and are advised to be removed. Please ask a DBA to delete them.' % logged_servername, 'OK', '', '', 'wb.admin.delete_anonymous_accounts:' + logged_username + '|no_privileges|' + '@' + logged_servername, "Don't show this message again") self.already_asked_for_anon_accounts = True return self.account_tab def update_ui(self): pass def async_callback(self): pass def do_refresh(self): if not self.account_tab.dirty: self.account_tab.refresh() self.account_tab.update()