Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
502 changes: 502 additions & 0 deletions doc/source/arch_speculative_actions.rst

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions doc/source/main_architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ This section provides details on the overall BuildStream architecture.
arch_caches
arch_sandboxing
arch_remote_execution
arch_speculative_actions

171 changes: 171 additions & 0 deletions src/buildstream/_artifactcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,69 @@ def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_bui

return True

# pull_artifact_proto():
#
# Pull only the artifact proto (metadata) for an element by key.
#
# This is a lightweight pull that fetches just the artifact proto
# from the remote, without fetching files, buildtrees, or other
# large blobs. Used by the speculative actions priming path to
# retrieve the SA digest reference from a previous build's artifact.
#
# Args:
# element (Element): The element whose artifact proto to pull
# key (str): The cache key to pull by (typically the weak key)
#
# Returns:
# (bool): True if the proto was pulled, False if not found
#
def pull_artifact_proto(self, element, key):
project = element._get_project()

artifact_name = element.get_artifact_name(key=key)
uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name)

index_remotes, storage_remotes = self.get_remotes(project.name, False)

# Resolve the artifact name to a digest via index remotes
artifact_digest = None
for remote in index_remotes:
remote.init()
try:
response = remote.fetch_blob([uri])
if response:
artifact_digest = response.blob_digest
break
except AssetCacheError:
continue

if not artifact_digest:
return False

# Fetch the artifact blob via casd (handles remote fetching)
try:
if storage_remotes:
self.cas.fetch_blobs(storage_remotes[0], [artifact_digest])
else:
return False
except (BlobNotFound, CASRemoteError):
return False

# Parse and write the artifact proto to local cache
try:
artifact = artifact_pb2.Artifact()
with self.cas.open(artifact_digest, "rb") as f:
artifact.ParseFromString(f.read())

artifact_path = os.path.join(self._basedir, artifact_name)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with utils.save_file_atomic(artifact_path, mode="wb") as f:
f.write(artifact.SerializeToString())

return True
except (FileNotFoundError, OSError):
return False

# _query_remote()
#
# Args:
Expand All @@ -473,3 +536,111 @@ def _query_remote(self, ref, remote):
return bool(response)
except AssetCacheError as e:
raise ArtifactError("{}".format(e), temporary=True) from e

# store_speculative_actions():
#
# Store SpeculativeActions for an element's artifact.
#
# Stores using both the artifact proto field (backward compat) and
# a weak key reference (stable across dependency version changes).
#
# Args:
# artifact (Artifact): The artifact to attach speculative actions to
# spec_actions (SpeculativeActions): The speculative actions proto
# weak_key (str): Optional weak cache key for stable lookup
#
def store_speculative_actions(self, artifact, spec_actions, weak_key=None):

# Store the speculative actions proto in CAS
spec_actions_digest = self.cas.store_proto(spec_actions)

# Set the speculative_actions field on the artifact proto
artifact_proto = artifact._get_proto()
artifact_proto.speculative_actions.CopyFrom(spec_actions_digest)

# Save the updated artifact proto under all keys (strong + weak).
# The artifact was originally stored under both keys; we must update
# both so that lookup_speculative_actions_by_weak_key() can find the
# SA when the strong key changes but the weak key remains stable.
element = artifact._element
keys = set()
keys.add(artifact.get_extract_key())
if artifact.weak_key:
keys.add(artifact.weak_key)
serialized = artifact_proto.SerializeToString()
for key in keys:
ref = element.get_artifact_name(key)
proto_path = os.path.join(self._basedir, ref)
with open(proto_path, mode="w+b") as f:
f.write(serialized)

# lookup_speculative_actions_by_weak_key():
#
# Look up SpeculativeActions by element and weak key.
#
# Loads the artifact proto stored under the weak key ref and reads
# its speculative_actions digest. This works even when the element
# is not cached under its strong key (the common priming scenario:
# dependency changed, strong key differs, but weak key is stable
# so the artifact from the previous build is still reachable).
#
# Args:
# element (Element): The element to look up SA for
# weak_key (str): The weak cache key
#
# Returns:
# SpeculativeActions proto or None if not available
#
def lookup_speculative_actions_by_weak_key(self, element, weak_key):
from ._protos.buildstream.v2 import speculative_actions_pb2
from ._protos.buildstream.v2 import artifact_pb2

