Source code for libka.tritemiolib

#!/usr/bin/python3
# pylint: disable=line-too-long
# kate: space-indent on; indent-width 4; replace-tabs on; indent-mode python; remove-trailing-space modified;
# vim: expandtab ts=4
# pylint: enable=line-too-long

############################################################################
#   Copyright © 2018 José Manuel Santamaría Lema <panfaust@gmail.com>      #
#                                                                          #
#   This program is free software; you can redistribute it and/or modify   #
#   it under the terms of the GNU General Public License as published by   #
#   the Free Software Foundation; either version 2 of the License, or      #
#   (at your option) any later version.                                    #
############################################################################

"""
This module provides various functions to operate with tritemio's wannabuild
and reprepro setup.
"""


#pylint: disable=too-many-arguments,too-many-locals

import subprocess
import time
import sys

from libka.ka_configuration import get_ka_config_parser
from libka.ka_data_utils import read_packages_file
from libka.ka_print import ka_print_plain
from libka.ka_print import ka_print_error

[docs] def tritemio_exec_ssh(user, host, command, retry_number=10, retry_delay=3, exit_on_error=True): """Executes the given ssh command at tritemio""" #pylint: disable=line-too-long #FIXME: If we don't have the host 'tritemio' in ssh's known_hosts the # subprocess.check_output(ssh_command) below is probably going to # hang. Maybe we should add it to known_hosts in advance just in # case? see: # https://stackoverflow.com/questions/13597115/how-to-add-hostname-to-known-hosts-using-python #pylint: enable=line-too-long ssh_command = ['ssh', '%s@%s' % (user, host)] + command while retry_number > 0: retry_number -= 1 try: output_bytes = subprocess.check_output(ssh_command, stderr=subprocess.STDOUT) retry_number = 0 #exit the loop as soon as the ssh command is successful except subprocess.CalledProcessError as exception: if retry_number == 0: ka_print_error(str(exception)) ka_print_plain(exception.output.decode().strip()) if exit_on_error: sys.exit(1) else: return None else: time.sleep(retry_delay) ssh_reply = output_bytes.decode("utf-8").strip() return ssh_reply
[docs] def tritemio_exec_reprepro(command, retry_number=10, retry_delay=3, exit_on_error=True): """Executes the given reprepro command at tritemio""" #Load configuration ka_config = get_ka_config_parser() #Read configuration variables reprepro_host = ka_config['tritemio repository']['host_repo_fqdn'] reprepro_user = ka_config['tritemio repository']['host_ssh_user'] reprepro_dir = ka_config['tritemio repository']['host_repo_dir'] #Prepare ssh command command = ['reprepro', '-b', reprepro_dir] + command #Execute ssh command reprepro_reply = tritemio_exec_ssh(reprepro_user, reprepro_host, command, retry_number, retry_delay, exit_on_error) #Return result return reprepro_reply
[docs] def tritemio_list_all_srcs(dist): """Returns a list of all source package names available in a given tritemio distribution""" reprepro_command = ['-A', 'source', 'list', dist] reprepro_reply = tritemio_exec_reprepro(reprepro_command) reprepro_reply = reprepro_reply.splitlines() src_pkg_list = [] for line in reprepro_reply: #Find out the package name from a line like this: #ubuntu-exp3|main|source: umbrello 4:17.12.3-0ubuntu1+tritemio1 src_pkg_name = line.split(' ')[1] #Add the source package name to the final list src_pkg_list.append(src_pkg_name) #Return the list with all the source package names return src_pkg_list
[docs] def tritemio_pkglist(release_type, dist): """ Finds out a package list for a given tritemio distribution, for instance, tritemio_pkglist('other','ubuntu-exp3') would give you a list of the source pakcages which are available in 'ubuntu-exp3' and are not part of qt/frameworks/plasma/applications """ pkg_list = [] real_release_types = ['qt', 'frameworks', 'plasma', 'applications'] if release_type in real_release_types: pkg_list = read_packages_file('tritemio/package-name-lists/%s-%s' % (release_type, dist)) elif release_type == 'all': return tritemio_list_all_srcs(dist) elif release_type == 'other': #Build a list with all source package names available in the given dist all_srcs_list = tritemio_list_all_srcs(dist) #Build a set with all source package names available for all real release types real_rt_list = [] for real_release_type in real_release_types: real_rt_list += read_packages_file('tritemio/package-name-lists/%s-%s' % (real_release_type, dist)) real_rt_set = set(real_rt_list) #Inspeact all the source packages and if they are not part of a real release type, they #are suposed to belong to the virtual 'other' release type. for src_pkg in all_srcs_list: if src_pkg not in real_rt_set: pkg_list.append(src_pkg) else: raise ValueError('Wrong release type: %s' % release_type) return pkg_list
[docs] def tritemio_ls(package_name, arch='any'): """Returns the output of reprepro ls at tritemio""" if arch == 'any': reprepro_command = ['ls', package_name] else: reprepro_command = ['-A', arch, 'ls', package_name] #Get the package information at tritemio's repository reprepro_reply = tritemio_exec_reprepro(reprepro_command) return reprepro_reply
[docs] def tritemio_copysrc(origin_dist, destination_dist, src_package_name): """Copy the given source package from origin_dist to destination_dist""" return tritemio_exec_reprepro( ['copysrc', destination_dist, origin_dist, src_package_name], exit_on_error=False)
[docs] def tritemio_removesrc(dist, src_package_name): """Removes given package from given distribution""" return tritemio_exec_reprepro(['removesrc', dist, src_package_name], exit_on_error=False)
[docs] def tritemio_bin_nmu(dist, arch, src_package_name, changelog_entry='Rebuild.'): """Performs a binNMU of the given package""" #Load configuration ka_config = get_ka_config_parser() #Read configuration variables wannabuild_ssh_user = ka_config['tritemio database']['host_ssh_user'] wannabuild_ssh_host = ka_config['tritemio database']['host_ssh_fqdn'] #Get the package information from wannabuild info_command = ['wanna-build', '-d', dist, '-A', arch, '--info', src_package_name] ssh_info_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host, info_command, exit_on_error=False) #Abort in the wanna-build command failed if ssh_info_reply is None: return '' #Parse the ssh info reply in order to get a dictionary with fields and values ssh_info_reply_dict = dict() ssh_info_lines = ssh_info_reply.splitlines() for line in ssh_info_lines: split_line = line.split(':', 1) field_name = split_line[0].strip() field_value = split_line[1].strip() ssh_info_reply_dict[field_name] = field_value #Find out the current version if 'Version' not in ssh_info_reply_dict: #Abort if we couldn't find info for the given package ka_print_error('Package %s not found' % src_package_name) return '' src_package_version = ssh_info_reply_dict['Version'] #Find out the current binNMU number and increase it bin_nmu_number = 0 if 'Binary-NMU-Version' in ssh_info_reply_dict: bin_nmu_number = int(ssh_info_reply_dict['Binary-NMU-Version']) bin_nmu_number += 1 #Exec the wannabuild binNMU command and return its output bin_nmu_command = ['wanna-build', '-d', dist, '-A', arch, '--binNMU', str(bin_nmu_number), '--message', changelog_entry, '%s_%s' % (src_package_name, src_package_version)] ssh_bin_nmu_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host, bin_nmu_command, exit_on_error=True) return ssh_bin_nmu_reply
[docs] def tritemio_give_back(dist, arch, src_package_name): """Performs a give-back of the given package""" #Load configuration ka_config = get_ka_config_parser() #Read configuration variables wannabuild_ssh_user = ka_config['tritemio database']['host_ssh_user'] wannabuild_ssh_host = ka_config['tritemio database']['host_ssh_fqdn'] #Get the package information from wannabuild info_command = ['wanna-build', '-d', dist, '-A', arch, '--info', src_package_name] ssh_info_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host, info_command, exit_on_error=False) #Abort in the wanna-build command failed if ssh_info_reply is None: return '' #Parse the ssh info reply in order to get a dictionary with fields and values ssh_info_reply_dict = dict() ssh_info_lines = ssh_info_reply.splitlines() for line in ssh_info_lines: split_line = line.split(':', 1) field_name = split_line[0].strip() field_value = split_line[1].strip() ssh_info_reply_dict[field_name] = field_value #Find out the current version if 'Version' not in ssh_info_reply_dict: #Abort if we couldn't find info for the given package ka_print_error('Package %s not found' % src_package_name) return '' src_package_version = ssh_info_reply_dict['Version'] #Exec the wannabuild binNMU command and return its output bin_nmu_command = ['wanna-build', '-d', dist, '-A', arch, '--give-back', '%s_%s' % (src_package_name, src_package_version)] ssh_bin_give_back_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host, bin_nmu_command, exit_on_error=True) return ssh_bin_give_back_reply