1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

CSV import implemented using CSVFileField

This commit is contained in:
Alyssa Bigley
2021-06-04 10:27:19 -04:00
parent 6ff5a1db42
commit c2b2b059e6
2 changed files with 77 additions and 22 deletions

View File

@ -26,6 +26,7 @@ __all__ = (
'CSVChoiceField',
'CSVContentTypeField',
'CSVDataField',
'CSVFileField',
'CSVModelChoiceField',
'CSVTypedChoiceField',
'DynamicModelChoiceField',
@ -221,6 +222,77 @@ class CSVDataField(forms.CharField):
return value
class CSVFileField(forms.FileField):
"""
A CharField (rendered as a Textarea) which accepts CSV-formatted data. It returns data as a two-tuple: The first
item is a dictionary of column headers, mapping field names to the attribute by which they match a related object
(where applicable). The second item is a list of dictionaries, each representing a discrete row of CSV data.
:param from_form: The form from which the field derives its validation rules.
"""
def __init__(self, from_form, *args, **kwargs):
form = from_form()
self.model = form.Meta.model
self.fields = form.fields
self.required_fields = [
name for name, field in form.fields.items() if field.required
]
super().__init__(*args, **kwargs)
def to_python(self, file):
records = []
file.seek(0)
csv_str = file.read().decode('utf-8')
reader = csv.reader(csv_str.splitlines())
# Consume the first line of CSV data as column headers. Create a dictionary mapping each header to an optional
# "to" field specifying how the related object is being referenced. For example, importing a Device might use a
# `site.slug` header, to indicate the related site is being referenced by its slug.
headers = {}
for header in next(reader):
if '.' in header:
field, to_field = header.split('.', 1)
headers[field] = to_field
else:
headers[header] = None
# Parse CSV rows into a list of dictionaries mapped from the column headers.
for i, row in enumerate(reader, start=1):
if len(row) != len(headers):
raise forms.ValidationError(
f"Row {i}: Expected {len(headers)} columns but found {len(row)}"
)
row = [col.strip() for col in row]
record = dict(zip(headers.keys(), row))
records.append(record)
return headers, records
def validate(self, value):
headers, records = value
# Validate provided column headers
for field, to_field in headers.items():
if field not in self.fields:
raise forms.ValidationError(f'Unexpected column header "{field}" found.')
if to_field and not hasattr(self.fields[field], 'to_field_name'):
raise forms.ValidationError(f'Column "{field}" is not a related object; cannot use dots')
if to_field and not hasattr(self.fields[field].queryset.model, to_field):
raise forms.ValidationError(f'Invalid related object attribute for column "{field}": {to_field}')
# Validate required fields
for f in self.required_fields:
if f not in headers:
raise forms.ValidationError(f'Required column header "{f}" not found.')
return value
class CSVChoiceField(forms.ChoiceField):
"""
Invert the provided set of choices to take the human-friendly label as input, and return the database value.