Source code for ghia.common

import asyncio
import configparser
from concurrent.futures.thread import ThreadPoolExecutor
import threading

import click
import re

from ghia.github import GitHub


[docs]class PrinterObserver:
[docs] @staticmethod def issue(owner, repo, issue): number, url = issue['number'], issue['html_url'] identifier = click.style(f'{owner}/{repo}#{number}', bold=True) click.echo(f'-> {identifier} ({url})')
[docs] @staticmethod def assignees(old, new): mi = click.style('-', fg='red', bold=True) pl = click.style('+', fg='green', bold=True) eq = click.style('=', fg='blue', bold=True) assignees = list(set(old).union(set(new))) assignees.sort(key=lambda a: a.lower()) for assignee in assignees: sign = eq if assignee not in old: sign = pl elif assignee not in new: sign = mi click.echo(f' {sign} {assignee}')
[docs] @staticmethod def fallbacked(label, added=True, enabled=True): if enabled: prefix = click.style('FALLBACK', fg='yellow', bold=True) click.echo(' ', nl=False) message = 'added label' if added else 'already has label' click.echo(f'{prefix}: {message} "{label}"')
[docs] @staticmethod def error(message, of_issue=False): prefix = click.style('ERROR', bold=True, fg='red') if of_issue: click.echo(' ', nl=False, err=True) click.echo(f'{prefix}: {message}', err=True)
def _strategy_append(found, old): return old + [a for a in found if a not in old] def _strategy_set(found, old): return found if len(old) == 0 else old def _strategy_change(found, old): return found def _match_title(pattern, issue): """ :param pattern: :param issue: :return: [True] if issue contains the pattern >>> _match_text('aaa',{'title': 'aaa','body': "bbb"}) >>> _match_text('aaa',{'title': 'bbb','body': "bbb"}) """ return re.search(pattern, issue['title'], re.IGNORECASE) def _match_text(pattern, issue): """ :param pattern: :param issue: :return: [True] if issue contains the pattern >>> _match_text('.*',{'body': 'aaa'}) <re.Match object; span=(0, 3), match='aaa'> >>> _match_text('aaa',{'body': "bbb"}) """ return re.search(pattern, issue['body'], re.IGNORECASE) def _match_label(pattern, issue): """ :param pattern: :param issue: :return: [True] if issue contains the pattern >>> _match_label('aaa', {'labels': [{'name': 'aaa'}]}) True >>> _match_label('aaa', {'labels': [{'name': 'bbb'}]}) False """ return any(re.search(pattern, label['name'], re.IGNORECASE) for label in issue['labels']) def _match_any(*args): """ :param pattern: :param issue: :return: [True] if issue contains the pattern >>> _match_any('aaa',{'labels': [{'name': 'aaa'}],'body': 'bbb','title': 'bbb'}) True >>> _match_any('aa',{'labels': [{'name': 'bbb'}],'body': 'bbb','title': 'bbb'}) False >>> _match_any('aa',{'labels': [{'name': 'bbb'}],'body': 'aa','title': 'bbb'}) <re.Match object; span=(0, 2), match='aa'> >>> _match_any('aa',{'labels': [{'name': 'bbb'}],'body': 'bb','title': 'aa'}) <re.Match object; span=(0, 2), match='aa'> """ return _match_title(*args) or _match_text(*args) or _match_label(*args)
[docs]class GHIA: STRATEGIES = { 'append': _strategy_append, 'set': _strategy_set, 'change': _strategy_change } DEFAULT_STRATEGY = 'append' ENVVAR_STRATEGY = 'GHIA_STRATEGY' ENVVAR_DRYRUN = 'GHIA_DRYRUN' ENVVAR_CONFIG = 'GHIA_CONFIG' MATCHERS = { 'any': _match_any, 'text': _match_text, 'title': _match_title, 'label': _match_label } def __init__(self, token, rules, fallback_label, dry_run, ghia_strategy, is_async=False): self.github = GitHub(token, is_async=is_async) self.rules = rules self.fallback_label = fallback_label self.fallback_enabled = fallback_label is not None self.real_run = not dry_run self.strategy = self.STRATEGIES[ghia_strategy] self.observers = dict() self.is_async = is_async self.print_lock = threading.Lock()
[docs] def add_observer(self, name, observer): self.observers[name] = observer
[docs] def remove_observer(self, name): del self.observers[name]
[docs] def call_observers(self, method, *args, **kwargs): for observer in self.observers.values(): getattr(observer, method)(*args, **kwargs)
@classmethod def _matches_pattern(cls, pattern, issue): t, p = pattern.split(':', 1) return cls.MATCHERS[t](p, issue) @classmethod def _matches(cls, patterns, issue): return any(cls._matches_pattern(pattern, issue) for pattern in patterns) def _find_assignees(self, issue): return [username for username, patterns in self.rules.items() if self._matches(patterns, issue) ] def _make_new_assignees(self, found, old): return self.strategy(found, old) def _update_assignees(self, owner, repo, issue, assignees): if self.real_run: if self.is_async: self.github.set_issue_assignees_async(owner, repo, issue['number'], assignees) else: self.github.set_issue_assignees(owner, repo, issue['number'], assignees) def _update_labels(self, owner, repo, issue, labels): if self.real_run: if self.is_async: self.github.set_issue_labels_async(owner, repo, issue['number'], labels) else: self.github.set_issue_labels(owner, repo, issue['number'], labels) def _create_fallback_label(self, owner, repo, issue): if self.fallback_label is None: return False labels = [label['name'] for label in issue['labels']] if self.fallback_label not in labels: labels.append(self.fallback_label) self._update_labels(owner, repo, issue, labels) return True else: return False
[docs] def run_issue(self, owner, repo, issue): self.call_observers('issue', owner, repo, issue) found_assignees = self._find_assignees(issue) old_assignees = [assignee['login'] for assignee in issue['assignees']] new_assignees = self.strategy(found_assignees, old_assignees) if old_assignees != new_assignees: # there is a change self._update_assignees(owner, repo, issue, new_assignees) self.call_observers('assignees', old_assignees, new_assignees) applied = False if len(new_assignees) == 0: # noone is assigned now applied = self._create_fallback_label(owner, repo, issue) with self.print_lock: self.call_observers('issue', owner, repo, issue) self.call_observers('assignees', old_assignees, new_assignees) if(len(new_assignees) == 0): self.call_observers('fallbacked', self.fallback_label,applied,self.fallback_enabled )
[docs] def run(self, slugs): if self.is_async: with ThreadPoolExecutor(max_workers=6, thread_name_prefix='GHIA')\ as executor: for owner, repo in slugs: executor.submit(self.run_inner, owner, repo) executor.shutdown(wait=True) print("done") else: for owner, repo in slugs: self.run_inner(owner, repo)
[docs] def run_inner(self, owner, repo): try: issues = self.github.issues(owner, repo) except Exception as e: print(str(e)) self.call_observers('error', f'Could not list issues ' f'for repository {owner}/{repo}') exit(10) return for issue in issues: try: self.run_issue(owner, repo, issue) except Exception as e: number = issue['number'] print(str(e)) self.call_observers('error', f'Could not update issue ' f'{owner}/{repo}#{number}', True)
[docs]def parse_rules(cfg): """ Parse labels to dict where label is key and list of patterns is corresponding value cfg: ConfigParser with loaded configuration of labels """ patterns = { username: list(filter(None, cfg['patterns'][username].splitlines())) for username in cfg['patterns'] } fallback = cfg.get('fallback', 'label', fallback=None) for user_patterns in patterns.values(): for pattern in user_patterns: t, p = pattern.split(':', 1) assert t in GHIA.MATCHERS.keys() return patterns, fallback
[docs]def get_rules(ctx, param, config_rules): """ Extract labels from labels config and do the checks config_rules: ConfigParser with loaded configuration of labels """ try: cfg_rules = configparser.ConfigParser() cfg_rules.optionxform = str cfg_rules.read_file(config_rules) return parse_rules(cfg_rules) except Exception: raise click.BadParameter('incorrect configuration format')
[docs]def get_token(ctx, param, config_auth): """ Extract token from auth config and do the checks config_auth: ConfigParser with loaded configuration of auth """ try: cfg_auth = configparser.ConfigParser() cfg_auth.read_file(config_auth) return cfg_auth.get('github', 'token') except Exception: raise click.BadParameter('incorrect configuration format')