1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Adds csv dialect detection to bulk import view (#13563)

* adds csv dialect detection to bulk import view #13239

* adds sane delimiters for dialect detection #13239

* adds csv delimiter tests #13239

* adds csv delimiter on the form

* pass delimiter to clean_csv method #13239

* fix tests for csv import #13239

* fix tests for csv import #13239

* fix tests for csv import #13239

* fix tests for csv import #13239

* Improve auto-detection of import data format

* Misc cleanup

* Include tab as a supported delimiting character for auto-detection

* Move delimiting chars to a separate constant for easy reference

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
This commit is contained in:
Abhimanyu Saharan
2023-09-13 02:18:40 +05:30
committed by GitHub
parent 39cb9c32d6
commit b7cfb2f7d9
9 changed files with 123 additions and 15 deletions

View File

@@ -7,10 +7,10 @@ from django import forms
from django.utils.translation import gettext as _
from core.forms.mixins import SyncedDataMixin
from utilities.choices import ImportFormatChoices
from utilities.choices import CSVDelimiterChoices, ImportFormatChoices, ImportMethodChoices
from utilities.constants import CSV_DELIMITERS
from utilities.forms.utils import parse_csv
from .mixins import BootstrapMixin
from ..choices import ImportMethodChoices
class BulkImportForm(BootstrapMixin, SyncedDataMixin, forms.Form):
@@ -24,13 +24,20 @@ class BulkImportForm(BootstrapMixin, SyncedDataMixin, forms.Form):
help_text=_("Enter object data in CSV, JSON or YAML format.")
)
upload_file = forms.FileField(
label="Data file",
label=_("Data file"),
required=False
)
format = forms.ChoiceField(
choices=ImportFormatChoices,
initial=ImportFormatChoices.AUTO
)
csv_delimiter = forms.ChoiceField(
choices=CSVDelimiterChoices,
initial=CSVDelimiterChoices.AUTO,
label=_("CSV delimiter"),
help_text=_("The character which delimits CSV fields. Applies only to CSV format."),
required=False
)
data_field = 'data'
@@ -54,13 +61,18 @@ class BulkImportForm(BootstrapMixin, SyncedDataMixin, forms.Form):
# Determine the data format
if self.cleaned_data['format'] == ImportFormatChoices.AUTO:
format = self._detect_format(data)
if self.cleaned_data['csv_delimiter'] != CSVDelimiterChoices.AUTO:
# Specifying the CSV delimiter implies CSV format
format = ImportFormatChoices.CSV
else:
format = self._detect_format(data)
else:
format = self.cleaned_data['format']
# Process data according to the selected format
if format == ImportFormatChoices.CSV:
self.cleaned_data['data'] = self._clean_csv(data)
delimiter = self.cleaned_data.get('csv_delimiter', CSVDelimiterChoices.AUTO)
self.cleaned_data['data'] = self._clean_csv(data, delimiter=delimiter)
elif format == ImportFormatChoices.JSON:
self.cleaned_data['data'] = self._clean_json(data)
elif format == ImportFormatChoices.YAML:
@@ -78,7 +90,10 @@ class BulkImportForm(BootstrapMixin, SyncedDataMixin, forms.Form):
return ImportFormatChoices.JSON
if data.startswith('---') or data.startswith('- '):
return ImportFormatChoices.YAML
if ',' in data.split('\n', 1)[0]:
# Look for any of the CSV delimiters in the first line (ignoring the default 'auto' choice)
first_line = data.split('\n', 1)[0]
csv_delimiters = CSV_DELIMITERS.values()
if any(x in first_line for x in csv_delimiters):
return ImportFormatChoices.CSV
except IndexError:
pass
@@ -86,12 +101,31 @@ class BulkImportForm(BootstrapMixin, SyncedDataMixin, forms.Form):
'format': _('Unable to detect data format. Please specify.')
})
def _clean_csv(self, data):
def _clean_csv(self, data, delimiter=CSVDelimiterChoices.AUTO):
"""
Clean CSV-formatted data. The first row will be treated as column headers.
"""
# Determine the CSV dialect
if delimiter == CSVDelimiterChoices.AUTO:
# This uses a rough heuristic to detect the CSV dialect based on the presence of supported delimiting
# characters. If the data is malformed, we'll fall back to the default Excel dialect.
delimiters = ''.join(CSV_DELIMITERS.values())
try:
dialect = csv.Sniffer().sniff(data.strip(), delimiters=delimiters)
except csv.Error:
dialect = csv.excel
elif delimiter in (CSVDelimiterChoices.COMMA, CSVDelimiterChoices.SEMICOLON):
dialect = csv.excel
dialect.delimiter = delimiter
elif delimiter == CSVDelimiterChoices.TAB:
dialect = csv.excel_tab
else:
raise forms.ValidationError({
'csv_delimiter': _('Invalid CSV delimiter'),
})
stream = StringIO(data.strip())
reader = csv.reader(stream)
reader = csv.reader(stream, dialect=dialect)
headers, records = parse_csv(reader)
# Set CSV headers for reference by the model form