Skip to content

Commit da24730

Browse files
Chibinmy-ship-it
authored andcommitted
Accommodate for AF_INET6 when doing a motion layer IPC teardown
Previously on commit 70306db18e2, we removed pg_getaddrinfo_all for signal handlers. However, in doing so, the capability of supporting both AF_INET6 and AF_INET was lost; this responsibility must now be handled by us. The commit mentioned above fixed the issue for AF_INET (IPv4), but not for AF_INET6 (IPv6). This commit addresses the situation for both AF_INET and AF_INET6. Reviewed-by: Soumyadeep Chakraborty <[email protected]>
1 parent 1c9b883 commit da24730

1 file changed

Lines changed: 100 additions & 58 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -705,8 +705,8 @@ typedef struct ICStatistics
705705
/* Statistics for UDP interconnect. */
706706
static ICStatistics ic_statistics;
707707

708-
static struct addrinfo udp_dummy_packet_addrinfo;
709-
static struct sockaddr udp_dummy_packet_sockaddr;
708+
/* Cached sockaddr of the listening udp socket */
709+
static struct sockaddr_storage udp_dummy_packet_sockaddr;
710710

711711
/* UDP listen fd */
712712
int UDP_listenerFd;
@@ -735,10 +735,15 @@ static void setRxThreadError(int eno);
735735
static void resetRxThreadError(void);
736736
static void SendDummyPacket(void);
737737

738+
static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len);
739+
#if defined(__darwin__)
740+
#define s6_addr32 __u6_addr.__u6_addr32
741+
static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest);
742+
#endif
738743
static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort);
739744
static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
740745
static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort,
741-
int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr);
746+
int *txFamily, struct sockaddr_storage *listenerSockaddr);
742747
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
743748
ExecSlice *sendSlice,
744749
int *pOutgoingCount);
@@ -1576,7 +1581,7 @@ resetRxThreadError()
15761581
* Setup udp listening socket.
15771582
*/
15781583
static void
1579-
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr)
1584+
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr)
15801585
{
15811586
struct addrinfo *addrs = NULL;
15821587
struct addrinfo *addr;
@@ -1697,16 +1702,6 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
16971702
if (!addr || ic_socket == PGINVALID_SOCKET)
16981703
goto startup_failed;
16991704

1700-
/*
1701-
* cache the successful addrinfo and sockaddr of the listening socket, so
1702-
* we can use this information to connect to the listening socket.
1703-
*/
1704-
if (listenerAddrinfo != NULL && listenerSockaddr != NULL )
1705-
{
1706-
memcpy(listenerAddrinfo, addr, sizeof(udp_dummy_packet_addrinfo));
1707-
memcpy(listenerSockaddr, addr->ai_addr, sizeof(udp_dummy_packet_sockaddr));
1708-
}
1709-
17101705
/* Memorize the socket fd, kernel assigned port and address family */
17111706
*listenerSocketFd = ic_socket;
17121707
if (listenerAddr.ss_family == AF_INET6)
@@ -1720,6 +1715,13 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
17201715
*txFamily = AF_INET;
17211716
}
17221717

1718+
/*
1719+
* cache the successful sockaddr of the listening socket, so
1720+
* we can use this information to connect to the listening socket.
1721+
*/
1722+
if (listenerSockaddr != NULL)
1723+
memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage));
1724+
17231725
/* Set up socket non-blocking mode */
17241726
if (!pg_set_noblock(ic_socket))
17251727
{
@@ -1852,9 +1854,8 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
18521854
/*
18531855
* setup listening socket and sending socket for Interconnect.
18541856
*/
1855-
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily,
1856-
&udp_dummy_packet_addrinfo, &udp_dummy_packet_sockaddr);
1857-
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL, NULL);
1857+
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, &udp_dummy_packet_sockaddr);
1858+
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL);
18581859

18591860
/* Initialize receive control data. */
18601861
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
@@ -1962,7 +1963,6 @@ static inline void CleanupMotionUDPIFC(void)
19621963
ICSenderPort = 0;
19631964
ICSenderFamily = 0;
19641965

1965-
memset(&udp_dummy_packet_addrinfo, 0, sizeof(udp_dummy_packet_addrinfo));
19661966
memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr));
19671967

19681968
#ifdef USE_ASSERT_CHECKING
@@ -3269,30 +3269,8 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS
32693269
*/
32703270
if (pEntry->txfd_family == AF_INET6)
32713271
{
3272-
struct sockaddr_storage temp;
3273-
const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer;
3274-
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
3275-
3276-
memset(&temp, 0, sizeof(temp));
3277-
32783272
elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address.");
3279-
3280-
/* Construct a V4-to-6 mapped address. */
3281-
temp.ss_family = AF_INET6;
3282-
in6_new->sin6_family = AF_INET6;
3283-
in6_new->sin6_port = in->sin_port;
3284-
in6_new->sin6_flowinfo = 0;
3285-
3286-
memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr));
3287-
/* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */
3288-
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
3289-
/* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */
3290-
memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4);
3291-
in6_new->sin6_scope_id = 0;
3292-
3293-
/* copy it back */
3294-
memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6));
3295-
conn->peer_len = sizeof(struct sockaddr_in6);
3273+
ConvertToIPv4MappedAddr(&conn->peer, &conn->peer_len);
32963274
}
32973275
else
32983276
{
@@ -7989,28 +7967,94 @@ WaitInterconnectQuitUDPIFC(void)
79897967
ic_control_info.threadCreated = false;
79907968
}
79917969

