telemeta.management.commands.telemeta-import-collection-from-dir module
from optparse import make_option from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.core.files.base import ContentFile from django.contrib.auth.models import User from telemeta.models import * from telemeta.util.unaccent import unaccent import os try: from django.utils.text import slugify except ImportError: def slugify(string): killed_chars = re.sub('[\(\),]', '', string) return re.sub(' ', '_', killed_chars) def beautify(string): return os.path.splitext(string)[0].replace('_',' ') class Command(BaseCommand): help = "import media files from a directory into a collection" media_root = os.path.normpath(settings.MEDIA_ROOT) option_list = BaseCommand.option_list + ( make_option('-d', '--dry-run', action='store_true', dest='dry-run', help='Do NOT write anything'), make_option('-f', '--force', action='store_true', dest='force', help='Force overwrite data'), make_option('-s', '--source', dest='source_dir', help='define the source directory'), make_option('-l', '--log', dest='log', help='define log file'), make_option('-p', '--pattern', dest='pattern', help='define the pattern'), make_option('-u', '--username', dest='user', help='define the username'), make_option('-c', '--collection-code', action='store', dest='collection_code', default='default', metavar = '<code>', help='collection code'), make_option('-t', '--collection-title', action='store', dest='collection_title', default='default', metavar = '<title>', help='collection title'), ) def write_file(self, item, media): filename = media.split(os.sep)[-1] if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media[len(self.media_root)+1:] if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user) def handle(self, *args, **options): self.source_dir = os.path.abspath(options.get('source_dir')) self.collection_code = options.get('collection_code') self.collection_title = options.get('collection_title') self.dry_run = options.get('dry-run') self.user = None self.pattern = options.get('pattern') self.force = options.get('force') users = User.objects.filter(username=options.get('username')) if users: self.user = users[0] collection, c = MediaCollection.objects.get_or_create(code=self.collection_code, title=self.collection_code) if c: collection.public_access = 'full' collection.save() print 'Collection created: ' + self.collection_code else: print 'Using collection: ' + collection.code for root, dirs, files in os.walk(self.source_dir): for filename in files: path = root + os.sep + filename filename_pre, ext = os.path.splitext(filename) item_code = collection.code + '_' + filename_pre item, c = MediaItem.objects.get_or_create(collection=collection, code=item_code) if c: item.title = filename_pre item.public_access = 'full' self.write_file(item, path) item.save() print 'item created: ' + item.code else: print 'item already exists: ' + item.code
Module variables
var ITEM_PUBLIC_ACCESS_CHOICES
var ITEM_TRANSODING_STATUS
var PUBLIC_ACCESS_CHOICES
var SCOPE_CHOICES
var TYPE_CHOICES
var app_name
var code_linesep
var collection_code_regex
var collection_published_code_regex
var collection_unpublished_code_regex
var default_decoding
var default_encoding
var engine
var eol
var ext
var item_code_regex
var item_published_code_regex
var item_unpublished_code_regex
var mime_type
var private_extra_types
var public_extra_types
var resource_code_regex
var strict_code
Functions
def beautify(
string)
def beautify(string): return os.path.splitext(string)[0].replace('_',' ')
Classes
class Command
class Command(BaseCommand): help = "import media files from a directory into a collection" media_root = os.path.normpath(settings.MEDIA_ROOT) option_list = BaseCommand.option_list + ( make_option('-d', '--dry-run', action='store_true', dest='dry-run', help='Do NOT write anything'), make_option('-f', '--force', action='store_true', dest='force', help='Force overwrite data'), make_option('-s', '--source', dest='source_dir', help='define the source directory'), make_option('-l', '--log', dest='log', help='define log file'), make_option('-p', '--pattern', dest='pattern', help='define the pattern'), make_option('-u', '--username', dest='user', help='define the username'), make_option('-c', '--collection-code', action='store', dest='collection_code', default='default', metavar = '<code>', help='collection code'), make_option('-t', '--collection-title', action='store', dest='collection_title', default='default', metavar = '<title>', help='collection title'), ) def write_file(self, item, media): filename = media.split(os.sep)[-1] if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media[len(self.media_root)+1:] if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user) def handle(self, *args, **options): self.source_dir = os.path.abspath(options.get('source_dir')) self.collection_code = options.get('collection_code') self.collection_title = options.get('collection_title') self.dry_run = options.get('dry-run') self.user = None self.pattern = options.get('pattern') self.force = options.get('force') users = User.objects.filter(username=options.get('username')) if users: self.user = users[0] collection, c = MediaCollection.objects.get_or_create(code=self.collection_code, title=self.collection_code) if c: collection.public_access = 'full' collection.save() print 'Collection created: ' + self.collection_code else: print 'Using collection: ' + collection.code for root, dirs, files in os.walk(self.source_dir): for filename in files: path = root + os.sep + filename filename_pre, ext = os.path.splitext(filename) item_code = collection.code + '_' + filename_pre item, c = MediaItem.objects.get_or_create(collection=collection, code=item_code) if c: item.title = filename_pre item.public_access = 'full' self.write_file(item, path) item.save() print 'item created: ' + item.code else: print 'item already exists: ' + item.code
Ancestors (in MRO)
- Command
- django.core.management.base.BaseCommand
- __builtin__.object
Class variables
var args
var can_import_settings
var help
var leave_locale_alone
var media_root
var option_list
var output_transaction
var requires_model_validation
Methods
def __init__(
self)
def __init__(self): self.style = color_style()
def create_parser(
self, prog_name, subcommand)
Create and return the OptionParser
which will be used to
parse the arguments to this command.
def create_parser(self, prog_name, subcommand): """ Create and return the ``OptionParser`` which will be used to parse the arguments to this command. """ return OptionParser(prog=prog_name, usage=self.usage(subcommand), version=self.get_version(), option_list=self.option_list)
def execute(
self, *args, **options)
Try to execute this command, performing model validation if
needed (as controlled by the attribute
self.requires_model_validation
, except if force-skipped).
def execute(self, *args, **options): """ Try to execute this command, performing model validation if needed (as controlled by the attribute ``self.requires_model_validation``, except if force-skipped). """ self.stdout = OutputWrapper(options.get('stdout', sys.stdout)) self.stderr = OutputWrapper(options.get('stderr', sys.stderr), self.style.ERROR) if self.can_import_settings: from django.conf import settings saved_locale = None if not self.leave_locale_alone: # Only mess with locales if we can assume we have a working # settings file, because django.utils.translation requires settings # (The final saying about whether the i18n machinery is active will be # found in the value of the USE_I18N setting) if not self.can_import_settings: raise CommandError("Incompatible values of 'leave_locale_alone' " "(%s) and 'can_import_settings' (%s) command " "options." % (self.leave_locale_alone, self.can_import_settings)) # Switch to US English, because django-admin.py creates database # content like permissions, and those shouldn't contain any # translations. from django.utils import translation saved_locale = translation.get_language() translation.activate('en-us') try: if self.requires_model_validation and not options.get('skip_validation'): self.validate() output = self.handle(*args, **options) if output: if self.output_transaction: # This needs to be imported here, because it relies on # settings. from django.db import connections, DEFAULT_DB_ALIAS connection = connections[options.get('database', DEFAULT_DB_ALIAS)] if connection.ops.start_transaction_sql(): self.stdout.write(self.style.SQL_KEYWORD(connection.ops.start_transaction_sql())) self.stdout.write(output) if self.output_transaction: self.stdout.write('\n' + self.style.SQL_KEYWORD("COMMIT;")) finally: if saved_locale is not None: translation.activate(saved_locale)
def get_version(
self)
Return the Django version, which should be correct for all built-in Django commands. User-supplied commands should override this method.
def get_version(self): """ Return the Django version, which should be correct for all built-in Django commands. User-supplied commands should override this method. """ return django.get_version()
def handle(
self, *args, **options)
def handle(self, *args, **options): self.source_dir = os.path.abspath(options.get('source_dir')) self.collection_code = options.get('collection_code') self.collection_title = options.get('collection_title') self.dry_run = options.get('dry-run') self.user = None self.pattern = options.get('pattern') self.force = options.get('force') users = User.objects.filter(username=options.get('username')) if users: self.user = users[0] collection, c = MediaCollection.objects.get_or_create(code=self.collection_code, title=self.collection_code) if c: collection.public_access = 'full' collection.save() print 'Collection created: ' + self.collection_code else: print 'Using collection: ' + collection.code for root, dirs, files in os.walk(self.source_dir): for filename in files: path = root + os.sep + filename filename_pre, ext = os.path.splitext(filename) item_code = collection.code + '_' + filename_pre item, c = MediaItem.objects.get_or_create(collection=collection, code=item_code) if c: item.title = filename_pre item.public_access = 'full' self.write_file(item, path) item.save() print 'item created: ' + item.code else: print 'item already exists: ' + item.code
def print_help(
self, prog_name, subcommand)
Print the help message for this command, derived from
self.usage()
.
def print_help(self, prog_name, subcommand): """ Print the help message for this command, derived from ``self.usage()``. """ parser = self.create_parser(prog_name, subcommand) parser.print_help()
def run_from_argv(
self, argv)
Set up any environment changes requested (e.g., Python path
and Django settings), then run this command. If the
command raises a CommandError
, intercept it and print it sensibly
to stderr. If the --traceback
option is present or the raised
Exception
is not CommandError
, raise it.
def run_from_argv(self, argv): """ Set up any environment changes requested (e.g., Python path and Django settings), then run this command. If the command raises a ``CommandError``, intercept it and print it sensibly to stderr. If the ``--traceback`` option is present or the raised ``Exception`` is not ``CommandError``, raise it. """ parser = self.create_parser(argv[0], argv[1]) options, args = parser.parse_args(argv[2:]) handle_default_options(options) try: self.execute(*args, **options.__dict__) except Exception as e: if options.traceback or not isinstance(e, CommandError): raise # self.stderr is not guaranteed to be set here stderr = getattr(self, 'stderr', OutputWrapper(sys.stderr, self.style.ERROR)) stderr.write('%s: %s' % (e.__class__.__name__, e)) sys.exit(1)
def usage(
self, subcommand)
Return a brief description of how to use this command, by
default from the attribute self.help
.
def usage(self, subcommand): """ Return a brief description of how to use this command, by default from the attribute ``self.help``. """ usage = '%%prog %s [options] %s' % (subcommand, self.args) if self.help: return '%s\n\n%s' % (usage, self.help) else: return usage
def validate(
self, app=None, display_num_errors=False)
Validates the given app, raising CommandError for any errors.
If app is None, then this will validate all installed apps.
def validate(self, app=None, display_num_errors=False): """ Validates the given app, raising CommandError for any errors. If app is None, then this will validate all installed apps. """ from django.core.management.validation import get_validation_errors s = StringIO() num_errors = get_validation_errors(s, app) if num_errors: s.seek(0) error_text = s.read() raise CommandError("One or more models did not validate:\n%s" % error_text) if display_num_errors: self.stdout.write("%s error%s found" % (num_errors, '' if num_errors == 1 else 's'))
def write_file(
self, item, media)
def write_file(self, item, media): filename = media.split(os.sep)[-1] if os.path.exists(media): if not item.file or self.force: if not self.media_root in self.source_dir: print "file not in MEDIA_ROOT, copying..." f = open(media, 'r') if not self.dry_run: file_content = ContentFile(f.read()) item.file.save(filename, file_content) item.save() f.close() else: print "file in MEDIA_ROOT, linking..." path = media[len(self.media_root)+1:] if not self.dry_run: item.file = path item.save() if self.user: item.set_revision(self.user)