Add starter code of Project: Exploring Near-Earth Objects

This commit is contained in:
2025-12-26 17:19:08 -08:00
parent 241df7c9d3
commit 137a13ec61
20 changed files with 92427 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
"""Let Python know that the `tests/` folder is a package for Test Discovery [1].
[1]: https://docs.python.org/3/library/unittest.html#unittest-test-discovery
"""

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,62 @@
"""Check that the data files exist and are readable, nonempty, and well-formatted.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_data_files
These tests should pass on the starter code.
"""
import collections
import csv
import json
import os
import pathlib
import unittest
# The root of the project, containing `main.py`.
PROJECT_ROOT = pathlib.Path(__file__).parent.parent.resolve()
class TestDataFiles(unittest.TestCase):
def setUp(self):
self.data_root = PROJECT_ROOT / 'data'
self.neo_file = self.data_root / 'neos.csv'
self.cad_file = self.data_root / 'cad.json'
def test_data_files_exist(self):
self.assertTrue(self.neo_file.exists())
self.assertTrue(self.cad_file.exists())
def test_data_files_are_readable(self):
self.assertTrue(os.access(self.neo_file, os.R_OK))
self.assertTrue(os.access(self.cad_file, os.R_OK))
def test_data_files_are_not_empty(self):
try:
self.assertTrue(self.neo_file.stat().st_size > 0, "Empty NEO file.")
self.assertTrue(self.cad_file.stat().st_size > 0, "Empty CAD file.")
except OSError:
self.fail("Unexpected OSError.")
def test_data_files_are_well_formatted(self):
# Check that the NEO data is CSV-formatted.
try:
with self.neo_file.open() as f:
# Consume the entire sequence into length-0 deque.
collections.deque(csv.reader(f), maxlen=0)
except csv.Error as err:
raise self.failureException(f"{self.neo_file!r} is not a well-formated CSV.") from err
# Check that the CAD data is JSON-formatted.
try:
with self.cad_file.open() as f:
json.load(f)
json.loads(self.cad_file.read_text())
except json.JSONDecodeError as err:
raise self.failureException(f"{self.cad_file!r} is not a valid JSON document.") from err
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,119 @@
"""Check that an `NEODatabase` can be constructed and responds to inspect queries.
The `NEODatabase` constructor should cross-link NEOs and their close approaches,
as well as prepare any additional metadata needed to support the `get_neo_by_*`
methods.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_database
These tests should pass when Task 2 is complete.
"""
import pathlib
import math
import unittest
from extract import load_neos, load_approaches
from database import NEODatabase
# Paths to the test data files.
TESTS_ROOT = (pathlib.Path(__file__).parent).resolve()
TEST_NEO_FILE = TESTS_ROOT / 'test-neos-2020.csv'
TEST_CAD_FILE = TESTS_ROOT / 'test-cad-2020.json'
class TestDatabase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.neos = load_neos(TEST_NEO_FILE)
cls.approaches = load_approaches(TEST_CAD_FILE)
cls.db = NEODatabase(cls.neos, cls.approaches)
def test_database_construction_links_approaches_to_neos(self):
for approach in self.approaches:
self.assertIsNotNone(approach.neo)
def test_database_construction_ensures_each_neo_has_an_approaches_attribute(self):
for neo in self.neos:
self.assertTrue(hasattr(neo, 'approaches'))
def test_database_construction_ensures_neos_collectively_exhaust_approaches(self):
approaches = set()
for neo in self.neos:
approaches.update(neo.approaches)
self.assertEqual(approaches, set(self.approaches))
def test_database_construction_ensures_neos_mutually_exclude_approaches(self):
seen = set()
for neo in self.neos:
for approach in neo.approaches:
if approach in seen:
self.fail(f"{approach} appears in the approaches of multiple NEOs.")
seen.add(approach)
def test_get_neo_by_designation(self):
cerberus = self.db.get_neo_by_designation('1865')
self.assertIsNotNone(cerberus)
self.assertEqual(cerberus.designation, '1865')
self.assertEqual(cerberus.name, 'Cerberus')
self.assertEqual(cerberus.diameter, 1.2)
self.assertEqual(cerberus.hazardous, False)
adonis = self.db.get_neo_by_designation('2101')
self.assertIsNotNone(adonis)
self.assertEqual(adonis.designation, '2101')
self.assertEqual(adonis.name, 'Adonis')
self.assertEqual(adonis.diameter, 0.60)
self.assertEqual(adonis.hazardous, True)
tantalus = self.db.get_neo_by_designation('2102')
self.assertIsNotNone(tantalus)
self.assertEqual(tantalus.designation, '2102')
self.assertEqual(tantalus.name, 'Tantalus')
self.assertEqual(tantalus.diameter, 1.649)
self.assertEqual(tantalus.hazardous, True)
def test_get_neo_by_designation_neos_with_year(self):
bs_2020 = self.db.get_neo_by_designation('2020 BS')
self.assertIsNotNone(bs_2020)
self.assertEqual(bs_2020.designation, '2020 BS')
self.assertEqual(bs_2020.name, None)
self.assertTrue(math.isnan(bs_2020.diameter))
self.assertEqual(bs_2020.hazardous, False)
py1_2020 = self.db.get_neo_by_designation('2020 PY1')
self.assertIsNotNone(py1_2020)
self.assertEqual(py1_2020.designation, '2020 PY1')
self.assertEqual(py1_2020.name, None)
self.assertTrue(math.isnan(py1_2020.diameter))
self.assertEqual(py1_2020.hazardous, False)
def test_get_neo_by_designation_missing(self):
nonexistent = self.db.get_neo_by_designation('not-real-designation')
self.assertIsNone(nonexistent)
def test_get_neo_by_name(self):
lemmon = self.db.get_neo_by_name('Lemmon')
self.assertIsNotNone(lemmon)
self.assertEqual(lemmon.designation, '2013 TL117')
self.assertEqual(lemmon.name, 'Lemmon')
self.assertTrue(math.isnan(lemmon.diameter))
self.assertEqual(lemmon.hazardous, False)
jormungandr = self.db.get_neo_by_name('Jormungandr')
self.assertIsNotNone(jormungandr)
self.assertEqual(jormungandr.designation, '471926')
self.assertEqual(jormungandr.name, 'Jormungandr')
self.assertTrue(math.isnan(jormungandr.diameter))
self.assertEqual(jormungandr.hazardous, True)
def test_get_neo_by_name_missing(self):
nonexistent = self.db.get_neo_by_name('not-real-name')
self.assertIsNone(nonexistent)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,122 @@
"""Check that data can be extracted from structured data files.
The `load_neos` function should load a collection of `NearEarthObject`s from a
CSV file, and the `load_approaches` function should load a collection of
`CloseApproach` objects from a JSON file.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_extract
These tests should pass when Task 2 is complete.
"""
import collections.abc
import datetime
import pathlib
import math
import unittest
from extract import load_neos, load_approaches
from models import NearEarthObject, CloseApproach
TESTS_ROOT = (pathlib.Path(__file__).parent).resolve()
TEST_NEO_FILE = TESTS_ROOT / 'test-neos-2020.csv'
TEST_CAD_FILE = TESTS_ROOT / 'test-cad-2020.json'
class TestLoadNEOs(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.neos = load_neos(TEST_NEO_FILE)
cls.neos_by_designation = {neo.designation: neo for neo in cls.neos}
@classmethod
def get_first_neo_or_none(cls):
try:
# Don't use __getitem__ in case the object is a set or a stream.
return next(iter(cls.neos))
except StopIteration:
return None
def test_neos_are_collection(self):
self.assertIsInstance(self.neos, collections.abc.Collection)
def test_neos_contain_near_earth_objects(self):
neo = self.get_first_neo_or_none()
self.assertIsNotNone(neo)
self.assertIsInstance(neo, NearEarthObject)
def test_neos_contain_all_elements(self):
self.assertEqual(len(self.neos), 4226)
def test_neos_contain_2019_SC8_no_name_no_diameter(self):
self.assertIn('2019 SC8', self.neos_by_designation)
neo = self.neos_by_designation['2019 SC8']
self.assertEqual(neo.designation, '2019 SC8')
self.assertEqual(neo.name, None)
self.assertTrue(math.isnan(neo.diameter))
self.assertEqual(neo.hazardous, False)
def test_asclepius_has_name_no_diameter(self):
self.assertIn('4581', self.neos_by_designation)
neo = self.neos_by_designation['4581']
self.assertEqual(neo.designation, '4581')
self.assertEqual(neo.name, 'Asclepius')
self.assertTrue(math.isnan(neo.diameter))
self.assertEqual(neo.hazardous, True)
def test_adonis_is_potentially_hazardous(self):
self.assertIn('2101', self.neos_by_designation)
neo = self.neos_by_designation['2101']
self.assertEqual(neo.designation, '2101')
self.assertEqual(neo.name, 'Adonis')
self.assertEqual(neo.diameter, 0.6)
self.assertEqual(neo.hazardous, True)
class TestLoadApproaches(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.approaches = load_approaches(TEST_CAD_FILE)
@classmethod
def get_first_approach_or_none(cls):
try:
# Don't __getitem__, in case it's a set or a stream.
return next(iter(cls.approaches))
except StopIteration:
return None
def test_approaches_are_collection(self):
self.assertIsInstance(self.approaches, collections.abc.Collection)
def test_approaches_contain_close_approaches(self):
approach = self.get_first_approach_or_none()
self.assertIsNotNone(approach)
self.assertIsInstance(approach, CloseApproach)
def test_approaches_contain_all_elements(self):
self.assertEqual(len(self.approaches), 4700)
def test_approach_time_is_datetime(self):
approach = self.get_first_approach_or_none()
self.assertIsNotNone(approach)
self.assertIsInstance(approach.time, datetime.datetime)
def test_approach_distance_is_float(self):
approach = self.get_first_approach_or_none()
self.assertIsNotNone(approach)
self.assertIsInstance(approach.distance, float)
def test_approach_velocity_is_float(self):
approach = self.get_first_approach_or_none()
self.assertIsNotNone(approach)
self.assertIsInstance(approach.velocity, float)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,54 @@
"""Check that the `limit` function limits iterables.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_limit
It isn't guaranteed that `limit` is a generator function - it's possible to
implement it imperatively with the tools from the `itertools` module.
These tests should pass when Task 3c is complete.
"""
import collections.abc
import unittest
from filters import limit
class TestLimit(unittest.TestCase):
def setUp(self):
self.iterable = tuple(range(5))
def test_limit_iterable_with_limit(self):
self.assertEqual(tuple(limit(self.iterable, 3)), (0, 1, 2))
def test_limit_iterable_without_limit(self):
self.assertEqual(tuple(limit(self.iterable)), (0, 1, 2, 3, 4))
self.assertEqual(tuple(limit(self.iterable, 0)), (0, 1, 2, 3, 4))
self.assertEqual(tuple(limit(self.iterable, None)), (0, 1, 2, 3, 4))
def test_limit_iterator_with_smaller_limit(self):
self.assertEqual(tuple(limit(iter(self.iterable), 3)), (0, 1, 2))
def test_limit_iterator_with_matching_limit(self):
self.assertEqual(tuple(limit(iter(self.iterable), 5)), (0, 1, 2, 3, 4))
def test_limit_iterator_with_larger_limit(self):
self.assertEqual(tuple(limit(iter(self.iterable), 10)), (0, 1, 2, 3, 4))
def test_limit_iterator_without_limit(self):
self.assertEqual(tuple(limit(iter(self.iterable))), (0, 1, 2, 3, 4))
self.assertEqual(tuple(limit(iter(self.iterable), 0)), (0, 1, 2, 3, 4))
self.assertEqual(tuple(limit(iter(self.iterable), None)), (0, 1, 2, 3, 4))
def test_limit_produces_an_iterable(self):
self.assertIsInstance(limit(self.iterable, 3), collections.abc.Iterable)
self.assertIsInstance(limit(self.iterable, 5), collections.abc.Iterable)
self.assertIsInstance(limit(self.iterable, 10), collections.abc.Iterable)
self.assertIsInstance(limit(self.iterable), collections.abc.Iterable)
self.assertIsInstance(limit(self.iterable, 0), collections.abc.Iterable)
self.assertIsInstance(limit(self.iterable, None), collections.abc.Iterable)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,49 @@
"""Check that the Python version is at least up to a minimum threshold of 3.6.
The instructions explicitly invoke each command using `python3` on the command
line, but a student's local setup might not default to using Python 3.6+, which
is required for this project. Additionally, some students may accidentally be in
the habit of using bare `python`, which could invoke Python 2.x if their
environment isn't set up correctly.
Other modules in this project aggressively assume Python 3.6+, so this unit test
is our only cession to the possibility that students might be running a lower
version of Python.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_python_version
These tests should (successfully) fail, but not crash, when invoked with Python 2:
$ /usr/bin/python2.7 -m unittest --verbose tests.test_python_version
"""
import sys
import unittest
class TestPythonVersion(unittest.TestCase):
"""Check that the Python version is >= 3.6."""
def test_python_version_is_at_least_3_6(self):
self.assertTrue(sys.version_info >= (3, 6),
msg="""Unsupported Python version.
It looks like you're using a version of Python that's too old.
This project requires Python 3.6+. You're currently using Python {}.{}.{}.
Make sure that you have a compatible version of Python and that you're using
`python3` at the command-line (or that your environment resolves `python` to
some Python3.6+ version if you have a custom setup).
Remember, you can always ask Python to display its version with:
$ python3 -V
Python 3.X.Y
""".format(*sys.version_info[:3]))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,533 @@
"""Check that `query`ing an `NEODatabase` accurately produces close approaches.
There are a plethora of ways to combine the arguments to `create_filters`, which
correspond to different command-line options. This modules tests the options in
isolation, in pairs, and in more complicated combinations. Althought the tests
are not entirely exhaustive, any implementation that passes all of these tests
is most likely up to snuff.
To run these tests from the project root, run::
$ python3 -m unittest --verbose tests.test_query
These tests should pass when Tasks 3a and 3b are complete.
"""
import datetime
import pathlib
import unittest
from database import NEODatabase
from extract import load_neos, load_approaches
from filters import create_filters
TESTS_ROOT = (pathlib.Path(__file__).parent).resolve()
TEST_NEO_FILE = TESTS_ROOT / 'test-neos-2020.csv'
TEST_CAD_FILE = TESTS_ROOT / 'test-cad-2020.json'
class TestQuery(unittest.TestCase):
# Set longMessage to True to enable lengthy diffs between set comparisons.
longMessage = False
@classmethod
def setUpClass(cls):
cls.neos = load_neos(TEST_NEO_FILE)
cls.approaches = load_approaches(TEST_CAD_FILE)
cls.db = NEODatabase(cls.neos, cls.approaches)
def test_query_all(self):
expected = set(self.approaches)
self.assertGreater(len(expected), 0)
filters = create_filters()
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
###############################################
# Single filters and pairs of related filters #
###############################################
def test_query_approaches_on_march_2(self):
date = datetime.date(2020, 3, 2)
expected = set(
approach for approach in self.approaches
if approach.time.date() == date
)
self.assertGreater(len(expected), 0)
filters = create_filters(date=date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_after_april(self):
start_date = datetime.date(2020, 4, 1)
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date()
)
self.assertGreater(len(expected), 0)
filters = create_filters(start_date=start_date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_before_july(self):
end_date = datetime.date(2020, 6, 30)
expected = set(
approach for approach in self.approaches
if approach.time.date() <= end_date
)
self.assertGreater(len(expected), 0)
filters = create_filters(end_date=end_date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_march(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 3, 31)
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
)
self.assertGreater(len(expected), 0)
filters = create_filters(start_date=start_date, end_date=end_date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_conflicting_date_bounds(self):
start_date = datetime.date(2020, 10, 1)
end_date = datetime.date(2020, 4, 1)
expected = set()
filters = create_filters(start_date=start_date, end_date=end_date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_bounds_and_a_specific_date(self):
start_date = datetime.date(2020, 2, 1)
date = datetime.date(2020, 3, 2)
end_date = datetime.date(2020, 4, 1)
expected = set(
approach for approach in self.approaches
if approach.time.date() == date
)
self.assertGreater(len(expected), 0)
filters = create_filters(date=date, start_date=start_date, end_date=end_date)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_distance(self):
distance_max = 0.4
expected = set(
approach for approach in self.approaches
if approach.distance <= distance_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(distance_max=distance_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_min_distance(self):
distance_min = 0.1
expected = set(
approach for approach in self.approaches
if distance_min <= approach.distance
)
self.assertGreater(len(expected), 0)
filters = create_filters(distance_min=distance_min)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_distance_and_min_distance(self):
distance_max = 0.4
distance_min = 0.1
expected = set(
approach for approach in self.approaches
if distance_min <= approach.distance <= distance_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(distance_min=distance_min, distance_max=distance_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_distance_and_min_distance_conflicting(self):
distance_max = 0.1
distance_min = 0.4
expected = set()
filters = create_filters(distance_min=distance_min, distance_max=distance_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_velocity(self):
velocity_max = 20
expected = set(
approach for approach in self.approaches
if approach.velocity <= velocity_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(velocity_max=velocity_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_min_velocity(self):
velocity_min = 10
expected = set(
approach for approach in self.approaches
if velocity_min <= approach.velocity
)
self.assertGreater(len(expected), 0)
filters = create_filters(velocity_min=velocity_min)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_velocity_and_min_velocity(self):
velocity_max = 20
velocity_min = 10
expected = set(
approach for approach in self.approaches
if velocity_min <= approach.velocity <= velocity_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(velocity_min=velocity_min, velocity_max=velocity_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_velocity_and_min_velocity_conflicting(self):
velocity_max = 10
velocity_min = 20
expected = set()
filters = create_filters(velocity_min=velocity_min, velocity_max=velocity_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_diameter(self):
diameter_max = 1.5
expected = set(
approach for approach in self.approaches
if approach.neo.diameter <= diameter_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(diameter_max=diameter_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_min_diameter(self):
diameter_min = 0.5
expected = set(
approach for approach in self.approaches
if diameter_min <= approach.neo.diameter
)
self.assertGreater(len(expected), 0)
filters = create_filters(diameter_min=diameter_min)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_diameter_and_min_diameter(self):
diameter_max = 1.5
diameter_min = 0.5
expected = set(
approach for approach in self.approaches
if diameter_min <= approach.neo.diameter <= diameter_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(diameter_min=diameter_min, diameter_max=diameter_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_max_diameter_and_min_diameter_conflicting(self):
diameter_max = 0.5
diameter_min = 1.5
expected = set()
filters = create_filters(diameter_min=diameter_min, diameter_max=diameter_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_hazardous(self):
expected = set(
approach for approach in self.approaches
if approach.neo.hazardous
)
self.assertGreater(len(expected), 0)
filters = create_filters(hazardous=True)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_with_not_hazardous(self):
expected = set(
approach for approach in self.approaches
if not approach.neo.hazardous
)
self.assertGreater(len(expected), 0)
filters = create_filters(hazardous=False)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
###########################
# Combinations of filters #
###########################
def test_query_approaches_on_march_2_with_max_distance(self):
date = datetime.date(2020, 3, 2)
distance_max = 0.4
expected = set(
approach for approach in self.approaches
if approach.time.date() == date
and approach.distance <= distance_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(date=date, distance_max=distance_max)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_on_march_2_with_min_distance(self):
date = datetime.date(2020, 3, 2)
distance_min = 0.1
expected = set(
approach for approach in self.approaches
if approach.time.date() == date
and distance_min <= approach.distance
)
self.assertGreater(len(expected), 0)
filters = create_filters(date=date, distance_min=distance_min)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_march_with_min_distance_and_max_distance(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 3, 31)
distance_max = 0.4
distance_min = 0.1
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_march_with_distance_bounds_and_max_velocity(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 3, 31)
distance_max = 0.4
distance_min = 0.1
velocity_max = 20
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and approach.velocity <= velocity_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_max=velocity_max
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_march_with_distance_and_velocity_bounds(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 3, 31)
distance_max = 0.4
distance_min = 0.1
velocity_max = 20
velocity_min = 10
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and velocity_min <= approach.velocity <= velocity_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_min=velocity_min, velocity_max=velocity_max
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_spring_with_distance_and_velocity_bounds_and_max_diameter(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 5, 31)
distance_max = 0.5
distance_min = 0.05
velocity_max = 25
velocity_min = 5
diameter_max = 1.5
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and velocity_min <= approach.velocity <= velocity_max
and approach.neo.diameter <= diameter_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_min=velocity_min, velocity_max=velocity_max,
diameter_max=diameter_max
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_spring_with_distance_velocity_and_diameter_bounds(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 5, 31)
distance_max = 0.5
distance_min = 0.05
velocity_max = 25
velocity_min = 5
diameter_max = 1.5
diameter_min = 0.5
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and velocity_min <= approach.velocity <= velocity_max
and diameter_min <= approach.neo.diameter <= diameter_max
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_min=velocity_min, velocity_max=velocity_max,
diameter_min=diameter_min, diameter_max=diameter_max
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_spring_with_all_bounds_and_potentially_hazardous_neos(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 5, 31)
distance_max = 0.5
distance_min = 0.05
velocity_max = 25
velocity_min = 5
diameter_max = 1.5
diameter_min = 0.5
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and velocity_min <= approach.velocity <= velocity_max
and diameter_min <= approach.neo.diameter <= diameter_max
and approach.neo.hazardous
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_min=velocity_min, velocity_max=velocity_max,
diameter_min=diameter_min, diameter_max=diameter_max,
hazardous=True
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
def test_query_approaches_in_spring_with_all_bounds_and_not_potentially_hazardous_neos(self):
start_date = datetime.date(2020, 3, 1)
end_date = datetime.date(2020, 5, 31)
distance_max = 0.5
distance_min = 0.05
velocity_max = 25
velocity_min = 5
diameter_max = 1.5
diameter_min = 0.5
expected = set(
approach for approach in self.approaches
if start_date <= approach.time.date() <= end_date
and distance_min <= approach.distance <= distance_max
and velocity_min <= approach.velocity <= velocity_max
and diameter_min <= approach.neo.diameter <= diameter_max
and not approach.neo.hazardous
)
self.assertGreater(len(expected), 0)
filters = create_filters(
start_date=start_date, end_date=end_date,
distance_min=distance_min, distance_max=distance_max,
velocity_min=velocity_min, velocity_max=velocity_max,
diameter_min=diameter_min, diameter_max=diameter_max,
hazardous=False
)
received = set(self.db.query(filters))
self.assertEqual(expected, received, msg="Computed results do not match expected results.")
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,230 @@
"""Check that streams of results can be written to files.
The `write_to_csv` and `write_to_json` methods should follow a specific output
format, described in the project instructions.
There's some sketchy file-like manipulation in order to avoid writing anything
to disk and avoid letting a context manager in the implementation eagerly close
the in-memory file - so be warned that the workaround is gnarly.
To run these tests from the project root, run:
$ python3 -m unittest --verbose tests.test_write
These tests should pass when Task 4 is complete.
"""
import collections
import collections.abc
import contextlib
import csv
import datetime
import io
import json
import pathlib
import unittest
import unittest.mock
from extract import load_neos, load_approaches
from database import NEODatabase
from write import write_to_csv, write_to_json
TESTS_ROOT = (pathlib.Path(__file__).parent).resolve()
TEST_NEO_FILE = TESTS_ROOT / 'test-neos-2020.csv'
TEST_CAD_FILE = TESTS_ROOT / 'test-cad-2020.json'
def build_results(n):
neos = tuple(load_neos(TEST_NEO_FILE))
approaches = tuple(load_approaches(TEST_CAD_FILE))
# Only needed to link together these objects.
NEODatabase(neos, approaches)
return approaches[:n]
@contextlib.contextmanager
def UncloseableStringIO(value=''):
"""A context manager for an uncloseable `io.StringIO`.
This produces an almost-normal `io.StringIO`, except the `close` method has
been patched out with a no-op. The context manager takes care of restoring
the monkeypatch and closing the buffer, but this prevents other nested
context managers (such as `open` from the implementation of `write_to_*`)
from preemptively closing the `StringIO` before we can rewind it and read
its value.
"""
buf = io.StringIO(value)
buf._close = buf.close
buf.close = lambda: False
yield buf
buf.close = buf._close
delattr(buf, '_close')
buf.close()
class TestWriteToCSV(unittest.TestCase):
@classmethod
@unittest.mock.patch('write.open')
def setUpClass(cls, mock_file):
results = build_results(5)
with UncloseableStringIO() as buf:
mock_file.return_value = buf
try:
write_to_csv(results, None)
except csv.Error as err:
raise cls.failureException("Unable to write results to CSV.") from err
except ValueError as err:
raise cls.failureException("Unexpected failure while writing to CSV.") from err
else:
# Rewind the unclosed buffer to save its contents.
buf.seek(0)
cls.value = buf.getvalue()
def test_csv_data_is_well_formed(self):
# Now, we have the value in memory, and can _actually_ start testing.
buf = io.StringIO(self.value)
# Check that the output is well-formed.
try:
# Consume the output and immediately discard it.
collections.deque(csv.DictReader(buf), maxlen=0)
except csv.Error as err:
raise self.failureException("write_to_csv produced an invalid CSV format.") from err
def test_csv_data_has_header(self):
try:
self.assertTrue(csv.Sniffer().has_header(self.value))
return
except csv.Error as err:
raise self.failureException("Unable to sniff for headers.") from err
def test_csv_data_has_five_rows(self):
# Now, we have the value in memory, and can _actually_ start testing.
buf = io.StringIO(self.value)
# Check that the output is well-formed.
try:
reader = csv.DictReader(buf)
rows = tuple(reader)
except csv.Error as err:
raise self.failureException("write_to_csv produced an invalid CSV format.") from err
self.assertEqual(len(rows), 5)
def test_csv_data_header_matches_requirements(self):
# Now, we have the value in memory, and can _actually_ start testing.
buf = io.StringIO(self.value)
# Check that the output is well-formed.
try:
reader = csv.DictReader(buf)
rows = tuple(reader)
except csv.Error as err:
raise self.failureException("write_to_csv produced an invalid CSV format.") from err
fieldnames = ('datetime_utc', 'distance_au', 'velocity_km_s', 'designation', 'name', 'diameter_km', 'potentially_hazardous')
self.assertGreater(len(rows), 0)
self.assertSetEqual(set(fieldnames), set(rows[0].keys()))
class TestWriteToJSON(unittest.TestCase):
@classmethod
@unittest.mock.patch('write.open')
def setUpClass(cls, mock_file):
results = build_results(5)
with UncloseableStringIO() as buf:
mock_file.return_value = buf
try:
write_to_json(results, None)
except csv.Error as err:
raise cls.failureException("Unable to write results to CSV.") from err
except ValueError as err:
raise cls.failureException("Unexpected failure while writing to CSV.") from err
else:
# Rewind the unclosed buffer to fetch the contents saved to "disk".
buf.seek(0)
cls.value = buf.getvalue()
def test_json_data_is_well_formed(self):
# Now, we have the value in memory, and can _actually_ start testing.
buf = io.StringIO(self.value)
try:
json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
def test_json_data_is_a_sequence(self):
buf = io.StringIO(self.value)
try:
data = json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
self.assertIsInstance(data, collections.abc.Sequence)
def test_json_data_has_five_elements(self):
buf = io.StringIO(self.value)
try:
data = json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
self.assertEqual(len(data), 5)
def test_json_element_is_associative(self):
buf = io.StringIO(self.value)
try:
data = json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
approach = data[0]
self.assertIsInstance(approach, collections.abc.Mapping)
def test_json_element_has_nested_attributes(self):
buf = io.StringIO(self.value)
try:
data = json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
approach = data[0]
self.assertIn('datetime_utc', approach)
self.assertIn('distance_au', approach)
self.assertIn('velocity_km_s', approach)
self.assertIn('neo', approach)
neo = approach['neo']
self.assertIn('designation', neo)
self.assertIn('name', neo)
self.assertIn('diameter_km', neo)
self.assertIn('potentially_hazardous', neo)
def test_json_element_decodes_to_correct_types(self):
buf = io.StringIO(self.value)
try:
data = json.load(buf)
except json.JSONDecodeError as err:
raise self.failureException("write_to_json produced an invalid JSON document") from err
approach = data[0]
try:
datetime.datetime.strptime(approach['datetime_utc'], '%Y-%m-%d %H:%M')
except ValueError:
self.fail("The `datetime_utc` key isn't in YYYY-MM-DD HH:MM` format.")
self.assertIsInstance(approach['distance_au'], float)
self.assertIsInstance(approach['velocity_km_s'], float)
self.assertIsInstance(approach['neo']['designation'], str)
self.assertNotEqual(approach['neo']['name'], 'None')
if approach['neo']['name']:
self.assertIsInstance(approach['neo']['name'], str)
self.assertIsInstance(approach['neo']['diameter_km'], float)
self.assertIsInstance(approach['neo']['potentially_hazardous'], bool)
if __name__ == '__main__':
unittest.main()