7970+
/*
7971+
* If the socket was created AF_INET6, but the address we want to
7972+
* send to is IPv4 (AF_INET), we need to change the address
7973+
* format. On Linux, this is not necessary: glibc automatically
7974+
* handles this. But on MAC OSX and Solaris, we need to convert
7975+
* the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format.
7976+
*
7977+
* The comment above relies on getaddrinfo() via function getSockAddr to get
7978+
* the correct V4-mapped address. We need to be careful here as we need to
7979+
* ensure that the platform we are using is POSIX 1003-2001 compliant.
7980+
* Just to be on the safeside, we'll be keeping this function for
7981+
* now to be used for all platforms and not rely on POSIX.
7982+
*
7983+
* Since this can be called in a signal handler, we avoid the use of
7984+
* async-signal unsafe functions such as memset/memcpy
7985+
*/
7986+
static void
7987+
ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len)
7988+
{
7989+
const struct sockaddr_in *in = (const struct sockaddr_in *) sockaddr;
7990+
struct sockaddr_storage temp = {0};
7991+
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
7992+
7993+
/* Construct a IPv4-to-IPv6 mapped address. */
7994+
temp.ss_family = AF_INET6;
7995+
in6_new->sin6_family = AF_INET6;
7996+
in6_new->sin6_port = in->sin_port;
7997+
in6_new->sin6_flowinfo = 0;
7998+
7999+
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
8000+
8001+
in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
8002+
in6_new->sin6_scope_id = 0;
8003+
8004+
/* copy it back */
8005+
*sockaddr = temp;
8006+
*o_len = sizeof(struct sockaddr_in6);
8007+
}
8008+
8009+
#if defined(__darwin__)
8010+
/* macos does not accept :: as the destination, we will need to covert this to the IPv6 loopback */
8011+
static void
8012+
ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest)
8013+
{
8014+
char address[INET6_ADDRSTRLEN];
8015+
/* we want to terminate our own process, so this should be local */
8016+
const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &udp_dummy_packet_sockaddr;
8017+
inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address));
8018+
if (strcmp("::", address) == 0)
8019+
((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback;
8020+
}
8021+
#endif
8022+
79928023
/*
79938024
* Send a dummy packet to interconnect thread to exit poll() immediately
79948025
*/
79958026
static void
79968027
SendDummyPacket(void)
79978028
{
79988029
int ret;
7999-
in_port_t udp_listener_port;
80008030
char *dummy_pkt = "stop it";
80018031
int counter;
8002-
struct sockaddr_in *addr_in = NULL;
8003-
struct sockaddr_in dest_addr;
8004-
/*
8005-
* Get address info from interconnect udp listener port
8006-
*/
8007-
udp_listener_port = (Gp_listener_port >> 16) & 0x0ffff;
8032+
struct sockaddr_storage dest;
8033+
socklen_t dest_len;
80088034

8009-
addr_in = (struct sockaddr_in *) &udp_dummy_packet_sockaddr;
8010-
memset(&dest_addr, 0, sizeof(dest_addr));
8011-
dest_addr.sin_family = addr_in->sin_family;
8012-
dest_addr.sin_port = htons(udp_listener_port);
8013-
dest_addr.sin_addr.s_addr = addr_in->sin_addr.s_addr;
8035+
Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || udp_dummy_packet_sockaddr.ss_family == AF_INET6);
8036+
Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6);
8037+
8038+
dest = udp_dummy_packet_sockaddr;
8039+
dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
8040+
8041+
if (ICSenderFamily == AF_INET6)
8042+
{
8043+
#if defined(__darwin__)
8044+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET6)
8045+
ConvertIPv6WildcardToLoopback(&dest);
8046+
#endif
8047+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET)
8048+
ConvertToIPv4MappedAddr(&dest, &dest_len);
8049+
}
8050+
8051+
if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == AF_INET6)
8052+
{
8053+
/* the size of AF_INET6 is bigger than the side of IPv4, so
8054+
* converting from IPv6 to IPv4 may potentially not work. */
8055+
ereport(LOG, errmsg("sending dummy packet failed: cannot send from AF_INET to receiving on AF_INET6"));
8056+
return;
8057+
}
80148058

80158059
/*
80168060
* Send a dummy package to the interconnect listener, try 10 times.
@@ -8021,24 +8065,22 @@ SendDummyPacket(void)
80218065
while (counter < 10)
80228066
{
80238067
counter++;
8024-
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest_addr, sizeof(dest_addr));
8068+
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest, dest_len);
80258069
if (ret < 0)
80268070
{
80278071
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
80288072
continue;
80298073
else
80308074
{
8031-
elog(LOG, "send dummy packet failed, sendto failed: %m");
8075+
ereport(LOG, errmsg("send dummy packet failed, sendto failed: %m"));
80328076
return;
80338077
}
80348078
}
80358079
break;
80368080
}
80378081

80388082
if (counter >= 10)
8039-
{
8040-
elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m");
8041-
}
8083+
ereport(LOG, errmsg("send dummy packet failed, sendto failed with 10 times: %m"));
80428084
}
80438085

80448086
void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id)

0 commit comments

Comments
 (0)