本文主要介绍 Ambari 集成自定义服务的详细过程,包括制作 RPM 包、metainfo 文件、配置文件、安装、启动、停止等脚本,以及快速链接。
概述
本文主要介绍 Ambari 集成自定义服务的方法,这里以自研组件 Ftp-Transport 为例做介绍。
实验环境
- 操作系统:Centos 7.6
 
- Ambari版本:Ambari 2.7.1
 
- HDP版本:HDP 3.0.1
 
- 已安装 Ambari 环境,具体可参考:Ambari 安装详细步骤
 
制作 RPM 包
因为 Ambari 安装 包的时候都是以 RPM 包的形式安装,所以尽量将自研服务制作成 RPM 包,具体制作方法可以参考ZooKeeper从源码到RPM包制作过程详解, 里面有详细的打包步骤,这里不再赘述,直接看最终 rpm 包。

制作本地源
在原有源的基础上增加 ftp-transport 源
1 2
   | cd /var/www/html/ambari/HDP/centos7/3.0.1.0 mkdir ftp-transport
   | 
 
复制rpm包到ftp-transport文件夹

在原有 repodata 文件夹的同级目录中执行如下命令
1 2 3
   | cd /var/www/html/ambari/HDP/centos7/3.0.1.0 rm -rf repodata/ createrepo ./
   | 
 
在所有节点执行如下命令
1 2
   | yum clean all yum makecache
   | 
 
在任意节点通过执行 yum search service_name 来查看软件源是否制作好,如
1
   | yum search ftp-transport
   | 
 

出现类似这样的信息即为制作本地源完成。
创建 stacks 文件
目录结构
这里只介绍 Ambari 集成自定义服务所需的最少文件为例,最终的文件目录结构如下:

这些文件在源码中的位置为:ambari-server\src\main\resources\stacks\HDP\3.0\services\FTP_TRANSPORT,现在介绍主要的文件内容。
metainfo.xm 文件是非常重要的一个文件,具体介绍可以参考:Ambari 中 metainfo.xml 文件解析
这里的 metainfo.xm 文件全部内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
   | <?xml version="1.0"?>
  <metainfo>   <schemaVersion>2.0</schemaVersion>   <services>     <service>       <name>FTP_TRANSPORT</name>       <displayName>Ftp-Transport</displayName>       <comment>Ftp-Transport是一种基于FTP协议的文件数据传输增强工具。</comment>       <version>1.0.0</version>       <components>         <component>           <name>FTP_TRANSPORT</name>           <displayName>Ftp-Transport</displayName>           <category>MASTER</category>           <cardinality>1</cardinality>           <versionAdvertised>true</versionAdvertised>           <commandScript>             <script>scripts/ftp_transport.py</script>             <scriptType>PYTHON</scriptType>             <timeout>1200</timeout>           </commandScript>           <logs>             <log>               <logId>ftp_transport</logId>               <primary>true</primary>             </log>           </logs>         </component>       </components>
        <osSpecifics>         <osSpecific>           <osFamily>any</osFamily>           <packages>             <package>               <name>ftp-transport</name>             </package>           </packages>         </osSpecific>       </osSpecifics>
        <requiredServices>         <service>REDIS</service>       </requiredServices>
        <configuration-dependencies>         <config-type>ftp-transport-env</config-type>       </configuration-dependencies>
        <quickLinksConfigurations>         <quickLinksConfiguration>           <fileName>quicklinks.json</fileName>           <default>true</default>         </quickLinksConfiguration>       </quickLinksConfigurations>
      </service>   </services> </metainfo>
   | 
 
因为 Ftp-Transport 依赖 Redis,所以这里增加了
1 2 3
   | <requiredServices>   <service>REDIS</service> </requiredServices>
   | 
 
因为有配置文件 ftp-transport-env,所以
1 2 3
   | <configuration-dependencies>   <config-type>ftp-transport-env</config-type> </configuration-dependencies>
   | 
 
因为有快速链接,所以
1 2 3 4 5 6
   | <quickLinksConfigurations>   <quickLinksConfiguration>     <fileName>quicklinks.json</fileName>     <default>true</default>   </quickLinksConfiguration> </quickLinksConfigurations>
   | 
 
