一、准备工作

2.1.1 准备Ansible环境

1、准备两台机器,机器配置无需太高,比如1c2g,这里建议大家使用阿里云按量付费机器。

image-20250927105933601

image-20250927105944437

image-20250927105958970

image-20250927110017516

image-20250927110033792

2、安装Ansible

两台机器都执行

说明:以下命令为阿里云alibaba cloud linux系统

dnf install python38 python38-pip  ##新版本ansible需要python3.8或更高版本
pip3.8 install pip --upgrade  ##升级pip版本
pip3.8 install ansible  ##用pip安装ansible

3、配置Ansible

两台机器,其中一台机器作为控制端,另外一台作为被控制端

设置主机名

hostnamectl set-hostname aming01   ##控制端
hostnamectl set-hostname aming02   ##被控制端

编写hosts,两台机器都执行

编辑/etc/hosts 文件,最后面增加

192.168.118.34 aming01
192.168.118.35 aming02

做免密登录,在控制端执行

## 生成密钥对
ssh-keygen -t rsa

## 将公钥拷贝到被控制机
ssh-copy-id root@aming01   ## 本机到本机的免密
ssh-copy-id root@aming02  ## 本机到被控制端

创建主机清单配置,控制端执行

mkdir -p /etc/ansible
vi /etc/ansible/hosts  ##最后面增加
[coze]
aming01 ansible_python_interpreter=/usr/bin/python3.8
aming02 ansible_python_interpreter=/usr/bin/python3.8

测试

ansible all --list-hosts   ##列出所有主机组合主机
ansible aming02 -m command -a 'hostname'  ##查看aming02主机名

2.1.2 编写ansible api服务脚本并开启API

1、在Ansible控制端编写api服务脚本

创建目录

mkdir /opt/ansible_api
cd /opt/ansible_api

编写脚本 main.py,内容如下:

from flask import Flask, request, jsonify
import ansible_runner
import os
import tempfile
import logging
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader

app = Flask(__name__)

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_TOKEN = "<your-api-token>"
DEFAULT_INVENTORY_PATH = "/etc/ansible/hosts"  # 系统默认inventory文件路径
PLAYBOOK_DIR = "/opt/ansible_api/playbook/"  # playbook文件所在目录

# 确保playbook目录存在
if not os.path.exists(PLAYBOOK_DIR):
    os.makedirs(PLAYBOOK_DIR)
    logger.info(f"Created playbook directory: {os.path.abspath(PLAYBOOK_DIR)}")

def read_file_content(file_obj):
    """安全读取文件对象内容"""
    if file_obj is None:
        return ""
    if hasattr(file_obj, 'read'):
        try:
            return file_obj.read()
        except Exception as e:
            logger.error(f"Error reading file: {e}")
            return str(file_obj)
    return str(file_obj)

def get_inventory_groups(inventory_path):
    """从inventory文件中获取所有组名"""
    try:
        loader = DataLoader()
        inventory_manager = InventoryManager(loader=loader, sources=inventory_path)
        groups = list(inventory_manager.groups.keys())
        logger.info(f"Found groups in inventory: {groups}")
        return groups
    except Exception as e:
        logger.error(f"Failed to get inventory groups: {e}")
        return []

def get_inventory_hosts(inventory_path, group_name=None):
    """从inventory文件中获取主机列表"""
    try:
        loader = DataLoader()
        inventory_manager = InventoryManager(loader=loader, sources=inventory_path)

        if group_name and group_name in inventory_manager.groups:
            group = inventory_manager.groups[group_name]
            hosts = [host.name for host in group.hosts]
            logger.info(f"Found hosts in group '{group_name}': {hosts}")
            return hosts
        else:
            # 获取所有主机
            hosts = [host.name for host in inventory_manager.get_hosts()]
            logger.info(f"Found all hosts in inventory: {hosts}")
            return hosts
    except Exception as e:
        logger.error(f"Failed to get inventory hosts: {e}")
        return []

