Source code for plugins.gitrepo
from plugins.git.repo import Repo, Head, Commit
from plugins.git.errors import *
[docs]class GitCommit (Commit) :
[docs] def get_age (self) :
"""return relative time since the commit date.
note: If you want the absolute time, use self.committed_date instead"""
age = self.repo.git.show('--format="%ar"', self.sha)
return age.splitlines()[0].strip('"')
[docs] def get_tag(self) :
"""Check if commit correspond to a tag.
return Tag or None"""
for tag in self.repo.tags :
if tag.commit.sha == self.sha :
return tag
return None
[docs] def get_branch(self) :
"""Check if commit correspond to a head of branch
return Branch or None"""
for branch in self.repo.branches :
if branch.commit.sha == self.sha :
return branch
for remote in self.repo.remotes :
for branch in remote.refs :
if branch.remote_head == 'HEAD' :
continue
if branch.commit.sha == self.sha :
return branch
return None
[docs] def get_parent_branch(self) :
"""Return branch if commit is on head or a parent branch"""
branch = self.get_branch ()
if branch :
return branch
for branch in self.repo.branches :
for commit in self.repo.iter_commits (branch) :
if commit.sha == self.sha :
return branch
for remote in self.repo.remotes :
for branch in remote.refs :
if branch.remote_head == 'HEAD' :
continue
for commit in self.repo.iter_commits (branch) :
if commit.sha == self.sha :
return branch
return None
[docs]class GitRepoError(Exception) :
"""
Base Class for GitRepo Exception
"""
[docs]class GitNoBranchError(GitRepoError) :
"""
Thrown when head is in strange state (Not in any branch ?).
(Can be happened when rebasing for example)
"""
[docs]class GitRepo :
"""Git tools on top of GitPython"""
def __init__ (self) :
self._repo = None
self.refresh ()
[docs] def refresh (self) :
path = self.get_path () # child class must implement get_path()
if path is None :
self._repo = None
else :
try :
self._repo = Repo (path)
except (NoSuchPathError, InvalidGitRepositoryError) :
return
if not len (self._repo.heads) :
# no branch = only init'ed -> cannot work with it
self._repo = None
[docs] def get_current_branch (self) :
if self._repo is None :
return None
if self._repo.head.is_detached :
return None
return self._repo.active_branch
[docs] def get_branches (self) :
if self._repo is None :
return []
# TODO: return remotes ?
return self._repo.branches
[docs] def get_tracked_branch (self) :
if not self._repo.remotes :
return None
# TODO : get_tracked_branch (can be different of first one)
return 'remotes/%s/%s' % (self._repo.remotes[0].name, self.get_current_branch().name)
[docs] def is_tagged (self, branch=None) :
return len (self.get_head_tags (branch)) > 0
[docs] def get_latest_tag (self, branch=None) :
latest_tags = self.get_latest_tags (branch)
if not latest_tags :
return None
return latest_tags [0] # choose the first if many
[docs] def set_tag (self, tag) :
if self._repo :
self._repo.create_tag (tag)
[docs] def get_ref_branch (self, ref) :
"""return branch which contains ref"""
if ref.get ('type') == 'branch' :
return self._repo.branches [ref.get ('name')]
commit = ref.get ('commit')
if not commit :
return None
commit = GitCommit (self._repo, commit.sha)
return commit.get_parent_branch ()
[docs] def get_branch_refs (self, ref) :
"""return ordered tags and branch head for branch which contains ref"""
branch = self.get_ref_branch (ref)
if not branch :
return None
tags = self.get_tags ([branch])
tags.sort (key=lambda x: x.name)
refs = [ {'commit': tag.commit, 'name': tag.name, 'type': 'tag'} for tag in tags ]
if not self.is_tagged (branch) :
refs.append ({'commit': branch.commit, 'name': branch.name, 'type': 'branch'})
return refs
[docs] def start_branch (self, branch) :
raise NotImplementedError ("TODO: start_branch")
[docs] def get_current_commit (self) :
"""return SHA for HEAD"""
if self._repo is None :
return None
return GitCommit (self._repo, self._repo.head.commit.sha)
[docs] def get_current_ref (self) :
"""return dict {commit, name, type} for HEAD"""
if self._repo is None :
return {}
head = self.get_current_commit ()
if head.get_tag() :
return {'commit' : GitCommit (self._repo, head.sha), 'name' : head.get_tag().__str__(), 'type' : 'tag'}
elif head.get_branch() :
return {'commit' : GitCommit (self._repo, head.sha), 'name' : head.get_branch().__str__(), 'type' : 'branch'}
# Last commit is inevitably in a branch, or something is wrong..
raise GitNoBranchError('Cannot recognize head state')
[docs] def has_changes (self) :
if self._repo is None :
return False
return self._repo.is_dirty ()
[docs] def is_updated (self) :
raise NotImplementedError ("TODO: is_updated")
[docs] def is_published (self) :
if self._repo is None:
return False
current_branch = self.get_current_branch ()
if current_branch is None :
# TODO: handle correctly detached state (case of a tag)
if self.get_current_commit ().get_tag () :
return True
return False
remote_branch = self.get_tracked_branch ()
if remote_branch is None :
return False
if len (self._repo.git.cherry (remote_branch, current_branch)) :
return False
return True
[docs] def publish (self) :
# TODO: handle error correctly
try :
self._repo.git.push ()
except GitCommandError :
pass
[docs] def checkout (self, ref) :
#ref.checkout () # this checkout doesn't allow to checkout a tag
print "checkout to %s" % ref
# TODO: check if it is a remote branch
# TODO: check if a local branch track this remote
# TODO: create local branch
self._repo.git.checkout (ref)
[docs] def is_writeable (self) :
if self._repo is None :
return False
return not self._repo.head.is_detached
[docs] def commit (self, comment, amend=False) :
if not self.has_changes () :
return
if amend :
arg = "--amend"
else :
arg = "-m%s" % comment
self._repo.git.commit ('-a', arg)
[docs] def get_commit_message(self, ref=None, short=False) :
"""Return the last commit message or None if ref doesn't exist.
If short is enable, just return the summary message"""
if self._repo is None:
return None
commit = (ref.commit if ref else self._repo.head.commit)
return (commit.summary if short else commit.message)