Automation#

When dealing with any sort of repetitive tasks like:

  • uploading a large number of files

  • creating many packages

  • preserving your data periodically

  • analyzing information across packages

you can use the API of CKAN (the core software stack of ERIC) to automate these tasks with a programming language of your choosing.

Authentication#

For many things you might want to automate (like uploading data) you will need to authenticate yourself to the system, so that CKAN can check if you’re authorized to for instance upload data to a certain package. For that you will need a token. If you do not yet have one please contact rdm@eawag.ch and we will generate one for you.

Important

CKAN has some limitations when uploading large files (>8GB). If that is what you’re after please contact rdm@eawag.ch.

Examples#

Below you will find some examples on how to use the API with Python.

Retrieving information about a package#

For this we can use the package_show endpoint the CKAN API offers.

First we’ll define a function that can request information from the CKAN API and returns a dictionary:

import json
from urllib.request import urlopen, Request

def request_json_data(url: str, token: str | None =None) -> dict:
    headers = {} if token is None else {'Authorization': token}
    with urlopen(Request(url, headers=headers)) as response:
        return json.loads(response.read().decode())

Then we can request the data. As we’re reading from a public dataset we do not need an API key. Mind the composition of the url:

host = "https://opendata.eawag.ch/"  # The url of the public data repository
api_endpoint = "api/3/action/package_show"
endpoint_parameter = "id"
parameter_value = "data-for-geringste-konzentrationen-grosste-wirkung"

url = f"{host}{api_endpoint}?{endpoint_parameter}={parameter_value}"

package_data = request_json_data(url)
package_data
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[2], line 8
      4 parameter_value = "data-for-geringste-konzentrationen-grosste-wirkung"
      6 url = f"{host}{api_endpoint}?{endpoint_parameter}={parameter_value}"
----> 8 package_data = request_json_data(url)
      9 package_data

Cell In[1], line 6, in request_json_data(url, token)
      4 def request_json_data(url: str, token: str | None =None) -> dict:
      5     headers = {} if token is None else {'Authorization': token}
----> 6     with urlopen(Request(url, headers=headers)) as response:
      7         return json.loads(response.read().decode())

File /usr/local/lib/python3.11/urllib/request.py:216, in urlopen(url, data, timeout, cafile, capath, cadefault, context)
    214 else:
    215     opener = _opener
--> 216 return opener.open(url, data, timeout)

File /usr/local/lib/python3.11/urllib/request.py:519, in OpenerDirector.open(self, fullurl, data, timeout)
    516     req = meth(req)
    518 sys.audit('urllib.Request', req.full_url, req.data, req.headers, req.get_method())
--> 519 response = self._open(req, data)
    521 # post-process response
    522 meth_name = protocol+"_response"

File /usr/local/lib/python3.11/urllib/request.py:536, in OpenerDirector._open(self, req, data)
    533     return result
    535 protocol = req.type
--> 536 result = self._call_chain(self.handle_open, protocol, protocol +
    537                           '_open', req)
    538 if result:
    539     return result

File /usr/local/lib/python3.11/urllib/request.py:496, in OpenerDirector._call_chain(self, chain, kind, meth_name, *args)
    494 for handler in handlers:
    495     func = getattr(handler, meth_name)
--> 496     result = func(*args)
    497     if result is not None:
    498         return result

File /usr/local/lib/python3.11/urllib/request.py:1391, in HTTPSHandler.https_open(self, req)
   1390 def https_open(self, req):
-> 1391     return self.do_open(http.client.HTTPSConnection, req,
   1392         context=self._context, check_hostname=self._check_hostname)

File /usr/local/lib/python3.11/urllib/request.py:1348, in AbstractHTTPHandler.do_open(self, http_class, req, **http_conn_args)
   1346 try:
   1347     try:
-> 1348         h.request(req.get_method(), req.selector, req.data, headers,
   1349                   encode_chunked=req.has_header('Transfer-encoding'))
   1350     except OSError as err: # timeout error
   1351         raise URLError(err)

File /usr/local/lib/python3.11/http/client.py:1303, in HTTPConnection.request(self, method, url, body, headers, encode_chunked)
   1300 def request(self, method, url, body=None, headers={}, *,
   1301             encode_chunked=False):
   1302     """Send a complete request to the server."""
-> 1303     self._send_request(method, url, body, headers, encode_chunked)

