Introduction

This is developer documentation for Dexy. It is far from complete but is under active development.

The sources for this documentation are part of the main dexy repository which is http://github.com/dexy/dexy

Please visit http://dexy.it/docs/ for a list of all available documentation.

Data and Storage

Data objects capture the state of Dexy documents at a given stage in filter processing, and Storage objects handle persistence of data objects. A Dexy document will have a data object corresponding to its initial state (the original file to be processed), plus a data object representing the state after each filter is processed. Each data object has a corresponding storage object. Filter implementations will call methods on data classes to append and save data, and to read data from the previous stage in processing or other document inputs.

For example, here’s the process method from the ExampleProcessMethod class, which reads input from the input_data object, representing the previous data state, and saving the modified data using the set_data method of the output_data object:

    def process(self):
        output = "Dexy processed the text '%s'" % self.input_data
        self.output_data.set_data(output)

Different data classes can expose methods which users and filters can access to write or read data in useful ways, and different storage classes can implement different storage options.

Data Types

The main role of data types is to provide convenient interfaces to the type of data being stored.

The basic data class is Generic which is designed to store binary data or unstructured text data. In this case the _data attribute contains the raw binary data or text.

def test_generic_data_stores_string():
    with wrap() as wrapper:
        doc = Doc("hello.txt",
                wrapper,
                [],
                contents="hello"
                )

        wrapper.run_docs(doc)
        data = doc.output_data()

        assert data.alias == 'generic'
        assert data._data == "hello"

The next class is Sectioned which holds ordered, named sections. These are usually named sections in a document. When data is loaded in memory, the _data attribute holds a list of dictionaries. The first entry contains metadata for the document, the subsequent dictionaries contain each section’s content and metadata.

def test_sectioned_data_stores_list_of_dicts():
    with wrap() as wrapper:
        contents=[
                {},
                {
                    "name" : "Welcome",
                    "contents" : "This is the first section."
                }
            ]

        doc = Doc("hello.txt",
                wrapper,
                [],
                data_type="sectioned",
                contents=contents
                )

        wrapper.run_docs(doc)
        data = doc.output_data()

        assert data.alias == 'sectioned'
        assert data._data == contents
        assert data['Welcome']['contents'] == "This is the first section."
        assert data[0]['contents'] == "This is the first section."

Another class is KeyValue which holds dictionary-type data. Unlike sectioned data, there is no requirement for the entries to be kept in order, and there is no arbitrary metadata for each section. There are several options for storage including JSON data files and Sqlite databases.

The KeyValue type cannot be initialized with a dictionary. Instead an empty data object needs to be initialized and then the append method must be used to add keys and values. This ensures that the behavior of the data type is the same whether the backend is a JSON file or a sqlite database.

def test_keyvalue_data_stores_dict():
    with wrap() as wrapper:
        doc = Doc("hello.json",
                wrapper,
                [],
                data_type="keyvalue",
                contents="dummy contents"
                )

        wrapper.run_docs(doc)
        data = doc.output_data()

        assert data.alias == 'keyvalue'
        assert data.keys() == []

        data.append("foo", 123)
        data.append("bar", 456)

        assert sorted(data.keys()) == ["bar", "foo"]

Additional classes are available which are intended to provide custom interfaces to data. For example the BeautifulSoupData type makes it easy to apply the BeautifulSoup HTML parser to content, and so in documents you can run queries using CSS selectors to extract a part of the document you are interested in.

Creating a plugin with a custom data type is a possible alternative to writing a filter. With a filter, the results of processing a whole document are cached. With a custom data type, you can run code on just a piece of a document you are interested in, but the results will not be cached, the calculation will be re-run each time the template document is processed. Using custom data types mean you put more logic in your template documents.

Accessing Data Objects

Within a filter, you can access the previous step’s output (this step’s input) using the input_data attribute.

The filter output is stored in the output_data attribute.

The data Method

In some classes, you can directly access stored data via the data() method.

    def data(self):
        if (not self._data) or self._data == [{}]:
            self.load_data()
        return self._data

Usually you will not call data() directly but will call a method designed to work with the stored data in a transparent way, like str(), keys(), or iter().

Different data types in Dexy will implement common methods such as str(), keys() and iter() appropriately for the type of data they are wrapping.

Where it is meaningless to access the data() object, the method should raise an Exception, as for the KeyValue data type:

    def data(self):
        raise Exception("No data method for KeyValue type data.")

Loading and Reading Data

Data will typically be loaded automatically when needed by calling the data() method, directly or indirectly. Other methods which access data should do so via the data() method. If necessary, the data() method calls the load_data() method, which in turn calls the storage’s read_data() since knowledge of where the data is stored and how to load it is the responsibility of the Storage object.

Here’s the load_data() method from the base Data class:

    def load_data(self, this=None):
        try:
            self._data = self.storage.read_data()
        except IOError:
            msg = "no data in file '%s' for %s (wrapper state '%s', data state '%s')"
            msgargs = (self.storage.data_file(), self.key,
                    self.wrapper.state, self.state)
            raise dexy.exceptions.InternalDexyProblem(msg % msgargs)

String-Like Access

Where it makes sense, the __unicode__ method should return the contents of a document as a unicode string.

    def __unicode__(self):
        if isinstance(self.data(), unicode):
            return self.data()
        elif not self.data():
            return unicode(None)
        else:
            return self.wrapper.decode_encoded(self.data())
    def __unicode__(self):
        return u"\n".join(unicode(v) for v in self.values() if unicode(v))

The str method will convert results of __unicode__ to an instance of str type:

    def __str__(self):
        return unicode(self).encode("utf-8", errors="strict")

Dictionary-Like Access

Data types should support dictionary-style access where appropriate.

Although the Generic type does not have sections, it implements dictionary-style access as though it consists of a single section with name "1". This way filters can be written which can process either Generic or Sectioned data as input data.

