Introduction
This is developer documentation for Dexy. It is far from complete but is under active development.
The sources for this documentation are part of the main dexy repository which is http://github.com/dexy/dexy
Please visit http://dexy.it/docs/ for a list of all available documentation.
Data and Storage
Data objects capture the state of Dexy documents at a given stage in filter processing, and Storage objects handle persistence of data objects. A Dexy document will have a data object corresponding to its initial state (the original file to be processed), plus a data object representing the state after each filter is processed. Each data object has a corresponding storage object. Filter implementations will call methods on data classes to append and save data, and to read data from the previous stage in processing or other document inputs.
For example, here’s the process
method from the ExampleProcessMethod
class,
which reads input from the input_data
object, representing the previous data
state, and saving the modified data using the set_data
method of the
output_data
object:
def process(self):
output = "Dexy processed the text '%s'" % self.input_data
self.output_data.set_data(output)
Different data classes can expose methods which users and filters can access to write or read data in useful ways, and different storage classes can implement different storage options.
Data Types
The main role of data types is to provide convenient interfaces to the type of data being stored.
The basic data class is Generic
which is designed to store binary data or
unstructured text data. In this case the _data
attribute contains the raw
binary data or text.
def test_generic_data_stores_string():
with wrap() as wrapper:
doc = Doc("hello.txt",
wrapper,
[],
contents="hello"
)
wrapper.run_docs(doc)
data = doc.output_data()
assert data.alias == 'generic'
assert data._data == "hello"
The next class is Sectioned
which holds ordered, named sections. These are
usually named sections in a document. When data is loaded in memory, the
_data
attribute holds a list of dictionaries. The first entry contains
metadata for the document, the subsequent dictionaries contain each section’s
content and metadata.
def test_sectioned_data_stores_list_of_dicts():
with wrap() as wrapper:
contents=[
{},
{
"name" : "Welcome",
"contents" : "This is the first section."
}
]
doc = Doc("hello.txt",
wrapper,
[],
data_type="sectioned",
contents=contents
)
wrapper.run_docs(doc)
data = doc.output_data()
assert data.alias == 'sectioned'
assert data._data == contents
assert data['Welcome']['contents'] == "This is the first section."
assert data[0]['contents'] == "This is the first section."
Another class is KeyValue
which holds dictionary-type data. Unlike sectioned
data, there is no requirement for the entries to be kept in order, and there is
no arbitrary metadata for each section. There are several options for storage
including JSON data files and Sqlite databases.
The KeyValue
type cannot be initialized with a dictionary. Instead an empty
data object needs to be initialized and then the append
method must be used
to add keys and values. This ensures that the behavior of the data type is the
same whether the backend is a JSON file or a sqlite database.
def test_keyvalue_data_stores_dict():
with wrap() as wrapper:
doc = Doc("hello.json",
wrapper,
[],
data_type="keyvalue",
contents="dummy contents"
)
wrapper.run_docs(doc)
data = doc.output_data()
assert data.alias == 'keyvalue'
assert data.keys() == []
data.append("foo", 123)
data.append("bar", 456)
assert sorted(data.keys()) == ["bar", "foo"]
Additional classes are available which are intended to provide custom
interfaces to data. For example the BeautifulSoupData
type makes it easy to
apply the BeautifulSoup HTML parser to content, and so in documents you can run
queries using CSS selectors to extract a part of the document you are
interested in.
Creating a plugin with a custom data type is a possible alternative to writing a filter. With a filter, the results of processing a whole document are cached. With a custom data type, you can run code on just a piece of a document you are interested in, but the results will not be cached, the calculation will be re-run each time the template document is processed. Using custom data types mean you put more logic in your template documents.
Accessing Data Objects
Within a filter, you can access the previous step’s output (this step’s input)
using the input_data
attribute.
The filter output is stored in the output_data
attribute.
The data Method
In some classes, you can directly access stored data via the data()
method.
def data(self):
if (not self._data) or self._data == [{}]:
self.load_data()
return self._data
Usually you will not call data()
directly but will call a method designed to
work with the stored data in a transparent way, like str()
, keys()
, or
iter()
.
Different data types in Dexy will implement common methods such as str()
,
keys()
and iter()
appropriately for the type of data they are wrapping.
Where it is meaningless to access the data()
object, the method should raise
an Exception, as for the KeyValue data type:
def data(self):
raise Exception("No data method for KeyValue type data.")
Loading and Reading Data
Data will typically be loaded automatically when needed by calling the data()
method, directly or indirectly. Other methods which access data should do so
via the data()
method. If necessary, the data()
method calls the
load_data()
method, which in turn calls the storage’s read_data()
since
knowledge of where the data is stored and how to load it is the responsibility
of the Storage object.
Here’s the load_data()
method from the base Data
class:
def load_data(self, this=None):
try:
self._data = self.storage.read_data()
except IOError:
msg = "no data in file '%s' for %s (wrapper state '%s', data state '%s')"
msgargs = (self.storage.data_file(), self.key,
self.wrapper.state, self.state)
raise dexy.exceptions.InternalDexyProblem(msg % msgargs)
String-Like Access
Where it makes sense, the __unicode__
method should return the contents of a
document as a unicode string.
def __unicode__(self):
if isinstance(self.data(), unicode):
return self.data()
elif not self.data():
return unicode(None)
else:
return self.wrapper.decode_encoded(self.data())
def __unicode__(self):
return u"\n".join(unicode(v) for v in self.values() if unicode(v))
The str
method will convert results of __unicode__
to an instance of str
type:
def __str__(self):
return unicode(self).encode("utf-8", errors="strict")
Dictionary-Like Access
Data types should support dictionary-style access where appropriate.
Although the Generic
type does not have sections, it implements
dictionary-style access as though it consists of a single section with name
"1"
. This way filters can be written which can process either Generic
or
Sectioned
data as input data.
TODO: Example of filter which makes use of Generic’s dummy sections. Pyg filter maybe?
The iteritems
method should yield key, value tuples.
Generic:
def iteritems(self):
"""
Iterable list of sections in document.
"""
yield ('1', self.data())
Sectioned:
def iteritems(self):
"""
Iterable list of sections in document.
"""
keys = self.keys()
values = self.values()
for i in range(len(keys)):
yield (keys[i], values[i])
KeyValue:
def iteritems(self):
"""
Iterable list of available keys.
"""
return self.storage.iteritems()
Sqlite3KeyValueStorage:
def iteritems(self):
self._cursor.execute("SELECT key, value from kvstore")
for k in self._cursor.fetchall():
yield (unicode(k[0]), k[1])
JsonKeyValueStorage:
def iteritems(self):
return self.data().iteritems()
The items
method should return a list of key, value tuples:
Generic:
def items(self):
"""
List of sections in document.
"""
return [('1', self.data(),)]
Sectioned:
def items(self):
return [(key, value) for (key, value) in self.iteritems()]
KeyValue:
def items(self):
"""
List of available keys.
"""
return self.storage.items()
Sqlite3KeyValueStorage:
def items(self):
return [(key, value) for (key, value) in self.iteritems()]
JsonKeyValueStorage:
def items(self):
return self.data().items()
The keys
method should return a list of keys:
Generic:
def keys(self):
"""
List of keys (section names) in document.
"""
return ['1']
Sectioned:
def keys(self):
return [a['name'] for a in self.data()[1:]]
KeyValue:
def keys(self):
return self.storage.keys()
Sqlite3KeyValueStorage:
def keys(self):
self._cursor.execute("SELECT key from kvstore")
return [unicode(k[0]) for k in self._cursor.fetchall()]
JsonKeyValueStorage:
def keys(self):
return self.data().keys()
To allow accessing elements using the [key]
syntax, the __getitem__
method
is implemented:
Generic:
def __getitem__(self, key):
if key == '1':
return self.data()
else:
try:
return self.data()[key]
except TypeError:
if self.ext == '.json':
return self.from_json()[key]
else:
raise
Sectioned:
def __getitem__(self, key):
try:
return self.data()[key+1]
except TypeError:
return self.value(key)
KeyValue:
def __getitem__(self, key):
return self.value(key)
Sqlite3KeyValueStorage value method:
def value(self, key):
self._cursor.execute("SELECT value from kvstore where key = ?", (key,))
row = self._cursor.fetchone()
if not row:
raise Exception("No value found for key '%s'" % key)
else:
return row[0]
JsonKeyValueStorage value method:
def value(self, key):
return self.data()[key]
The __setitem__
method can also be implemented, to allow appending and
setting elements using the [key]
syntax:
def __setitem__(self, key, value):
keyindex = self.keyindex(key)
if keyindex >= 0:
# Existing section.
assert self._data[keyindex+1]['name'] == key
self._data[keyindex+1]['contents'] = value
else:
# New section.
section_dict = {"name" : key, "contents" : value}
self._data.append(section_dict)
And the __delitem__
method allows using the del
keyword to remove elements:
def __delitem__(self, key):
index = self.keyindex(key)
self.data().pop(index+1)
def test_sectioned_data_setitem_delitem():
with wrap() as wrapper:
contents=[
{},
{
"name" : "Welcome",
"contents" : "This is the first section."
}
]
doc = Doc("hello.txt",
wrapper,
[],
data_type="sectioned",
contents=contents
)
wrapper.run_docs(doc)
data = doc.output_data()
assert data.alias == 'sectioned'
assert len(data) == 1
# Add a new section
data["Conclusions"] = "This is the final section."
assert len(data) == 2
assert unicode(data['Welcome']) == "This is the first section."
assert unicode(data["Conclusions"]) == "This is the final section."
# Modify an existing section
data["Welcome"] = "This is the initial section."
assert len(data) == 2
assert unicode(data['Welcome']) == "This is the initial section."
assert unicode(data["Conclusions"]) == "This is the final section."
del data["Conclusions"]
assert len(data) == 1
assert data.keys() == ["Welcome"]
Custom Access Methods
Dictionary-style methods represent a generic interface which works for many situations, and so these methods are implemented across several data types.
Some data types will have additional methods for searching or formatting data.
These can be seen using the dexy datas
command with -alias
argument.
The KeyValue data type implements a like
method for using sqlite like
queries:
def like(self, key):
try:
return self.storage.like(key)
except AttributeError:
msg = "The `like()` method is not implemented for storage type '%s'"
msgargs = self.storage.alias
raise dexy.exceptions.UserFeedback(msg % msgargs)
Here’s the corresponding method on the Sqlite3KeyValueStorage class:
def like(self, key):
self._cursor.execute("SELECT value from kvstore where key LIKE ?", (key,))
row = self._cursor.fetchone()
if not row:
raise Exception("No value found for key '%s'" % key)
else:
return row[0]
Saving Data
The save()
method must be called to persist data to disk.
The set_data()
shortcut method is also available, it sets data to the
provided value and then calls save.
def set_data(self, data):
"""
Shortcut to set and save data.
"""
self._data = data
self.save()
Here’s the implementation for Generic
:
def save(self):
if isinstance(self._data, unicode):
self.storage.write_data(self._data.encode("utf-8"))
else:
if self._data == None:
msg = "No data found for '%s', did you reference a file that doesn't exist?"
raise dexy.exceptions.UserFeedback(msg % self.key)
self.storage.write_data(self._data)
And here’s the implementations for Sectioned
:
def save(self):
try:
self.storage.write_data(self._data)
except Exception as e:
msg = "Problem saving '%s': %s" % (self.key, str(e))
raise dexy.exceptions.InternalDexyProblem(msg)
And here’s the implementation for KeyValue
:
def save(self):
try:
self.storage.persist()
except Exception as e:
msg = u"Problem saving '%s': %s" % (self.key, unicode(e))
raise dexy.exceptions.InternalDexyProblem(msg)
def persist(self):
if self.connected_to == 'existing':
assert os.path.exists(self.data_file(read=False))
elif self.connected_to == 'working':
self.assert_location_is_in_project_dir(self.data_file(read=False))
self._storage.commit()
shutil.copyfile(self.working_file(), self.data_file(read=False))
else:
msg = "Unexpected 'connected_to' value %s"
msgargs = self.connected_to
raise InternalDexyProblem(msg % msgargs)
Here’s the persist
method for Sqlite3KeyValueStorage:
def persist(self):
if self.connected_to == 'existing':
assert os.path.exists(self.data_file(read=False))
elif self.connected_to == 'working':
self.assert_location_is_in_project_dir(self.data_file(read=False))
self._storage.commit()
shutil.copyfile(self.working_file(), self.data_file(read=False))
else:
msg = "Unexpected 'connected_to' value %s"
msgargs = self.connected_to
raise InternalDexyProblem(msg % msgargs)
And for JsonKeyValueStorage:
def persist(self):
self.write_data(self._data)
Exporting Data
The output_to_file
method writes data to a file. For Generic data this will
just be the raw data which may either be binary data or a string:
def __unicode__(self):
if isinstance(self.data(), unicode):
return self.data()
elif not self.data():
return unicode(None)
else:
return self.wrapper.decode_encoded(self.data())
For Sectioned data, this writes the unicode-formatted data, so all sections are combined into a single document, which is usually what you want for final/canonical output at the end of processing:
def __unicode__(self):
return u"\n".join(unicode(v) for v in self.values() if unicode(v))
There’s no concept of a canonical output for KeyValue data, it’s intended for use in providing data to other documents.
Initializing Data Objects
Data objects do some initialization work in __init__
:
def __init__(self, key, ext, storage_key, settings, wrapper):
self.key = key
self.ext = ext
self.storage_key = storage_key
self.wrapper = wrapper
self.initialize_settings(**settings)
self.update_settings(settings)
self._data = None
self.state = None
self.name = self.setting('canonical-name')
if not self.name:
msg = "Document must provide canonical-name setting to data."
raise InternalDexyProblem(msg)
elif self.name.startswith("("):
raise Exception()
self.transition('new')
And more in setup
:
def setup(self):
self.setup_storage()
self.transition('ready')
The setup
method can be customized but it should always call the
setup_storage
method and transition the state to ready
:
def setup_storage(self):
storage_type = self.storage_class_alias(self.ext)
instanceargs = (self.storage_key, self.ext, self.wrapper,)
self.storage = dexy.storage.Storage.create_instance(storage_type, *instanceargs)
self.storage.assert_location_is_in_project_dir(self.name)
if self.output_name():
self.storage.assert_location_is_in_project_dir(self.output_name())
self.storage.setup()
The storage_class_alias
method is responsible for choosing the correct type
of storage to use:
def storage_class_alias(self, file_ext):
return self.setting('storage-type')
By default this just reads the storage-type
setting, but some classes may
automatically determine the class based on file extension.
Reconstituting Data Objects
Data objects are designed to be instantiated as standalone objects after a dexy
run, so their data can be used for reporting and querying. The dexy grep
command works by loading data objects directly.
To do this, the arguments needed to initialize a data object are stored in batch metadata.
The args_to_data_init
method is called when saving batch metadata to provide
JSON-serializable initialization args for each data instance:
def args_to_data_init(self):
"""
Returns tuple of attributes to pass to create_instance.
"""
return (self.alias, self.key, self.ext, self.storage_key, self.setting_values())
batch
objects can then recreate data objects:
def data(self, doc_key, input_or_output='output'):
"""
Retrieves a data object given the doc key.
"""
doc_info = self.doc_info(doc_key)["%s-data" % input_or_output]
args = list(doc_info)
args.append(self.wrapper)
data = dexy.data.Data.create_instance(*args)
data.setup_storage()
if hasattr(data.storage, 'connect'):
data.storage.connect()
return data
Storage Objects
Storage objects are primarily concerned with reading and writing data to the correct location on the file system or other form of storage, but sometimes they will handle queries and other methods in order to provide user transparency at the Data object leveel.
Filters
Wrapper & Batches
Parsers
Reporters
Website Reporter
The Website reporter publishes the same files as the Output reporter, but also provides utilities designed for websites, such as applying templates to HTML files and providing data to help users construct site navigation.
It doesn’t make sense to generate a website unless all dexy files are generated, so the website reporter doesn’t run if a specific target is chosen (many links would likely be broken too):
def run(self, wrapper):
self.wrapper=wrapper
self.setup()
if self.wrapper.target:
msg = "Not running website reporter because a target has been specified."
self.log_warn(msg)
return
for doc in wrapper.nodes.values():
if self.should_process(doc):
self.process_doc(doc)
self.log_debug("finished")
The setup
method initializes some instance variables we will use later:
def setup(self):
self.keys_to_outfiles = []
self.locations = {}
self.create_reports_dir()
self.setup_navobj()
As we iterate over all the processed docs, the should_process
method
determines whether they are properly formed Doc objects which are "canonical":
def should_process(self, doc):
if not doc.key_with_class() in self.wrapper.batch.docs:
return False
elif not doc.state in ('ran', 'consolidated'):
return False
elif not hasattr(doc, 'output_data'):
return False
elif not doc.output_data().output_name():
return False
elif not doc.output_data().is_canonical_output():
msg = "skipping %s - not canonical"
self.log_debug(msg % doc.key)
return False
else:
return True
Then process_doc
runs the code which writes the document, with any templates
applied, to the report output directory:
def process_doc(self, doc):
self.log_debug("processing %s" % doc.key)
output_ext = doc.output_data().ext
if output_ext == ".html":
self.process_html(doc)
elif isinstance(doc.output_data(), dexy.data.Sectioned):
assert output_ext == ".json"
self.apply_and_render_template(doc)
else:
self.write_canonical_data(doc)
In the case of a HTML file, we may want to apply a template. This will depend
on whether the document already appears to have a HTML header, and the value
of the ws-template
setting which may be a boolean or the name of a template
to apply.
def process_html(self, doc):
if doc.setting('ws-template') == False:
self.log_debug(" ws-template is False for %s" % doc.key)
self.write_canonical_data(doc)
elif self.detect_html_header(doc) and not doc.setting('ws-template'):
self.log_debug(" found html tag in output of %s" % doc.key)
self.write_canonical_data(doc)
else:
self.apply_and_render_template(doc)
The header check is rather rudimentary:
def detect_html_header(self, doc):
fragments = ('<html', '<body', '<head')
return any(html_fragment
in unicode(doc.output_data())
for html_fragment in fragments)
The write_canonical_data
method is inherited from the Output class, and is
used when a document is not a HTML file or a setting determines that HTML
templates should not be applied:
def write_canonical_data(self, doc):
output_name = doc.output_data().output_name()
if output_name:
fp = os.path.join(self.setting('dir'), output_name)
if fp in self.locations:
self.log_warn("WARNING overwriting file %s" % fp)
else:
self.locations[fp] = []
self.locations[fp].append(doc.key)
parent_dir = os.path.dirname(fp)
try:
os.makedirs(parent_dir)
except os.error:
pass
self.log_debug(" writing %s to %s" % (doc.key, fp))
doc.output_data().output_to_file(fp)
In the other cases, the apply_and_render_template
method is used:
def apply_and_render_template(self, doc):
template_info = self.template_file_and_path(doc)
template_file, template_path = template_info
env_data = self.template_environment(doc, template_path)
self.log_debug(" creating jinja environment")
env = self.jinja_environment(template_path)
self.log_debug(" loading jinja template at %s" % template_path)
template = env.get_template(template_path)
output_file = self.fix_ext(doc.output_data().output_name())
output_path = os.path.join(self.setting('dir'), output_file)
try:
os.makedirs(os.path.dirname(output_path))
except os.error:
pass
self.log_debug(" writing to %s" % (output_path))
template.stream(env_data).dump(output_path, encoding="utf-8")
Templates
Templates are ways of creating dexy examples.
Command Line Interface
The command line interface for dexy is driven by python-modargs.
init.py
Any function ending with _command
is automatically exposed as a command. To
keep modules to a manageable size, the various dexy commands are defined in
different modules, but all are imported into dexy.commands
in the
__init__.py
file so they can be available to modargs within a single module:
from dexy.commands.info import links_command
from dexy.commands.cite import cite_command
from dexy.commands.parsers import parsers_command
from dexy.commands.conf import conf_command
from dexy.commands.dirs import cleanup_command
from dexy.commands.dirs import reset_command
from dexy.commands.dirs import setup_command
from dexy.commands.env import env_command
from dexy.commands.env import datas_command
from dexy.commands.env import plugins_command
from dexy.commands.fcmds import fcmd_command
from dexy.commands.fcmds import fcmds_command
from dexy.commands.filters import filters_command
from dexy.commands.filters import filters_command as filter_command
from dexy.commands.grep import grep_command
from dexy.commands.info import info_command
from dexy.commands.it import dexy_command
from dexy.commands.it import it_command
from dexy.commands.it import targets_command
from dexy.commands.nodes import nodes_command
from dexy.commands.reporters import reporters_command
from dexy.commands.reporters import reporters_command as reports_command
from dexy.commands.serve import serve_command
from dexy.commands.templates import gen_command
from dexy.commands.templates import template_command
from dexy.commands.templates import templates_command
The run
method in dexy.commands
is listed in setup.py
as a console script entry point:
entry_points = {
'console_scripts' : [
'dexy = dexy.commands:run'
],
'pygments.lexers' : [
'rst+django = dexy.filters.utils:RstDjangoLexer'
]
},
And this is the first method which will be called whenever a user enters a dexy
command:
def run():
capture_warnings()
parse_and_run_cmd(*resolve_argv())
The capture_warnings
method just hides irrelevant warning messages from dexy users:
def capture_warnings():
"""
Capture deprecation messages and other irrelevant warnings in whatever way
is appropriate to the dexy version.
"""
if hasattr(logging, 'captureWarnings'):
logging.captureWarnings(True)
else:
warnings.filterwarnings("ignore",category=Warning)
Dexy can load a lot of different libraries as it runs various filters, and deprecation messages and similar warnings are confusing and annoying for end users.
The resolve_argv
method is called next:
def resolve_argv():
"""
Do some processing of the user-provided arguments in argv before they go to
modargs so we can support commands defined in plugins.
"""
only_one_arg = (len(sys.argv) == 1)
second_arg_is_known_cmd = not only_one_arg and \
sys.argv[1] in args.available_commands(dexy_cmd_mod)
second_arg_is_option = not only_one_arg and \
sys.argv[1].startswith("-")
if only_one_arg or second_arg_is_known_cmd or second_arg_is_option:
return sys.argv[1:], dexy_cmd_mod, dexy_default_cmd
else:
cmd, subcmd, cmd_mod = resolve_plugin_cmd(sys.argv[1])
default_cmd = cmd.default_cmd or cmd.namespace
return [subcmd] + sys.argv[2:], cmd_mod, default_cmd
It’s possible for additional commands to be added to dexy via the plugin
system. For example, the dexy-viewer
plugin defines a ping
command which
can be called as follows:
$ dexy viewer:ping
pong
@patch.object(sys, 'argv', ['dexy', 'viewer:ping'])
@patch('sys.stdout', new_callable=StringIO)
def test_viewer_command(stdout):
dexy.commands.run()
assert "pong" in stdout.getvalue()
If necessary the resolve_argv
command calls resolve_plugin_command
to
look in dexy plugins for the requested command:
def resolve_plugin_cmd(raw_command_name):
"""
Take a command name like viewer:run and return the command method and
module object.
"""
if ":" in raw_command_name:
alias, subcommand = raw_command_name.split(":")
else:
alias, subcommand = raw_command_name, ''
try:
cmd = dexy.plugin.Command.create_instance(alias)
except cashew.exceptions.NoPlugin:
msg = """No command '%s' available.
Run `dexy help --all` to see list of available commands."""
msgargs = (alias)
sys.stderr.write(inspect.cleandoc(msg) % msgargs)
sys.stderr.write(os.linesep)
sys.exit(1)
mod_name = cmd.__module__
cmd_mod = args.load_module(mod_name)
return cmd, subcommand, cmd_mod
Returning to the run
command we started with:
def run():
capture_warnings()
parse_and_run_cmd(*resolve_argv())
The parsed arguments, module object and a default command are passed to
parse_and_run_cmd
which delegates to the modargs command of the same name,
and wraps the call with error handling to provide nicer error messages if
there’s a problem:
def parse_and_run_cmd(argv, module, default_command):
try:
args.parse_and_run_command(argv, module, default_command)
except (dexy.exceptions.UserFeedback, cashew.exceptions.UserFeedback) as e:
msg = u"""Oops, there's a problem running your command.
Here is some more information:"""
sys.stderr.write(inspect.cleandoc(msg))
sys.stderr.write(os.linesep)
err_msg = unicode(e)
if err_msg:
sys.stderr.write(u"'%s'" % unicode(e))
else:
sys.stderr.write(u"Sorry, can't get text of error message.")
sys.stderr.write(os.linesep)
sys.exit(1)
except KeyboardInterrupt:
sys.stderr.write("stopping...")
sys.stderr.write(os.linesep)
sys.exit(1)
The help
and version
commands are also defined in the __init__.py
file:
def help_command(
all=False, # List all available dexy commands (auto-generated).
on=False # Get help on a particular dexy command.
):
if all and not on:
print ""
args.help_command(prog, dexy_cmd_mod, dexy_default_cmd, on)
print ""
elif not on:
print ""
print "For help on the main `dexy` command, run `dexy help -on dexy`."
print ""
print "The dexy tool includes several different commands:"
print " `dexy help --all` lists all available commands"
print " `dexy help --on XXX` provides help on a specific command"
print ""
print "Commands for running dexy:"
print " `dexy` runs dexy"
print " `dexy setup` makes directories dexy needs"
print " `dexy cleanup` removes directories dexy has created"
print " `dexy reset` empties and resets dexy's working directories"
print ""
print "Commands which print lists of dexy features:"
print " `dexy filters` filters like |jinja |py |javac"
print " `dexy reports` reporters like `output` and `run`"
print " `dexy nodes` node types and their document settings"
print " `dexy datas` data types and available methods"
print " `dexy env` elements available in document templates"
print ""
print "Commands which print information about your project:"
print " (you need to be in the project dir and have run dexy already)"
print " `dexy grep` search for documents and keys in documents"
print " `dexy info` list metadata about a particular document"
print " `dexy targets` list target names you can run"
print " `dexy links` list all ways to refer to documents and sections"
print ""
print "Other commands:"
print " `dexy serve` start a local static web server to view generated docs"
print " `dexy help` you're reading it"
print " `dexy version` print the version of dexy software which is installed"
print ""
else:
try:
args.help_command(prog, dexy_cmd_mod, dexy_default_cmd, on)
except KeyError:
sys.stderr.write("Could not find help on '%s'." % on)
sys.stderr.write(os.linesep)
sys.exit(1)
def version_command():
"""
Print the version number of dexy.
"""
print "%s version %s" % (prog, DEXY_VERSION)
cite.py
The dexy cite
command prints out a bibliographic citation for dexy.
def cite_command(
fmt='bibtex' # desired format of citation
):
"""
How to cite dexy in papers.
"""
if fmt == 'bibtex':
cite_bibtex()
else:
msg = "Don't know how to provide citation in '%s' format"
raise dexy.exceptions.UserFeedback(msg % fmt)
$ dexy cite
@misc{Dexy,
title = {Dexy: Reproducible Data Analysis and Document Automation Software, Version~1.0.0d},
author = {{Nelson, Ana}},
year = {2013},
url = {http://www.dexy.it/},
note = {http://orcid.org/0000-0003-2561-1564}
}
Currently the bibtex format is supported:
def bibtex_text():
args = {
'version' : DEXY_VERSION,
'year' : datetime.date.today().year
}
return """@misc{Dexy,
title = {Dexy: Reproducible Data Analysis and Document Automation Software, Version~%(version)s},
author = {{Nelson, Ana}},
year = {%(year)s},
url = {http://www.dexy.it/},
note = {http://orcid.org/0000-0003-2561-1564}
}""" % args
conf.py
The main dexy command has a lot of command line options, and for convenience
you can save the option values in a dexy.conf
file in your project so you
don’t have to remember and type them all the time. The dexy conf
command
generates an example file for you containing all the default dexy options:
def conf_command(
conf=defaults['config_file'], # name of config file to write to
p=False # whether to print to stdout rather than write to file
):
"""
Write a config file containing dexy's defaults.
"""
if file_exists(conf) and not p:
print inspect.cleandoc("""Config file %s already exists,
will print conf to stdout instead...""" % conf)
p = True
config = default_config()
# Can't specify config file name in config file.
del config['conf']
yaml_help = inspect.cleandoc("""# YAML config file for dexy.
# You can delete any lines you don't wish to customize.
# Options are same as command line options,
# for more info run 'dexy help -on dexy'.
""")
if p:
print yaml.dump(config, default_flow_style=False)
else:
with open(conf, "wb") as f:
if conf.endswith(".yaml") or conf.endswith(".conf"):
f.write(yaml_help)
f.write(os.linesep)
f.write(yaml.dump(config, default_flow_style=False))
elif conf.endswith(".json"):
json.dump(config, f, sort_keys=True, indent=4)
else:
msg = "Don't know how to write config file '%s'"
raise dexy.exceptions.UserFeedback(msg % conf)
print "Config file has been written to '%s'" % conf
$ dexy conf
Config file has been written to 'dexy.conf'
$ ls
commands.sh.txt dexy.conf
If a config file already exists, or if you use the -p
flag, then config
options are written to stdout instead of written to a file.
@patch.object(sys, 'argv', ['dexy', 'conf'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command(stdout):
with tempdir():
dexy.commands.run()
assert os.path.exists("dexy.conf")
assert "has been written" in stdout.getvalue()
@patch.object(sys, 'argv', ['dexy', 'conf'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command_if_path_exists(stdout):
with tempdir():
with open("dexy.conf", "w") as f:
f.write("foo")
assert os.path.exists("dexy.conf")
dexy.commands.run()
assert "dexy.conf already exists" in stdout.getvalue()
assert "artifactsdir" in stdout.getvalue()
@patch.object(sys, 'argv', ['dexy', 'conf', '-p'])
@patch('sys.stdout', new_callable=StringIO)
def test_conf_command_with_print_option(stdout):
with tempdir():
dexy.commands.run()
assert not os.path.exists("dexy.conf")
assert "artifactsdir" in stdout.getvalue()
dirs.py
These commands expose methods for creating and removing dexy’s working directories.
The setup
and cleanup
command create and remove working directories respectively:
def setup_command(
__cli_options=False,
artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
**kwargs):
"""
Create the directories dexy needs to run.
"""
wrapper = init_wrapper(locals())
wrapper.create_dexy_dirs()
def cleanup_command(
__cli_options=False,
artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
logdir=defaults['log_dir'], # DEPRECATED
reports=True # Whether directories generated by reports should also be removed.
):
"""
Remove the directories which dexy created, including working directories
and reports.
"""
wrapper = init_wrapper(locals())
wrapper.remove_dexy_dirs()
wrapper.remove_reports_dirs(reports)
The reset
command cleans out any working files and leaves you with a fresh setup:
def reset_command(
__cli_options=False,
artifactsdir=defaults['artifacts_dir'], # Where dexy should store working files.
logdir=defaults['log_dir'] # DEPRECATED
):
"""
Clean out the contents of dexy's cache and reports directories.
"""
wrapper = init_wrapper(locals())
wrapper.remove_dexy_dirs()
wrapper.remove_reports_dirs(keep_empty_dir=True)
wrapper.create_dexy_dirs()
fcmds.py
Filter commands are intended to be a way for filters to expose some information to users. For example, filter commands could provide a way for an API to list available methods.
def fcmds_command(
alias=False # Only print commands defined by this alias.
):
"""
Prints a list of available filter commands.
"""
if alias:
filter_instances = [dexy.filter.Filter.create_instance(alias)]
else:
filter_instances = dexy.filter.Filter
for filter_instance in filter_instances:
cmds = filter_instance.filter_commands()
if cmds:
print "filter alias:", filter_instance.alias
for command_name in sorted(cmds):
docs = inspect.getdoc(cmds[command_name])
if docs:
doc = docs.splitlines()[0]
print " %s # %s" % (command_name, doc)
else:
print " %s" % command_name
print ''
The fcmds_command
lists filter commands:
$ dexy fcmds
filter alias: apis
create_keyfile
filter alias: botoup
create_keyfile
filter alias: htmlsections
css # Prints out CSS for the specified style.
sty # Prints out .sty file (latex) for the specified style.
filter alias: pyg
css # Prints out CSS for the specified style.
sty # Prints out .sty file (latex) for the specified style.
filter alias: wordpress
create_keyfile # Creates a key file for WordPress in the local directory.
list_categories # List available blog post categories.
list_methods # List API methods exposed by WordPress API.
$ dexy fcmds -alias pyg
filter alias: pyg
css # Prints out CSS for the specified style.
sty # Prints out .sty file (latex) for the specified style.
To run a filter command you need to pass the alias and the command name:
$ dexy fcmd -alias pyg -cmd css | head
.hll { background-color: #ffffcc }
.c { color: #408080; font-style: italic } /* Comment */
.err { border: 1px solid #FF0000 } /* Error */
.k { color: #008000; font-weight: bold } /* Keyword */
.o { color: #666666 } /* Operator */
.cm { color: #408080; font-style: italic } /* Comment.Multiline */
.cp { color: #BC7A00 } /* Comment.Preproc */
.c1 { color: #408080; font-style: italic } /* Comment.Single */
.cs { color: #408080; font-style: italic } /* Comment.Special */
.gd { color: #A00000 } /* Generic.Deleted */
def fcmd_command(
alias=None, # The alias of the filter which defines the custom command
cmd=None, # The name of the command to run
**kwargs # Additional arguments to be passed to the command
):
"""
Run a filter command.
"""
filter_instance = dexy.filter.Filter.create_instance(alias)
cmd_name = "docmd_%s" % cmd
if not cmd_name in dir(filter_instance):
msg = "%s is not a valid command. There is no method %s defined in %s"
msgargs = (cmd, cmd_name, filter_instance.__class__.__name__)
raise dexy.exceptions.UserFeedback(msg % msgargs)
else:
instance_method = getattr(filter_instance, cmd_name)
# TODO use try/catch instead of inspect.ismethod
if inspect.ismethod(instance_method):
try:
instance_method.__func__(filter_instance, **kwargs)
except TypeError as e:
print e.message
print inspect.getargspec(instance_method.__func__)
print inspect.getdoc(instance_method.__func__)
raise
else:
msg = "expected %s to be an instance method of %s"
msgargs = (cmd_name, filter_instance.__class__.__name__)
raise dexy.exceptions.InternalDexyProblem(msg % msgargs)
filters.py
The filters module contains dexy’s command line reference for filters.
def filters_command(
alias="", # Print docs for this filter.
example=False, # Whether to run included examples (slower).
nocolor=False, # Skip syntax highlighting if showing source code.
source=False, # Print source code of filter.
versions=False # Print the installed version of external software (slower).
):
"""
Prints list of available filters or docs for a particular filter.
"""
if alias:
help_for_filter(alias, example, source, nocolor)
else:
list_filters(versions)
def help_for_filter(alias, run_example, show_source, nocolor):
instance = dexy.filter.Filter.create_instance(alias)
print ''
print instance.setting('help')
print ''
print "aliases: %s" % ", ".join(instance.setting('aliases'))
print "tags: %s" % ", ".join(instance.setting('tags'))
print ''
print "Converts from file formats:"
for ext in instance.setting('input-extensions'):
print " %s" % ext
print ''
print "Converts to file formats:"
for ext in instance.setting('output-extensions'):
print " %s" % ext
print ''
print('Settings:')
for k in sorted(instance._instance_settings):
if k in dexy.filter.Filter.nodoc_settings:
continue
if k in ('aliases', 'tags'):
continue
tup = instance._instance_settings[k]
print " %s" % k
for line in inspect.cleandoc(tup[0]).splitlines():
print " %s" % line
print " default value: %s" % tup[1]
print ''
examples = instance.setting('examples')
example_templates = {}
for alias in examples:
try:
template_instance = dexy.template.Template.create_instance(alias)
example_templates[alias] = template_instance
except dexy.exceptions.InactivePlugin:
pass
if examples:
print ''
print "Examples for this filter:"
for alias, template in example_templates.iteritems():
print ''
print " %s" % alias
print " %s" % inspect.getdoc(template.__class__)
if run_example:
for alias, template in example_templates.iteritems():
print ''
print ''
print "Running example: %s" % template.setting('help')
print ''
print ''
print template_text(template)
print ''
print "For online docs see http://dexy.it/ref/filters/%s" % alias
print ''
print "If you have suggestions or feedback about this filter,"
print "please contact info@dexy.it"
print ''
if show_source:
print ''
source_code = inspect.getsource(instance.__class__)
if nocolor:
print source_code
else:
formatter = pygments.formatters.TerminalFormatter()
lexer = PythonLexer()
print highlight(source_code, lexer, formatter)
def list_filters(versions):
print "Installed filters:"
for filter_instance in dexy.filter.Filter:
# Should we show this filter?
no_aliases = not filter_instance.setting('aliases')
no_doc = filter_instance.setting('nodoc')
not_dexy = not filter_instance.__class__.__module__.startswith("dexy.")
exclude = filter_instance.alias in extra_nodoc_aliseas
if no_aliases or no_doc or not_dexy or exclude:
continue
# generate version message
if versions:
if hasattr(filter_instance, 'version'):
version = filter_instance.version()
if version:
version_message = "Installed version: %s" % version
else:
msg = "'%s' failed, filter may not be available."
msgargs = filter_instance.version_command()
version_message = msg % msgargs
else:
version_message = ""
filter_help = " " + filter_instance.alias + \
" : " + filter_instance.setting('help').splitlines()[0]
if versions and version_message:
filter_help += " %s" % version_message
print filter_help
print ''
print "For more information about a particular filter,"
print "use the -alias flag and specify the filter alias."
print ''
grep.py
The grep interface is a way to search on the command line for dexy docs and keys within docs. (The dexy viewer plugin presents similar information in a web-based interface.)
def grep_command(
__cli_options=False, # nodoc
contents=False, # print out the contents of each matched file
expr="", # An expression partially matching document name.
key="", # An exact document key
keyexpr="", # Only search for keys matching this expression
keylimit=10, # Maximum number of matching keys to print
keys=False, # List keys in documents
limit=10, # Maximum number of matching documents to print
lines=False, # maximum number of lines of content to print
**kwargs
):
"""
Search for documents and sections within documents.
Dexy must have already run successfully.
You can search for documents based on exact key or inexpect expression. The
number of documents returned is controlled by --limit.
You can print all keys in found documents by requesting --keys, number of
results is controlled by --keylimit.
You can search the section names/keys in found documents by passing a
--keyexpr
You can print contents of documents by requesting --contents, number of
lines of content can be controlled by --lines.
This does not search contents of documents, just document names and
internal section names.
"""
artifactsdir = kwargs.get('artifactsdir', defaults['artifacts_dir'])
wrapper = init_wrapper(locals())
batch = Batch.load_most_recent(wrapper)
if not batch:
print "you need to run dexy first"
sys.exit(1)
else:
if expr:
matches = sorted([data for data in batch if expr in data.key],
key=attrgetter('key'))
elif key:
matches = sorted([data for data in batch if key == data.key],
key=attrgetter('key'))
else:
raise dexy.exceptions.UserFeedback("Must specify either expr or key")
n = len(matches)
if n > limit:
print "only printing first %s of %s total matches" % (limit, n)
matches = matches[0:limit]
for match in matches:
print_match(match, keys, keyexpr, contents, keylimit, lines)
The grep command calls print_match
for each match:
def print_match(match, keys, keyexpr, contents, keylimit, lines):
print match.key, "\tcache key:", match.storage_key
if hasattr(match, 'keys'):
if keyexpr:
print_keys([key for key in match.keys() if keyexpr in key], keylimit, lines)
elif keys:
print_keys(match.keys(), keylimit, lines)
if contents:
if isinstance(match, Sectioned):
for section_name, section_contents in match.data().iteritems():
print " section: %s" % section_name
print
print_contents(section_contents, lines)
print
elif isinstance(match, KeyValue):
pass
elif isinstance(match, Generic):
try:
json.dumps(unicode(match))
print_contents(unicode(match), lines)
except UnicodeDecodeError:
print " not printable"
Where it reaches a document which has keys, it may also print the keys (depending on options):
def print_keys(pkeys, keylimit, lines):
n = len(pkeys)
if n > keylimit:
pkeys = pkeys[0:keylimit]
for key in pkeys:
print ' ', key
if n > keylimit:
print " only printed first %s of %s total keys" % (keylimit, n)
And contents of files may also be printed:
def print_contents(text, lines):
text_lines = text.splitlines()
for i, line in enumerate(text_lines):
if lines and i > lines-1:
continue
print " ", line
if lines and lines < len(text_lines):
print " only printed first %s of %s total lines" % (lines, len(text_lines))
info.py
The info command lets you see where documents are cached and to get
documentation about their available methods. You should know the doc key you
want to search for, you can use dexy grep
to help you search.
The attributes listed in info_attrs
and the methods listed in info_methods
are displayed.
info_attrs = [
'name',
'ext',
'key'
]
info_methods = [
'title',
'basename',
'filesize',
'baserootname',
'parent_dir',
'long_name',
'web_safe_document_key'
]
storage_methods = []
def info_command(
__cli_options=False,
expr="", # An expression partially matching document name.
key="", # The exact document key.
ws=False, # Whether to print website reporter keys and values.
**kwargs
):
"""
Prints metadata about a dexy document.
Dexy must have already run successfully.
You can specify an exact document key or an expression which matches part
of a document name/key. The `dexy grep` command is available to help you
search for documents and print document contents.
"""
artifactsdir = kwargs.get('artifactsdir', defaults['artifacts_dir'])
wrapper = init_wrapper(locals())
wrapper.setup_log()
batch = Batch.load_most_recent(wrapper)
wrapper.batch = batch
if expr:
print "search expr:", expr
matches = sorted([data for data in batch if expr in data.key],
key=attrgetter('key'))
elif key:
matches = sorted([data for data in batch if key == data.key],
key=attrgetter('key'))
else:
raise dexy.exceptions.UserFeedback("Must specify either expr or key")
for match in matches:
print ""
print " Info for Document '%s'" % match.key
print ""
print " document output data type:", match.alias
print ""
print_indented("settings:", 2)
for k in sorted(match._instance_settings):
if not k in ('aliases', 'help'):
print_indented("%s: %s" % (k, match.setting(k)), 4)
print ""
print_indented("attributes:", 2)
for fname in sorted(info_attrs):
print_indented("%s: %s" % (fname, getattr(match, fname)), 4)
print ""
print_indented("methods:", 2)
for fname in sorted(info_methods):
print_indented("%s(): %s" % (fname, getattr(match, fname)()), 4)
print ""
if storage_methods:
print_indented("storage methods:", 2)
for fname in sorted(storage_methods):
print_indented("%s(): %s" % (fname, getattr(match.storage, fname)), 4)
print ''
if ws:
print_indented("website reporter methods:", 2)
print ''
reporter = dexy.reporter.Reporter.create_instance('ws')
reporter.wrapper = wrapper
reporter.setup_navobj()
reporter.help(match)
print ''
print_indented("active template plugins are:", 2)
print_indented(", ".join(reporter.setting('plugins')), 4)
print ''
else:
print_indented("For website reporter tags, run this command with -ws option", 4)
print ''
print_rewrapped("""For more information about methods available on this
data type run `dexy datas -alias %s`""" % match.alias)
it.py
The main command which actually runs dexy.