Skip to content

Text file

TextFileConf

Bases: BaseModel

Text file Config.

Parameters:

Name Type Description Default
path str

Path to file or directory of files.

required
uri_term str

Attritube to use as uri.

'uri'
extra_terms list[KeyOutputKey]

List of extra attributes.

[]
Source code in stac_generator/plugins/inputs/text_file.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TextFileConf(BaseModel):
    """Text file Config."""

    path: str = Field(
        description="Path to file or directory of files.",
    )
    uri_term: str = Field(
        default="uri",
        description="Attritube to use as uri.",
    )
    extra_terms: list[KeyOutputKey] = Field(
        default=[],
        description="List of extra attributes.",
    )

TextFileInput

Bases: Input

Reads lines from file/files as a source for events.

Plugin name: text_file

Example Configuration

.. code-block:: yaml

- name: text_file
  conf:
    filepath: /path/to/files
Source code in stac_generator/plugins/inputs/text_file.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class TextFileInput(Input):
    """
    Reads lines from file/files as a source for events.

    **Plugin name:** ``text_file``

    Example Configuration:
        .. code-block:: yaml

            - name: text_file
              conf:
                filepath: /path/to/files
    """

    config_class = TextFileConf

    def run(self):

        if isdir(self.conf.path):
            file_list = [
                join(self.conf.path, file)
                for file in listdir(self.conf.path)
                if isfile(join(self.conf.path, file))
            ]

        else:
            file_list = [self.conf.path]

        start = datetime.now()
        total_generated = 0
        unique_lines = set()

        for file in file_list:
            with (open(file, "r", encoding="utf-8") as f,):
                for line in f:
                    if line not in unique_lines:
                        unique_lines.add(line)

                        data = json.loads(line)
                        output = {"uri": data[self.conf.uri_term]}

                        for extra_term in self.conf.extra_terms:
                            output[extra_term.output_key] = data[extra_term.key]

                        yield output
                        total_generated += 1

        end = datetime.now()
        print(f"Processed {total_generated} elasticsearch records in {end-start}")