TODO: Example of filter which makes use of Generic’s dummy sections. Pyg filter maybe?

The iteritems method should yield key, value tuples.

Generic:

    def iteritems(self):
        """
        Iterable list of sections in document.
        """
        yield ('1', self.data())

Sectioned:

    def iteritems(self):
        """
        Iterable list of sections in document.
        """
        keys = self.keys()
        values = self.values()
        for i in range(len(keys)):
            yield (keys[i], values[i])

KeyValue:

    def iteritems(self):
        """
        Iterable list of available keys.
        """
        return self.storage.iteritems()

Sqlite3KeyValueStorage:

    def iteritems(self):
        self._cursor.execute("SELECT key, value from kvstore")
        for k in self._cursor.fetchall():
            yield (unicode(k[0]), k[1])

JsonKeyValueStorage:

    def iteritems(self):
        return self.data().iteritems()

The items method should return a list of key, value tuples:

Generic:

    def items(self):
        """
        List of sections in document.
        """
        return [('1', self.data(),)]

Sectioned:

    def items(self):
        return [(key, value) for (key, value) in self.iteritems()]

KeyValue:

    def items(self):
        """
        List of available keys.
        """
        return self.storage.items()

Sqlite3KeyValueStorage:

    def items(self):
        return [(key, value) for (key, value) in self.iteritems()]

JsonKeyValueStorage:

    def items(self):
        return self.data().items()

The keys method should return a list of keys:

Generic:

    def keys(self):
        """
        List of keys (section names) in document.
        """
        return ['1']

Sectioned:

    def keys(self):
        return [a['name'] for a in self.data()[1:]]

KeyValue:

    def keys(self):
        return self.storage.keys()

Sqlite3KeyValueStorage:

    def keys(self):
        self._cursor.execute("SELECT key from kvstore")
        return [unicode(k[0]) for k in self._cursor.fetchall()]

JsonKeyValueStorage:

    def keys(self):
        return self.data().keys()

To allow accessing elements using the [key] syntax, the __getitem__ method is implemented:

Generic:

    def __getitem__(self, key):
        if key == '1':
            return self.data()
        else:
            try:
                return self.data()[key]
            except TypeError:
                if self.ext == '.json':
                    return self.from_json()[key]
                else:
                    raise

Sectioned:

    def __getitem__(self, key):
        try:
            return self.data()[key+1]
        except TypeError:
            return self.value(key)

KeyValue:

    def __getitem__(self, key):
        return self.value(key)

Sqlite3KeyValueStorage value method:

    def value(self, key):
        self._cursor.execute("SELECT value from kvstore where key = ?", (key,))
        row = self._cursor.fetchone()
        if not row:
            raise Exception("No value found for key '%s'" % key)
        else:
            return row[0]

JsonKeyValueStorage value method:

    def value(self, key):
        return self.data()[key]

The __setitem__ method can also be implemented, to allow appending and setting elements using the [key] syntax:

    def __setitem__(self, key, value):
        keyindex = self.keyindex(key)
        if keyindex >= 0:
            # Existing section.
            assert self._data[keyindex+1]['name'] == key
            self._data[keyindex+1]['contents'] = value
        else:
            # New section.
            section_dict = {"name" : key, "contents" : value}
            self._data.append(section_dict)

And the __delitem__ method allows using the del keyword to remove elements:

    def __delitem__(self, key):
        index = self.keyindex(key)
        self.data().pop(index+1)

def test_sectioned_data_setitem_delitem():
    with wrap() as wrapper:
        contents=[
                {},
                {
                    "name" : "Welcome",
                    "contents" : "This is the first section."
                }
            ]

        doc = Doc("hello.txt",
                wrapper,
                [],
                data_type="sectioned",
                contents=contents
                )

        wrapper.run_docs(doc)
        data = doc.output_data()

        assert data.alias == 'sectioned'
        assert len(data) == 1

        # Add a new section
        data["Conclusions"] = "This is the final section."

        assert len(data) == 2

        assert unicode(data['Welcome']) == "This is the first section."
        assert unicode(data["Conclusions"]) == "This is the final section."

        # Modify an existing section
        data["Welcome"] = "This is the initial section."

        assert len(data) == 2

        assert unicode(data['Welcome']) == "This is the initial section."
        assert unicode(data["Conclusions"]) == "This is the final section."

        del data["Conclusions"]

        assert len(data) == 1
        assert data.keys() == ["Welcome"]

Desired Feature: Implement setitem for KeyValue data

Allow appending and setting elements using [key] = value syntax.

  • Updated at: 2013-11-20T01:32:25Z

  • Assigned to: None

  • Milestone: None

Desired Feature: Implement delitem for KeyValue data

Allow removing elements using del dict[key] syntax.

  • Updated at: 2013-11-20T01:37:44Z

  • Assigned to: None

  • Milestone: None

Custom Access Methods

Dictionary-style methods represent a generic interface which works for many situations, and so these methods are implemented across several data types.

Some data types will have additional methods for searching or formatting data. These can be seen using the dexy datas command with -alias argument.

The KeyValue data type implements a like method for using sqlite like queries:

    def like(self, key):
        try:
            return self.storage.like(key)
        except AttributeError:
            msg = "The `like()` method is not implemented for storage type '%s'"
            msgargs = self.storage.alias
            raise dexy.exceptions.UserFeedback(msg % msgargs)

Here’s the corresponding method on the Sqlite3KeyValueStorage class:

    def like(self, key):
        self._cursor.execute("SELECT value from kvstore where key LIKE ?", (key,))
        row = self._cursor.fetchone()
        if not row:
            raise Exception("No value found for key '%s'" % key)
        else:
            return row[0]

Saving Data

The save() method must be called to persist data to disk.

The set_data() shortcut method is also available, it sets data to the provided value and then calls save.

    def set_data(self, data):
        """
        Shortcut to set and save data.
        """
        self._data = data
        self.save()

