# $Id$ # a simple infogami maintenance bot helper class import cookielib, urllib2, urllib, mimetools import os, sys, time import htmlload VERSION = "0.5" DEFAULT_USER_AGENT = ( "infogamibot/%s python/%s (effbot.org)" % (VERSION, sys.version.split()[0]) ) ET = htmlload.ET # urllib2 stuff to build a cookie-aware loader with access to the jar # (it's not just the word "jar" that feels a bit javaish here...) jar = cookielib.CookieJar() handler = urllib2.HTTPCookieProcessor(jar) opener = urllib2.build_opener(handler) urllib2.install_opener(opener) class Result(dict): def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(key) class FileWrapper: def __init__(self, file): self.file = file self.headers = mimetools.Message(file) def __getattr__(self, attr): return getattr(self.file, attr) ## # Creates an Bot instance for a given site. # # @param site Site name. This can either be an infogami URL, a # site name, or a "username:password@site" specifier. If a # specifier is given, the {@link login} method is called to # set the login cookie. # @param cache Cache expiry time. If given, the bot will fetch pages # from the cache if they've been inspected within the given number # of seconds. If omitted, the cache is disabled. # @param cachedir Cache directory. If omitted, defaults to the site's # hostname, in the current directory (e.g. "./pytut.infogami.com"). class Bot: ## # User agent string. user_agent = DEFAULT_USER_AGENT ## # Login status. A true value if logged in, a false value # otherwise. If the value is None, the status is unknown (either # because you haven't logged in, or because the last login attempt # returned an unclear status). logged_in = None def __init__(self, site, cache=0, cachedir=None): self.cache = cache self.cachedir = cachedir if site.startswith("http://"): self.site = site.rstrip("/") else: username = password = None if "@" in site: username, site = site.rsplit("@", 1) if ":" in username: username, password = username.split(":", 1) self.site = "http://%s.infogami.com" % site if username: if not password: import getpass password = getpass.getpass( "Enter password for %s@%s: " % (username, site) ) self.login(username, password) ## # Opens the given page as a raw HTTP stream. # # @param page Page path, relative to the server root. # @param data Data to post to the server. If given, the client # issues a POST request instead of a GET request, using the # given data as the payload. def open_raw_page(self, page, data=None, extra_headers=None): if not data and self.cache: file, mtime = self.open_cache_page(page) if file and mtime >= time.time() - self.cache: return file else: file = None url = self.site + "/" + page.lstrip("/") headers = { "user-agent": self.user_agent, } if extra_headers: headers.update(extra_headers) http_file = urllib2.urlopen(urllib2.Request(url, data, headers)) if file: try: last_modified = http_file.headers["last-modified"] if file.headers.get("last-modified") == last_modified: # print "---", page, "NOT CHANGED" http_file.close() return file except KeyError: pass file.close() return http_file ## # (Internal) Opens the given page from the cache, if present. def open_cache_page(self, page, mode="r", suffix=None): url = self.site + "/" + page.lstrip("/") if url.startswith("http://"): url = url[7:] dir, url = url.split("/", 1) if self.cachedir: dir = self.cachedir if mode == "w" and not os.path.isdir(dir): os.makedirs(dir) url = urllib.quote(url, "") file = os.path.join(dir, url) + ".cache" if suffix: file = file + suffix try: file = open(file, mode) mtime = os.fstat(file.fileno()).st_mtime if mode == "r": file = FileWrapper(file) return file, mtime except IOError: return None, None ## # Fetches the given page as a parsed HTML element structure. # # @param page Page path, relative to the server root. # @param data Data to post to the server. If given, the client # issues a POST request instead of a GET request, using the # given data as the payload. # @return An element tree. def get_html(self, page, data=None, extra_headers=None): file = self.open_raw_page(page, data, extra_headers) # hackhackhack! should use elementtidy (or somesuch) if available f = open("temp.html", "wb") f.write(file.read()) f.close() file.close() tree = htmlload.load("temp.html", encoding="utf-8") tree.headers = file.headers if self.cache and not data: file, mtime = self.open_cache_page(page, "w", ".tmp") if not file: return tree file.write(str(tree.headers)) file.write("\n") tree.write(file) file.close() name, e = os.path.splitext(file.name) if e != ".tmp": return tree try: os.remove(name) except OSError, v: pass # print "***", v try: os.rename(file.name, name) except OSError, v: pass # print "***", v else: pass # print "---", page, "UPDATED" return tree ## # Posts a form to the server, and returns the response as a parsed # HTML element structure. # # @def post_html(page, **options) # @param path Page path, relative to the server root. # @keyparam options Form values. def post_html(self, __page, **options): return self.get_html(__page, urllib.urlencode(options)) ## # Logs in to the server. # # @param username Infogami user name. # @param password Password. # @param path Optional path for the returned page. If omitted, # the site's front page is returned. def login(self, username, password, path="/"): html = self.post_html("/_account/in", path=path, username=username, password=password, what="Log In" ) self.logged_in = None # unknown for elem in html.getiterator(): if elem.get("class") == "login": # simple check: if we get a non-hidden input link, assume # we're not logged in if elem.tag == "input" and elem.get("type") != "hidden": self.logged_in = False break elif elem.tag == "p": if elem.text and elem.text.startswith("logged in as"): self.logged_in = True break return self.logged_in ## # Logs out from the server. # # @param path Optional path for the returned page. If omitted, # the site's front page is returned. def logout(self, path="/"): return self.post_html("/_account/out", path=path) ## # Gets the Markdown source code for a given page. You must be # logged in as a user with edit permissions to use this function. # # @param page Page path, relative to the server root. The path # should start with a slash. # @param revision Optional revision number. If omitted, the # source for the current revision is returned. # @return A result object with title and body attributes. The # title and the body may both be None, if they could not be # found. def get_source(self, page, revision=None): extra = "" if revision is not None: extra = "?r=" + str(revision) page = self.get_html("/_edit/" + page.lstrip("/") + extra) result = Result(title=None, body=None, headers=page.headers) for elem in page.getiterator(): if elem.tag == "input" and elem.get("name") == "title": result.title = elem.get("value") elif elem.tag == "textarea" and elem.get("name") == "body": result.body = elem.text return result ## # Gets the rendered body for a given page. # # @param page Page path, relative to the server root. The path # should start with a slash. # @param revision Optional revision number. If omitted, the # body for the current revision is returned. # @return A result object with title and body attributes, # containing element trees. The title and the body may both # be None, if not found. def get_body(self, page, revision=None): extra = "" if revision is not None: extra = "?r=" + str(revision) page = self.get_html("/" + page.lstrip("/") + extra) result = Result(page=page, title=None, body=None, headers=page.headers) for elem in page.getiterator(): if elem.get("id") != "content": continue # process content tag for e in elem: if e.tag in ("h1", "h2") and result.title is None: result.title = ET.Element("title") result.title.text = e.text elif e.get("class") == "page": result.body = ET.Element("body") for e in e: if e.get("class") != "dateline": result.body.append(e) break return result ## # Gets the revision history for a given page. # # @return A list of (revision, age, user, host) tuples. def get_history(self, page): page = self.get_html("/_history/" + page.lstrip("/")) for elem in page.getiterator("table"): if elem.get("id") == "main": break else: return None table = elem.find(".//table") if not table: return None result = [] for tr in table.findall("tr"): if len(tr) == 5: try: user, host = tr[4].text.split() except ValueError: user = "anonymous" host = tr[4].text.strip() result.append((tr[0].findtext("a"), tr[3].text, user, host)) return result ## # Gets a list of pages from an index page. # # @param index_page Index page to parse. If omitted, this defaults # to the site index. # @return A list of (href, title) tuples. def get_index(self, index_page="/_special/index"): tree = self.get_html(index_page) index = [] for elem in tree.findall(".//td"): if elem.get("id") == "body": for e in elem.getiterator(): if e.get("class") == "dateline": e.clear() for a in elem.findall(".//a"): if a.get("href") != "/": index.append((a.get("href"), a.text)) return index ## # Updates a page. # # @return None if the page hasn't changed, an element structure # if it was updated. def update(self, page, title, body): try: cache = self.cache self.cache = 0 # disable caching result = self.get_source(page) finally: self.cache = cache if title is None: title = result.title if body is None: body = result.body def same(a, b): # ignore whitespace return (a or "").strip() == (b or "").strip() if same(title, result.title) and same(body, result.body): return None # no update return self.post_html( "/_edit/" + page.strip("/"), title=title, body=body ) ## # (Debugging) Print a list of cookies. def dump_cookies(self): for cookie in jar: print >>sys.stderr, repr(cookie) ## # Helper to "escape" literal strings for markdown. This is incomplete. def escape(text): text = text.replace("_", "\\_") text = text.replace("*", "\\*") # add more stuff here return text