if not weak_key:
return None

# Load the artifact proto stored under the weak key ref
artifact_ref = element.get_artifact_name(key=weak_key)
proto_path = os.path.join(self._basedir, artifact_ref)
try:
with open(proto_path, mode="r+b") as f:
artifact_proto = artifact_pb2.Artifact()
artifact_proto.ParseFromString(f.read())
except FileNotFoundError:
return None

# Read the speculative_actions digest from the artifact proto
if not artifact_proto.HasField("speculative_actions"):
return None

return self.cas.fetch_proto(
artifact_proto.speculative_actions, speculative_actions_pb2.SpeculativeActions
)

# get_speculative_actions():
#
# Retrieve SpeculativeActions for an element's artifact.
#
# First tries the weak key path (stable across dependency version
# changes), then falls back to the artifact proto field.
#
# Args:
# artifact (Artifact): The artifact to get speculative actions from
# weak_key (str): Optional weak cache key for stable lookup
#
# Returns:
# SpeculativeActions proto or None if not available
#
def get_speculative_actions(self, artifact):
from ._protos.buildstream.v2 import speculative_actions_pb2

# Load from artifact proto's speculative_actions digest field
artifact_proto = artifact._get_proto()
if not artifact_proto:
return None

# Check if speculative_actions field is set
if not artifact_proto.HasField("speculative_actions"):
return None

# Fetch the speculative actions from CAS
return self.cas.fetch_proto(artifact_proto.speculative_actions, speculative_actions_pb2.SpeculativeActions)
94 changes: 94 additions & 0 deletions src/buildstream/_cas/cascache.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,100 @@ def _send_directory(self, remote, digest):
def get_cache_usage(self):
return self._cache_usage_monitor.get_cache_usage()

# fetch_proto():
#
# Fetch a protobuf message from CAS by digest and parse it.
#
# Args:
# digest (Digest): The digest of the proto message
# proto_class: The protobuf message class to parse into
#
# Returns:
# The parsed protobuf message, or None if not found
#
def fetch_proto(self, digest, proto_class):
if not digest or not digest.hash:
return None

try:
with self.open(digest, mode="rb") as f:
proto_instance = proto_class()
proto_instance.ParseFromString(f.read())
return proto_instance
except FileNotFoundError:
return None
except Exception:
return None

# store_proto():
#
# Store a protobuf message in CAS.
#
# Args:
# proto: The protobuf message instance
# instance_name (str): Optional casd instance_name for remote CAS
#
# Returns:
# (Digest): The digest of the stored proto
#
def store_proto(self, proto, instance_name=None):
buffer = proto.SerializeToString()
return self.add_object(buffer=buffer, instance_name=instance_name)

# fetch_action():
#
# Fetch an Action proto from CAS.
#
# Args:
# action_digest (Digest): The digest of the Action
#
# Returns:
# Action proto or None if not found
#
def fetch_action(self, action_digest):
return self.fetch_proto(action_digest, remote_execution_pb2.Action)

# store_action():
#
# Store an Action proto in CAS.
#
# Args:
# action (Action): The Action proto
# instance_name (str): Optional casd instance_name
#
# Returns:
# (Digest): The digest of the stored action
#
def store_action(self, action, instance_name=None):
return self.store_proto(action, instance_name=instance_name)

# fetch_directory():
#
# Fetch a Directory proto from CAS (not the full tree).
#
# Args:
# directory_digest (Digest): The digest of the Directory
#
# Returns:
# Directory proto or None if not found
#
def fetch_directory_proto(self, directory_digest):
return self.fetch_proto(directory_digest, remote_execution_pb2.Directory)

# store_directory():
#
# Store a Directory proto in CAS.
#
# Args:
# directory (Directory): The Directory proto
# instance_name (str): Optional casd instance_name
#
# Returns:
# (Digest): The digest of the stored directory
#
def store_directory_proto(self, directory, instance_name=None):
return self.store_proto(directory, instance_name=instance_name)


# _CASCacheUsage
#
Expand Down
18 changes: 16 additions & 2 deletions src/buildstream/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ._remotespec import RemoteSpec, RemoteExecutionSpec
from ._sourcecache import SourceCache
from ._cas import CASCache, CASDProcessManager, CASLogLevel
from .types import _CacheBuildTrees, _PipelineSelection, _SchedulerErrorAction, _SourceUriPolicy
from .types import _CacheBuildTrees, _PipelineSelection, _SchedulerErrorAction, _SourceUriPolicy, _SpeculativeActionMode
from ._workspaces import Workspaces, WorkspaceProjectCache
from .node import Node, MappingNode