Here’s the implementation for Generic:

    def save(self):
        if isinstance(self._data, unicode):
            self.storage.write_data(self._data.encode("utf-8"))
        else:
            if self._data == None:
                msg = "No data found for '%s', did you reference a file that doesn't exist?"
                raise dexy.exceptions.UserFeedback(msg % self.key)
            self.storage.write_data(self._data)

And here’s the implementations for Sectioned:

    def save(self):
        try:
            self.storage.write_data(self._data)
        except Exception as e:
            msg = "Problem saving '%s': %s" % (self.key, str(e))
            raise dexy.exceptions.InternalDexyProblem(msg)

And here’s the implementation for KeyValue:

    def save(self):
        try:
            self.storage.persist()
        except Exception as e:
            msg = u"Problem saving '%s': %s" % (self.key, unicode(e))
            raise dexy.exceptions.InternalDexyProblem(msg)
    def persist(self):
        if self.connected_to == 'existing':
            assert os.path.exists(self.data_file(read=False))
        elif self.connected_to == 'working':
            self.assert_location_is_in_project_dir(self.data_file(read=False))
            self._storage.commit()
            shutil.copyfile(self.working_file(), self.data_file(read=False))
        else:
            msg = "Unexpected 'connected_to' value %s"
            msgargs = self.connected_to
            raise InternalDexyProblem(msg % msgargs)

Here’s the persist method for Sqlite3KeyValueStorage:

    def persist(self):
        if self.connected_to == 'existing':
            assert os.path.exists(self.data_file(read=False))
        elif self.connected_to == 'working':
            self.assert_location_is_in_project_dir(self.data_file(read=False))
            self._storage.commit()
            shutil.copyfile(self.working_file(), self.data_file(read=False))
        else:
            msg = "Unexpected 'connected_to' value %s"
            msgargs = self.connected_to
            raise InternalDexyProblem(msg % msgargs)

And for JsonKeyValueStorage:

    def persist(self):
        self.write_data(self._data)

Exporting Data

The output_to_file method writes data to a file. For Generic data this will just be the raw data which may either be binary data or a string:

    def __unicode__(self):
        if isinstance(self.data(), unicode):
            return self.data()
        elif not self.data():
            return unicode(None)
        else:
            return self.wrapper.decode_encoded(self.data())

For Sectioned data, this writes the unicode-formatted data, so all sections are combined into a single document, which is usually what you want for final/canonical output at the end of processing:

    def __unicode__(self):
        return u"\n".join(unicode(v) for v in self.values() if unicode(v))

There’s no concept of a canonical output for KeyValue data, it’s intended for use in providing data to other documents.

Initializing Data Objects

Data objects do some initialization work in __init__:

    def __init__(self, key, ext, storage_key, settings, wrapper):
        self.key = key
        self.ext = ext
        self.storage_key = storage_key

        self.wrapper = wrapper
        self.initialize_settings(**settings)
        self.update_settings(settings)

        self._data = None
        self.state = None
        self.name = self.setting('canonical-name')
        if not self.name:
            msg = "Document must provide canonical-name setting to data."
            raise InternalDexyProblem(msg)
        elif self.name.startswith("("):
            raise Exception()

        self.transition('new')

And more in setup:

    def setup(self):
        self.setup_storage()
        self.transition('ready')

The setup method can be customized but it should always call the setup_storage method and transition the state to ready:

    def setup_storage(self):
        storage_type = self.storage_class_alias(self.ext)
        instanceargs = (self.storage_key, self.ext, self.wrapper,)
        self.storage = dexy.storage.Storage.create_instance(storage_type, *instanceargs)

        self.storage.assert_location_is_in_project_dir(self.name)

        if self.output_name():
            self.storage.assert_location_is_in_project_dir(self.output_name())

        self.storage.setup()

The storage_class_alias method is responsible for choosing the correct type of storage to use:

    def storage_class_alias(self, file_ext):
        return self.setting('storage-type')

By default this just reads the storage-type setting, but some classes may automatically determine the class based on file extension.

Reconstituting Data Objects

Data objects are designed to be instantiated as standalone objects after a dexy run, so their data can be used for reporting and querying. The dexy grep command works by loading data objects directly.

To do this, the arguments needed to initialize a data object are stored in batch metadata.

The args_to_data_init method is called when saving batch metadata to provide JSON-serializable initialization args for each data instance:

    def args_to_data_init(self):
        """
        Returns tuple of attributes to pass to create_instance.
        """
        return (self.alias, self.key, self.ext, self.storage_key, self.setting_values())

batch objects can then recreate data objects:

    def data(self, doc_key, input_or_output='output'):
        """
        Retrieves a data object given the doc key.
        """
        doc_info = self.doc_info(doc_key)["%s-data" % input_or_output]
        args = list(doc_info)
        args.append(self.wrapper)
        data = dexy.data.Data.create_instance(*args)
        data.setup_storage()
        if hasattr(data.storage, 'connect'):
            data.storage.connect()
        return data

Storage Objects

Storage objects are primarily concerned with reading and writing data to the correct location on the file system or other form of storage, but sometimes they will handle queries and other methods in order to provide user transparency at the Data object leveel.

Filters

Wrapper & Batches

Parsers

Reporters

Website Reporter

The Website reporter publishes the same files as the Output reporter, but also provides utilities designed for websites, such as applying templates to HTML files and providing data to help users construct site navigation.

It doesn’t make sense to generate a website unless all dexy files are generated, so the website reporter doesn’t run if a specific target is chosen (many links would likely be broken too):

    def run(self, wrapper):
        self.wrapper=wrapper
        self.setup()

        if self.wrapper.target:
            msg = "Not running website reporter because a target has been specified."
            self.log_warn(msg)
            return

        for doc in wrapper.nodes.values():
            if self.should_process(doc):
                self.process_doc(doc)

        self.log_debug("finished")