File /usr/local/lib/python3.11/http/client.py:1349, in HTTPConnection._send_request(self, method, url, body, headers, encode_chunked)
   1345 if isinstance(body, str):
   1346     # RFC 2616 Section 3.7.1 says that text default has a
   1347     # default charset of iso-8859-1.
   1348     body = _encode(body, 'body')
-> 1349 self.endheaders(body, encode_chunked=encode_chunked)

File /usr/local/lib/python3.11/http/client.py:1298, in HTTPConnection.endheaders(self, message_body, encode_chunked)
   1296 else:
   1297     raise CannotSendHeader()
-> 1298 self._send_output(message_body, encode_chunked=encode_chunked)

File /usr/local/lib/python3.11/http/client.py:1058, in HTTPConnection._send_output(self, message_body, encode_chunked)
   1056 msg = b"\r\n".join(self._buffer)
   1057 del self._buffer[:]
-> 1058 self.send(msg)
   1060 if message_body is not None:
   1061 
   1062     # create a consistent interface to message_body
   1063     if hasattr(message_body, 'read'):
   1064         # Let file-like take precedence over byte-like.  This
   1065         # is needed to allow the current position of mmap'ed
   1066         # files to be taken into account.

File /usr/local/lib/python3.11/http/client.py:996, in HTTPConnection.send(self, data)
    994 if self.sock is None:
    995     if self.auto_open:
--> 996         self.connect()
    997     else:
    998         raise NotConnected()

File /usr/local/lib/python3.11/http/client.py:1468, in HTTPSConnection.connect(self)
   1465 def connect(self):
   1466     "Connect to a host on a given (SSL) port."
-> 1468     super().connect()
   1470     if self._tunnel_host:
   1471         server_hostname = self._tunnel_host

File /usr/local/lib/python3.11/http/client.py:962, in HTTPConnection.connect(self)
    960 """Connect to the host and port specified in __init__."""
    961 sys.audit("http.client.connect", self, self.host, self.port)
--> 962 self.sock = self._create_connection(
    963     (self.host,self.port), self.timeout, self.source_address)
    964 # Might fail in OSs that don't implement TCP_NODELAY
    965 try:

File /usr/local/lib/python3.11/socket.py:848, in create_connection(address, timeout, source_address, all_errors)
    846 if source_address:
    847     sock.bind(source_address)
--> 848 sock.connect(sa)
    849 # Break explicitly a reference cycle
    850 exceptions.clear()

KeyboardInterrupt: 

A lot of data is returned. Let’s only check your all resource links for this data package.

resource_urls = [resource["url"] for resource in package_data["result"]["resources"]]
resource_urls
['https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/016e7298-77dc-4a2d-b73e-1b68df23d038/download/readme.txt',
 'https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/f4e375c5-8cd5-4e7c-8ed2-1fe32409a002/download/pyrethroids2018.xlsx',
 'https://opendata.eawag.ch/dataset/50eafcd5-27c2-40a1-95d6-fc671262ee92/resource/35c4dcfb-a4bf-4dc4-82cf-e3360d0f08e8/download/pyrethroids2017.xlsx']

Downloading resources#

In our previous example we use the package_show endpoint the CKAN API to extract links of resources. In this example we will download those resources.

def download_resource(url: str, file_path:  str, token: str | None = None, chunk_size: int = 1024) -> None:
    headers = {} if token is None else {'Authorization': token}
    with urlopen(Request(url, headers=headers)) as response:
        with open(file_path, 'wb') as file:
            while True:
                chunk = response.read(chunk_size)
                if not chunk:
                    break
                file.write(chunk)

With the download_resource function with can iterate the previously extracted resources and download them.

for url in resource_urls:
    file_path =f"/tmp/{url.split('/')[-1]}"
    download_resource(url, file_path)
    print(f"Successfully saved resource at: {file_path}")
Successfully saved resource at: /tmp/readme.txt
Successfully saved resource at: /tmp/pyrethroids2018.xlsx
Successfully saved resource at: /tmp/pyrethroids2017.xlsx

Uploading resources#

In this scenario we assume you created a package on ERIC/internal called data-for-project-x and now you want to upload your many resources.

Important

This procedure will require an API Token.

Note

