Automation#
When dealing with any sort of repetitive tasks like:
uploading a large number of files
creating many packages
preserving your data periodically
analyzing information across packages
you can use the API of CKAN (the core software stack of ERIC) to automate these tasks with a programming language of your choosing.
Authentication#
For many things you might want to automate (like uploading data) you will need to authenticate yourself to the system, so that CKAN can check if you’re authorized to for instance upload data to a certain package. For that you will need a token. If you do not yet have one please contact rdm@eawag.ch and we will generate one for you.
Important
CKAN has some limitations when uploading large files (>8GB). If that is what you’re after please contact rdm@eawag.ch.
Examples#
Below you will find some examples on how to use the API with Python.
Retrieving information about a package#
For this we can use the package_show endpoint the CKAN API offers.
First we’ll define a function that can request information from the CKAN API and returns a dictionary:
import json
from urllib.request import urlopen, Request
def request_json_data(url: str, token: str | None =None) -> dict:
headers = {} if token is None else {'Authorization': token}
with urlopen(Request(url, headers=headers)) as response:
return json.loads(response.read().decode())
Then we can request the data. As we’re reading from a public dataset we do not need an API key. Mind the composition of the url:
host = "https://opendata.eawag.ch/" # The url of the public data repository
api_endpoint = "api/3/action/package_show"
endpoint_parameter = "id"
parameter_value = "data-for-geringste-konzentrationen-grosste-wirkung"
url = f"{host}{api_endpoint}?{endpoint_parameter}={parameter_value}"
package_data = request_json_data(url)
package_data
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[2], line 8
4 parameter_value = "data-for-geringste-konzentrationen-grosste-wirkung"
6 url = f"{host}{api_endpoint}?{endpoint_parameter}={parameter_value}"
----> 8 package_data = request_json_data(url)
9 package_data
Cell In[1], line 6, in request_json_data(url, token)
4 def request_json_data(url: str, token: str | None =None) -> dict:
5 headers = {} if token is None else {'Authorization': token}
----> 6 with urlopen(Request(url, headers=headers)) as response:
7 return json.loads(response.read().decode())
File /usr/local/lib/python3.11/urllib/request.py:216, in urlopen(url, data, timeout, cafile, capath, cadefault, context)
214 else:
215 opener = _opener
--> 216 return opener.open(url, data, timeout)
File /usr/local/lib/python3.11/urllib/request.py:519, in OpenerDirector.open(self, fullurl, data, timeout)
516 req = meth(req)
518 sys.audit('urllib.Request', req.full_url, req.data, req.headers, req.get_method())
--> 519 response = self._open(req, data)
521 # post-process response
522 meth_name = protocol+"_response"
File /usr/local/lib/python3.11/urllib/request.py:536, in OpenerDirector._open(self, req, data)
533 return result
535 protocol = req.type
--> 536 result = self._call_chain(self.handle_open, protocol, protocol +
537 '_open', req)
538 if result:
539 return result
File /usr/local/lib/python3.11/urllib/request.py:496, in OpenerDirector._call_chain(self, chain, kind, meth_name, *args)
494 for handler in handlers:
495 func = getattr(handler, meth_name)
--> 496 result = func(*args)
497 if result is not None:
498 return result
File /usr/local/lib/python3.11/urllib/request.py:1391, in HTTPSHandler.https_open(self, req)
1390 def https_open(self, req):
-> 1391 return self.do_open(http.client.HTTPSConnection, req,
1392 context=self._context, check_hostname=self._check_hostname)
File /usr/local/lib/python3.11/urllib/request.py:1348, in AbstractHTTPHandler.do_open(self, http_class, req, **http_conn_args)
1346 try:
1347 try:
-> 1348 h.request(req.get_method(), req.selector, req.data, headers,
1349 encode_chunked=req.has_header('Transfer-encoding'))
1350 except OSError as err: # timeout error
1351 raise URLError(err)
File /usr/local/lib/python3.11/http/client.py:1303, in HTTPConnection.request(self, method, url, body, headers, encode_chunked)
1300 def request(self, method, url, body=None, headers={}, *,
1301 encode_chunked=False):
1302 """Send a complete request to the server."""
-> 1303 self._send_request(method, url, body, headers, encode_chunked)
File /usr/local/lib/python3.11/http/client.py:1349, in HTTPConnection._send_request(self, method, url, body, headers, encode_chunked)
1345 if isinstance(body, str):
1346 # RFC 2616 Section 3.7.1 says that text default has a
1347 # default charset of iso-8859-1.
1348 body = _encode(body, 'body')
-> 1349 self.endheaders(body, encode_chunked=encode_chunked)
File /usr/local/lib/python3.11/http/client.py:1298, in HTTPConnection.endheaders(self, message_body, encode_chunked)
1296 else:
1297 raise CannotSendHeader()
-> 1298 self._send_output(message_body, encode_chunked=encode_chunked)
File /usr/local/lib/python3.11/http/client.py:1058, in HTTPConnection._send_output(self, message_body, encode_chunked)
1056 msg = b"\r\n".join(self._buffer)
1057 del self._buffer[:]
-> 1058 self.send(msg)
1060 if message_body is not None:
1061
1062 # create a consistent interface to message_body
1063 if hasattr(message_body, 'read'):
1064 # Let file-like take precedence over byte-like. This
1065 # is needed to allow the current position of mmap'ed
1066 # files to be taken into account.
File /usr/local/lib/python3.11/http/client.py:996, in HTTPConnection.send(self, data)
994 if self.sock is None:
995 if self.auto_open:
--> 996 self.connect()
997 else:
998 raise NotConnected()
File /usr/local/lib/python3.11/http/client.py:1468, in HTTPSConnection.connect(self)
1465 def connect(self):
1466 "Connect to a host on a given (SSL) port."
-> 1468 super().connect()
1470 if self._tunnel_host:
1471 server_hostname = self._tunnel_host
File /usr/local/lib/python3.11/http/client.py:962, in HTTPConnection.connect(self)
960 """Connect to the host and port specified in __init__."""
961 sys.audit("http.client.connect", self, self.host, self.port)
--> 962 self.sock = self._create_connection(
963 (self.host,self.port), self.timeout, self.source_address)
964 # Might fail in OSs that don't implement TCP_NODELAY
965 try:
File /usr/local/lib/python3.11/socket.py:848, in create_connection(address, timeout, source_address, all_errors)
846 if source_address:
847 sock.bind(source_address)
--> 848 sock.connect(sa)
849 # Break explicitly a reference cycle
850 exceptions.clear()
KeyboardInterrupt:
A lot of data is returned. Let’s only check your all resource links for this data package.
resource_urls = [resource["url"] for resource in package_data["result"]["resources"]]
resource_urls
['https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/016e7298-77dc-4a2d-b73e-1b68df23d038/download/readme.txt',
'https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/f4e375c5-8cd5-4e7c-8ed2-1fe32409a002/download/pyrethroids2018.xlsx',
'https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/35c4dcfb-a4bf-4dc4-82cf-e3360d0f08e8/download/pyrethroids2017.xlsx']
Downloading resources#
In our previous example we use the package_show endpoint the CKAN API to extract links of resources. In this example we will download those resources.
def download_resource(url: str, file_path: str, token: str | None = None, chunk_size: int = 1024) -> None:
headers = {} if token is None else {'Authorization': token}
with urlopen(Request(url, headers=headers)) as response:
with open(file_path, 'wb') as file:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
file.write(chunk)
With the download_resource
function with can iterate the previously extracted resources and download them.
for url in resource_urls:
file_path =f"/tmp/{url.split('/')[-1]}"
download_resource(url, file_path)
print(f"Successfully saved resource at: {file_path}")
Successfully saved resource at: /tmp/readme.txt
Successfully saved resource at: /tmp/pyrethroids2018.xlsx
Successfully saved resource at: /tmp/pyrethroids2017.xlsx
Uploading resources#
In this scenario we assume you created a package on ERIC/internal called data-for-project-x
and now you want to upload your many resources.
Important
This procedure will require an API Token.
Note
Uploads will take longer that the implemented progress bar shows. The progress bar will reach 100% after about 1/4 of the time need for the process to finish. The reasons are very technical, if your want to know why please click below. You will only notice this for large files.
Technical reasons!
A file’s journey from your computer across the network to its final “resting place” passes through several proxies. Each of these proxies passes the data on to the next. The time shown in the progress bar is the time taken to upload the data to the first proxies. The additional time you have to wait is the time it takes for the various other proxies to copy the data from one to the other.
For ease of use we will install 3 libraries via pip install ...
:
requests
requests_toolbelt
tqdm
The function below can be used to upload your data.
import pathlib
import tqdm
import requests
from requests_toolbelt.multipart.encoder import (
MultipartEncoder,
MultipartEncoderMonitor,
)
class TqdmProgressCallback:
def __init__(self, total_size, filename):
self.bar = tqdm.tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=f"Uploading {filename}",
)
def __call__(self, monitor):
self.bar.update(monitor.bytes_read - self.bar.n)
self.bar.refresh()
def close(self):
self.bar.close()
def upload_resource(
file_path: pathlib.Path,
package_id: str,
token: str,
description: str = "",
resource_type: str = "Dataset",
restricted_level: str = "public",
state: str = "active",
host: str = "https://data.eawag.ch",
endpoint: str = "/api/3/action/resource_create",
):
file_name = file_path.name
file_size = file_path.stat().st_size
with open(file_path, "rb") as file_stream:
encoder = MultipartEncoder(
fields={
"upload": (
file_name,
file_stream,
"application/octet-stream",
),
"package_id": package_id,
"name": file_name,
"description": description,
"state": state,
"size": str(file_size),
"resource_type": resource_type,
"restricted_level": restricted_level,
}
)
progress_callback = TqdmProgressCallback(file_size, file_name)
monitor = MultipartEncoderMonitor(encoder, progress_callback)
headers = {"Authorization": token, "Content-Type": monitor.content_type}
response = requests.post(
f"{host}{endpoint}",
data=monitor,
headers=headers,
auth=None,
stream=True,
)
progress_callback.close()
response.raise_for_status()
Note
File paths should be passed as pathlib.Path
objects to the upload_resource
function.
Let’s try it out. I prepared a folder full of test files.
/tmp/upload-test
├── random_file_1
├── random_file_10
├── random_file_11
├── random_file_12
├── random_file_13
├── random_file_14
├── random_file_15
├── random_file_2
├── random_file_3
├── random_file_4
├── random_file_5
├── random_file_6
├── random_file_7
├── random_file_8
└── random_file_9
1 directory, 15 files
In this example, we’ll iterate over the entire contents of the “tmp/upload-test
” folder and upload the contents if it’s a file. To do this, we also need the package_id “data-for-project-x
” that we want to upload to, and a valid token.
your_token = "..." # you must provide your token here
your_package_id = "data-for-project-x" # you must provide your token here
data_package_folder = pathlib.Path("/tmp/upload-test/")
for candidate in data_package_folder.iterdir():
if not candidate.is_file():
continue
upload_resource(
file_path=candidate,
package_id=your_package_id,
token = your_token,
description = f"This is the description for file {candidate}",
)
Uploading random_file_15: 15.7MB [00:01, 9.06MB/s]
Uploading random_file_14: 14.7MB [00:01, 7.55MB/s]
Uploading random_file_13: 13.6MB [00:01, 7.41MB/s]
Uploading random_file_12: 12.6MB [00:01, 7.69MB/s]
Uploading random_file_11: 11.5MB [00:01, 7.53MB/s]
Uploading random_file_10: 10.5MB [00:01, 7.33MB/s]
Uploading random_file_9: 9.44MB [00:01, 6.16MB/s]
Uploading random_file_8: 8.39MB [00:01, 5.47MB/s]
Uploading random_file_7: 7.34MB [00:01, 5.52MB/s]
Uploading random_file_6: 6.29MB [00:01, 5.13MB/s]
Uploading random_file_5: 5.24MB [00:01, 4.67MB/s]
Uploading random_file_4: 4.20MB [00:01, 3.74MB/s]
Uploading random_file_3: 3.15MB [00:01, 3.08MB/s]
Uploading random_file_2: 2.10MB [00:01, 1.71MB/s]
Uploading random_file_1: 1.05MB [00:01, 916kB/s]