The setup method initializes some instance variables we will use later:

    def setup(self):
        self.keys_to_outfiles = []
        self.locations = {}
        self.create_reports_dir()
        self.setup_navobj()

As we iterate over all the processed docs, the should_process method determines whether they are properly formed Doc objects which are "canonical":

    def should_process(self, doc):
        if not doc.key_with_class() in self.wrapper.batch.docs:
            return False
        elif not doc.state in ('ran', 'consolidated'):
            return False
        elif not hasattr(doc, 'output_data'):
            return False
        elif not doc.output_data().output_name():
            return False
        elif not doc.output_data().is_canonical_output():
            msg = "skipping %s - not canonical"
            self.log_debug(msg % doc.key)
            return False
        else:
            return True

Then process_doc runs the code which writes the document, with any templates applied, to the report output directory:

    def process_doc(self, doc):
        self.log_debug("processing %s" % doc.key)

        output_ext = doc.output_data().ext

        if output_ext == ".html":
            self.process_html(doc)

        elif isinstance(doc.output_data(), dexy.data.Sectioned):
            assert output_ext == ".json"
            self.apply_and_render_template(doc)

        else:
            self.write_canonical_data(doc)

In the case of a HTML file, we may want to apply a template. This will depend on whether the document already appears to have a HTML header, and the value of the ws-template setting which may be a boolean or the name of a template to apply.

    def process_html(self, doc):
        if doc.setting('ws-template') == False:
            self.log_debug("  ws-template is False for %s" % doc.key)
            self.write_canonical_data(doc)

        elif self.detect_html_header(doc) and not doc.setting('ws-template'):
            self.log_debug("  found html tag in output of %s" % doc.key)
            self.write_canonical_data(doc)

        else:
            self.apply_and_render_template(doc)

The header check is rather rudimentary:

    def detect_html_header(self, doc):
        fragments = ('<html', '<body', '<head')
        return any(html_fragment
                      in unicode(doc.output_data())
                      for html_fragment in fragments)

The write_canonical_data method is inherited from the Output class, and is used when a document is not a HTML file or a setting determines that HTML templates should not be applied:

    def write_canonical_data(self, doc):
        output_name = doc.output_data().output_name()

        if output_name:
            fp = os.path.join(self.setting('dir'), output_name)

            if fp in self.locations:
                self.log_warn("WARNING overwriting file %s" % fp)
            else:
                self.locations[fp] = []
            self.locations[fp].append(doc.key)

            parent_dir = os.path.dirname(fp)
            try:
                os.makedirs(parent_dir)
            except os.error:
                pass

            self.log_debug("  writing %s to %s" % (doc.key, fp))

            doc.output_data().output_to_file(fp)

In the other cases, the apply_and_render_template method is used:

    def apply_and_render_template(self, doc):
        template_info = self.template_file_and_path(doc)
        template_file, template_path = template_info
        env_data = self.template_environment(doc, template_path)

        self.log_debug("  creating jinja environment")
        env = self.jinja_environment(template_path)

        self.log_debug("  loading jinja template at %s" % template_path)
        template = env.get_template(template_path)

        output_file = self.fix_ext(doc.output_data().output_name())
        output_path = os.path.join(self.setting('dir'), output_file)

        try:
            os.makedirs(os.path.dirname(output_path))
        except os.error:
            pass

        self.log_debug("  writing to %s" % (output_path))
        template.stream(env_data).dump(output_path, encoding="utf-8")

Templates

Templates are ways of creating dexy examples.

Command Line Interface

The command line interface for dexy is driven by python-modargs.

init.py

Any function ending with _command is automatically exposed as a command. To keep modules to a manageable size, the various dexy commands are defined in different modules, but all are imported into dexy.commands in the __init__.py file so they can be available to modargs within a single module:

from dexy.commands.info import links_command
from dexy.commands.cite import cite_command
from dexy.commands.parsers import parsers_command
from dexy.commands.conf import conf_command
from dexy.commands.dirs import cleanup_command
from dexy.commands.dirs import reset_command
from dexy.commands.dirs import setup_command
from dexy.commands.env import env_command
from dexy.commands.env import datas_command
from dexy.commands.env import plugins_command
from dexy.commands.fcmds import fcmd_command
from dexy.commands.fcmds import fcmds_command
from dexy.commands.filters import filters_command
from dexy.commands.filters import filters_command as filter_command
from dexy.commands.grep import grep_command
from dexy.commands.info import info_command
from dexy.commands.it import dexy_command
from dexy.commands.it import it_command
from dexy.commands.it import targets_command
from dexy.commands.nodes import nodes_command
from dexy.commands.reporters import reporters_command
from dexy.commands.reporters import reporters_command as reports_command
from dexy.commands.serve import serve_command
from dexy.commands.templates import gen_command
from dexy.commands.templates import template_command
from dexy.commands.templates import templates_command

The run method in dexy.commands is listed in setup.py as a console script entry point:

        entry_points = {
            'console_scripts' : [
                'dexy = dexy.commands:run'
                ],
            'pygments.lexers' : [
                'rst+django = dexy.filters.utils:RstDjangoLexer'
                ]
            },

And this is the first method which will be called whenever a user enters a dexy command:

def run():
    capture_warnings()
    parse_and_run_cmd(*resolve_argv())

The capture_warnings method just hides irrelevant warning messages from dexy users:

def capture_warnings():
    """
    Capture deprecation messages and other irrelevant warnings in whatever way
    is appropriate to the dexy version.
    """
    if hasattr(logging, 'captureWarnings'):
        logging.captureWarnings(True)
    else:
        warnings.filterwarnings("ignore",category=Warning)

Dexy can load a lot of different libraries as it runs various filters, and deprecation messages and similar warnings are confusing and annoying for end users.

The resolve_argv method is called next:

def resolve_argv():
    """
    Do some processing of the user-provided arguments in argv before they go to
    modargs so we can support commands defined in plugins.
    """
    only_one_arg = (len(sys.argv) == 1)
    second_arg_is_known_cmd = not only_one_arg and \
        sys.argv[1] in args.available_commands(dexy_cmd_mod)
    second_arg_is_option = not only_one_arg and \
        sys.argv[1].startswith("-")

    if only_one_arg or second_arg_is_known_cmd or second_arg_is_option:
        return sys.argv[1:], dexy_cmd_mod, dexy_default_cmd

    else:
        cmd, subcmd, cmd_mod = resolve_plugin_cmd(sys.argv[1])
        default_cmd = cmd.default_cmd or cmd.namespace
        return [subcmd] + sys.argv[2:], cmd_mod, default_cmd

It’s possible for additional commands to be added to dexy via the plugin system. For example, the dexy-viewer plugin defines a ping command which can be called as follows:

$ dexy viewer:ping
pong
@patch.object(sys, 'argv', ['dexy', 'viewer:ping'])
@patch('sys.stdout', new_callable=StringIO)
def test_viewer_command(stdout):
    dexy.commands.run()
    assert "pong" in stdout.getvalue()

If necessary the resolve_argv command calls resolve_plugin_command to look in dexy plugins for the requested command:

def resolve_plugin_cmd(raw_command_name):
    """
    Take a command name like viewer:run and return the command method and
    module object.
    """
    if ":" in raw_command_name:
        alias, subcommand = raw_command_name.split(":")
    else:
        alias, subcommand = raw_command_name, ''

    try:
        cmd = dexy.plugin.Command.create_instance(alias)
    except cashew.exceptions.NoPlugin:
        msg = """No command '%s' available.
        Run `dexy help --all` to see list of available commands."""
        msgargs = (alias)
        sys.stderr.write(inspect.cleandoc(msg) % msgargs)
        sys.stderr.write(os.linesep)
        sys.exit(1)

    mod_name = cmd.__module__
    cmd_mod = args.load_module(mod_name)

    return cmd, subcommand, cmd_mod

Returning to the run command we started with:

def run():
    capture_warnings()
    parse_and_run_cmd(*resolve_argv())

The parsed arguments, module object and a default command are passed to parse_and_run_cmd which delegates to the modargs command of the same name, and wraps the call with error handling to provide nicer error messages if there’s a problem:

def parse_and_run_cmd(argv, module, default_command):
    try:
        args.parse_and_run_command(argv, module, default_command)
    except (dexy.exceptions.UserFeedback, cashew.exceptions.UserFeedback) as e:
        msg = u"""Oops, there's a problem running your command.
        Here is some more information:"""
        sys.stderr.write(inspect.cleandoc(msg))
        sys.stderr.write(os.linesep)

        err_msg = unicode(e)
        if err_msg:
            sys.stderr.write(u"'%s'" % unicode(e))
        else:
            sys.stderr.write(u"Sorry, can't get text of error message.")

        sys.stderr.write(os.linesep)
        sys.exit(1)

    except KeyboardInterrupt:
        sys.stderr.write("stopping...")
        sys.stderr.write(os.linesep)
        sys.exit(1)

The help and version commands are also defined in the __init__.py file:

def help_command(
        all=False, # List all available dexy commands (auto-generated).
        on=False # Get help on a particular dexy command.
    ):

    if all and not on:
        print ""
        args.help_command(prog, dexy_cmd_mod, dexy_default_cmd, on)
        print ""

    elif not on:
        print ""
        print "For help on the main `dexy` command, run `dexy help -on dexy`."
        print ""
        print "The dexy tool includes several different commands:"
        print "  `dexy help --all` lists all available commands"
        print "  `dexy help --on XXX` provides help on a specific command"
        print ""
        print "Commands for running dexy:"
        print "  `dexy` runs dexy"
        print "  `dexy setup` makes directories dexy needs"
        print "  `dexy cleanup` removes directories dexy has created"
        print "  `dexy reset` empties and resets dexy's working directories"
        print ""
        print "Commands which print lists of dexy features:"
        print "  `dexy filters` filters like |jinja |py |javac"
        print "  `dexy reports` reporters like `output` and `run`"
        print "  `dexy nodes` node types and their document settings"
        print "  `dexy datas` data types and available methods"
        print "  `dexy env` elements available in document templates"
        print ""
        print "Commands which print information about your project:"
        print "  (you need to be in the project dir and have run dexy already)"
        print "  `dexy grep` search for documents and keys in documents"
        print "  `dexy info` list metadata about a particular document"
        print "  `dexy targets` list target names you can run"
        print "  `dexy links` list all ways to refer to documents and sections"
        print ""
        print "Other commands:"
        print "  `dexy serve` start a local static web server to view generated docs"
        print "  `dexy help` you're reading it"
        print "  `dexy version` print the version of dexy software which is installed"
        print ""

    else:
        try:
            args.help_command(prog, dexy_cmd_mod, dexy_default_cmd, on)

        except KeyError:
            sys.stderr.write("Could not find help on '%s'." % on)
            sys.stderr.write(os.linesep)
            sys.exit(1)
def version_command():
    """
    Print the version number of dexy.
    """
    print "%s version %s" % (prog, DEXY_VERSION)

cite.py

The dexy cite command prints out a bibliographic citation for dexy.

def cite_command(
        fmt='bibtex' # desired format of citation
        ):
    """
    How to cite dexy in papers.
    """
    if fmt == 'bibtex':
        cite_bibtex()
    else:
        msg = "Don't know how to provide citation in '%s' format"
        raise dexy.exceptions.UserFeedback(msg % fmt)