Uploads will take longer that the implemented progress bar shows. The progress bar will reach 100% after about 1/4 of the time need for the process to finish. The reasons are very technical, if your want to know why please click below. You will only notice this for large files.

For ease of use we will install 3 libraries via pip install ...:

  • requests

  • requests_toolbelt

  • tqdm

The function below can be used to upload your data.

import pathlib

import tqdm
import requests
from requests_toolbelt.multipart.encoder import (
    MultipartEncoder,
    MultipartEncoderMonitor,
)


class TqdmProgressCallback:
    def __init__(self, total_size, filename):
        self.bar = tqdm.tqdm(
            total=total_size,
            unit="B",
            unit_scale=True,
            desc=f"Uploading {filename}",
        )
    def __call__(self, monitor):
        self.bar.update(monitor.bytes_read - self.bar.n)
        self.bar.refresh()

    def close(self):
        self.bar.close()

def upload_resource(
    file_path: pathlib.Path,
    package_id: str,
    token: str,
    description: str = "",
    resource_type: str = "Dataset",
    restricted_level: str = "public",
    state: str = "active",
    host: str = "https://data.eawag.ch",
    endpoint: str = "/api/3/action/resource_create",
):
    
    file_name = file_path.name
    file_size = file_path.stat().st_size
    with open(file_path, "rb") as file_stream:
        encoder = MultipartEncoder(
            fields={
                "upload": (
                    file_name,
                    file_stream,
                    "application/octet-stream",
                ),
                "package_id": package_id,
                "name": file_name,
                "description": description,
                "state": state,
                "size": str(file_size),
                "resource_type": resource_type,
                "restricted_level": restricted_level,
            }
        )

        progress_callback = TqdmProgressCallback(file_size, file_name)
        monitor = MultipartEncoderMonitor(encoder, progress_callback)
        
        headers = {"Authorization": token, "Content-Type": monitor.content_type}

        response = requests.post(
            f"{host}{endpoint}",
            data=monitor,
            headers=headers,
            auth=None,
            stream=True,
        )
        progress_callback.close()
        response.raise_for_status()

Note

File paths should be passed as pathlib.Path objects to the upload_resource function.

Let’s try it out. I prepared a folder full of test files.

/tmp/upload-test
├── random_file_1
├── random_file_10
├── random_file_11
├── random_file_12
├── random_file_13
├── random_file_14
├── random_file_15
├── random_file_2
├── random_file_3
├── random_file_4
├── random_file_5
├── random_file_6
├── random_file_7
├── random_file_8
└── random_file_9

1 directory, 15 files

In this example, we’ll iterate over the entire contents of the “tmp/upload-test” folder and upload the contents if it’s a file. To do this, we also need the package_id “data-for-project-x” that we want to upload to, and a valid token.

your_token = "..."  # you must provide your token here
your_package_id = "data-for-project-x"  # you must provide your token here
data_package_folder = pathlib.Path("/tmp/upload-test/")
for candidate in data_package_folder.iterdir():
    if not candidate.is_file():
        continue
    upload_resource(
        file_path=candidate,
        package_id=your_package_id,
        token = your_token,
        description = f"This is the description for file {candidate}",
    )
Uploading random_file_15: 15.7MB [00:01, 9.06MB/s]                        
Uploading random_file_14: 14.7MB [00:01, 7.55MB/s]                        
Uploading random_file_13: 13.6MB [00:01, 7.41MB/s]                        
Uploading random_file_12: 12.6MB [00:01, 7.69MB/s]                        
Uploading random_file_11: 11.5MB [00:01, 7.53MB/s]                        
Uploading random_file_10: 10.5MB [00:01, 7.33MB/s]                        
Uploading random_file_9: 9.44MB [00:01, 6.16MB/s]                         
Uploading random_file_8: 8.39MB [00:01, 5.47MB/s]                         
Uploading random_file_7: 7.34MB [00:01, 5.52MB/s]                         
Uploading random_file_6: 6.29MB [00:01, 5.13MB/s]                         
Uploading random_file_5: 5.24MB [00:01, 4.67MB/s]                         
Uploading random_file_4: 4.20MB [00:01, 3.74MB/s]                         
Uploading random_file_3: 3.15MB [00:01, 3.08MB/s]                         
Uploading random_file_2: 2.10MB [00:01, 1.71MB/s]                         
Uploading random_file_1: 1.05MB [00:01, 916kB/s]