配置文件
配置文件都在 configuration 文件夹内,在页面的配置参数中展示,值的属性可以作一些简单的约束校验,详情可以参考:Ambari 服务配置校验和配置推荐介绍 文章的 “value-attributes” 部分介绍。
脚本
脚本都在 scripts 文件夹内,这里有 params.py 和 ftp_transport.py 两个 Python 脚本,params.py 主要是获取参数配置, ftp_transport.py 主要执行服务的安装、启停、状态监控等。
params.py
params.py 里只列举部分内容,如:
1 2 3
   | config = Script.get_config() # Logger.info("config:{0}".format(config)) Logger.info('Starting config params ...')
   | 
 
Confing 中包含几乎所有的配置参数信息,如果想获取哪个参数,可以打印日志看下,比如获取 ftp-transport 的 master 安装节点 hostname,
1 2
   | # master节点hostname或IP ftp_transport_master_hostname = config['clusterHostInfo']['ftp_transport_hosts'][0]
   | 
 
日志打印方法为
1 2 3 4 5
   | from resource_management.core.logger import Logger # 初始化 Logger.initialize_logger() ftp_transport_env = config['configurations']['ftp-transport-env'] Logger.info("ftp_transport_env:{0}".format(ftp_transport_env))
   | 
 
ftp_transport.py
这里主要介绍加载配置、安装、启动、停止、状态监控等功能。
这一步是必须的,否则无法获取当前的配置信息,之后操作会报错。
1 2 3 4 5 6
   | class Ftp_transport_master(Script):     def configure(self, env):         import params         Logger.info('Configure Starting Config Params ...')         env.set_params(params)         Logger.info('Configure Config Params.')
   | 
 
如无特殊需求,可以直接调用 Ambari 分装好的安装组件方法 self.install_packages(env),如:
1 2 3 4 5 6
   | def install(self, env):     import params     env.set_params(params)     Logger.info('Starting Install Ftp-Transport ...')     self.install_packages(env)     Logger.info('Installed Ftp-Transport!')
   | 
 
安装时会自动执行如下命令
1
   | 2019-08-09 16:42:51,817 - Installing package ftp-transport ('/usr/bin/yum -y install ftp-transport')
  | 
 
这个 ftp-transport 就是在 metainfo.xml中设置的 packages
1 2 3 4 5 6 7 8 9 10
   | <osSpecifics>   <osSpecific>     <osFamily>any</osFamily>     <packages>       <package>         <name>ftp-transport</name>       </package>     </packages>   </osSpecific> </osSpecifics>
   | 
 
启动服务命令,这里的启动有点复杂,主要是需要获取各种参数,实际是使用 java -jar 命令启动的,启动后需要写入 pid 文件,否则无法监控到当前服务状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
   | def start(self, env):         import params         # import redis_params         env.set_params(params)
          Logger.info('Starting Ftp-Transport ...')
          # 构建参数值         master_jar_path = os.path.join(params.ftp_transport_base_dir, params.ftp_transport_master_jar)         database_url = params.ftp_transport_database_url         database_user = params.ftp_transport_database_user         database_pwd = params.ftp_transport_database_password
          # 获取Ftp-Transport master节点的IP         master_ip = socket.gethostbyname(params.ftp_transport_master_hostname)
          worker_jar_name = params.ftp_transport_worker_jar_name         worker_start_shell_name = params.ftp_transport_worker_start_shell_name         worker_source_path = os.path.join(params.ftp_transport_base_dir, params.ftp_transport_worker_source_dir)         worker_goal_path = os.path.join(params.ftp_transport_base_dir, params.ftp_transport_worker_goal_dir)
          # 获取JAVA_HOME         java_home = params.java64_home         java_path = os.path.join(java_home, "bin/java")
          ftp_transport_log_path = os.path.join(params.ftp_transport_log_dir, params.ftp_transport_log_file)
          # 获取redis集群信息         print('zookeeper_host_port:'+params.zookeeper_host_port)         print('redis_instance_znode_dir:'+params.redis_instance_znode_dir)         redis_node = get_redis_instance_znode_list(params.zookeeper_host_port, params.redis_instance_znode_dir)         redis_instance_ip_port = ""         for i in range(len(redis_node)):             redis_instance_znode_path = os.path.join(params.redis_instance_znode_dir, redis_node[i])             redis_instance_dictObject = get_redis_znode_data(params.zookeeper_host_port, redis_instance_znode_path)             for j in range(len(redis_instance_dictObject)):                 redis_dir_name = redis_instance_dictObject.keys()[j]                 redis_ip = redis_instance_dictObject[redis_dir_name]['ip']                 redis_port = redis_instance_dictObject[redis_dir_name]['port']                 redis_instance_ip_port = redis_instance_ip_port + str(redis_ip) + ":" + str(redis_port) + " "
          print('redis_instance_ip_port:'+redis_instance_ip_port)
          # 构建启动master命令         start_master_cmd = "{0} -jar {1} --spring.datasource.url={2} --spring.datasource.username={3} " \                            "--spring.datasource.password={4} --worker.masterIP={5} --worker.jarGoalName={6} " \                            "--worker.startShellName={7} --worker.jarSourceFolder={8} --worker.jarGoalFolder={9} " \                            "--worker.java_home={10} --logging.file={11} --spring.redis.cluster.nodes={12}".format(             java_path, master_jar_path, database_url, database_user, database_pwd, master_ip, worker_jar_name,             worker_start_shell_name, worker_source_path, worker_goal_path, java_home, ftp_transport_log_path,             redis_instance_ip_port         )         Logger.info("start_master_cmd:{0}".format(start_master_cmd))
          # 判断文件夹是否存在         # log日志文件         if not os.path.exists(params.ftp_transport_log_dir):             os.makedirs(params.ftp_transport_log_dir)
          # pid文件         if not os.path.exists(params.ftp_transport_pid_dir):             os.makedirs(params.ftp_transport_pid_dir)
          ftp_transport_nohup_path = os.path.join(params.ftp_transport_log_dir, params.ftp_transport_nohup_file)         ftp_transport_pid_path = os.path.join(params.ftp_transport_pid_dir, params.ftp_transport_pid_file)         nohup_cmd = 'nohup {0} > {1} 2>&1 & echo $! > {2}'.format(start_master_cmd, ftp_transport_nohup_path, ftp_transport_pid_path)         # Execute(nohup_cmd, user=params.root_user)         try:             Execute(nohup_cmd, user=params.root_user)         except:             show_logs(ftp_transport_nohup_path, user=params.root_user)             raise
          Logger.info('Started Ftp-Transport !')
   | 
 
