博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark ui acl 不生效的问题分析
阅读量:6998 次
发布时间:2019-06-27

本文共 14675 字,大约阅读时间需要 48 分钟。

spark ui acl 不生效的问题分析

 

按照spark 文档配置了spark.acls.enable, spark.ui.view.acls 等参数,再去访问spark web ui后台,还是可以访问,说明acl没有生效。为什么么有生效呢?本人查看了spark源码后,发现要让acl 生效,还要配置filter。源码如下:

private[spark] class SecurityManager(    sparkConf: SparkConf,    val ioEncryptionKey: Option[Array[Byte]] = None)  extends Logging with SecretKeyHolder {  import SecurityManager._  // allow all users/groups to have view/modify permissions  private val WILDCARD_ACL = "*"  private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)  // keep spark.ui.acls.enable for backwards compatibility with 1.0  private var aclsOn =    sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))  // admin acls should be set before view or modify acls  private var adminAcls: Set[String] =    stringToSet(sparkConf.get("spark.admin.acls", ""))  // admin group acls should be set before view or modify group acls  private var adminAclsGroups : Set[String] =    stringToSet(sparkConf.get("spark.admin.acls.groups", ""))  private var viewAcls: Set[String] = _  private var viewAclsGroups: Set[String] = _  // list of users who have permission to modify the application. This should  // apply to both UI and CLI for things like killing the application.  private var modifyAcls: Set[String] = _  private var modifyAclsGroups: Set[String] = _  // always add the current user and SPARK_USER to the viewAcls  private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),    Utils.getCurrentUserName())  setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))  setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))  setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));  setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));  private val secretKey = generateSecretKey()  logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +    "; ui acls " + (if (aclsOn) "enabled" else "disabled") +    "; users  with view permissions: " + viewAcls.toString() +    "; groups with view permissions: " + viewAclsGroups.toString() +    "; users  with modify permissions: " + modifyAcls.toString() +    "; groups with modify permissions: " + modifyAclsGroups.toString())  // Set our own authenticator to properly negotiate user/password for HTTP connections.  // This is needed by the HTTP client fetching from the HttpServer. Put here so its  // only set once.  if (authOn) {    Authenticator.setDefault(      new Authenticator() {        override def getPasswordAuthentication(): PasswordAuthentication = {          var passAuth: PasswordAuthentication = null          val userInfo = getRequestingURL().getUserInfo()          if (userInfo != null) {            val  parts = userInfo.split(":", 2)            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())          }          return passAuth        }      }    )  }  // the default SSL configuration - it will be used by all communication layers unless overwritten  private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)  // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().  val fileServerSSLOptions = getSSLOptions("fs")  val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {    val trustStoreManagers =      for (trustStore <- fileServerSSLOptions.trustStore) yield {        val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()        try {          val ks = KeyStore.getInstance(KeyStore.getDefaultType)          ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)          val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)          tmf.init(ks)          tmf.getTrustManagers        } finally {          input.close()        }      }    lazy val credulousTrustStoreManagers = Array({      logWarning("Using 'accept-all' trust manager for SSL connections.")      new X509TrustManager {        override def getAcceptedIssuers: Array[X509Certificate] = null        override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}        override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}      }: TrustManager    })    require(fileServerSSLOptions.protocol.isDefined,      "spark.ssl.protocol is required when enabling SSL connections.")    val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.get)    sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)    val hostVerifier = new HostnameVerifier {      override def verify(s: String, sslSession: SSLSession): Boolean = true    }    (Some(sslContext.getSocketFactory), Some(hostVerifier))  } else {    (None, None)  }  def getSSLOptions(module: String): SSLOptions = {    val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))    logDebug(s"Created SSL options for $module: $opts")    opts  }  /**   * Split a comma separated String, filter out any empty items, and return a Set of strings   */  private def stringToSet(list: String): Set[String] = {    list.split(',').map(_.trim).filter(!_.isEmpty).toSet  }  /**   * Admin acls should be set before the view or modify acls.  If you modify the admin   * acls you should also set the view and modify acls again to pick up the changes.   */  def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {    viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))    logInfo("Changing view acls to: " + viewAcls.mkString(","))  }  def setViewAcls(defaultUser: String, allowedUsers: String) {    setViewAcls(Set[String](defaultUser), allowedUsers)  }  /**   * Admin acls groups should be set before the view or modify acls groups. If you modify the admin   * acls groups you should also set the view and modify acls groups again to pick up the changes.   */  def setViewAclsGroups(allowedUserGroups: String) {    viewAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));    logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))  }  /**   * Checking the existence of "*" is necessary as YARN can't recognize the "*" in "defaultuser,*"   */  def getViewAcls: String = {    if (viewAcls.contains(WILDCARD_ACL)) {      WILDCARD_ACL    } else {      viewAcls.mkString(",")    }  }  def getViewAclsGroups: String = {    if (viewAclsGroups.contains(WILDCARD_ACL)) {      WILDCARD_ACL    } else {      viewAclsGroups.mkString(",")    }  }  /**   * Admin acls should be set before the view or modify acls.  If you modify the admin   * acls you should also set the view and modify acls again to pick up the changes.   */  def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {    modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))    logInfo("Changing modify acls to: " + modifyAcls.mkString(","))  }  /**   * Admin acls groups should be set before the view or modify acls groups. If you modify the admin   * acls groups you should also set the view and modify acls groups again to pick up the changes.   */  def setModifyAclsGroups(allowedUserGroups: String) {    modifyAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));    logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))  }  /**   * Checking the existence of "*" is necessary as YARN can't recognize the "*" in "defaultuser,*"   */  def getModifyAcls: String = {    if (modifyAcls.contains(WILDCARD_ACL)) {      WILDCARD_ACL    } else {      modifyAcls.mkString(",")    }  }  def getModifyAclsGroups: String = {    if (modifyAclsGroups.contains(WILDCARD_ACL)) {      WILDCARD_ACL    } else {      modifyAclsGroups.mkString(",")    }  }  /**   * Admin acls should be set before the view or modify acls.  If you modify the admin   * acls you should also set the view and modify acls again to pick up the changes.   */  def setAdminAcls(adminUsers: String) {    adminAcls = stringToSet(adminUsers)    logInfo("Changing admin acls to: " + adminAcls.mkString(","))  }  /**   * Admin acls groups should be set before the view or modify acls groups. If you modify the admin   * acls groups you should also set the view and modify acls groups again to pick up the changes.   */  def setAdminAclsGroups(adminUserGroups: String) {    adminAclsGroups = stringToSet(adminUserGroups)    logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))  }  def setAcls(aclSetting: Boolean) {    aclsOn = aclSetting    logInfo("Changing acls enabled to: " + aclsOn)  }  def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey  /**   * Generates or looks up the secret key.   *   * The way the key is stored depends on the Spark deployment mode. Yarn   * uses the Hadoop UGI.   *   * For non-Yarn deployments, If the config variable is not set   * we throw an exception.   */  private def generateSecretKey(): String = {    if (!isAuthenticationEnabled) {      null    } else if (SparkHadoopUtil.get.isYarnMode) {      // In YARN mode, the secure cookie will be created by the driver and stashed in the      // user's credentials, where executors can get it. The check for an array of size 0      // is because of the test code in YarnSparkHadoopUtilSuite.      val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)      if (secretKey == null || secretKey.length == 0) {        logDebug("generateSecretKey: yarn mode, secret key from credentials is null")        val rnd = new SecureRandom()        val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE        val secret = new Array[Byte](length)        rnd.nextBytes(secret)        val cookie = HashCodes.fromBytes(secret).toString()        SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)        cookie      } else {        new Text(secretKey).toString      }    } else {      // user must have set spark.authenticate.secret config      // For Master/Worker, auth secret is in conf; for Executors, it is in env variable      Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))        .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {        case Some(value) => value        case None =>          throw new IllegalArgumentException(            "Error: a secret key must be specified via the " +              SecurityManager.SPARK_AUTH_SECRET_CONF + " config")      }    }  }  /**   * Check to see if Acls for the UI are enabled   * @return true if UI authentication is enabled, otherwise false   */  def aclsEnabled(): Boolean = aclsOn  /**   * Checks the given user against the view acl and groups list to see if they have   * authorization to view the UI. If the UI acls are disabled   * via spark.acls.enable, all users have view access. If the user is null   * it is assumed authentication is off and all users have access. Also if any one of the   * UI acls or groups specify the WILDCARD(*) then all users have view access.   *   * @param user to see if is authorized   * @return true is the user has permission, otherwise false   */  def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" + viewAcls.mkString(",") + " viewAclsGroups=" + viewAclsGroups.mkString(",")) if (!aclsEnabled || user == null || viewAcls.contains(user) || viewAcls.contains(WILDCARD_ACL) || viewAclsGroups.contains(WILDCARD_ACL)) { return true } val currentUserGroups = Utils.getCurrentUserGroups(sparkConf, user) logDebug("userGroups=" + currentUserGroups.mkString(",")) viewAclsGroups.exists(currentUserGroups.contains(_)) } /** * Checks the given user against the modify acl and groups list to see if they have * authorization to modify the application. If the modify acls are disabled * via spark.acls.enable, all users have modify access. If the user is null * it is assumed authentication isn't turned on and all users have access. Also if any one * of the modify acls or groups specify the WILDCARD(*) then all users have modify access. * * @param user to see if is authorized * @return true is the user has permission, otherwise false */ def checkModifyPermissions(user: String): Boolean = { logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" + modifyAcls.mkString(",") + " modifyAclsGroups=" + modifyAclsGroups.mkString(",")) if (!aclsEnabled || user == null || modifyAcls.contains(user) || modifyAcls.contains(WILDCARD_ACL) || modifyAclsGroups.contains(WILDCARD_ACL)) { return true } val currentUserGroups = Utils.getCurrentUserGroups(sparkConf, user) logDebug("userGroups=" + currentUserGroups) modifyAclsGroups.exists(currentUserGroups.contains(_)) } /** * Check to see if authentication for the Spark communication protocols is enabled * @return true if authentication is enabled, otherwise false */ def isAuthenticationEnabled(): Boolean = authOn /** * Checks whether network encryption should be enabled. * @return Whether to enable encryption when connecting to services that support it. */ def isEncryptionEnabled(): Boolean = { sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) } /** * Gets the user used for authenticating HTTP connections. * For now use a single hardcoded user. * @return the HTTP user as a String */ def getHttpUser(): String = "sparkHttpUser" /** * Gets the user used for authenticating SASL connections. * For now use a single hardcoded user. * @return the SASL user as a String */ def getSaslUser(): String = "sparkSaslUser" /** * Gets the secret key. * @return the secret key as a String if authentication is enabled, otherwise returns null */ def getSecretKey(): String = secretKey // Default SecurityManager only has a single secret key, so ignore appId. override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey()}

 

private[v1] class SecurityFilter extends ContainerRequestFilter with ApiRequestContext {  override def filter(req: ContainerRequestContext): Unit = {    val user = httpRequest.getRemoteUser()    if (!uiRoot.securityManager.checkUIViewPermissions(user)) {
req.abortWith( Response .status(Response.Status.FORBIDDEN) .entity(raw"""user "$user" is not authorized""") .build() ) } }}

只有配置了filter, 登录web ui后,spark 才能获取到用户信息,进行acl判断。如果没有配置filter,那就获取不到访问web ui的用户信息,导致acl不生效。

转载于:https://www.cnblogs.com/chengjunhao/p/8806539.html

你可能感兴趣的文章