@app.route('/run', methods=['POST'])
def run_playbook():
    # 验证Token
    if request.headers.get('X-API-Token') != API_TOKEN:
        return jsonify({"error": "Unauthorized"}), 401

    # 获取请求参数
    try:
        data = request.get_json()
        if not data:
            return jsonify({"error": "Invalid JSON"}), 400
    except Exception as e:
        logger.error(f"JSON parse error: {e}")
        return jsonify({"error": f"JSON parse error: {str(e)}"}), 400

    playbook = data.get('playbook')
    inventory = data.get('inventory', 'default')  # 默认使用系统inventory

    # 验证必填参数
    if not playbook:
        return jsonify({"error": "Missing playbook"}), 400
    if not inventory:
        return jsonify({"error": "Missing inventory"}), 400

    # 处理playbook参数 - 安全检查并构建完整路径
    # 移除路径中的任何目录部分,只保留文件名
    playbook_filename = os.path.basename(playbook)
    playbook_path = os.path.join(PLAYBOOK_DIR, playbook_filename)

    # 检查playbook文件是否存在
    if not os.path.isfile(playbook_path):
        return jsonify({
            "error": f"Playbook file '{playbook_filename}' not found in {os.path.abspath(PLAYBOOK_DIR)}",
            "available_playbooks": os.listdir(PLAYBOOK_DIR) if os.path.exists(PLAYBOOK_DIR) else []
        }), 400

    # 创建临时工作目录
    with tempfile.TemporaryDirectory() as tmp_dir:
        # 处理inventory参数
        limit = None  # 用于限制执行的主机或组

        if isinstance(inventory, str):
            # 检查系统inventory文件是否存在
            if os.path.exists(DEFAULT_INVENTORY_PATH):
                # 获取系统inventory中的所有组
                system_groups = get_inventory_groups(DEFAULT_INVENTORY_PATH)

                if inventory.lower() in ['all', 'default']:
                    # 使用系统默认inventory,不限制组
                    inventory_path = DEFAULT_INVENTORY_PATH
                    logger.info(f"Using system default inventory: {inventory_path}")
                elif inventory in system_groups:
                    # 使用系统inventory,并限制到指定组
                    inventory_path = DEFAULT_INVENTORY_PATH
                    limit = inventory
                    logger.info(f"Using system inventory with group limit: {limit}")
                elif os.path.isfile(inventory):
                    # 使用指定的inventory文件路径
                    inventory_path = inventory
                    logger.info(f"Using specified inventory file: {inventory_path}")
                else:
                    # 检查是否是主机列表(逗号分隔)
                    hosts = [host.strip() for host in inventory.split(',')]

                    # 检查是否有保留字"all"
                    if any(host.lower() == 'all' for host in hosts):
                        return jsonify({"error": "Cannot use 'all' as a hostname. It is a reserved word in Ansible."}), 400

                    # 检查是否有系统inventory中的组名
                    if any(host in system_groups for host in hosts):
                        return jsonify({
                            "error": f"Cannot use group names as hostnames. Found group names in: {hosts}. "
                                    f"Use the group name directly to target all hosts in that group."
                        }), 400

                    # 创建临时inventory文件
                    inv_content = f"[all]\n" + "\n".join(hosts)
                    inv_path = os.path.join(tmp_dir, "inventory")
                    with open(inv_path, 'w') as f:
                        f.write(inv_content)
                    inventory_path = inv_path
                    logger.info(f"Created temporary inventory with hosts: {hosts}")
            else:
                # 系统inventory文件不存在,只能使用主机列表或指定文件
                if os.path.isfile(inventory):
                    inventory_path = inventory
                    logger.info(f"Using specified inventory file: {inventory_path}")
                else:
                    # 检查是否是主机列表(逗号分隔)
                    hosts = [host.strip() for host in inventory.split(',')]

                    # 检查是否有保留字"all"
                    if any(host.lower() == 'all' for host in hosts):
                        return jsonify({"error": "Cannot use 'all' as a hostname. It is a reserved word in Ansible."}), 400

                    # 创建临时inventory文件
                    inv_content = f"[all]\n" + "\n".join(hosts)
                    inv_path = os.path.join(tmp_dir, "inventory")
                    with open(inv_path, 'w') as f:
                        f.write(inv_content)
                    inventory_path = inv_path
                    logger.info(f"Created temporary inventory with hosts: {hosts}")

        # 执行Ansible
        try:
            logger.info(f"Running playbook: {playbook_path}")
            logger.info(f"Inventory: {inventory_path}")
            if limit:
                logger.info(f"Limiting to group/host: {limit}")

            # 使用正确的参数调用ansible_runner(移除了extravars参数)
            result = ansible_runner.run(
                private_data_dir=tmp_dir,
                playbook=playbook_path,  # 使用完整路径
                inventory=inventory_path,
                limit=limit,  # 添加limit参数
                quiet=True  # 禁止输出到控制台
            )

            # 获取输出内容
            stdout_content = read_file_content(result.stdout)
            stderr_content = read_file_content(result.stderr)

            # 获取统计信息
            stats = {}
            if hasattr(result, 'stats'):
                stats = result.stats

            logger.info(f"Playbook execution completed with RC: {result.rc}")

            return jsonify({
                "status": "success",
                "rc": result.rc,
                "stdout": stdout_content,
                "stderr": stderr_content,
                "stats": stats
            })

        except Exception as e:
            logger.error(f"Ansible execution failed: {e}", exc_info=True)
            return jsonify({
                "error": f"Ansible execution failed: {str(e)}",
                "traceback": str(e.__traceback__) if hasattr(e, '__traceback__') else None
            }), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

