您的位置:首页 > 数码常识数码常识

检测端口连通性测试命令(python测试端口连通)

2025-05-12人已围观

检测端口连通性测试命令(python测试端口连通)
  # -*- coding: utf-8 -*-#!/bin/env python#AUTHOR:karl#DATE:2018-1-19#VERSION:V1.0######################import timeimport osimport paramikoimport datetimeimport sysimport MySQLdbimport threading ########################################首次插入数据是REG=0,更新数据REG为其他#######################################REG=1private_key=paramiko.RSAKey.from_private_key_file('/home/appdeploy/.ssh/id_rsa')def Data_mysql(info):    try:        string=info        Pip=string.split(" ")[0]        Sip=string.split(" ")[1]        Dip=string.split(" ")[2]        Port=string.split(" ")[3]        Result=string.split(" ")[4].replace('n','')        con=MySQLdb.connect(host='localhost',user="root",passwd="******",db="zabbix",port=3306,charset="utf8")        dt=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")        cursor=con.cursor()        valuse=(dt,Result,Pip,Sip,Dip,Port)        if REG==0:            sql="""INSERT INTO Balant_telnet (create_time,telnet_result,telnet_physicalip,telnet_sourceip,telnet_desip,telnet_port) VALUES (%s,%s,%s,%s,%s,%s) """            result=cursor.execute(sql,valuse)        else:            sql="""UPDATE Balant_telnet set create_time='%s',telnet_result=%s where telnet_physicalip='%s' and telnet_sourceip='%s' and telnet_desip='%s' and telnet_port='%s';""" %(valuse)            result=cursor.execute(sql)        cursor.close()        con.commit()        con.close()    except MySQLdb.Error, e:        print "Error %d: %s" % (e.args[0], e.args[1])        sys.exit(1)def check_port(value,content):    ager={}    ager[value]=content    ssh=paramiko.SSHClient()    try:        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())        ssh.connect(value,port=22,username='appdeploy',pkey=private_key,timeout=5)        cmd="sh /home/appdeploy/monitor.sh {value}".format(value=ager[ager.keys()[index]])         stdin,stdout,stderr=ssh.exec_command(cmd)        for file_d in stdout.readlines():              Data_mysql(file_d)    except:        result=cmd+','+'failed'+'n'    ssh.close()#################################### 对配置文件进行参数解析###################################def get_parameter():    ager={}    value=[]    with open("monitor_config") as context:        for line in context:            while not "," in list(line):                host=line.strip()[1:-1]                value=[]                break            line=line.strip().split(",")            value=value+line            ager[str(host)]=value    return agerif __name__=='__main__':    threads=[]    parm=get_parameter()    files=range(len(parm.keys()))    for index,content in parm.items():        t=threading.Thread(target=check_port,args=(index,content))        threads.append(t)    for index in files:        threads[index].start()    for index in files:        threads[index].join()配置文件:

  python测试端口连通

  [10.117.194.23]

  10.117.194.77,10.116.41.82,9920

  10.117.194.77,10.116.47.12,1080

  10.117.194.77,10.116.45.56,1081

  [10.117.194.24]

  10.117.194.78,10.116.41.82,9920

  10.117.194.78,10.116.47.12,1080

  10.117.194.78,10.116.45.56,1081

  10.117.194.78,10.116.145.33,8001

  【】里面的IP 是物理IP,下面分别对应着应用的源IP,目标IP和测试端口号

  agent 直接用shell 实现.

  数据库中效果

  在zabbix 中 加入对表中数据的监控。在zabbix 的mysql 模板中加入Balant 的监控项,也可以加入图像,我们最后的结果是要展示在grafana 上进行展示。为1就说明应用端口是正常运行。

  上面就是小居数码小编今天给大家介绍的关于(python测试端口连通)的全部内容,希望可以帮助到你,想了解更多关于数码知识的问题,欢迎关注我们,并收藏,转发,分享。

  94%的朋友还想知道的:

  软件测试常用的性能测试指标有哪些(软件性能测试有哪些性能指标)

  linux80端口被占用了怎么办(linux80端口被占用怎么解决)

  软件测试的基本知识(软件测试过程中最基础性的测试)

  app测试使用的工具有哪些(移动应用测试工具有哪些)



  154622
 

很赞哦! ()

随机图文