停止服务,主要使用 kill -9 pid 命令,然后删除 pid 文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | def stop(self, env):     import params     env.set_params(params)     Logger.info('Stopping Ftp-Transport')
      # 检测pid内容状态     ftp_transport_pid_path = os.path.join(params.ftp_transport_pid_dir, params.ftp_transport_pid_file)     # 当 pid 文件存在时,kill 进程     if os.path.exists(ftp_transport_pid_path):         kill_cmd = 'kill -9 `cat {0}`'.format(ftp_transport_pid_path)         Execute(kill_cmd, user=params.root_user)         File(ftp_transport_pid_path, action="delete")
      Logger.info('Stopped Ftp-Transport')
   | 
 
使用 Ambari 自带的检测程序状态方法 check_process_status(os.path.join(params.ftp_transport_pid_dir, params.ftp_transport_pid_file)) 即可,它会先检查是否有 pid 文件,如果没有,就判定为服务停止;如果有 pid 文件,它会去尝试 kill 掉,但不会真的去 kill 掉,以此检测这个 pid 是否真的存在。
1 2 3 4 5 6
   | def status(self, env):     import params     env.set_params(params)     Logger.info('Checking ftp-transport Status ...')     check_process_status(os.path.join(params.ftp_transport_pid_dir, params.ftp_transport_pid_file))     Logger.info('Checked ftp-transport Status')
   | 
 
快速链接
在 quicklinks 文件夹里存放快速链接文件 quicklinks.json,文件内容为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | {   "name": "default",   "description": "Ftp-Transport-webUI default quick links configuration",   "configuration": {     "protocol":     {       "type":"http"     },
      "links": [       {         "name": "Ftp-Transport-webUI",         "label": "Ftp-Transport webUI",         "requires_user_name": "false",         "component_name": "FTP_TRANSPORT",         "url":"%@://%@:%@/ftpTransport",         "port":{           "http_default_port": "9800",           "regex": "^(\\d+)$",           "site": "ftp-transport-env"         }       }     ]   } }
  | 
 
在 metainfo.xm 文件里标注快速链接信息,在metainfo.xm已介绍过,如下:
1 2 3 4 5 6
   | <quickLinksConfigurations>   <quickLinksConfiguration>     <fileName>quicklinks.json</fileName>     <default>true</default>   </quickLinksConfiguration> </quickLinksConfigurations>
   | 
 
重启 Ambari
编写好 stacks 文件后,可以直接放到 Ambari server 的文件夹 /var/lib/ambari-server/resources/stacks/HDP/3.0/services/ 中,放置好后需要重启 Ambari。
重启 Ambari server
Server 节点:
重启 Ambari agent
全部节点:
安装
接下来进行安装流程,




至此,Ambari 集成服务完成。