1.获取数据
python version 2.7
假设我们要获取的Jenkins job名字为test_flow,该job触发了另外两个Jenkins job
test1 和test2 job.
我们要获取test_flow job的运行时间,状态,number,jobname.
我们把功能性函数写在一个文件中 Build class类中,方便主函数调用
python-jenkins模块
build.py
import
jenkins
import
ssl
import
re
import
datetime
ssl
.
_create_default_https_context
=
ssl
.
_create_unverified_context
#build 类主要获取Jenkins job参数
class
Build
:
def
__init__
(
self
,
jobname
,
url
,
number
)
:
self
.
jobname
=
jobname
self
.
number
=
number
self
.
jenkins_url
=
url
self
.
jenkins_server
=
jenkins
.
Jenkins
(
self
.
jenkins_url
)
self
.
build_console
=
''
def
_getConsoleFromJenkins
(
self
)
:
self
.
build_console
=
self
.
jenkins_server
.
get_build_console_output
(
self
.
jobname
,
self
.
number
)
return
self
.
build_console
def
get_next_job_build_number
(
self
,
name
)
:
number
=
0
status
=
"Running"
if
self
.
build_console
==
''
:
self
.
_getConsoleFromJenkins
(
)
pattern
=
re
.
compile
(
r
'Starting building: '
+
str
(
name
)
+
' #(.*)'
)
match
=
re
.
search
(
pattern
,
self
.
build_console
,
0
)
if
match
:
numberstr
=
match
.
group
(
1
)
number
=
int
(
numberstr
)
status
=
"Completed"
return
{
"job"
:
name
,
"number"
:
number
,
"status"
:
status
}
def
_setAttrFromJenkins
(
self
)
:
try
:
build_info
=
self
.
jenkins_server
.
get_build_info
(
self
.
jobname
,
self
.
number
)
except
:
return
False
self
.
href
=
build_info
[
'url'
]
self
.
duration
=
build_info
[
'duration'
]
self
.
build_time
=
build_info
[
'timestamp'
]
if
build_info
[
'building'
]
:
self
.
status
=
"Running"
else
:
self
.
status
=
build_info
[
'result'
]
return
True
#获取jenkins job的各种信息然后组装到字典attr中
def
attr_to_dict
(
self
)
:
attr
=
dict
(
)
if
self
.
_setAttrFromJenkins
(
)
==
False
:
return
None
attr
[
'job'
]
=
self
.
jobname
attr
[
'number'
]
=
self
.
number
attr
[
'href'
]
=
self
.
href
attr
[
'duration'
]
=
int
(
self
.
duration
/
1000
)
attr
[
'status'
]
=
self
.
status
attr
[
'timestamp'
]
=
self
.
build_time
# cet time zone
attr
[
'build_time'
]
=
datetime
.
datetime
.
utcfromtimestamp
(
float
(
self
.
build_time
)
/
1000
+
3600
)
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
if
self
.
status
!=
"Running"
:
attr
[
'finish_time'
]
=
datetime
.
datetime
.
utcfromtimestamp
(
float
(
self
.
build_time
)
/
1000
+
3600
+
float
(
self
.
duration
)
/
1000
)
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
return
attr
class
Job
:
def
__init__
(
self
,
jobname
,
url
)
:
self
.
jenkins_job
=
jobname
self
.
jenkins_url
=
url
self
.
jenkins_server
=
jenkins
.
Jenkins
(
self
.
jenkins_url
)
#获取最后完成的number
def
get_last_completed
(
self
)
:
job_info
=
self
.
jenkins_server
.
get_job_info
(
self
.
jenkins_job
)
if
job_info
is
None
:
print
(
"yes it is none \n"
)
return
0
return
int
(
job_info
[
'lastCompletedBuild'
]
[
'number'
]
)
#获取jenkins job的已经运行的number队列
def
get_build_list
(
self
)
:
build
=
self
.
jenkins_server
.
get_job_info
(
self
.
jenkins_job
)
if
build
is
None
:
print
(
"yes it is none \n"
)
return
build
[
'builds'
]
2.es数据库的创建
获取的数据需要存储到es数据库中,这里用到的是Elasticsearch6.01版本
es.py
from
elasticsearch
import
Elasticsearch
class
Elastic
:
def
__init__
(
self
,
host
,
index
)
:
self
.
index
=
index
self
.
es
=
Elasticsearch
(
host
)
def
exists
(
self
)
:
return
self
.
es
.
indices
.
exists
(
self
.
index
)
def
create_index
(
self
)
:
build_mapping
=
{
'properties'
:
{
'timestamp'
:
{
'type'
:
'date'
}
,
}
}
create_index_body
=
{
'setting'
:
{
'number_of_shards'
:
1
,
'number_of_replicas'
:
0
}
,
'mappings'
:
{
'_doc'
:
{
'properties'
:
{
'job'
:
{
'type'
:
'keyword'
}
,
'timestamp'
:
{
'type'
:
'date'
}
,
'status'
:
{
'type'
:
'keyword'
}
,
'item'
:
{
'type'
:
'object'
,
'properties'
:
{
'test1'
:
build_mapping
,
}
}
}
}
}
}
try
:
self
.
es
.
indices
.
create
(
index
=
self
.
index
,
body
=
create_index_body
)
except
TransportError
as
e
:
if
e
.
error
==
'index_already_exists_exception'
:
pass
else
:
raise
def
save
(
self
,
body
)
:
try
:
self
.
es
.
index
(
self
.
index
,
doc_type
=
'_doc'
,
body
=
body
)
except
:
print
(
"save error !!!"
)
def
update
(
self
,
_id
,
body
)
:
try
:
self
.
es
.
index
(
self
.
index
,
doc_type
=
'_doc'
,
id
=
_id
,
body
=
body
)
except
:
print
(
"update error !!!"
)
3.主函数
getdata.py
#!/usr/local/bin/python
import
re
import
sys
import
argparse
from
elasticsearch
import
Elasticsearch
from
module
.
es
import
*
from
module
.
build
import
*
job_name
=
test_flow
jenkins_url
=
http
:
//
jenkins
.
test
.
com
/
jenkins
job
=
Job
(
job_name
,
jenkins_url
)
doc
=
dict
(
)
testjob
=
Build
(
job_name
,
jenkins_url
,
number
)
#建立对象
doc
=
testjob
.
attr_to_dict
(
)
#获取job参数存到字典中
es
.
save
(
doc
)
#把数据存储到es数据库中