Expand Down Expand Up @@ -164,6 +164,9 @@ def __init__(self, *, use_casd: bool = True) -> None:
# What to do when a build fails in non interactive mode
self.sched_error_action: Optional[str] = None

# Speculative actions mode
self.speculative_actions_mode: _SpeculativeActionMode = _SpeculativeActionMode.NONE

# Maximum jobs per build
self.build_max_jobs: Optional[int] = None

Expand Down Expand Up @@ -451,13 +454,24 @@ def load(self, config: Optional[str] = None) -> None:

# Load scheduler config
scheduler = defaults.get_mapping("scheduler")
scheduler.validate_keys(["on-error", "fetchers", "builders", "pushers", "network-retries"])
scheduler.validate_keys(["on-error", "fetchers", "builders", "pushers", "network-retries", "speculative-actions"])
self.sched_error_action = scheduler.get_enum("on-error", _SchedulerErrorAction)
self.sched_fetchers = scheduler.get_int("fetchers")
self.sched_builders = scheduler.get_int("builders")
self.sched_pushers = scheduler.get_int("pushers")
self.sched_network_retries = scheduler.get_int("network-retries")

# Load speculative actions config
# Accepts mode string (none/prime-only/source-artifact/intra-element/full)
# or boolean for backward compatibility (True → full, False → none)
try:
self.speculative_actions_mode = scheduler.get_enum("speculative-actions", _SpeculativeActionMode)
except Exception:
self.speculative_actions_mode = (
_SpeculativeActionMode.FULL if scheduler.get_bool("speculative-actions")
else _SpeculativeActionMode.NONE
)

# Load build config
build = defaults.get_mapping("build")
build.validate_keys(["max-jobs", "retry-failed", "dependencies"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ message SymlinkNode {
// serializing, but care should be taken to avoid shortcuts. For instance,
// concatenating two messages to merge them may produce duplicate fields.
message Digest {
// The hash, represented as a lowercase hexadecimal string, padded with
// leading zeroes up to the hash function length.
// The hash. In the case of SHA-256, it will always be a lowercase hex string
// exactly 64 characters long.
string hash = 1;

// The size of the blob, in bytes.
Expand Down Expand Up @@ -1220,6 +1220,13 @@ message ActionResult {

// The details of the execution that originally produced this result.
ExecutedActionMetadata execution_metadata = 9;

// The digests of Actions that were executed as nested executions during
// this action (e.g., compiler invocations via recc). Each digest references
// an Action that was stored in the CAS during execution. This allows clients
// to retrieve the full dependency tree of actions that contributed to this
// result.
repeated Digest subactions = 99;
}

// An `OutputFile` is similar to a
Expand Down Expand Up @@ -1433,20 +1440,6 @@ message ExecuteRequest {
// length of the action digest hash and the digest functions announced
// in the server's capabilities.
DigestFunction.Value digest_function = 9;

// A hint to the server to request inlining stdout in the
// [ActionResult][build.bazel.remote.execution.v2.ActionResult] message.
bool inline_stdout = 10;

// A hint to the server to request inlining stderr in the
// [ActionResult][build.bazel.remote.execution.v2.ActionResult] message.
bool inline_stderr = 11;

// A hint to the server to inline the contents of the listed output files.
// Each path needs to exactly match one file path in either `output_paths` or
// `output_files` (DEPRECATED since v2.1) in the
// [Command][build.bazel.remote.execution.v2.Command] message.
repeated string inline_output_files = 12;
}

// A `LogFile` is a log stored in the CAS.
Expand Down Expand Up @@ -1682,7 +1675,7 @@ message BatchUpdateBlobsRequest {
bytes data = 2;

// The format of `data`. Must be `IDENTITY`/unspecified, or one of the
// compressors advertised by the
// compressors advertised by the
// [CacheCapabilities.supported_batch_compressors][build.bazel.remote.execution.v2.CacheCapabilities.supported_batch_compressors]
// field.
Compressor.Value compressor = 3;
Expand Down

Large diffs are not rendered by default.

Loading
Loading