Rundeck | py library

I have a Rundeck setup running in Linux and DB= mysql. I have all my code/scripts in py. So I thought it will be easy for me to do any maintenance on rundeck if I have a rundeck library in py. This Library will give me easy access to rundeck details that I need for doing my maintenance.

USE- CASE
The first maintenance I wanted to do on Rundeck is to delete the old executions as the db size was getting bigger and the logs on the machine was also occupying a lot of space. Since I have lots of recurring jobs, I thought it would be better to also have a maintenance job which can delete old executions after a specific no_of_days.

Please note that I have lots of library code (like logger, Cquery for mysql_query etc). You will have to replace those with your library function. If you dont have an equivalent library then you need to change the respective area in the code to make this work. If you find any difficulty, please let a comment and I will try to help you.


"""
This class defines rundeck methods using Rundeck DB. this is better than the rundeck API
---------------
change-history
---------------
1.0|21-oct-2015|vsubr|created

"""
__version__ = '1.0'

#-------  foxtel/AWS App related  --------
import foxconf.cfg    as cfg
import foxpy.lib.utils      as utl
from   foxpy.lib.logger  import MyLogger
from   foxpy.lib.commondb import CommonDbQuery as Cquery

RUNDECK_PROD_PROJECT_NAME = 'prodcution'
RUNDECK_EXEC_HISTORY_RETENTION_DAYS = 45


logger = MyLogger(pTag=1)
#-----------------------------------------------------------------------------------------------------------------------
class Rundeck(object):
    """
    :usage:
     r = Rundeck()
     #print r.get_project_names()
     for j in r.get_job_uuid_list(p_project='dev_vsubr'):
      print j
      #print r.get_job_exec_list(p_job_uuid='b26fa86c-6599-4d0f-aec3-63159e8b019c')
      print r.get_job_exec_detail(p_job_uuid='b26fa86c-6599-4d0f-aec3-63159e8b019c', p_exec_id ='7')

    """
    def __init__(self):
        self._o_mysql_qry = Cquery(cfg.RUNDECK_MYSQLDB_TAG)
#-----------------------------------------------------------------------------------------------------------------------
    def get_project_names(self):
        """
        :return: arr
                Array of project names
        """
        l_sql = "SELECT name FROM project"
        larr_project_names = self._o_mysql_qry.fun_fetchArrayOfOneValue( p_sql=l_sql)
        return larr_project_names
#-----------------------------------------------------------------------------------------------------------------------
    def get_job_list(self, p_project):
        """
        :param p_project:
        :return: arrd
            array of dict with job attributes
        """
        l_sql = " select a.workflow_id, id, uuid, version, description " \
                "  from scheduled_execution a " \
                " where  project = '%s'" % ( p_project)

        larrd_jobs = self._o_mysql_qry.fun_fetchall( p_sql=l_sql)
        return larrd_jobs
#-----------------------------------------------------------------------------------------------------------------------
    def get_job_details(self, p_project, p_job_uuid):
        """
        :param p_project:
        :return: dict
            dict with job attributes
        """
        larrd_jobs = self.get_job_list(p_project=p_project)

        for d_job in larrd_jobs:
            if d_job['uuid'] ==  p_job_uuid:
                return d_job
            #end if
        #end if
        return {}
#-----------------------------------------------------------------------------------------------------------------------
    def get_job_uuid_list(self, p_project):

        larrd_jobs = self.get_job_list(p_project=p_project)
        logger.log(larrd_jobs)
        larr_jobs_uuid = [d_job['uuid'] for d_job in larrd_jobs]
        return larr_jobs_uuid
#-----------------------------------------------------------------------------------------------------------------------
    def get_job_exec_list(self, p_job_uuid):
        """
        :param p_job_uuid:
        :return: arrd
        """
        larrd_job_exec = []
        l_sql = """select id exec_id, scheduled_execution_id, workflow_id, version, cancelled, outputfilepath, status,
                          date_started, date_completed
                     from rundeck.execution a
                    where (project,scheduled_execution_id) = ( select project, id
                                                                from  scheduled_execution
                                                               where uuid = '%s')""" %( p_job_uuid)

        larrd_job_exec = self._o_mysql_qry.fun_fetchall( p_sql=l_sql)
        return larrd_job_exec