2、安装依赖模块

python3.8 -m pip install  flask  ansible-runner

3、开启服务

nohup python3.8  main.py >> ansible_api.log 2>>ansible_api.log &

4、编写测试playbook

创建playbook目录,以后所有的playbook文件全部放到这里

mkdir /opt/ansible_api/playbook
cd /opt/ansible_api/playbook

vi test_playbook.yml #内容为

- name: Simple Example Playbook
  hosts: all
  become: yes  # 使用sudo权限执行任务

  tasks:
    - name: Create a directory
      ansible.builtin.file:
        path: /tmp/ansible_test
        state: directory
        mode: '0755' 

5、使用curl测试接口是否成功

curl -X POST  http://localhost:5000/run \
 -H "Content-Type: application/json" \
 -H "X-API-Token: <your-api-token>"  \
 -d '{
       "playbook": "test_playbook.yml", 
       "inventory": "aming01"
     }'

2.1.3 编写playbook

起初阶段,我们可以先自行手动编写playbook,后期可以借助LLM自动生成。

几个常见的场景:重启某服务、执行一个shell脚本、查看系统负载、查看磁盘使用情况等

1、重启服务

在/opt/ansible_api/playbook目录下创建 restart_nginx.yml

---
- name: Restart Nginx with systemd module
  hosts: all
  become: yes

  tasks:
    - name: Restart nginx service
      ansible.builtin.systemd:
        name: nginx
        state: restarted

2、执行shell脚本

在/opt/ansible_api/playbook目录下创建shell_script.yml

说明:shell脚本在ansible控制端,会自动传输到远程

---
- name: Execute shell script
  hosts: all
  tasks:
    - name: Run local script on remote hosts
      script: /path/to/local_script.sh  # 本地脚本路径
      args:
        chdir: /tmp  # 可选:指定远程执行目录
      register: result
      changed_when: false  # 避免误报 changed 状态

    - name: Print script output
      debug:
        var: result.stdout

3、查看系统负载

在/opt/ansible_api/playbook目录下创建get_systemload.yml

---
- name: Check System Load Average
  hosts: all
  gather_facts: no  # 禁用 facts 收集以提高效率
  tasks:
    - name: Get system load via uptime
      command: uptime
      register: uptime_result
      changed_when: false  # 此命令不会改变系统状态

    - name: Display load information
      debug:
        msg: |
          Host: {{ inventory_hostname }}
          System Info: {{ uptime_result.stdout }}
          Load Average (1m/5m/15m): {{ uptime_result.stdout | regex_replace('.*load average: (.*)', '\\1') }}

4、查看磁盘

在/opt/ansible_api/playbook目录下创建get_diskinfo.yml

---
- name: Check Disk Usage
  hosts: all
  vars:
    warn_threshold: 80    # 警告阈值(百分比)
    crit_threshold: 90    # 严重阈值(百分比)
    exclude_fs:           # 排除的文件系统类型
      - tmpfs
      - devtmpfs
      - squashfs
      - overlay
      - nfs
      - cifs
  tasks:
    - name: Get disk usage information
      shell: |
        df -h | awk 'NR>1 {
          gsub(/%/, "", $5);
          if ($5 >= {{ crit_threshold }}) {
            printf "\033[1;31mCRITICAL\033[0m";
          } else if ($5 >= {{ warn_threshold }}) {
            printf "\033[1;33mWARNING\033[0m";
          } else {
            printf "\033[1;32mOK\033[0m";
          }
          printf " %s %s %s %s%% %s\n", $1, $2, $3, $5, $6;
        }' | grep -vE '{{ exclude_fs | join("|") }}'
      register: disk_info
      changed_when: false
      failed_when: false

    - name: Display disk usage report
      debug:
        msg: |
          ===== Disk Usage Report for {{ inventory_hostname }} =====
          Status  Filesystem      Size    Used    Use%    Mountpoint
          ---------------------------------------------------------
          {{ disk_info.stdout }}

          {% if disk_info.stderr %}
          Errors encountered:
          {{ disk_info.stderr }}
          {% endif %}