Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ impl DeploymentStore {
schema: &InputSchema,
deployment: DeploymentCreate,
site: Arc<Site>,
graft_base: Option<Arc<Layout>>,
replace: bool,
on_sync: OnSync,
index_def: Option<IndexList>,
Expand Down Expand Up @@ -217,27 +216,14 @@ impl DeploymentStore {
let query = format!("create schema {}", &site.namespace);
conn.batch_execute(&query).await?;

let layout = Layout::create_relational_schema(
let _ = Layout::create_relational_schema(
conn,
site.clone(),
schema,
entities_with_causality_region.into_iter().collect(),
index_def,
)
.await?;
// See if we are grafting and check that the graft is permissible
if let Some(base) = graft_base {
let errors = layout.can_copy_from(&base);
if !errors.is_empty() {
return Err(StoreError::Unknown(anyhow!(
"The subgraph `{}` cannot be used as the graft base \
for `{}` because the schemas are incompatible:\n - {}",
&base.catalog.site.namespace,
&layout.catalog.site.namespace,
errors.join("\n - ")
)));
}
}

// Create data sources table
if site.schema_version.private_data_sources() {
Expand Down
101 changes: 58 additions & 43 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use graph::{
use graph::{derive::CheapClone, futures03::future::join_all};

use crate::{
catalog::Catalog,
deployment::{OnSync, SubgraphHealth},
primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site},
relational::{
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Shard {
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
{
return Err(StoreError::InvalidIdentifier(format!(
"shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name
"shard name `{name}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'"
)));
}
Ok(Shard(name))
Expand Down Expand Up @@ -339,7 +340,7 @@ impl SubgraphStore {
self.evict(schema.id())?;
let graft_base = deployment.graft_base.as_ref();

let (site, exists, node_id) = {
let (site, deployment_store, node_id) = {
// We need to deal with two situations:
// (1) We are really creating a new subgraph; it therefore needs
// to go in the shard and onto the node that the placement
Expand All @@ -351,42 +352,63 @@ impl SubgraphStore {
// assignment that we used last time to avoid creating
// the same deployment in another shard
let (shard, node_id) = self.place(&name, &network_name, node_id).await?;
let mut conn = self.primary_conn().await?;
let (site, site_was_created) = conn
.allocate_site(shard, schema.id(), network_name, graft_base)
.await?;
let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id);
(site, !site_was_created, node_id)
};
let site = Arc::new(site);

// if the deployment already exists, we don't need to perform any copying
// so we can set graft_base to None
// if it doesn't exist, we need to copy the graft base to the new deployment
let graft_base_layout = if !exists {
let graft_base = match deployment.graft_base.as_ref() {
Some(base) => Some(self.layout(base).await?),
None => None,
};

if let Some(graft_base) = &graft_base {
self.primary_conn()
.await?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())
.await?;
}
graft_base
} else {
None
let mut pconn = self.primary_conn().await?;
pconn
.transaction(|pconn| {
async {
let (site, site_was_created) = pconn
.allocate_site(shard, schema.id(), network_name, graft_base)
.await?;
let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id);
let site = Arc::new(site);
let deployment_store = self
.stores
.get(&site.shard)
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;

if site_was_created {
if let Some(graft_base) = graft_base {
// Ensure that the graft base exists
let base_layout = self.layout(graft_base).await?;
let mut shard_conn =
deployment_store.get_replica_conn(ReplicaId::Main).await?;
let entities_with_causality_region =
deployment.manifest.entities_with_causality_region.clone();
let catalog = Catalog::for_creation(
&mut shard_conn,
site.cheap_clone(),
entities_with_causality_region.into_iter().collect(),
)
.await?;
let layout = Layout::new(site.cheap_clone(), schema, catalog)?;

let errors = layout.can_copy_from(&base_layout);
if !errors.is_empty() {
return Err(StoreError::Unknown(anyhow!(
"The subgraph `{}` cannot be used as the graft base \
for `{}` because the schemas are incompatible:\n - {}",
&base_layout.catalog.site.namespace,
&layout.catalog.site.namespace,
errors.join("\n - ")
)));
}

pconn
.record_active_copy(base_layout.site.as_ref(), site.as_ref())
.await?;
}
}

Ok((site, deployment_store, node_id))
}
.scope_boxed()
})
.await?
};

// Create the actual databases schema and metadata entries
let deployment_store = self
.stores
.get(&site.shard)
.ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?;

let index_def = if let Some(graft) = &graft_base.clone() {
let index_def = if let Some(graft) = graft_base {
if let Some(site) = self.sites.get(graft) {
let store = self
.stores
Expand All @@ -406,7 +428,6 @@ impl SubgraphStore {
schema,
deployment,
site.clone(),
graft_base_layout,
replace,
OnSync::None,
index_def,
Expand Down Expand Up @@ -731,18 +752,15 @@ impl Inner {

if src.id == dst.id {
return Err(StoreError::Unknown(anyhow!(
"can not copy deployment {} onto itself",
src_loc
"can not copy deployment {src_loc} onto itself"
)));
}
// The very last thing we do when we set up a copy here is assign it
// to a node. Therefore, if `dst` is already assigned, this function
// should not have been called.
if let Some(node) = self.mirror.assigned_node(dst.as_ref()).await? {
return Err(StoreError::Unknown(anyhow!(
"can not copy into deployment {} since it is already assigned to node `{}`",
dst_loc,
node
"can not copy into deployment {dst_loc} since it is already assigned to node `{node}`"
)));
}
let deployment = src_store.load_deployment(src.clone()).await?;
Expand All @@ -758,8 +776,6 @@ impl Inner {
history_blocks_override: None,
};

let graft_base = self.layout(&src.deployment).await?;

self.primary_conn()
.await?
.record_active_copy(src.as_ref(), dst.as_ref())
Expand All @@ -776,7 +792,6 @@ impl Inner {
&src_layout.input_schema,
deployment,
dst.clone(),
Some(graft_base),
false,
on_sync,
Some(index_def),
Expand Down