$ dexy cite
@misc{Dexy,
    title = {Dexy: Reproducible Data Analysis and Document Automation Software, Version~1.0.0d},
    author = {{Nelson, Ana}},
    year = {2013},
    url = {http://www.dexy.it/},
    note = {http://orcid.org/0000-0003-2561-1564}
}

Currently the bibtex format is supported:

def bibtex_text():
    args = {
            'version' : DEXY_VERSION,
            'year' : datetime.date.today().year
            }

    return """@misc{Dexy,
    title = {Dexy: Reproducible Data Analysis and Document Automation Software, Version~%(version)s},
    author = {{Nelson, Ana}},
    year = {%(year)s},
    url = {http://www.dexy.it/},
    note = {http://orcid.org/0000-0003-2561-1564}
}""" % args

conf.py

The main dexy command has a lot of command line options, and for convenience you can save the option values in a dexy.conf file in your project so you don’t have to remember and type them all the time. The dexy conf command generates an example file for you containing all the default dexy options:

def conf_command(
        conf=defaults['config_file'], # name of config file to write to
        p=False # whether to print to stdout rather than write to file
        ):
    """
    Write a config file containing dexy's defaults.
    """
    if file_exists(conf) and not p:
        print inspect.cleandoc("""Config file %s already exists,
        will print conf to stdout instead...""" % conf)
        p = True

    config = default_config()

    # Can't specify config file name in config file.
    del config['conf']

    yaml_help = inspect.cleandoc("""# YAML config file for dexy.
        # You can delete any lines you don't wish to customize.
        # Options are same as command line options,
        # for more info run 'dexy help -on dexy'.
        """)

    if p:
        print yaml.dump(config, default_flow_style=False)
    else:
        with open(conf, "wb") as f:
            if conf.endswith(".yaml") or conf.endswith(".conf"):
                f.write(yaml_help)
                f.write(os.linesep)
                f.write(yaml.dump(config, default_flow_style=False))
            elif conf.endswith(".json"):
                json.dump(config, f, sort_keys=True, indent=4)
            else:
                msg = "Don't know how to write config file '%s'"
                raise dexy.exceptions.UserFeedback(msg % conf)

        print "Config file has been written to '%s'" % conf
$ dexy conf
Config file has been written to 'dexy.conf'
$ ls
commands.sh.txt  dexy.conf

If a config file already exists, or if you use the -p flag, then config options are written to stdout instead of written to a file.

@patch.object(sys, 'argv', ['dexy', 'conf'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command(stdout):
    with tempdir():
        dexy.commands.run()
        assert os.path.exists("dexy.conf")
        assert "has been written" in stdout.getvalue()

@patch.object(sys, 'argv', ['dexy', 'conf'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command_if_path_exists(stdout):
    with tempdir():
        with open("dexy.conf", "w") as f:
            f.write("foo")
        assert os.path.exists("dexy.conf")
        dexy.commands.run()
        assert "dexy.conf already exists" in stdout.getvalue()
        assert "artifactsdir" in stdout.getvalue()

@patch.object(sys, 'argv', ['dexy', 'conf', '-p'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command_with_print_option(stdout):
    with tempdir():
        dexy.commands.run()
        assert not os.path.exists("dexy.conf")
        assert "artifactsdir" in stdout.getvalue()

dirs.py

These commands expose methods for creating and removing dexy’s working directories.

The setup and cleanup command create and remove working directories respectively:

def setup_command(
        __cli_options=False,
        artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
        **kwargs):
    """
    Create the directories dexy needs to run.
    """
    wrapper = init_wrapper(locals())
    wrapper.create_dexy_dirs()
def cleanup_command(
        __cli_options=False,
        artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
        logdir=defaults['log_dir'], # DEPRECATED
        reports=True # Whether directories generated by reports should also be removed.
        ):
    """
    Remove the directories which dexy created, including working directories
    and reports.
    """
    wrapper = init_wrapper(locals())
    wrapper.remove_dexy_dirs()
    wrapper.remove_reports_dirs(reports)

The reset command cleans out any working files and leaves you with a fresh setup:

def reset_command(
        __cli_options=False,
        artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
        logdir=defaults['log_dir'] # DEPRECATED
        ):
    """
    Clean out the contents of dexy's cache and reports directories.
    """
    wrapper = init_wrapper(locals())
    wrapper.remove_dexy_dirs()
    wrapper.remove_reports_dirs(keep_empty_dir=True)
    wrapper.create_dexy_dirs()

fcmds.py

Filter commands are intended to be a way for filters to expose some information to users. For example, filter commands could provide a way for an API to list available methods.

def fcmds_command(
        alias=False # Only print commands defined by this alias.
        ):
    """
    Prints a list of available filter commands.
    """
    if alias:
        filter_instances = [dexy.filter.Filter.create_instance(alias)]
    else:
        filter_instances = dexy.filter.Filter

    for filter_instance in filter_instances:
        cmds = filter_instance.filter_commands()
        if cmds:
            print "filter alias:", filter_instance.alias
            for command_name in sorted(cmds):
                docs = inspect.getdoc(cmds[command_name])
                if docs:
                    doc = docs.splitlines()[0]
                    print "    %s   # %s" % (command_name, doc)
                else:
                    print "    %s" % command_name
            print ''

The fcmds_command lists filter commands:

$ dexy fcmds
filter alias: apis
    create_keyfile

filter alias: botoup
    create_keyfile

filter alias: htmlsections
    css   # Prints out CSS for the specified style.
    sty   # Prints out .sty file (latex) for the specified style.

filter alias: pyg
    css   # Prints out CSS for the specified style.
    sty   # Prints out .sty file (latex) for the specified style.

filter alias: wordpress
    create_keyfile   # Creates a key file for WordPress in the local directory.
    list_categories   # List available blog post categories.
    list_methods   # List API methods exposed by WordPress API.

$ dexy fcmds -alias pyg
filter alias: pyg
    css   # Prints out CSS for the specified style.
    sty   # Prints out .sty file (latex) for the specified style.

To run a filter command you need to pass the alias and the command name:

$ dexy fcmd -alias pyg -cmd css | head
.hll { background-color: #ffffcc }
.c { color: #408080; font-style: italic } /* Comment */
.err { border: 1px solid #FF0000 } /* Error */
.k { color: #008000; font-weight: bold } /* Keyword */
.o { color: #666666 } /* Operator */
.cm { color: #408080; font-style: italic } /* Comment.Multiline */
.cp { color: #BC7A00 } /* Comment.Preproc */
.c1 { color: #408080; font-style: italic } /* Comment.Single */
.cs { color: #408080; font-style: italic } /* Comment.Special */
.gd { color: #A00000 } /* Generic.Deleted */
def fcmd_command(
        alias=None, # The alias of the filter which defines the custom command
        cmd=None, # The name of the command to run
        **kwargs # Additional arguments to be passed to the command
        ):
    """
    Run a filter command.
    """
    filter_instance = dexy.filter.Filter.create_instance(alias)
    cmd_name = "docmd_%s" % cmd

    if not cmd_name in dir(filter_instance):
        msg = "%s is not a valid command. There is no method %s defined in %s"
        msgargs = (cmd, cmd_name, filter_instance.__class__.__name__)
        raise dexy.exceptions.UserFeedback(msg % msgargs)

    else:
        instance_method = getattr(filter_instance, cmd_name)
        # TODO use try/catch instead of inspect.ismethod
        if inspect.ismethod(instance_method):
            try:
                instance_method.__func__(filter_instance, **kwargs)
            except TypeError as e:
                print e.message
                print inspect.getargspec(instance_method.__func__)
                print inspect.getdoc(instance_method.__func__)
                raise

        else:
            msg = "expected %s to be an instance method of %s"
            msgargs = (cmd_name, filter_instance.__class__.__name__)
            raise dexy.exceptions.InternalDexyProblem(msg % msgargs)

filters.py

The filters module contains dexy’s command line reference for filters.

def filters_command(
        alias="", # Print docs for this filter.
        example=False, # Whether to run included examples (slower).
        nocolor=False, # Skip syntax highlighting if showing source code.
        source=False, # Print source code of filter.
        versions=False # Print the installed version of external software (slower).
        ):
    """
    Prints list of available filters or docs for a particular filter.
    """
    if alias:
        help_for_filter(alias, example, source, nocolor)
    else:
        list_filters(versions)
def help_for_filter(alias, run_example, show_source, nocolor):
    instance = dexy.filter.Filter.create_instance(alias)

    print ''
    print instance.setting('help')

    print ''
    print "aliases: %s" % ", ".join(instance.setting('aliases'))
    print "tags: %s" % ", ".join(instance.setting('tags'))
    print ''

    print "Converts from file formats:"
    for ext in instance.setting('input-extensions'):
        print "   %s" % ext
    print ''

    print "Converts to file formats:"
    for ext in instance.setting('output-extensions'):
        print "   %s" % ext
    print ''

    print('Settings:')
    for k in sorted(instance._instance_settings):
        if k in dexy.filter.Filter.nodoc_settings:
            continue
        if k in ('aliases', 'tags'):
            continue

        tup = instance._instance_settings[k]
        print "    %s" % k

        for line in inspect.cleandoc(tup[0]).splitlines():
            print "        %s" % line

        print "        default value: %s" % tup[1]
        print ''

    examples = instance.setting('examples')
    example_templates = {}
    for alias in examples:
        try:
            template_instance = dexy.template.Template.create_instance(alias)
            example_templates[alias] = template_instance
        except dexy.exceptions.InactivePlugin:
            pass

    if examples:
        print ''
        print "Examples for this filter:"
        for alias, template in example_templates.iteritems():
            print ''
            print "  %s" % alias
            print "            %s" % inspect.getdoc(template.__class__)

        if run_example:
            for alias, template in example_templates.iteritems():
                print ''
                print ''
                print "Running example: %s" % template.setting('help')
                print ''
                print ''
                print template_text(template)

    print ''
    print "For online docs see http://dexy.it/ref/filters/%s" % alias
    print ''
    print "If you have suggestions or feedback about this filter,"
    print "please contact info@dexy.it"
    print ''

    if show_source:
        print ''
        source_code = inspect.getsource(instance.__class__)
        if nocolor:
            print source_code
        else:
            formatter = pygments.formatters.TerminalFormatter()
            lexer = PythonLexer()
            print highlight(source_code, lexer, formatter)
def list_filters(versions):
        print "Installed filters:"
        for filter_instance in dexy.filter.Filter:
            # Should we show this filter?
            no_aliases = not filter_instance.setting('aliases')
            no_doc = filter_instance.setting('nodoc')
            not_dexy = not filter_instance.__class__.__module__.startswith("dexy.")
            exclude = filter_instance.alias in extra_nodoc_aliseas

            if no_aliases or no_doc or not_dexy or exclude:
                continue

            # generate version message
            if versions:
                if hasattr(filter_instance, 'version'):
                    version = filter_instance.version()
                    if version:
                        version_message = "Installed version: %s" % version
                    else:
                        msg = "'%s' failed, filter may not be available."
                        msgargs = filter_instance.version_command()
                        version_message = msg % msgargs
                else:
                    version_message = ""


            filter_help = "  " + filter_instance.alias + \
                    " : " + filter_instance.setting('help').splitlines()[0]

            if versions and version_message:
                filter_help += " %s" % version_message

            print filter_help

        print ''
        print "For more information about a particular filter,"
        print "use the -alias flag and specify the filter alias."
        print ''

grep.py

The grep interface is a way to search on the command line for dexy docs and keys within docs. (The dexy viewer plugin presents similar information in a web-based interface.)

def grep_command(
        __cli_options=False, # nodoc
        contents=False, # print out the contents of each matched file
        expr="", # An expression partially matching document name.
        key="", # An exact document key
        keyexpr="", # Only search for keys matching this expression
        keylimit=10, # Maximum number of matching keys to print
        keys=False, # List keys in documents
        limit=10, # Maximum number of matching documents to print
        lines=False, # maximum number of lines of content to print
        **kwargs
        ):
    """
    Search for documents and sections within documents.

    Dexy must have already run successfully.

    You can search for documents based on exact key or inexpect expression. The
    number of documents returned is controlled by --limit.

    You can print all keys in found documents by requesting --keys, number of
    results is controlled by --keylimit.

    You can search the section names/keys in found documents by passing a
    --keyexpr

    You can print contents of documents by requesting --contents, number of
    lines of content can be controlled by --lines.

    This does not search contents of documents, just document names and
    internal section names.
    """

    artifactsdir = kwargs.get('artifactsdir', defaults['artifacts_dir'])
    wrapper = init_wrapper(locals())
    batch = Batch.load_most_recent(wrapper)

    if not batch:
        print "you need to run dexy first"
        sys.exit(1)
    else:
        if expr:
            matches = sorted([data for data in batch if expr in data.key],
                    key=attrgetter('key'))
        elif key:
            matches = sorted([data for data in batch if key == data.key],
                    key=attrgetter('key'))
        else:
            raise dexy.exceptions.UserFeedback("Must specify either expr or key")

        n = len(matches)
        if n > limit:
            print "only printing first %s of %s total matches" % (limit, n)
            matches = matches[0:limit]

        for match in matches:
            print_match(match, keys, keyexpr, contents, keylimit, lines)

The grep command calls print_match for each match:

def print_match(match, keys, keyexpr, contents, keylimit, lines):
    print match.key, "\tcache key:", match.storage_key

    if hasattr(match, 'keys'):
        if keyexpr:
            print_keys([key for key in match.keys() if keyexpr in key], keylimit, lines)
        elif keys:
            print_keys(match.keys(), keylimit, lines)

    if contents:
        if isinstance(match, Sectioned):
            for section_name, section_contents in match.data().iteritems():
                print "  section: %s" % section_name
                print
                print_contents(section_contents, lines)
                print
        elif isinstance(match, KeyValue):
            pass
        elif isinstance(match, Generic):
            try:
                json.dumps(unicode(match))
                print_contents(unicode(match), lines)
            except UnicodeDecodeError:
                print "  not printable"

Where it reaches a document which has keys, it may also print the keys (depending on options):

def print_keys(pkeys, keylimit, lines):
    n = len(pkeys)
    if n > keylimit:
        pkeys = pkeys[0:keylimit]

    for key in pkeys:
        print '  ', key

    if n > keylimit:
        print "  only printed first %s of %s total keys" % (keylimit, n)

And contents of files may also be printed:

def print_contents(text, lines):
    text_lines = text.splitlines()
    for i, line in enumerate(text_lines):
        if lines and i > lines-1:
            continue
        print "  ", line

    if lines and lines < len(text_lines):
        print "   only printed first %s of %s total lines" % (lines, len(text_lines))

info.py

The info command lets you see where documents are cached and to get documentation about their available methods. You should know the doc key you want to search for, you can use dexy grep to help you search.

The attributes listed in info_attrs and the methods listed in info_methods are displayed.

info_attrs = [
        'name',
        'ext',
        'key'
        ]

info_methods = [
        'title',
        'basename',
        'filesize',
        'baserootname',
        'parent_dir',
        'long_name',
        'web_safe_document_key'
        ]

storage_methods = []
def info_command(
        __cli_options=False,
        expr="", # An expression partially matching document name.
        key="", # The exact document key.
        ws=False, # Whether to print website reporter keys and values.
        **kwargs
        ):
    """
    Prints metadata about a dexy document.

    Dexy must have already run successfully.

    You can specify an exact document key or an expression which matches part
    of a document name/key. The `dexy grep` command is available to help you
    search for documents and print document contents.
    """
    artifactsdir = kwargs.get('artifactsdir', defaults['artifacts_dir'])
    wrapper = init_wrapper(locals())
    wrapper.setup_log()
    batch = Batch.load_most_recent(wrapper)
    wrapper.batch = batch

    if expr:
        print "search expr:", expr
        matches = sorted([data for data in batch if expr in data.key],
                key=attrgetter('key'))
    elif key:
        matches = sorted([data for data in batch if key == data.key],
                key=attrgetter('key'))
    else:
        raise dexy.exceptions.UserFeedback("Must specify either expr or key")

    for match in matches:
        print ""
        print "  Info for Document '%s'" % match.key
        print ""
        print "  document output data type:", match.alias
        print ""

        print_indented("settings:", 2)
        for k in sorted(match._instance_settings):
            if not k in ('aliases', 'help'):
                print_indented("%s: %s" % (k, match.setting(k)), 4)

        print ""
        print_indented("attributes:", 2)
        for fname in sorted(info_attrs):
            print_indented("%s: %s" % (fname, getattr(match, fname)), 4)
        print ""

        print_indented("methods:", 2)
        for fname in sorted(info_methods):
            print_indented("%s(): %s" % (fname, getattr(match, fname)()), 4)
        print ""

        if storage_methods:
            print_indented("storage methods:", 2)
            for fname in sorted(storage_methods):
                print_indented("%s(): %s" % (fname, getattr(match.storage, fname)), 4)
            print ''

        if ws:
            print_indented("website reporter methods:", 2)
            print ''
            reporter = dexy.reporter.Reporter.create_instance('ws')
            reporter.wrapper = wrapper
            reporter.setup_navobj()
            reporter.help(match)
            print ''
            print_indented("active template plugins are:", 2)
            print_indented(", ".join(reporter.setting('plugins')), 4)
            print ''


        else:
            print_indented("For website reporter tags, run this command with -ws option", 4)
            print ''


        print_rewrapped("""For more information about methods available on this
        data type run `dexy datas -alias %s`""" % match.alias)

it.py

The main command which actually runs dexy.