Skip to content

Object store

ObjectStoreConf

Bases: BaseModel

Object Store Config.

Parameters:

Name Type Description Default
url str

URL of datastore.

required
buckets list[str]

Number of rows to skip.

-1
prefix str

URL of datastore.

required
delimiter str

URL of datastore.

required
session_kwargs dict

session kwargs.

{}
Source code in stac_generator/plugins/inputs/object_store.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ObjectStoreConf(BaseModel):
    """Object Store Config."""

    url: str = Field(
        description="URL of datastore.",
    )
    buckets: list[str] = Field(
        default=-1,
        description="Number of rows to skip.",
    )
    prefix: str = Field(
        description="URL of datastore.",
    )
    delimiter: str = Field(
        description="URL of datastore.",
    )
    session_kwargs: dict = Field(
        default={},
        description="session kwargs.",
    )

ObjectStoreInput

Bases: Input

Takes an endpoint url and optionally a bucket prefix and delimiter and will scan the object store at these points to produce events.

Plugin name: object_store

Example Configuration

.. code-block:: yaml

name: object_store
conf:
  endpoint_url: https://cedadev-o.s3-ext.jc.rl.ac.uk
  session_kwargs:
    aws_access_key_id: ACCESS_KEY,
    aws_secret_access_key: SECRET_KEY
  buckets:
    - my_bucket
  prefix: directory_or_file
  delimiter: .zarr/
Source code in stac_generator/plugins/inputs/object_store.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class ObjectStoreInput(Input):
    """
    Takes an endpoint url and optionally a bucket prefix and delimiter and will
    scan the object store at these points to produce events.

    **Plugin name:** ``object_store``

    Example Configuration:
        .. code-block:: yaml

            name: object_store
            conf:
              endpoint_url: https://cedadev-o.s3-ext.jc.rl.ac.uk
              session_kwargs:
                aws_access_key_id: ACCESS_KEY,
                aws_secret_access_key: SECRET_KEY
              buckets:
                - my_bucket
              prefix: directory_or_file
              delimiter: .zarr/

    """

    config_class = ObjectStoreConf

    def run(self):

        session = boto3.session.Session(**self.conf.session_kwargs)
        s3 = session.resource(
            "s3",
            endpoint_url=self.conf.url,
        )

        buckets = (
            [s3.Bucket(bucket) for bucket in self.conf.buckets]
            if self.conf.buckets
            else s3.buckets.all()
        )

        for bucket in buckets:
            total_files = 0

            for obj in bucket.objects.filter(
                Prefix=self.conf.prefix, Delimiter=self.conf.delimiter
            ):

                yield {
                    "uri": f"{self.conf.url}/{bucket.name}/{obj.key}",
                    "client": s3.meta.client,
                }
                total_files += 1

            LOGGER.info("Processed %s files from %s", total_files, bucket)