Search
These are a few methods to interface between archivy and the elasticsearch instance.
add_to_index(model)
Adds dataobj to given index. If object of given id already exists, it will be updated.
- index - String of the ES Index. Archivy uses
dataobj
by default. - model - Instance of
archivy.models.Dataobj
, the object you want to index.
Source code in archivy/search.py
def add_to_index(model):
"""
Adds dataobj to given index. If object of given id already exists, it will be updated.
Params:
- **index** - String of the ES Index. Archivy uses `dataobj` by default.
- **model** - Instance of `archivy.models.Dataobj`, the object you want to index.
"""
es = get_elastic_client()
if not es:
return
payload = {}
for field in model.__searchable__:
payload[field] = getattr(model, field)
es.index(
index=current_app.config["SEARCH_CONF"]["index_name"], id=model.id, body=payload
)
return True
parse_ripgrep_line(line)
Parses a line of ripgrep JSON output
Source code in archivy/search.py
def parse_ripgrep_line(line):
"""Parses a line of ripgrep JSON output"""
hit = json.loads(line)
data = {}
if hit["type"] == "begin":
curr_file = (
Path(hit["data"]["path"]["text"]).parts[-1].replace(".md", "").split("-")
) # parse target note data from path
curr_id = int(curr_file[0])
title = curr_file[-1].replace("_", " ")
data = {"title": title, "matches": [], "id": curr_id}
elif hit["type"] == "match":
data = hit["data"]["lines"]["text"].strip()
else:
return None # only process begin and match events, we don't care about endings
return (data, hit["type"])
query_es_index(query, strict=False)
Returns search results for your given query
Specify strict=True if you want only exact result (in case you're using ES.
Source code in archivy/search.py
def query_es_index(query, strict=False):
"""
Returns search results for your given query
Specify strict=True if you want only exact result (in case you're using ES.
"""
es = get_elastic_client()
if not es:
return []
search = es.search(
index=current_app.config["SEARCH_CONF"]["index_name"],
body={
"query": {
"multi_match": {
"query": query,
"fields": ["*"],
"analyzer": "rebuilt_standard",
}
},
"highlight": {
"fragment_size": 0,
"fields": {
"content": {
"pre_tags": "",
"post_tags": "",
}
},
},
},
)
hits = []
for hit in search["hits"]["hits"]:
formatted_hit = {"id": hit["_id"], "title": hit["_source"]["title"]}
if "highlight" in hit:
formatted_hit["matches"] = hit["highlight"]["content"]
reformatted_match = " ".join(formatted_hit["matches"])
if strict and not (query in reformatted_match):
continue
hits.append(formatted_hit)
return hits
query_ripgrep(query)
Uses ripgrep to search data with a simpler setup than ES. Returns a list of dicts with detailed matches.
Source code in archivy/search.py
def query_ripgrep(query):
"""
Uses ripgrep to search data with a simpler setup than ES.
Returns a list of dicts with detailed matches.
"""
from archivy.data import get_data_dir
if not which("rg"):
return []
rg_cmd = ["rg", RG_MISC_ARGS, RG_FILETYPE, "--json", query, str(get_data_dir())]
rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=60)
output = rg.stdout.decode().splitlines()
hits = []
for line in output:
parsed = parse_ripgrep_line(line)
if not parsed:
continue
if parsed[1] == "begin":
hits.append(parsed[0])
if parsed[1] == "match":
if not (parsed[0].startswith("tags: [") or parsed[0].startswith("title:")):
hits[-1]["matches"].append(parsed[0])
return sorted(
hits, key=lambda x: len(x["matches"]), reverse=True
) # sort by number of matches
query_ripgrep_tags()
Uses ripgrep to search for tags. Mandatory reference: https://xkcd.com/1171/
Source code in archivy/search.py
def query_ripgrep_tags():
"""
Uses ripgrep to search for tags.
Mandatory reference: https://xkcd.com/1171/
"""
EMB_PATTERN = r"(^|\n| )#([-_a-zA-ZÀ-ÖØ-öø-ÿ0-9]+)#"
from archivy.data import get_data_dir
if not which("rg"):
return []
# embedded tags
# io: case insensitive
rg_cmd = ["rg", "-Uio", RG_FILETYPE, RG_REGEX_ARG, EMB_PATTERN, str(get_data_dir())]
rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=60)
hits = set()
for line in rg.stdout.splitlines():
tag = Path(line.decode()).parts[-1].split(":")[-1]
tag = tag.replace("#", "").lstrip()
hits.add(tag)
# metadata tags
for item in search_frontmatter_tags():
for tag in item["tags"]:
hits.add(tag)
return hits
remove_from_index(dataobj_id)
Removes object of given id
Source code in archivy/search.py
def remove_from_index(dataobj_id):
"""Removes object of given id"""
es = get_elastic_client()
if not es:
return
es.delete(index=current_app.config["SEARCH_CONF"]["index_name"], id=dataobj_id)
search(query, strict=False)
Wrapper to search methods for different engines.
If using ES, specify strict=True if you only want results that strictly match the query, without parsing / tokenization.
Source code in archivy/search.py
def search(query, strict=False):
"""
Wrapper to search methods for different engines.
If using ES, specify strict=True if you only want results that strictly match the query, without parsing / tokenization.
"""
if current_app.config["SEARCH_CONF"]["engine"] == "elasticsearch":
return query_es_index(query, strict=strict)
elif current_app.config["SEARCH_CONF"]["engine"] == "ripgrep" or which("rg"):
return query_ripgrep(query)
search_frontmatter_tags(tag=None)
Returns a list of dataobj ids that have the given tag.
Source code in archivy/search.py
def search_frontmatter_tags(tag=None):
"""
Returns a list of dataobj ids that have the given tag.
"""
from archivy.data import get_data_dir
if not which("rg"):
return []
META_PATTERN = r"(^|\n)tags:(\n- [_a-zA-ZÀ-ÖØ-öø-ÿ0-9]+)+"
hits = []
rg_cmd = [
"rg",
"-Uo",
RG_MISC_ARGS,
RG_FILETYPE,
"--json",
RG_REGEX_ARG,
META_PATTERN,
str(get_data_dir()),
]
rg = run(rg_cmd, stdout=PIPE, stderr=PIPE, timeout=60)
output = rg.stdout.decode().splitlines()
for line in output:
parsed = parse_ripgrep_line(line)
if not parsed: # the event doesn't interest us
continue
if parsed[1] == "begin":
hits.append(parsed[0]) # append current hit data
if parsed[1] == "match":
sanitized = parsed[0].replace("- ", "").split("\n")[2:]
hits[-1]["tags"] = hits[-1].get("tags", []) + sanitized # get tags
if tag:
hits = list(filter(lambda x: tag in x["tags"], hits))
return hits