telemeta.management.commands.telemeta-import-corpus-from-dir module
from optparse import make_option from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.core.files.base import ContentFile from django.contrib.auth.models import User from telemeta.models import * from telemeta.util.unaccent import unaccent import os, re, glob try: from django.utils.text import slugify except ImportError: def slugify(string): killed_chars = re.sub('[\(\),]', '', string) return re.sub(' ', '_', killed_chars) def beautify(string): return os.path.splitext(string)[0].replace('_',' ') def cleanup_dir(root_dir): for resource in os.listdir(root_dir): path = os.path.join(root_dir, resource) if os.path.isdir(path): new_path = path.replace(' ', '_') new_path = new_path.replace('son_', '') new_path = new_path.replace('son', '') if new_path != path: os.rename(path, new_path) cleanup_dir(new_path) def trim_list(list): new = [] for item in list: if item: new.append(item) return new def reset(): for i in MediaItem.objects.all(): i.delete() for c in MediaCollection.objects.all(): c.delete() class Command(BaseCommand): help = "import media files from a directory to a corpus" args = "root_dir" media_formats = ['mp3'] image_formats = ['png', 'jpg'] text_formats = ['txt'] media_root = settings.MEDIA_ROOT dry_run = False user = User.objects.get(username='admin') def write_file(self, item, media): filename = media.split(os.sep)[-1] print media if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media.replace(self.media_root, '') if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user) def handle(self, *args, **options): # NOT4PROD!! reset() root_dir = args[-1] self.source_dir = root_dir print self.source_dir print self.media_root cleanup_dir(self.source_dir) chapters = os.listdir(self.source_dir) corpus_name = os.path.split(root_dir)[-1] corpus_id = slugify(unicode(corpus_name)) cc = MediaCorpus.objects.filter(code=corpus_id) if cc: corpus = cc[0] else: corpus = MediaCorpus(code=corpus_id) corpus.title = corpus_name corpus.save() for chapter in chapters: chapter_dir = os.path.join(self.source_dir, chapter) metadata = {} for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.txt' == os.path.splitext(filename)[1]: f = open(path, 'r') i = 0 for line in f.readlines(): data = re.split(r'\t+', line.rstrip('\t')) if i == 0: chapter_title = data[1] print chapter_title else: metadata[data[0]] = data[1:] i += 1 print metadata collection_name = chapter collection_id = corpus_id + '_' + slugify(unicode(collection_name)) collection_title = collection_name.replace('_', ' ') + ' - ' + chapter_title print collection_title cc = MediaCollection.objects.filter(code=collection_id, title=collection_title) if cc: collection = cc[0] else: collection = MediaCollection(code=collection_id) collection.title = collection_title collection.save() if not collection in corpus.children.all(): corpus.children.add(collection) for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.jpg' == os.path.splitext(filename)[1]: related_path = path.replace(self.media_root, '') related, c = MediaCollectionRelated.objects.get_or_create(collection=collection, file=related_path) for root, dirs, files in os.walk(chapter_dir): for media_file in files: path = os.path.join(root, media_file) print path new_media_file = slugify(unicode(media_file.decode('utf8'))) if new_media_file[-3] != '.': new_media_file = new_media_file[:-3] + '.' + new_media_file[-3:] print new_media_file if new_media_file != media_file: new_media_path = os.path.join(root, new_media_file) os.rename(path, new_media_path) media_file = new_media_file print 'renaming: ' + media_file path = new_media_path media_name = os.path.splitext(media_file)[0] media_ext = os.path.splitext(media_file)[1][1:] if media_ext and media_ext in self.media_formats and media_name[0] != '.': root_list = root.split(os.sep) media_path = os.sep.join(root_list[-4:]) + os.sep + media_file item_name = root_list[-1] item_id = collection_id + '_' + slugify(unicode(item_name)) data = metadata[item_name] item, c = MediaItem.objects.get_or_create(collection=collection, code=item_id) item.old_code = item_name self.write_file(item, path) title = data[0].split('.') item.title = title[0].replace('\n', '') print data if len(data) > 1: item.track = data[1].replace('\n', '') if len(title) > 1: item.comment = '. '.join(title[1:]) item.save() for related_file in os.listdir(root): related_path = os.sep.join(root_list[-4:]) + os.sep + related_file related_name = os.path.splitext(related_file)[0] related_ext = os.path.splitext(related_file)[1][1:] if related_ext in self.image_formats: related, c = MediaItemRelated.objects.get_or_create(item=item, file=related_path) if len(data) > 2: related.title = item.track related.set_mime_type() related.save()
Module variables
var ITEM_PUBLIC_ACCESS_CHOICES
var ITEM_TRANSODING_STATUS
var PUBLIC_ACCESS_CHOICES
var SCOPE_CHOICES
var TYPE_CHOICES
var app_name
var code_linesep
var collection_code_regex
var collection_published_code_regex
var collection_unpublished_code_regex
var default_decoding
var default_encoding
var engine
var eol
var ext
var item_code_regex
var item_published_code_regex
var item_unpublished_code_regex
var mime_type
var private_extra_types
var public_extra_types
var resource_code_regex
var strict_code
Functions
def beautify(
string)
def beautify(string): return os.path.splitext(string)[0].replace('_',' ')
def cleanup_dir(
root_dir)
def cleanup_dir(root_dir): for resource in os.listdir(root_dir): path = os.path.join(root_dir, resource) if os.path.isdir(path): new_path = path.replace(' ', '_') new_path = new_path.replace('son_', '') new_path = new_path.replace('son', '') if new_path != path: os.rename(path, new_path) cleanup_dir(new_path)
def reset(
)
def reset(): for i in MediaItem.objects.all(): i.delete() for c in MediaCollection.objects.all(): c.delete()
def trim_list(
list)
def trim_list(list): new = [] for item in list: if item: new.append(item) return new
Classes
class Command
class Command(BaseCommand): help = "import media files from a directory to a corpus" args = "root_dir" media_formats = ['mp3'] image_formats = ['png', 'jpg'] text_formats = ['txt'] media_root = settings.MEDIA_ROOT dry_run = False user = User.objects.get(username='admin') def write_file(self, item, media): filename = media.split(os.sep)[-1] print media if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media.replace(self.media_root, '') if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user) def handle(self, *args, **options): # NOT4PROD!! reset() root_dir = args[-1] self.source_dir = root_dir print self.source_dir print self.media_root cleanup_dir(self.source_dir) chapters = os.listdir(self.source_dir) corpus_name = os.path.split(root_dir)[-1] corpus_id = slugify(unicode(corpus_name)) cc = MediaCorpus.objects.filter(code=corpus_id) if cc: corpus = cc[0] else: corpus = MediaCorpus(code=corpus_id) corpus.title = corpus_name corpus.save() for chapter in chapters: chapter_dir = os.path.join(self.source_dir, chapter) metadata = {} for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.txt' == os.path.splitext(filename)[1]: f = open(path, 'r') i = 0 for line in f.readlines(): data = re.split(r'\t+', line.rstrip('\t')) if i == 0: chapter_title = data[1] print chapter_title else: metadata[data[0]] = data[1:] i += 1 print metadata collection_name = chapter collection_id = corpus_id + '_' + slugify(unicode(collection_name)) collection_title = collection_name.replace('_', ' ') + ' - ' + chapter_title print collection_title cc = MediaCollection.objects.filter(code=collection_id, title=collection_title) if cc: collection = cc[0] else: collection = MediaCollection(code=collection_id) collection.title = collection_title collection.save() if not collection in corpus.children.all(): corpus.children.add(collection) for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.jpg' == os.path.splitext(filename)[1]: related_path = path.replace(self.media_root, '') related, c = MediaCollectionRelated.objects.get_or_create(collection=collection, file=related_path) for root, dirs, files in os.walk(chapter_dir): for media_file in files: path = os.path.join(root, media_file) print path new_media_file = slugify(unicode(media_file.decode('utf8'))) if new_media_file[-3] != '.': new_media_file = new_media_file[:-3] + '.' + new_media_file[-3:] print new_media_file if new_media_file != media_file: new_media_path = os.path.join(root, new_media_file) os.rename(path, new_media_path) media_file = new_media_file print 'renaming: ' + media_file path = new_media_path media_name = os.path.splitext(media_file)[0] media_ext = os.path.splitext(media_file)[1][1:] if media_ext and media_ext in self.media_formats and media_name[0] != '.': root_list = root.split(os.sep) media_path = os.sep.join(root_list[-4:]) + os.sep + media_file item_name = root_list[-1] item_id = collection_id + '_' + slugify(unicode(item_name)) data = metadata[item_name] item, c = MediaItem.objects.get_or_create(collection=collection, code=item_id) item.old_code = item_name self.write_file(item, path) title = data[0].split('.') item.title = title[0].replace('\n', '') print data if len(data) > 1: item.track = data[1].replace('\n', '') if len(title) > 1: item.comment = '. '.join(title[1:]) item.save() for related_file in os.listdir(root): related_path = os.sep.join(root_list[-4:]) + os.sep + related_file related_name = os.path.splitext(related_file)[0] related_ext = os.path.splitext(related_file)[1][1:] if related_ext in self.image_formats: related, c = MediaItemRelated.objects.get_or_create(item=item, file=related_path) if len(data) > 2: related.title = item.track related.set_mime_type() related.save()
Ancestors (in MRO)
- Command
- django.core.management.base.BaseCommand
- __builtin__.object
Class variables
var args
var can_import_settings
var dry_run
var help
var image_formats
var leave_locale_alone
var media_formats
var media_root
var option_list
var output_transaction
var requires_model_validation
var text_formats
var user
Methods
def __init__(
self)
def __init__(self): self.style = color_style()
def create_parser(
self, prog_name, subcommand)
Create and return the OptionParser
which will be used to
parse the arguments to this command.
def create_parser(self, prog_name, subcommand): """ Create and return the ``OptionParser`` which will be used to parse the arguments to this command. """ return OptionParser(prog=prog_name, usage=self.usage(subcommand), version=self.get_version(), option_list=self.option_list)
def execute(
self, *args, **options)
Try to execute this command, performing model validation if
needed (as controlled by the attribute
self.requires_model_validation
, except if force-skipped).
def execute(self, *args, **options): """ Try to execute this command, performing model validation if needed (as controlled by the attribute ``self.requires_model_validation``, except if force-skipped). """ self.stdout = OutputWrapper(options.get('stdout', sys.stdout)) self.stderr = OutputWrapper(options.get('stderr', sys.stderr), self.style.ERROR) if self.can_import_settings: from django.conf import settings saved_locale = None if not self.leave_locale_alone: # Only mess with locales if we can assume we have a working # settings file, because django.utils.translation requires settings # (The final saying about whether the i18n machinery is active will be # found in the value of the USE_I18N setting) if not self.can_import_settings: raise CommandError("Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % (self.leave_locale_alone, self.can_import_settings)) # Switch to US English, because django-admin.py creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.activate('en-us') try: if self.requires_model_validation and not options.get('skip_validation'): self.validate() output = self.handle(*args, **options) if output: if self.output_transaction: # This needs to be imported here, because it relies on # settings. from django.db import connections, DEFAULT_DB_ALIAS connection = connections[options.get('database', DEFAULT_DB_ALIAS)] if connection.ops.start_transaction_sql(): self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql())) self.stdout.write(output) if self.output_transaction: self.stdout.write('\n' + self.style.SQL_KEYWORD("COMMIT;")) finally: if saved_locale is not None: translation.activate(saved_locale)
def get_version(
self)
Return the Django version, which should be correct for all built-in Django commands. User-supplied commands should override this method.
def get_version(self): """ Return the Django version, which should be correct for all built-in Django commands. User-supplied commands should override this method. """ return django.get_version()
def handle(
self, *args, **options)
def handle(self, *args, **options): # NOT4PROD!! reset() root_dir = args[-1] self.source_dir = root_dir print self.source_dir print self.media_root cleanup_dir(self.source_dir) chapters = os.listdir(self.source_dir) corpus_name = os.path.split(root_dir)[-1] corpus_id = slugify(unicode(corpus_name)) cc = MediaCorpus.objects.filter(code=corpus_id) if cc: corpus = cc[0] else: corpus = MediaCorpus(code=corpus_id) corpus.title = corpus_name corpus.save() for chapter in chapters: chapter_dir = os.path.join(self.source_dir, chapter) metadata = {} for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.txt' == os.path.splitext(filename)[1]: f = open(path, 'r') i = 0 for line in f.readlines(): data = re.split(r'\t+', line.rstrip('\t')) if i == 0: chapter_title = data[1] print chapter_title else: metadata[data[0]] = data[1:] i += 1 print metadata collection_name = chapter collection_id = corpus_id + '_' + slugify(unicode(collection_name)) collection_title = collection_name.replace('_', ' ') + ' - ' + chapter_title print collection_title cc = MediaCollection.objects.filter(code=collection_id, title=collection_title) if cc: collection = cc[0] else: collection = MediaCollection(code=collection_id) collection.title = collection_title collection.save() if not collection in corpus.children.all(): corpus.children.add(collection) for filename in os.listdir(chapter_dir): path = os.path.join(chapter_dir, filename) if os.path.isfile(path) and '.jpg' == os.path.splitext(filename)[1]: related_path = path.replace(self.media_root, '') related, c = MediaCollectionRelated.objects.get_or_create(collection=collection, file=related_path) for root, dirs, files in os.walk(chapter_dir): for media_file in files: path = os.path.join(root, media_file) print path new_media_file = slugify(unicode(media_file.decode('utf8'))) if new_media_file[-3] != '.': new_media_file = new_media_file[:-3] + '.' + new_media_file[-3:] print new_media_file if new_media_file != media_file: new_media_path = os.path.join(root, new_media_file) os.rename(path, new_media_path) media_file = new_media_file print 'renaming: ' + media_file path = new_media_path media_name = os.path.splitext(media_file)[0] media_ext = os.path.splitext(media_file)[1][1:] if media_ext and media_ext in self.media_formats and media_name[0] != '.': root_list = root.split(os.sep) media_path = os.sep.join(root_list[-4:]) + os.sep + media_file item_name = root_list[-1] item_id = collection_id + '_' + slugify(unicode(item_name)) data = metadata[item_name] item, c = MediaItem.objects.get_or_create(collection=collection, code=item_id) item.old_code = item_name self.write_file(item, path) title = data[0].split('.') item.title = title[0].replace('\n', '') print data if len(data) > 1: item.track = data[1].replace('\n', '') if len(title) > 1: item.comment = '. '.join(title[1:]) item.save() for related_file in os.listdir(root): related_path = os.sep.join(root_list[-4:]) + os.sep + related_file related_name = os.path.splitext(related_file)[0] related_ext = os.path.splitext(related_file)[1][1:] if related_ext in self.image_formats: related, c = MediaItemRelated.objects.get_or_create(item=item, file=related_path) if len(data) > 2: related.title = item.track related.set_mime_type() related.save()
def print_help(
self, prog_name, subcommand)
Print the help message for this command, derived from
self.usage()
.
def print_help(self, prog_name, subcommand): """ Print the help message for this command, derived from ``self.usage()``. """ parser = self.create_parser(prog_name, subcommand) parser.print_help()
def run_from_argv(
self, argv)
Set up any environment changes requested (e.g., Python path
and Django settings), then run this command. If the
command raises a CommandError
, intercept it and print it sensibly
to stderr. If the --traceback
option is present or the raised
Exception
is not CommandError
, raise it.
def run_from_argv(self, argv): """ Set up any environment changes requested (e.g., Python path and Django settings), then run this command. If the command raises a ``CommandError``, intercept it and print it sensibly to stderr. If the ``--traceback`` option is present or the raised ``Exception`` is not ``CommandError``, raise it. """ parser = self.create_parser(argv[0], argv[1]) options, args = parser.parse_args(argv[2:]) handle_default_options(options) try: self.execute(*args, **options.__dict__) except Exception as e: if options.traceback or not isinstance(e, CommandError): raise # self.stderr is not guaranteed to be set here stderr = getattr(self, 'stderr', OutputWrapper(sys.stderr, self.style.ERROR)) stderr.write('%s: %s' % (e.__class__.__name__, e)) sys.exit(1)
def usage(
self, subcommand)
Return a brief description of how to use this command, by
default from the attribute self.help
.
def usage(self, subcommand): """ Return a brief description of how to use this command, by default from the attribute ``self.help``. """ usage = '%%prog %s [options] %s' % (subcommand, self.args) if self.help: return '%s\n\n%s' % (usage, self.help) else: return usage
def validate(
self, app=None, display_num_errors=False)
Validates the given app, raising CommandError for any errors.
If app is None, then this will validate all installed apps.
def validate(self, app=None, display_num_errors=False): """ Validates the given app, raising CommandError for any errors. If app is None, then this will validate all installed apps. """ from django.core.management.validation import get_validation_errors s = StringIO() num_errors = get_validation_errors(s, app) if num_errors: s.seek(0) error_text = s.read() raise CommandError("One or more models did not validate:\n%s" % error_text) if display_num_errors: self.stdout.write("%s error%s found" % (num_errors, '' if num_errors == 1 else 's'))
def write_file(
self, item, media)
def write_file(self, item, media): filename = media.split(os.sep)[-1] print media if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media.replace(self.media_root, '') if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user)