#-----------------------------------------------------------------------------------------------------------------------
    def get_job_exec_detail(self, p_job_uuid, p_exec_id):
        """
        :param p_job_uuid:
        :param p_exec_id:
        :return: dict
        """
        larrd_exec_details = self.get_job_exec_list(p_job_uuid=p_job_uuid)
        logger.log('---')
        for d_exec_details in larrd_exec_details:
            if d_exec_details['exec_id'] == p_exec_id:
                return d_exec_details
            #end if
        #end for
        return {}
#-----------------------------------------------------------------------------------------------------------------------
    def delete_exec_logs(self, p_rdlog_path_filename):

        l_rdlog_path_filename = p_rdlog_path_filename
        l_state_path_filename = p_rdlog_path_filename.replace('.rdlog', '.state.json')

        logger.log("Deleting log files =%s , %s" %( l_rdlog_path_filename, l_state_path_filename))
        utl.remove_file_if_exists(p_src_path_filename=l_rdlog_path_filename)
        utl.remove_file_if_exists(p_src_path_filename=l_state_path_filename)
#-----------------------------------------------------------------------------------------------------------------------
    def delete_exec_history_above_ret_days(self):

        lo_mysql_qry = Cquery(cfg.RUNDECK_MYSQLDB_TAG)

        l_sql = """select e.id exec_id, e.scheduled_execution_id, e.workflow_id, e.version,
                          e.date_completed, e.date_started, e.outputfilepath, e.status,
                          se.job_name, se.description, se.uuid
                     from execution e
                          LEFT OUTER JOIN scheduled_execution se on( e.scheduled_execution_id = se.id)
                    where e.project = '%s'
                      /*and se.job_name in('send_email_frm_email_queue.py','process_filesdb.py')*/
                      and e.date_completed <= DATE_ADD(now(),INTERVAL -%s DAY)
                 order by se.uuid, e.scheduled_execution_id, e.date_completed""" %\
                                        (RUNDECK_PROD_PROJECT_NAME, RUNDECK_EXEC_HISTORY_RETENTION_DAYS)

        l_del_cnt = 0
        ld_result = {}
        l_prv_job_name  = None
        l_job_name      = None
        for d_rowx in lo_mysql_qry.fun_fetchone_iter(p_sql=l_sql):

            l_exec_id       = d_rowx['exec_id']
            l_job_name      = d_rowx['job_name']
            l_uuid          = d_rowx['uuid']
            l_rdlog_path_filename =  d_rowx['outputfilepath']

            if l_prv_job_name and d_rowx['job_name'] != l_prv_job_name:

                l_del_cnt_prv = ld_result.get(l_prv_job_name)
                if not l_del_cnt_prv:
                    l_del_cnt_prv = 0
                #end if
                ld_result[l_prv_job_name] = l_del_cnt + l_del_cnt_prv
                l_del_cnt = 0
            #end if

            l_del_cnt += 1

            logger.info("---- %s. Deleting exex_id=%s | uuid=%s | job_name=%s"%(l_del_cnt,l_exec_id,l_uuid,l_job_name))

            logger.log("Deleting log file=%s" %( l_rdlog_path_filename))
            self.delete_exec_logs( p_rdlog_path_filename=l_rdlog_path_filename)

            logger.log("Deleting executions using exec_id=%s" %( l_exec_id))
            lo_mysql_qry.executeQuery("DELETE FROM execution WHERE id = %s" %( l_exec_id))

            logger.log("Deleting base_report using exec_id=%s" %( l_exec_id))
            lo_mysql_qry.executeQuery("DELETE FROM base_report WHERE jc_exec_id = %s" %( l_exec_id))

            l_prv_job_name = d_rowx['job_name']
        #end for

        if l_job_name:
            l_del_cnt_prv = ld_result.get(l_prv_job_name)
            if not l_del_cnt_prv:
                l_del_cnt_prv = 0
            #end if
            ld_result[l_prv_job_name] = l_del_cnt + l_del_cnt_prv
        #end if

        logger.info(ld_result)
#-----------------------------------------------------------------------------------------------------------------------
    def delete_orphan_job_history(self):
        pass

This is the bin code
   lo_rundeck = Rundeck()
   lo_rundeck.delete_exec_history_above_ret_days()

Comments

Popular posts from this blog

Tableau - Accessing Tableau's DB

Tableau : Convert ESRI shapes into Tableau Format

Tableau: Convert Oracle Spatial Data into Tableau Format